Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28102: Iceberg: Invoke validateDataFilesExist for RowDelta operations. #5111

Merged
merged 2 commits into from Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,29 +33,37 @@ public class FilesForCommit implements Serializable {

private final Collection<DataFile> dataFiles;
private final Collection<DeleteFile> deleteFiles;
private Collection<DataFile> referencedDataFiles;
private final Collection<DataFile> replacedDataFiles;
private final Collection<CharSequence> referencedDataFiles;

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) {
this(dataFiles, deleteFiles, Collections.emptyList());
}

public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> referencedDataFiles) {
Collection<DataFile> replacedDataFiles, Collection<CharSequence> referencedDataFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
this.replacedDataFiles = replacedDataFiles;
this.referencedDataFiles = referencedDataFiles;
}

public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles) {
return new FilesForCommit(Collections.emptyList(), deleteFiles);
public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles,
Collection<DataFile> replacedDataFiles) {
this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet());
}

public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles,
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
Collection<CharSequence> referencedDataFiles) {
return new FilesForCommit(Collections.emptyList(), deleteFiles, Collections.emptyList(), referencedDataFiles);
}

public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList());
}

public static FilesForCommit onlyData(Collection<DataFile> dataFiles, Collection<DataFile> referencedDataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList(), referencedDataFiles);
public static FilesForCommit onlyData(Collection<DataFile> dataFiles, Collection<DataFile> replacedDataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList(), replacedDataFiles);
}

public static FilesForCommit empty() {
Expand All @@ -70,7 +78,11 @@ public Collection<DeleteFile> deleteFiles() {
return deleteFiles;
}

public Collection<DataFile> referencedDataFiles() {
public Collection<DataFile> replacedDataFiles() {
return replacedDataFiles;
}

public Collection<CharSequence> referencedDataFiles() {
return referencedDataFiles;
}

Expand All @@ -79,14 +91,15 @@ public Collection<? extends ContentFile> allFiles() {
}

public boolean isEmpty() {
return dataFiles.isEmpty() && deleteFiles.isEmpty() && referencedDataFiles.isEmpty();
return dataFiles.isEmpty() && deleteFiles.isEmpty() && replacedDataFiles.isEmpty();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("dataFiles", dataFiles.toString())
.add("deleteFiles", deleteFiles.toString())
.add("replacedDataFiles", replacedDataFiles.toString())
.add("referencedDataFiles", referencedDataFiles.toString())
.toString();
}
Expand Down
Expand Up @@ -79,6 +79,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
Expand Down Expand Up @@ -142,16 +143,18 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
attemptID.getJobID(), attemptID.getTaskID().getId());
if (writers.get(output) != null) {
Collection<DataFile> dataFiles = Lists.newArrayList();
Collection<DeleteFile> deleteFiles = Lists.newArrayList();
Collection<DataFile> referencedDataFiles = Lists.newArrayList();
List<DataFile> dataFiles = Lists.newArrayList();
List<DeleteFile> deleteFiles = Lists.newArrayList();
List<DataFile> replacedDataFiles = Lists.newArrayList();
Set<CharSequence> referencedDataFiles = Sets.newHashSet();
for (HiveIcebergWriter writer : writers.get(output)) {
FilesForCommit files = writer.files();
dataFiles.addAll(files.dataFiles());
deleteFiles.addAll(files.deleteFiles());
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());
}
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles),
createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles),
fileForCommitLocation, table.io());
} else {
LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
Expand Down Expand Up @@ -412,7 +415,8 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
}
List<DataFile> dataFiles = Lists.newArrayList();
List<DeleteFile> deleteFiles = Lists.newArrayList();
List<DataFile> referencedDataFiles = Lists.newArrayList();
List<DataFile> replacedDataFiles = Lists.newArrayList();
Set<CharSequence> referencedDataFiles = Sets.newHashSet();

Table table = null;
String branchName = null;
Expand All @@ -439,10 +443,11 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output
numTasks, executor, outputTable.table.location(), jobContext, io, true);
dataFiles.addAll(writeResults.dataFiles());
deleteFiles.addAll(writeResults.deleteFiles());
replacedDataFiles.addAll(writeResults.replacedDataFiles());
referencedDataFiles.addAll(writeResults.referencedDataFiles());
}

FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles);
FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles);
long startTime = System.currentTimeMillis();

if (Operation.IOW != operation) {
Expand Down Expand Up @@ -485,9 +490,9 @@ private Long getSnapshotId(Table table, String branchName) {
private void commitWrite(Table table, String branchName, Long snapshotId, long startTime,
FilesForCommit results, Operation operation) {

if (!results.referencedDataFiles().isEmpty()) {
if (!results.replacedDataFiles().isEmpty()) {
OverwriteFiles write = table.newOverwrite();
results.referencedDataFiles().forEach(write::deleteFile);
results.replacedDataFiles().forEach(write::deleteFile);
results.dataFiles().forEach(write::addFile);

if (StringUtils.isNotEmpty(branchName)) {
Expand All @@ -497,6 +502,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
write.validateFromSnapshot(snapshotId);
}
write.validateNoConflictingData();
write.validateNoConflictingDeletes();
write.commit();
return;
}
Expand All @@ -523,6 +529,7 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
write.validateDeletedFiles();
write.validateNoConflictingDeleteFiles();
}
write.validateDataFilesExist(results.referencedDataFiles());
write.validateNoConflictingDataFiles();
write.commit();
}
Expand Down Expand Up @@ -660,7 +667,8 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
// starting from 0.
Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
Collection<DataFile> referencedDataFiles = new ConcurrentLinkedQueue<>();
Collection<DataFile> replacedDataFiles = new ConcurrentLinkedQueue<>();
Collection<CharSequence> referencedDataFiles = new ConcurrentLinkedQueue<>();
Tasks.range(numTasks)
.throwFailureWhenFinished(throwOnFailure)
.executeWith(executor)
Expand All @@ -670,11 +678,11 @@ private static FilesForCommit collectResults(int numTasks, ExecutorService execu
FilesForCommit files = readFileForCommit(taskFileName, io);
dataFiles.addAll(files.dataFiles());
deleteFiles.addAll(files.deleteFiles());
replacedDataFiles.addAll(files.replacedDataFiles());
referencedDataFiles.addAll(files.referencedDataFiles());

});

return new FilesForCommit(dataFiles, deleteFiles, referencedDataFiles);
return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, referencedDataFiles);
}

/**
Expand Down
Expand Up @@ -45,7 +45,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
private final int currentSpecId;

private final GenericRecord rowDataTemplate;
private final List<DataFile> referencedDataFiles;
private final List<DataFile> replacedDataFiles;

HiveIcebergCopyOnWriteRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io,
Expand All @@ -54,7 +54,7 @@ class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, targetFileSize));
this.currentSpecId = currentSpecId;
this.rowDataTemplate = GenericRecord.create(schema);
this.referencedDataFiles = Lists.newArrayList();
this.replacedDataFiles = Lists.newArrayList();
}

@Override
Expand All @@ -72,7 +72,7 @@ public void write(Writable row) throws IOException {
.withFileSizeInBytes(0)
.withRecordCount(0)
.build();
referencedDataFiles.add(dataFile);
replacedDataFiles.add(dataFile);
} else {
writer.write(rowData, specs.get(currentSpecId), partition(rowData, currentSpecId));
}
Expand All @@ -81,6 +81,6 @@ public void write(Writable row) throws IOException {
@Override
public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
return FilesForCommit.onlyData(dataFiles, referencedDataFiles);
return FilesForCommit.onlyData(dataFiles, replacedDataFiles);
}
}
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -68,6 +69,7 @@ public void write(Writable row) throws IOException {
@Override
public FilesForCommit files() {
List<DeleteFile> deleteFiles = ((DeleteWriteResult) writer.result()).deleteFiles();
return FilesForCommit.onlyDelete(deleteFiles);
Set<CharSequence> referencedDataFiles = ((DeleteWriteResult) writer.result()).referencedDataFiles();
return FilesForCommit.onlyDelete(deleteFiles, referencedDataFiles);
}
}
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive.writer;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -54,18 +56,24 @@ public void testDelete() throws IOException {
Collections.sort(deleteRecords,
Comparator.comparing(a -> a.getField(MetadataColumns.PARTITION_COLUMN_NAME).toString()));

CharSequenceSet expectedDataFiles = CharSequenceSet.empty();
Container<Record> container = new Container<>();
for (Record deleteRecord : deleteRecords) {
container.set(deleteRecord);
testWriter.write(container);
expectedDataFiles.add((String) deleteRecord.getField(MetadataColumns.FILE_PATH.name()));
}

testWriter.close(false);

RowDelta rowDelta = table.newRowDelta();
testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
Collection<CharSequence> actualDataFiles = testWriter.files().referencedDataFiles();
rowDelta.commit();

Assert.assertTrue("Actual :" + actualDataFiles + " Expected: " + expectedDataFiles,
actualDataFiles.containsAll(expectedDataFiles));

StructLikeSet expected = rowSetWithoutIds(RECORDS, DELETED_IDS);
StructLikeSet actual = actualRowSet(table);

Expand Down