Skip to content

Commit

Permalink
[HUDI-2086]redo the logical of mor_incremental_view for hive
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Nov 2, 2021
1 parent f897e6d commit 2182f54
Show file tree
Hide file tree
Showing 9 changed files with 1,005 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Encode additional information in split to track matching log file and base files.
* Hence, this class tracks a log/base file split.
*/
public class BaseFileWithLogsSplit extends FileSplit {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalSplit = false;
// the log file paths of this split.
private List<String> deltaLogPaths = new ArrayList<>();
// max commit time of current split.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this split.
private String baseFilePath = "";

public BaseFileWithLogsSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeBoolean(belongToIncrementalSplit);
Text.writeString(out, maxCommitTime);
Text.writeString(out, basePath);
Text.writeString(out, baseFilePath);
out.writeInt(deltaLogPaths.size());
for (String logPath : deltaLogPaths) {
Text.writeString(out, logPath);
}
}

@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
belongToIncrementalSplit = in.readBoolean();
maxCommitTime = Text.readString(in);
basePath = Text.readString(in);
baseFilePath = Text.readString(in);
int deltaLogSize = in.readInt();
List<String> tempDeltaLogs = new ArrayList<>();
for (int i = 0; i < deltaLogSize; i++) {
tempDeltaLogs.add(Text.readString(in));
}
deltaLogPaths = tempDeltaLogs;
}

public boolean getBelongToIncrementalSplit() {
return belongToIncrementalSplit;
}

public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) {
this.belongToIncrementalSplit = belongToIncrementalSplit;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}

public String getMaxCommitTime() {
return maxCommitTime;
}

public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}

public String getBaseFilePath() {
return baseFilePath;
}

public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline tim
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}

protected FileStatus[] getStatus(JobConf job) throws IOException {
return super.listStatus(job);
}

protected boolean includeLogFilesForSnapShotView() {
return false;
}

@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
Expand Down Expand Up @@ -108,7 +116,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
}
return returns.toArray(new FileStatus[0]);
}
Expand All @@ -120,7 +128,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
private List<FileStatus> listStatusForIncrementalMode(
protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
import java.util.List;

/**
* Encode additional information in Path to track matching log file and base files.
* Hence, this class tracks a log/base file status.
*/
public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalPath = false;
// the log files belong this path.
private List<String> deltaLogPaths = new ArrayList<>();
// max commit time of current path.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this path;
private String baseFilePath = "";
// the bootstrap file belong to this path.
// only if current query table is bootstrap table, this field is used.
private PathWithBootstrapFileStatus pathWithBootstrapFileStatus;

public PathWithLogFilePath(Path parent, String child) {
super(parent, child);
}

public void setBelongToIncrementalPath(boolean belongToIncrementalPath) {
this.belongToIncrementalPath = belongToIncrementalPath;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}

public String getMaxCommitTime() {
return maxCommitTime;
}

public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}

public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}

public boolean splitable() {
return !baseFilePath.isEmpty();
}

public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() {
return pathWithBootstrapFileStatus;
}

public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) {
this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus;
}

public boolean includeBootstrapFilePath() {
return pathWithBootstrapFileStatus != null;
}

public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) {
BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts);
bs.setBelongToIncrementalSplit(belongToIncrementalPath);
bs.setDeltaLogPaths(deltaLogPaths);
bs.setMaxCommitTime(maxCommitTime);
bs.setBasePath(basePath);
bs.setBaseFilePath(baseFilePath);
return bs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* With the base input format implementations in Hadoop/Hive,
* we need to encode additional information in Path to track base files and logs files for realtime read.
* Hence, this class tracks a log/base file status
* in Path.
*/
public class RealtimeFileStatus extends FileStatus {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalFileStatus = false;
// the log files belong this fileStatus.
private List<String> deltaLogPaths = new ArrayList<>();
// max commit time of current fileStatus.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this status;
private String baseFilePath = "";
// the bootstrap file belong to this status.
// only if current query table is bootstrap table, this field is used.
private FileStatus bootStrapFileStatus;

public RealtimeFileStatus(FileStatus fileStatus) throws IOException {
super(fileStatus);
}

@Override
public Path getPath() {
Path path = super.getPath();
PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName());
pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths);
pathWithLogFilePath.setMaxCommitTime(maxCommitTime);
pathWithLogFilePath.setBasePath(basePath);
pathWithLogFilePath.setBaseFilePath(baseFilePath);
if (bootStrapFileStatus != null) {
pathWithLogFilePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath());
}
return pathWithLogFilePath;
}

public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStatus) {
this.belongToIncrementalFileStatus = belongToIncrementalFileStatus;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}

public String getMaxCommitTime() {
return maxCommitTime;
}

public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}

public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}

public void setBootStrapFileStatus(FileStatus bootStrapFileStatus) {
this.bootStrapFileStatus = bootStrapFileStatus;
}
}
Loading

0 comments on commit 2182f54

Please sign in to comment.