Skip to content

Commit

Permalink
HIVE-28102: Iceberg: Invoke validateDataFilesExist for RowDelta opera…
Browse files Browse the repository at this point in the history
…tions. (apache#5111). (Ayush Saxena, reviewed by Denys Kuzmenko)
  • Loading branch information
ayushtkn authored and dengzhhu653 committed Mar 7, 2024
1 parent 723232c commit 841d4be
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 24 deletions.
Expand Up @@ -33,29 +33,37 @@ public class FilesForCommit implements Serializable {

private final Collection<DataFile> dataFiles;
private final Collection<DeleteFile> deleteFiles;
private Collection<DataFile> referencedDataFiles;
private final Collection<DataFile> replacedDataFiles;
private final Collection<CharSequence> referencedDataFiles;

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) {
this(dataFiles, deleteFiles, Collections.emptyList());
}

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> referencedDataFiles) {
Collection<DataFile> replacedDataFiles, Collection<CharSequence> referencedDataFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.replacedDataFiles = replacedDataFiles;
this.referencedDataFiles = referencedDataFiles;
}

public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles) {
return new FilesForCommit(Collections.emptyList(), deleteFiles);
public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> replacedDataFiles) {
this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet());
}

public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles,
Collection<CharSequence> referencedDataFiles) {
return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(), referencedDataFiles);
}

public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList());
}

public static FilesForCommit onlyData(Collection<DataFile> dataFiles, Collection<DataFile> referencedDataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList(), referencedDataFiles);
public static FilesForCommit onlyData(Collection<DataFile> dataFiles, Collection<DataFile> replacedDataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList(), replacedDataFiles);
}

public static FilesForCommit empty() {
Expand All @@ -70,7 +78,11 @@ public Collection<DeleteFile> deleteFiles() {
return deleteFiles;
}

public Collection<DataFile> referencedDataFiles() {
public Collection<DataFile> replacedDataFiles() {
return replacedDataFiles;
}

public Collection<CharSequence> referencedDataFiles() {
return referencedDataFiles;
}

Expand All @@ -79,14 +91,15 @@ public Collection<? extends ContentFile> allFiles() {
}

public boolean isEmpty() {
return dataFiles.isEmpty() && deleteFiles.isEmpty() && referencedDataFiles.isEmpty();
return dataFiles.isEmpty() && deleteFiles.isEmpty() && replacedDataFiles.isEmpty();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("dataFiles", dataFiles.toString())
.add("deleteFiles", deleteFiles.toString())
.add("replacedDataFiles", replacedDataFiles.toString())
.add("referencedDataFiles", referencedDataFiles.toString())
.toString();
}
Expand Down
Expand Up @@ -79,6 +79,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
Expand Down Expand Up @@ -142,16 +143,18 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
attemptID.getJobID(), attemptID.getTaskID().getId());
if (writers.get(output) != null) {
Collection<DataFile> dataFiles = Lists.newArrayList();
Collection<DeleteFile> deleteFiles = Lists.newArrayList();
Collection<DataFile> referencedDataFiles = Lists.newArrayList();
List<DataFile> dataFiles = Lists.newArrayList();
List<DeleteFile> deleteFiles = Lists.newArrayList();
List<DataFile> replacedDataFiles = Lists.newArrayList();
Set<CharSequence> referencedDataFiles = Sets.newHashSet();
for (HiveIcebergWriter writer : writers.get(output)) {
FilesForCommit files = writer.files();
dataFiles.addAll(files.dataFiles());
deleteFiles.addAll(files.deleteFiles());
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());
}
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles),
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles),
fileForCommitLocation, table.io());
} else {
LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
Expand Down Expand Up @@ -412,7 +415,8 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
}
List<DataFile> dataFiles = Lists.newArrayList();
List<DeleteFile> deleteFiles = Lists.newArrayList();
List<DataFile> referencedDataFiles = Lists.newArrayList();
List<DataFile> replacedDataFiles = Lists.newArrayList();
Set<CharSequence> referencedDataFiles = Sets.newHashSet();

Table table = null;
String branchName = null;
Expand All @@ -439,10 +443,11 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
numTasks, executor, outputTable.table.location(), jobContext, io, true);
dataFiles.addAll(writeResults.dataFiles());
deleteFiles.addAll(writeResults.deleteFiles());
replacedDataFiles.addAll(writeResults.replacedDataFiles());
referencedDataFiles.addAll(writeResults.referencedDataFiles());
}

FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles);
FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles);
long startTime = System.currentTimeMillis();

if (Operation.IOW != operation) {
Expand Down Expand Up @@ -485,9 +490,9 @@ private Long getSnapshotId(Table table, String branchName) {
private void commitWrite(Table table, String branchName, Long snapshotId, long startTime,
FilesForCommit results, Operation operation) {

if (!results.referencedDataFiles().isEmpty()) {
if (!results.replacedDataFiles().isEmpty()) {
OverwriteFiles write = table.newOverwrite();
results.referencedDataFiles().forEach(write::deleteFile);
results.replacedDataFiles().forEach(write::deleteFile);
results.dataFiles().forEach(write::addFile);

if (StringUtils.isNotEmpty(branchName)) {
Expand All @@ -497,6 +502,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
write.validateFromSnapshot(snapshotId);
}
write.validateNoConflictingData();
write.validateNoConflictingDeletes();
write.commit();
return;
}
Expand All @@ -523,6 +529,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
write.validateDeletedFiles();
write.validateNoConflictingDeleteFiles();
}
write.validateDataFilesExist(results.referencedDataFiles());
write.validateNoConflictingDataFiles();
write.commit();
}
Expand Down Expand Up @@ -660,7 +667,8 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
// starting from 0.
Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
Collection<DataFile> referencedDataFiles = new ConcurrentLinkedQueue<>();
Collection<DataFile> replacedDataFiles = new ConcurrentLinkedQueue<>();
Collection<CharSequence> referencedDataFiles = new ConcurrentLinkedQueue<>();
Tasks.range(numTasks)
.throwFailureWhenFinished(throwOnFailure)
.executeWith(executor)
Expand All @@ -670,11 +678,11 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
FilesForCommit files = readFileForCommit(taskFileName, io);
dataFiles.addAll(files.dataFiles());
deleteFiles.addAll(files.deleteFiles());
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());

});

return new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles);
return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles);
}

/**
Expand Down
Expand Up @@ -45,7 +45,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
private final int currentSpecId;

private final GenericRecord rowDataTemplate;
private final List<DataFile> referencedDataFiles;
private final List<DataFile> replacedDataFiles;

HiveIcebergCopyOnWriteRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io,
Expand All @@ -54,7 +54,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize));
this.currentSpecId = currentSpecId;
this.rowDataTemplate = GenericRecord.create(schema);
this.referencedDataFiles = Lists.newArrayList();
this.replacedDataFiles = Lists.newArrayList();
}

@Override
Expand All @@ -72,7 +72,7 @@ public void write(Writable row) throws IOException {
.withFileSizeInBytes(0)
.withRecordCount(0)
.build();
referencedDataFiles.add(dataFile);
replacedDataFiles.add(dataFile);
} else {
writer.write(rowData, specs.get(currentSpecId), partition(rowData, currentSpecId));
}
Expand All @@ -81,6 +81,6 @@ public void write(Writable row) throws IOException {
@Override
public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
return FilesForCommit.onlyData(dataFiles, referencedDataFiles);
return FilesForCommit.onlyData(dataFiles, replacedDataFiles);
}
}
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -68,6 +69,7 @@ public void write(Writable row) throws IOException {
@Override
public FilesForCommit files() {
List<DeleteFile> deleteFiles = ((DeleteWriteResult) writer.result()).deleteFiles();
return FilesForCommit.onlyDelete(deleteFiles);
Set<CharSequence> referencedDataFiles = ((DeleteWriteResult) writer.result()).referencedDataFiles();
return FilesForCommit.onlyDelete(deleteFiles, referencedDataFiles);
}
}
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive.writer;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -54,18 +56,24 @@ public void testDelete() throws IOException {
Collections.sort(deleteRecords,
Comparator.comparing(a -> a.getField(MetadataColumns.PARTITION_COLUMN_NAME).toString()));

CharSequenceSet expectedDataFiles = CharSequenceSet.empty();
Container<Record> container = new Container<>();
for (Record deleteRecord : deleteRecords) {
container.set(deleteRecord);
testWriter.write(container);
expectedDataFiles.add((String) deleteRecord.getField(MetadataColumns.FILE_PATH.name()));
}

testWriter.close(false);

RowDelta rowDelta = table.newRowDelta();
testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
Collection<CharSequence> actualDataFiles = testWriter.files().referencedDataFiles();
rowDelta.commit();

Assert.assertTrue("Actual :" + actualDataFiles + " Expected: " + expectedDataFiles,
actualDataFiles.containsAll(expectedDataFiles));

StructLikeSet expected = rowSetWithoutIds(RECORDS, DELETED_IDS);
StructLikeSet actual = actualRowSet(table);

Expand Down

0 comments on commit 841d4be

Please sign in to comment.