Skip to content

Commit

Permalink
HIVE-28258: Use Iceberg semantics for Merge task
Browse files Browse the repository at this point in the history
  • Loading branch information
SourabhBadhya committed May 21, 2024
1 parent 77debdb commit f7ec736
Show file tree
Hide file tree
Showing 19 changed files with 523 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -35,27 +36,31 @@ public class FilesForCommit implements Serializable {
private final Collection<DeleteFile> deleteFiles;
private final Collection<DataFile> replacedDataFiles;
private final Collection<CharSequence> referencedDataFiles;
private final Collection<Path> mergedAndDeletedFiles;

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) {
this(dataFiles, deleteFiles, Collections.emptyList());
}

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> replacedDataFiles, Collection<CharSequence> referencedDataFiles) {
Collection<DataFile> replacedDataFiles, Collection<CharSequence> referencedDataFiles,
Collection<Path> mergedAndDeletedFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.replacedDataFiles = replacedDataFiles;
this.referencedDataFiles = referencedDataFiles;
this.mergedAndDeletedFiles = mergedAndDeletedFiles;
}

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> replacedDataFiles) {
this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet());
this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet(), Collections.emptySet());
}

public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles,
Collection<CharSequence> referencedDataFiles) {
return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(), referencedDataFiles);
return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(),
referencedDataFiles, Collections.emptySet());
}

public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
Expand Down Expand Up @@ -86,6 +91,10 @@ public Collection<CharSequence> referencedDataFiles() {
return referencedDataFiles;
}

public Collection<Path> mergedAndDeletedFiles() {
return mergedAndDeletedFiles;
}

public Collection<? extends ContentFile> allFiles() {
return Stream.concat(dataFiles.stream(), deleteFiles.stream()).collect(Collectors.toList());
}
Expand All @@ -101,6 +110,7 @@ public String toString() {
.add("deleteFiles", deleteFiles.toString())
.add("replacedDataFiles", replacedDataFiles.toString())
.add("referencedDataFiles", referencedDataFiles.toString())
.add("mergedAndDeletedFiles", mergedAndDeletedFiles.toString())
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
import org.apache.hadoop.hive.ql.io.MergeSplitProperties;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
Expand All @@ -56,6 +58,7 @@
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -65,7 +68,7 @@

public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
implements CombineHiveInputFormat.AvoidSplitCombination, VectorizedInputFormatInterface,
LlapCacheOnlyInputFormatInterface.VectorizedOnly {
LlapCacheOnlyInputFormatInterface.VectorizedOnly, CombineHiveInputFormat.MergeSplit {

private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class);
private static final String HIVE_VECTORIZED_RECORDREADER_CLASS =
Expand Down Expand Up @@ -225,4 +228,11 @@ public static String getVectorizationConfName(String tableName) {
String dbAndTableName = TableName.fromString(tableName, null, null).getNotEmptyDbTable();
return ICEBERG_DISABLE_VECTORIZATION_PREFIX + dbAndTableName;
}

@Override
public FileSplit createMergeSplit(Configuration conf,
CombineHiveInputFormat.CombineHiveInputSplit split,
Integer partition, MergeSplitProperties properties) throws IOException {
return new IcebergMergeSplit(conf, split, partition, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
Expand All @@ -83,6 +84,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
Expand Down Expand Up @@ -126,6 +128,13 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {

TaskAttemptID attemptID = context.getTaskAttemptID();
JobConf jobConf = context.getJobConf();
Collection<Path> mergedPaths = Collections.emptySet();
if (jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)) {
MapWork mrwork = Utilities.getMapWork(jobConf);
if (mrwork != null && mrwork.getInputPaths() != null) {
mergedPaths = mrwork.getInputPaths();
}
}
Set<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
Map<String, List<HiveIcebergWriter>> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
.orElseGet(() -> {
Expand All @@ -136,6 +145,7 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
try {
// Generates commit files for the target tables in parallel
Collection<Path> finalMergedPaths = new ConcurrentLinkedQueue<>(mergedPaths);
Tasks.foreach(outputs)
.retry(3)
.stopOnFailure()
Expand All @@ -158,7 +168,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());
}
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles),
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles,
finalMergedPaths),
fileForCommitLocation, table.io());
} else {
LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
Expand All @@ -170,8 +181,6 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
LOG.info("CommitTask found no serialized table in config for table: {}.", output);
}
}, IOException.class);

cleanMergeTaskInputFiles(jobConf, tableExecutor, context);
} finally {
if (tableExecutor != null) {
tableExecutor.shutdown();
Expand Down Expand Up @@ -281,6 +290,11 @@ public void commitJobs(List<JobContext> originalContextList, Operation operation
.collect(Collectors.toList()));
commitTable(table.io(), fileExecutor, output, operation);
});

// Cleanup any merge input files.
for (JobContext jobContext : jobContextList) {
cleanMergeTaskInputFiles(jobContext.getJobConf(), tableExecutor);
}
} finally {
fileExecutor.shutdown();
if (tableExecutor != null) {
Expand Down Expand Up @@ -423,6 +437,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
List<DeleteFile> deleteFiles = Lists.newArrayList();
List<DataFile> replacedDataFiles = Lists.newArrayList();
Set<CharSequence> referencedDataFiles = Sets.newHashSet();
Set<Path> mergedAndDeletedFiles = Sets.newHashSet();

Table table = null;
String branchName = null;
Expand Down Expand Up @@ -459,9 +474,14 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
deleteFiles.addAll(writeResults.deleteFiles());
replacedDataFiles.addAll(writeResults.replacedDataFiles());
referencedDataFiles.addAll(writeResults.referencedDataFiles());
mergedAndDeletedFiles.addAll(writeResults.mergedAndDeletedFiles());
}

FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles);
dataFiles.removeIf(dataFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(dataFile.path()))));
deleteFiles.removeIf(deleteFile -> mergedAndDeletedFiles.contains(new Path(String.valueOf(deleteFile.path()))));

FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles,
Collections.emptySet());
long startTime = System.currentTimeMillis();

if (Operation.IOW != operation) {
Expand Down Expand Up @@ -687,6 +707,7 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
Collection<DataFile> replacedDataFiles = new ConcurrentLinkedQueue<>();
Collection<CharSequence> referencedDataFiles = new ConcurrentLinkedQueue<>();
Collection<Path> mergedAndDeletedFiles = new ConcurrentLinkedQueue<>();
Tasks.range(numTasks)
.throwFailureWhenFinished(throwOnFailure)
.executeWith(executor)
Expand All @@ -698,9 +719,10 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
deleteFiles.addAll(files.deleteFiles());
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());
mergedAndDeletedFiles.addAll(files.mergedAndDeletedFiles());
});

return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles);
return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles, mergedAndDeletedFiles);
}

/**
Expand Down Expand Up @@ -751,7 +773,8 @@ public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws IOEx
List<OutputTable> outputs = collectOutputs(jobContexts);
ExecutorService fileExecutor = fileExecutor(jobContexts.get(0).getJobConf());
ExecutorService tableExecutor = tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
Map<Path, List<FileStatus>> parentDirToDataFile = Maps.newConcurrentMap();
Map<Path, List<FileStatus>> parentDirToDeleteFile = Maps.newConcurrentMap();
try {
Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
.map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext))))
Expand All @@ -773,8 +796,22 @@ public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws IOEx
FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext,
table.io(), false);
for (DataFile dataFile : results.dataFiles()) {
FileStatus fileStatus = fileSystem.getFileStatus(new Path(dataFile.path().toString()));
dataFiles.add(fileStatus);
Path filePath = new Path(dataFile.path().toString());
FileStatus fileStatus = fileSystem.getFileStatus(filePath);
parentDirToDataFile.merge(
filePath.getParent(), Lists.newArrayList(fileStatus), (oldList, newList) -> {
oldList.addAll(newList);
return oldList;
});
}
for (DeleteFile deleteFile : results.deleteFiles()) {
Path filePath = new Path(deleteFile.path().toString());
FileStatus fileStatus = fileSystem.getFileStatus(filePath);
parentDirToDeleteFile.merge(
filePath.getParent(), Lists.newArrayList(fileStatus), (oldList, newList) -> {
oldList.addAll(newList);
return oldList;
});
}
}, IOException.class);
} finally {
Expand All @@ -783,12 +820,59 @@ public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws IOEx
tableExecutor.shutdown();
}
}
Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
parentDirToDataFile.forEach((parentDir, fileStatusList) -> {
if (fileStatusList.size() > 1) {
dataFiles.addAll(fileStatusList);
}
});
parentDirToDeleteFile.forEach((parentDir, fileStatusList) -> {
if (fileStatusList.size() > 1) {
dataFiles.addAll(fileStatusList);
}
});
return Lists.newArrayList(dataFiles);
}

public List<ContentFile> getOutputContentFiles(List<JobContext> jobContexts) throws IOException {
List<OutputTable> outputs = collectOutputs(jobContexts);
ExecutorService fileExecutor = fileExecutor(jobContexts.get(0).getJobConf());
ExecutorService tableExecutor = tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
Collection<ContentFile> files = new ConcurrentLinkedQueue<>();
try {
Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
.map(jobContext -> new SimpleImmutableEntry<>(kv.table, jobContext))))
.suppressFailureWhenFinished()
.executeWith(tableExecutor)
.onFailure((output, exc) -> LOG.warn("Failed to retrieve merge input file for the table {}", output, exc))
.run(output -> {
JobContext jobContext = output.getValue();
JobConf jobConf = jobContext.getJobConf();
LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output);

Table table = output.getKey();
FileSystem fileSystem = new Path(table.location()).getFileSystem(jobConf);
String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
// list jobLocation to get number of forCommit files
// we do this because map/reduce num in jobConf is unreliable
// and we have no access to vertex status info
int numTasks = listForCommits(jobConf, jobLocation).size();
FilesForCommit results = collectResults(numTasks, fileExecutor, table.location(), jobContext,
table.io(), false);
files.addAll(results.deleteFiles());
files.addAll(results.dataFiles());
}, IOException.class);
} finally {
fileExecutor.shutdown();
if (tableExecutor != null) {
tableExecutor.shutdown();
}
}
return Lists.newArrayList(files);
}

private void cleanMergeTaskInputFiles(JobConf jobConf,
ExecutorService tableExecutor,
TaskAttemptContext context) throws IOException {
ExecutorService tableExecutor) throws IOException {
// Merge task has merged several files into one. Hence we need to remove the stale files.
// At this stage the file is written and task-committed, but the old files are still present.
if (jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class)) {
Expand All @@ -800,8 +884,10 @@ private void cleanMergeTaskInputFiles(JobConf jobConf,
.retry(3)
.executeWith(tableExecutor)
.run(path -> {
FileSystem fs = path.getFileSystem(context.getJobConf());
fs.delete(path, true);
FileSystem fs = path.getFileSystem(jobConf);
if (fs.exists(path)) {
fs.delete(path, true);
}
}, IOException.class);
}
}
Expand Down
Loading

0 comments on commit f7ec736

Please sign in to comment.