Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2086] Refactor hive mor_incremental_view #3203

Merged
merged 1 commit into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Encode additional information in split to track matching log file and base files.
* Hence, this class tracks a log/base file split.
*/
public class BaseFileWithLogsSplit extends FileSplit {
xiarixiaoyao marked this conversation as resolved.
Show resolved Hide resolved
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalSplit = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Rename to isIncrementalSplit. Sorry for nitpicking on the name but I feel if the comment is clear enough then the name can be a bit less verbose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, thanks

// the log file paths of this split.
private List<String> deltaLogPaths = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the log files of this split. => the log file paths of this split.

// max commit time of current split.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this split.
private String baseFilePath = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need bootstrap related variable in this class is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we will deal with bootstrap split in HoodieParquetRealtimeInputFormat.makeSplit. so we no need to add boostrap variable here


public BaseFileWithLogsSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeBoolean(belongToIncrementalSplit);
Text.writeString(out, maxCommitTime);
Text.writeString(out, basePath);
Text.writeString(out, baseFilePath);
out.writeInt(deltaLogPaths.size());
for (String logPath : deltaLogPaths) {
Text.writeString(out, logPath);
}
}

@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
belongToIncrementalSplit = in.readBoolean();
maxCommitTime = Text.readString(in);
basePath = Text.readString(in);
baseFilePath = Text.readString(in);
int deltaLogSize = in.readInt();
List<String> tempDeltaLogs = new ArrayList<>();
for (int i = 0; i < deltaLogSize; i++) {
tempDeltaLogs.add(Text.readString(in));
}
deltaLogPaths = tempDeltaLogs;
}

public boolean getBelongToIncrementalSplit() {
return belongToIncrementalSplit;
}

public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) {
this.belongToIncrementalSplit = belongToIncrementalSplit;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}

public String getMaxCommitTime() {
return maxCommitTime;
}

public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}

public String getBaseFilePath() {
return baseFilePath;
}

public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline tim
return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
}

protected FileStatus[] getStatus(JobConf job) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we remove the method and call listStatus directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we need to call ParquetInputFormat.listStatus in HoodieParquetRealtimeInputFormat.java, but we cannot access that method directly, so we overwrite getStatus in ParquetRealtimeInputFormat.java call call listStatus in it

return super.listStatus(job);
}

protected boolean includeLogFilesForSnapShotView() {
return false;
}

@Override
public FileStatus[] listStatus(JobConf job) throws IOException {
// Segregate inputPaths[] to incremental, snapshot and non hoodie paths
Expand Down Expand Up @@ -108,7 +116,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
// process snapshot queries next.
List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
if (snapshotPaths.size() > 0) {
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
}
return returns.toArray(new FileStatus[0]);
}
Expand All @@ -120,7 +128,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException {
* partitions and then filtering based on the commits of interest, this logic first extracts the
* partitions touched by the desired commits and then lists only those partitions.
*/
private List<FileStatus> listStatusForIncrementalMode(
protected List<FileStatus> listStatusForIncrementalMode(
JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException {
String tableName = tableMetaClient.getTableConfig().getTableName();
Job jobContext = Job.getInstance(job);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
import java.util.List;

/**
* Encode additional information in Path to track matching log file and base files.
* Hence, this class tracks a log/base file status.
*/
public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalPath = false;
// the log files belong this path.
private List<String> deltaLogPaths = new ArrayList<>();
vinothchandar marked this conversation as resolved.
Show resolved Hide resolved
// max commit time of current path.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this path;
private String baseFilePath = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the base file path belong to this path.

// the bootstrap file belong to this path.
// only if current query table is bootstrap table, this field is used.
private PathWithBootstrapFileStatus pathWithBootstrapFileStatus;

public PathWithLogFilePath(Path parent, String child) {
super(parent, child);
}

public void setBelongToIncrementalPath(boolean belongToIncrementalPath) {
this.belongToIncrementalPath = belongToIncrementalPath;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}

public String getMaxCommitTime() {
return maxCommitTime;
}

public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}

public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}

public boolean splitable() {
return !baseFilePath.isEmpty();
}

public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() {
return pathWithBootstrapFileStatus;
}

public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) {
this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus;
}

public boolean includeBootstrapFilePath() {
return pathWithBootstrapFileStatus != null;
}

public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) {
BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts);
bs.setBelongToIncrementalSplit(belongToIncrementalPath);
bs.setDeltaLogPaths(deltaLogPaths);
bs.setMaxCommitTime(maxCommitTime);
bs.setBasePath(basePath);
bs.setBaseFilePath(baseFilePath);
return bs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* With the base input format implementations in Hadoop/Hive,
* we need to encode additional information in Path to track base files and logs files for realtime read.
* Hence, this class tracks a log/base file status
* in Path.
*/
public class RealtimeFileStatus extends FileStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an -> a

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// a flag to mark this split is produced by incremental query or not.
private boolean belongToIncrementalFileStatus = false;
// the log files belong this fileStatus.
private List<String> deltaLogPaths = new ArrayList<>();
xiarixiaoyao marked this conversation as resolved.
Show resolved Hide resolved
// max commit time of current fileStatus.
private String maxCommitTime = "";
// the basePath of current hoodie table.
private String basePath = "";
// the base file belong to this status;
private String baseFilePath = "";
// the bootstrap file belong to this status.
// only if current query table is bootstrap table, this field is used.
private FileStatus bootStrapFileStatus;

public RealtimeFileStatus(FileStatus fileStatus) throws IOException {
super(fileStatus);
}

@Override
public Path getPath() {
Path path = super.getPath();
PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName());
pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus);
pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths);
pathWithLogFilePath.setMaxCommitTime(maxCommitTime);
pathWithLogFilePath.setBasePath(basePath);
pathWithLogFilePath.setBaseFilePath(baseFilePath);
if (bootStrapFileStatus != null) {
pathWithLogFilePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath());
}
return pathWithLogFilePath;
}

public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStatus) {
this.belongToIncrementalFileStatus = belongToIncrementalFileStatus;
}

public List<String> getDeltaLogPaths() {
return deltaLogPaths;
}

public void setDeltaLogPaths(List<String> deltaLogPaths) {
this.deltaLogPaths = deltaLogPaths;
}

public String getMaxCommitTime() {
return maxCommitTime;
}

public void setMaxCommitTime(String maxCommitTime) {
this.maxCommitTime = maxCommitTime;
}

public String getBasePath() {
return basePath;
}

public void setBasePath(String basePath) {
this.basePath = basePath;
}

public void setBaseFilePath(String baseFilePath) {
this.baseFilePath = baseFilePath;
}

public void setBootStrapFileStatus(FileStatus bootStrapFileStatus) {
this.bootStrapFileStatus = bootStrapFileStatus;
}
}