Skip to content

Commit

Permalink
Spark 3.4: Support distributed planning
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Jul 21, 2023
1 parent 1d88e80 commit d55167d
Show file tree
Hide file tree
Showing 15 changed files with 1,314 additions and 47 deletions.
1 change: 1 addition & 0 deletions .baseline/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
org.apache.iceberg.NullOrder.*,
org.apache.iceberg.MetadataTableType.*,
org.apache.iceberg.MetadataColumns.*,
org.apache.iceberg.PlanningMode.*,
org.apache.iceberg.SortDirection.*,
org.apache.iceberg.TableProperties.*,
org.apache.iceberg.types.Type.*,
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -58,7 +59,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
"upper_bounds",
"column_sizes");

private static final List<String> SCAN_WITH_STATS_COLUMNS =
protected static final List<String> SCAN_WITH_STATS_COLUMNS =
ImmutableList.<String>builder().addAll(SCAN_COLUMNS).addAll(STATS_COLUMNS).build();

protected static final List<String> DELETE_SCAN_COLUMNS =
Expand Down Expand Up @@ -95,6 +96,10 @@ public Table table() {
return table;
}

public FileIO io() {
return table.io();
}

protected Schema tableSchema() {
return schema;
}
Expand All @@ -115,6 +120,10 @@ protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}

protected Expression residualFilter() {
return shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
}

protected boolean shouldPlanWithExecutor() {
return PLAN_SCANS_WITH_WORKER_POOL || context().planWithCustomizedExecutor();
}
Expand Down
9 changes: 2 additions & 7 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SnapshotUtil;

public class DataTableScan extends BaseTableScan {
protected DataTableScan(Table table, Schema schema, TableScanContext context) {
Expand Down Expand Up @@ -52,12 +51,8 @@ public TableScan appendsAfter(long fromSnapshotId) {
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
// call method in superclass just for the side effect of argument validation;
// we do not use its return value
super.useSnapshot(scanSnapshotId);
Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
return newRefinedScan(table(), snapshotSchema, context().useSnapshotId(scanSnapshotId));
protected boolean useSnapshotSchema() {
return true;
}

@Override
Expand Down
105 changes: 71 additions & 34 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.StructLikeWrapper;
Expand All @@ -66,13 +68,16 @@
* file.
*/
class DeleteFileIndex {
private static final DeleteFile[] NO_DELETES = new DeleteFile[0];

private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, Types.StructType> partitionTypeById;
private final Map<Integer, ThreadLocal<StructLikeWrapper>> wrapperById;
private final long[] globalSeqs;
private final DeleteFile[] globalDeletes;
private final Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>>
sortedDeletesByPartition;
private final boolean isEmpty;

DeleteFileIndex(
Map<Integer, PartitionSpec> specsById,
Expand All @@ -87,11 +92,11 @@ class DeleteFileIndex {
this.globalSeqs = globalSeqs;
this.globalDeletes = globalDeletes;
this.sortedDeletesByPartition = sortedDeletesByPartition;
this.isEmpty = ArrayUtil.isEmpty(globalDeletes) && sortedDeletesByPartition.isEmpty();
}

public boolean isEmpty() {
return (globalDeletes == null || globalDeletes.length == 0)
&& sortedDeletesByPartition.isEmpty();
return isEmpty;
}

public Iterable<DeleteFile> referencedDeleteFiles() {
Expand Down Expand Up @@ -122,7 +127,18 @@ DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
return forDataFile(entry.dataSequenceNumber(), entry.file());
}

DeleteFile[] forDataFile(DataFile file) {
Preconditions.checkArgument(
file.dataSequenceNumber() != null,
"Cannot assign delete files for data files with unknown data sequence numbers");
return forDataFile(file.dataSequenceNumber(), file);
}

DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
if (isEmpty) {
return NO_DELETES;
}

Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);

Expand Down Expand Up @@ -354,9 +370,14 @@ static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
return new Builder(io, Sets.newHashSet(deleteManifests));
}

static Builder builderForFiles(FileIO io, Iterable<DeleteFile> deleteFiles) {
return new Builder(io, deleteFiles);
}

static class Builder {
private final FileIO io;
private final Set<ManifestFile> deleteManifests;
private final Set<DeleteFile> deleteFiles;
private long minSequenceNumber = 0L;
private Map<Integer, PartitionSpec> specsById = null;
private Expression dataFilter = Expressions.alwaysTrue();
Expand All @@ -369,6 +390,13 @@ static class Builder {
Builder(FileIO io, Set<ManifestFile> deleteManifests) {
this.io = io;
this.deleteManifests = Sets.newHashSet(deleteManifests);
this.deleteFiles = null;
}

Builder(FileIO io, Iterable<DeleteFile> deleteFiles) {
this.io = io;
this.deleteManifests = null;
this.deleteFiles = Sets.newHashSet(deleteFiles);
}

Builder afterSequenceNumber(long seq) {
Expand Down Expand Up @@ -411,10 +439,14 @@ Builder scanMetrics(ScanMetrics newScanMetrics) {
return this;
}

DeleteFileIndex build() {
private Collection<DeleteFile> planFiles() {
if (deleteFiles != null) {
return deleteFiles;
}

// read all of the matching delete manifests in parallel and accumulate the matching files in
// a queue
Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>();
Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
Tasks.foreach(deleteManifestReaders())
.stopOnFailure()
.throwFailureWhenFinished()
Expand All @@ -425,27 +457,32 @@ DeleteFileIndex build() {
for (ManifestEntry<DeleteFile> entry : reader) {
if (entry.dataSequenceNumber() > minSequenceNumber) {
// copy with stats for better filtering against data file stats
deleteEntries.add(entry.copy());
files.add(entry.file().copy());
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close");
}
});

return files;
}

DeleteFileIndex build() {
Collection<DeleteFile> files = planFiles();

// build a map from (specId, partition) to delete file entries
Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>>
deleteFilesByPartition =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
for (ManifestEntry<DeleteFile> entry : deleteEntries) {
int specId = entry.file().specId();
ListMultimap<Pair<Integer, StructLikeWrapper>, DeleteFile> deleteFilesByPartition =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
for (DeleteFile file : files) {
int specId = file.specId();
StructLikeWrapper wrapper =
wrappersBySpecId
.computeIfAbsent(
specId, id -> StructLikeWrapper.forType(specsById.get(id).partitionType()))
.copyFor(entry.file().partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
.copyFor(file.partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), file);
}

// sort the entries in each map value by sequence number and split into sequence numbers and
Expand All @@ -463,11 +500,9 @@ DeleteFileIndex build() {

List<Pair<Long, DeleteFile>> eqFilesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.EQUALITY_DELETES)
.map(
entry ->
// a delete file is indexed by the sequence number it should be applied to
Pair.of(entry.dataSequenceNumber() - 1, entry.file()))
.filter(file -> file.content() == FileContent.EQUALITY_DELETES)
// a delete file is indexed by the sequence number it should be applied to
.map(file -> Pair.of(file.dataSequenceNumber() - 1, file))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());

Expand All @@ -476,45 +511,39 @@ DeleteFileIndex build() {

List<Pair<Long, DeleteFile>> posFilesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
.map(entry -> Pair.of(entry.dataSequenceNumber(), entry.file()))
.filter(file -> file.content() == FileContent.POSITION_DELETES)
.map(file -> Pair.of(file.dataSequenceNumber(), file))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());

long[] seqs = posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files =
DeleteFile[] sortedFiles =
posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
sortedDeletesByPartition.put(partition, Pair.of(seqs, sortedFiles));

} else {
List<Pair<Long, DeleteFile>> filesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.map(
entry -> {
// a delete file is indexed by the sequence number it should be applied to
long applySeq =
entry.dataSequenceNumber()
- (entry.file().content() == FileContent.EQUALITY_DELETES ? 1 : 0);
return Pair.of(applySeq, entry.file());
})
// a delete file is indexed by the sequence number it should be applied to
.map(file -> Pair.of(applySeq(file), file))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());

long[] seqs = filesSortedBySeq.stream().mapToLong(Pair::first).toArray();
DeleteFile[] files =
DeleteFile[] sortedFiles =
filesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
sortedDeletesByPartition.put(partition, Pair.of(seqs, sortedFiles));
}
}

scanMetrics.indexedDeleteFiles().increment(deleteEntries.size());
scanMetrics.indexedDeleteFiles().increment(files.size());
deleteFilesByPartition
.values()
.forEach(
entry -> {
FileContent content = entry.file().content();
file -> {
FileContent content = file.content();
if (content == FileContent.EQUALITY_DELETES) {
scanMetrics.equalityDeleteFiles().increment();
} else if (content == FileContent.POSITION_DELETES) {
Expand All @@ -526,6 +555,14 @@ DeleteFileIndex build() {
specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
}

private long applySeq(DeleteFile file) {
if (file.content() == FileContent.EQUALITY_DELETES) {
return file.dataSequenceNumber() - 1;
} else {
return file.dataSequenceNumber();
}
}

private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
LoadingCache<Integer, ManifestEvaluator> evalCache =
specsById == null
Expand Down

0 comments on commit d55167d

Please sign in to comment.