Skip to content
Permalink
Browse files
Core: Add reference_snapshot_id filter column to all_manifests table (#…
  • Loading branch information
szehon-ho committed Jun 29, 2022
1 parent 3663497 commit 7e30bec5022924c64c6256eed3c570a86a0bbfd2
Show file tree
Hide file tree
Showing 7 changed files with 1,084 additions and 189 deletions.
@@ -20,18 +20,27 @@
package org.apache.iceberg;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;

@@ -43,6 +52,7 @@
* This table may return duplicate rows.
*/
public class AllManifestsTable extends BaseMetadataTable {
private static final int REF_SNAPSHOT_ID = 18;
private static final Schema MANIFEST_FILE_SCHEMA = new Schema(
Types.NestedField.required(14, "content", Types.IntegerType.get()),
Types.NestedField.required(1, "path", Types.StringType.get()),
@@ -60,7 +70,8 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())
)))
))),
Types.NestedField.required(REF_SNAPSHOT_ID, "reference_snapshot_id", Types.LongType.get())
);

AllManifestsTable(TableOperations ops, Table table) {
@@ -112,21 +123,25 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
Expression filter = shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.withNoopClose(Iterables.transform(table().snapshots(), snap -> {
SnapshotEvaluator snapshotEvaluator = new SnapshotEvaluator(filter, MANIFEST_FILE_SCHEMA.asStruct(),
isCaseSensitive());
Iterable<Snapshot> filteredSnapshots = Iterables.filter(table().snapshots(), snapshotEvaluator::eval);

return CloseableIterable.withNoopClose(Iterables.transform(filteredSnapshots, snap -> {
if (snap.manifestListLocation() != null) {
DataFile manifestListAsDataFile = DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(io.newInputFile(snap.manifestListLocation()))
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
return new ManifestListReadTask(io, schema(), specs, new BaseFileScanTask(
manifestListAsDataFile, null,
schemaString, specString, residuals));
return new ManifestListReadTask(io, schema(), specs,
new BaseFileScanTask(manifestListAsDataFile, null, schemaString, specString, residuals),
snap.snapshotId());
} else {
return StaticDataTask.of(
io.newInputFile(tableOps().current().metadataFileLocation()),
MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(io),
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest)
manifest -> manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest, snap.snapshotId())
);
}
}));
@@ -138,12 +153,15 @@ static class ManifestListReadTask implements DataTask {
private final Schema schema;
private final Map<Integer, PartitionSpec> specs;
private final FileScanTask manifestListTask;
private final long referenceSnapshotId;

ManifestListReadTask(FileIO io, Schema schema, Map<Integer, PartitionSpec> specs, FileScanTask manifestListTask) {
ManifestListReadTask(FileIO io, Schema schema, Map<Integer, PartitionSpec> specs, FileScanTask manifestListTask,
long referenceSnapshotId) {
this.io = io;
this.schema = schema;
this.specs = specs;
this.manifestListTask = manifestListTask;
this.referenceSnapshotId = referenceSnapshotId;
}

@Override
@@ -164,7 +182,7 @@ public CloseableIterable<StructLike> rows() {
.build()) {

CloseableIterable<StructLike> rowIterable = CloseableIterable.transform(manifests,
manifest -> ManifestsTable.manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest));
manifest -> manifestFileToRow(specs.get(manifest.partitionSpecId()), manifest, referenceSnapshotId));

StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
return CloseableIterable.transform(rowIterable, projection::wrap);
@@ -204,4 +222,188 @@ public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}
}

static StaticDataTask.Row manifestFileToRow(PartitionSpec spec, ManifestFile manifest, long referenceSnapshotId) {
return StaticDataTask.Row.of(
manifest.content().id(),
manifest.path(),
manifest.length(),
manifest.partitionSpecId(),
manifest.snapshotId(),
manifest.content() == ManifestContent.DATA ? manifest.addedFilesCount() : 0,
manifest.content() == ManifestContent.DATA ? manifest.existingFilesCount() : 0,
manifest.content() == ManifestContent.DATA ? manifest.deletedFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.addedFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
referenceSnapshotId
);
}

private static class SnapshotEvaluator {

private final Expression boundExpr;

private SnapshotEvaluator(Expression expr, Types.StructType structType, boolean caseSensitive) {
this.boundExpr = Binder.bind(structType, expr, caseSensitive);
}

private boolean eval(Snapshot snapshot) {
return new SnapshotEvalVisitor().eval(snapshot);
}

private class SnapshotEvalVisitor extends BoundExpressionVisitor<Boolean> {

private long snapshotId;
private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;

private boolean eval(Snapshot snapshot) {
this.snapshotId = snapshot.snapshotId();
return ExpressionVisitors.visitEvaluator(boundExpr, this);
}

@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH;
}

@Override
public Boolean alwaysFalse() {
return ROWS_CANNOT_MATCH;
}

@Override
public Boolean not(Boolean result) {
return !result;
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
}

@Override
public <T> Boolean isNull(BoundReference<T> ref) {
if (isSnapshotRef(ref)) {
return ROWS_CANNOT_MATCH; // reference_snapshot_id is never null
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNull(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
if (isSnapshotRef(ref)) {
return ROWS_CANNOT_MATCH; // reference_snapshot_id is never nan
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult < 0);
}

@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult <= 0);
}

@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult > 0);
}

@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult >= 0);
}

@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult == 0);
}

@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
return compareSnapshotRef(ref, lit, compareResult -> compareResult != 0);
}

@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
if (isSnapshotRef(ref)) {
Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
boolean noneMatch = literalSet.stream().noneMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
if (noneMatch) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
if (isSnapshotRef(ref)) {
Comparator<Object> longComparator = Comparators.forType(Types.LongType.get());
boolean anyMatch = literalSet.stream().anyMatch(lit -> longComparator.compare(snapshotId, lit) == 0);
if (anyMatch) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

/**
* Comparison of snapshot reference and literal, using long comparator.
*
* @param ref bound reference, comparison attempted only if reference is for reference_snapshot_id
* @param lit literal value to compare with snapshot id.
* @param desiredResult function to apply to long comparator result, returns true if result is as expected.
* @return false if comparator does not achieve desired result, true otherwise
*/
private <T> Boolean compareSnapshotRef(BoundReference<T> ref, Literal<T> lit,
Function<Integer, Boolean> desiredResult) {
if (isSnapshotRef(ref)) {
Literal<Long> longLit = lit.to(Types.LongType.get());
int cmp = longLit.comparator().compare(snapshotId, longLit.value());
if (!desiredResult.apply(cmp)) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

private <T> boolean isSnapshotRef(BoundReference<T> ref) {
return ref.fieldId() == REF_SNAPSHOT_ID;
}
}
}
}

0 comments on commit 7e30bec

Please sign in to comment.