Skip to content

Commit

Permalink
HIVE-28258: Use Iceberg semantics for Merge task
Browse files Browse the repository at this point in the history
  • Loading branch information
SourabhBadhya committed Jun 18, 2024
1 parent 33cadc5 commit de494e1
Show file tree
Hide file tree
Showing 20 changed files with 985 additions and 491 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -35,27 +36,31 @@ public class FilesForCommit implements Serializable {
private final Collection<DeleteFile> deleteFiles;
private final Collection<DataFile> replacedDataFiles;
private final Collection<CharSequence> referencedDataFiles;
private final Collection<Path> mergedAndDeletedFiles;

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) {
this(dataFiles, deleteFiles, Collections.emptyList());
}

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> replacedDataFiles, Collection<CharSequence> referencedDataFiles) {
Collection<DataFile> replacedDataFiles, Collection<CharSequence> referencedDataFiles,
Collection<Path> mergedAndDeletedFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.replacedDataFiles = replacedDataFiles;
this.referencedDataFiles = referencedDataFiles;
this.mergedAndDeletedFiles = mergedAndDeletedFiles;
}

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> replacedDataFiles) {
this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet());
this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet(), Collections.emptySet());
}

public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles,
Collection<CharSequence> referencedDataFiles) {
return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(), referencedDataFiles);
return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(),
referencedDataFiles, Collections.emptySet());
}

public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
Expand Down Expand Up @@ -86,6 +91,10 @@ public Collection<CharSequence> referencedDataFiles() {
return referencedDataFiles;
}

public Collection<Path> mergedAndDeletedFiles() {
return mergedAndDeletedFiles;
}

public Collection<? extends ContentFile> allFiles() {
return Stream.concat(dataFiles.stream(), deleteFiles.stream()).collect(Collectors.toList());
}
Expand All @@ -101,6 +110,7 @@ public String toString() {
.add("deleteFiles", deleteFiles.toString())
.add("replacedDataFiles", replacedDataFiles.toString())
.add("referencedDataFiles", referencedDataFiles.toString())
.add("mergedAndDeletedFiles", mergedAndDeletedFiles.toString())
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
Expand All @@ -39,6 +40,7 @@
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
Expand All @@ -56,6 +58,7 @@
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -64,7 +67,7 @@
import org.slf4j.LoggerFactory;

public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
implements CombineHiveInputFormat.AvoidSplitCombination, VectorizedInputFormatInterface,
implements CombineHiveInputFormat.MergeSplits, VectorizedInputFormatInterface,
LlapCacheOnlyInputFormatInterface.VectorizedOnly {

private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class);
Expand Down Expand Up @@ -225,4 +228,11 @@ public static String getVectorizationConfName(String tableName) {
String dbAndTableName = TableName.fromString(tableName, null, null).getNotEmptyDbTable();
return ICEBERG_DISABLE_VECTORIZATION_PREFIX + dbAndTableName;
}

@Override
public FileSplit createMergeSplit(Configuration conf,
CombineHiveInputFormat.CombineHiveInputSplit split,
Integer partition, Properties properties) throws IOException {
return new IcebergMergeSplit(conf, split, partition, properties);
}
}

0 comments on commit de494e1

Please sign in to comment.