diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 5bbcf1395da7..70f2b2f2f501 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -131,6 +131,25 @@ default TableScan select(String... columns) {
*/
Expression filter();
+ /**
+ * Create a table scan which can read append data from {@param fromSnapshotId}
+ * exclusive and up to {@param toSnapshotId} inclusive
+ * @param fromSnapshotId - the last snapshot id read by the user, exclusive
+ * @param toSnapshotId - read append data up to this snapshot id
+ * @return a table scan which can read append data from {@param fromSnapshotId}
+ * exclusive and up to {@param toSnapshotId} inclusive
+ */
+ TableScan appendsBetween(long fromSnapshotId, long toSnapshotId);
+
+ /**
+ * Create a table scan which can read append data from {@param fromSnapshotId}
+ * exclusive and up to the current snapshot id exclusive
+ * @param fromSnapshotId - the last snapshot id read by the user, exclusive
+ * @return a table scan which can read append data from {@param fromSnapshotId}
+ * exclusive and up to current snapshot inclusive
+ */
+ TableScan appendsAfter(long fromSnapshotId);
+
/**
* Plan the {@link FileScanTask files} that will be read by this scan.
*
@@ -180,4 +199,5 @@ default TableScan select(String... columns) {
* @return true if case sensitive, false otherwise.
*/
boolean isCaseSensitive();
+
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 84d852c4215a..fb0e5f7dc0ab 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -80,6 +80,26 @@ protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schem
this.options = options != null ? options : ImmutableMap.of();
}
+ protected TableOperations tableOps() {
+ return ops;
+ }
+
+ protected Long snapshotId() {
+ return snapshotId;
+ }
+
+ protected boolean colStats() {
+ return colStats;
+ }
+
+ protected Collection selectedColumns() {
+ return selectedColumns;
+ }
+
+ protected ImmutableMap options() {
+ return options;
+ }
+
@SuppressWarnings("checkstyle:HiddenField")
protected abstract long targetSplitSize(TableOperations ops);
@@ -98,10 +118,20 @@ public Table table() {
return table;
}
+ @Override
+ public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
+ throw new UnsupportedOperationException("Incremental scan is not supported");
+ }
+
+ @Override
+ public TableScan appendsAfter(long fromSnapshotId) {
+ throw new UnsupportedOperationException("Incremental scan is not supported");
+ }
+
@Override
public TableScan useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(this.snapshotId == null,
- "Cannot override snapshot, already set to id=%s", scanSnapshotId);
+ "Cannot override snapshot, already set to id=%s", snapshotId);
Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s", scanSnapshotId);
return newRefinedScan(
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 019e24e15199..10f46d925eb5 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -19,17 +19,12 @@
package org.apache.iceberg;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import java.util.Collection;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,15 +32,15 @@
public class DataTableScan extends BaseTableScan {
private static final Logger LOG = LoggerFactory.getLogger(DataTableScan.class);
- private static final ImmutableList SCAN_COLUMNS = ImmutableList.of(
+ static final ImmutableList SCAN_COLUMNS = ImmutableList.of(
"snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes",
"file_size_in_bytes", "record_count", "partition", "key_metadata"
);
- private static final ImmutableList SCAN_WITH_STATS_COLUMNS = ImmutableList.builder()
+ static final ImmutableList SCAN_WITH_STATS_COLUMNS = ImmutableList.builder()
.addAll(SCAN_COLUMNS)
.add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds", "column_sizes")
.build();
- private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+ static final boolean PLAN_SCANS_WITH_WORKER_POOL =
SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
public DataTableScan(TableOperations ops, Table table) {
@@ -58,6 +53,24 @@ protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schem
super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
}
+ @Override
+ public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
+ Long scanSnapshotId = snapshotId();
+ Preconditions.checkState(scanSnapshotId == null,
+ "Cannot enable incremental scan, scan-snapshot set to id=%s", scanSnapshotId);
+ return new IncrementalDataTableScan(
+ tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(),
+ fromSnapshotId, toSnapshotId);
+ }
+
+ @Override
+ public TableScan appendsAfter(long fromSnapshotId) {
+ Snapshot currentSnapshot = table().currentSnapshot();
+ Preconditions.checkState(currentSnapshot != null, "Cannot scan appends after %s, there is no current snapshot",
+ fromSnapshotId);
+ return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId());
+ }
+
@Override
protected TableScan newRefinedScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
@@ -70,39 +83,18 @@ protected TableScan newRefinedScan(
@Override
public CloseableIterable planFiles(TableOperations ops, Snapshot snapshot,
Expression rowFilter, boolean caseSensitive, boolean colStats) {
- LoadingCache evalCache = Caffeine.newBuilder().build(specId -> {
- PartitionSpec spec = ops.current().spec(specId);
- return ManifestEvaluator.forRowFilter(rowFilter, spec, caseSensitive);
- });
-
- Iterable nonEmptyManifests = Iterables.filter(snapshot.manifests(),
- manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles());
- Iterable matchingManifests = Iterables.filter(nonEmptyManifests,
- manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
-
- Iterable> readers = Iterables.transform(
- matchingManifests,
- manifest -> {
- ManifestReader reader = ManifestReader.read(
- ops.io().newInputFile(manifest.path()),
- ops.current().specsById());
- PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
- String schemaString = SchemaParser.toJson(spec.schema());
- String specString = PartitionSpecParser.toJson(spec);
- ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive);
- return CloseableIterable.transform(
- reader.filterRows(rowFilter)
- .caseSensitive(caseSensitive)
- .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
- file -> new BaseFileScanTask(file, schemaString, specString, residuals)
- );
- });
+ ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.manifests())
+ .caseSensitive(caseSensitive)
+ .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+ .filterData(rowFilter)
+ .specsById(ops.current().specsById())
+ .ignoreDeleted();
if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
- return new ParallelIterable<>(readers, ThreadPools.getWorkerPool());
- } else {
- return CloseableIterable.concat(readers);
+ manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
}
+
+ return manifestGroup.planFiles();
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
new file mode 100644
index 000000000000..aace44ad519e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.ThreadPools;
+
+class IncrementalDataTableScan extends DataTableScan {
+ private long fromSnapshotId;
+ private long toSnapshotId;
+
+ IncrementalDataTableScan(TableOperations ops, Table table, Schema schema,
+ Expression rowFilter, boolean caseSensitive, boolean colStats,
+ Collection selectedColumns, ImmutableMap options,
+ long fromSnapshotId, long toSnapshotId) {
+ super(ops, table, null, schema, rowFilter, caseSensitive, colStats, selectedColumns, options);
+ Preconditions.checkArgument(fromSnapshotId != toSnapshotId, "fromSnapshotId and toSnapshotId cannot be the same");
+ this.fromSnapshotId = fromSnapshotId;
+ this.toSnapshotId = toSnapshotId;
+ }
+
+ @Override
+ public TableScan asOfTime(long timestampMillis) {
+ throw new UnsupportedOperationException(String.format(
+ "Cannot scan table as of time %s: configured for incremental data in snapshots (%s, %s]",
+ timestampMillis, fromSnapshotId, toSnapshotId));
+ }
+
+ @Override
+ public TableScan useSnapshot(long scanSnapshotId) {
+ throw new UnsupportedOperationException(String.format(
+ "Cannot scan table using scan snapshot id %s: configured for incremental data in snapshots (%s, %s]",
+ scanSnapshotId, fromSnapshotId, toSnapshotId));
+ }
+
+ @Override
+ public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) {
+ Preconditions.checkArgument(
+ table().snapshot(newFromSnapshotId) != null, "fromSnapshotId: %s does not exist", newFromSnapshotId);
+ Preconditions.checkArgument(
+ table().snapshot(newToSnapshotId) != null, "toSnapshotId: %s does not exist", newToSnapshotId);
+ return new IncrementalDataTableScan(
+ tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(),
+ newFromSnapshotId, newToSnapshotId);
+ }
+
+ @Override
+ public TableScan appendsAfter(long newFromSnapshotId) {
+ final Snapshot currentSnapshot = table().currentSnapshot();
+ Preconditions.checkState(currentSnapshot != null,
+ "Cannot scan appends after %s, there is no current snapshot", newFromSnapshotId);
+ return appendsBetween(newFromSnapshotId, currentSnapshot.snapshotId());
+ }
+
+ @Override
+ public CloseableIterable planFiles() {
+ //TODO publish an incremental appends scan event
+ List snapshots = snapshotsWithin(table(), fromSnapshotId, toSnapshotId);
+ Iterable> files = Iterables.transform(snapshots, this::planFiles);
+ return CloseableIterable.concat(files);
+ }
+
+ private CloseableIterable planFiles(Snapshot snapshot) {
+ Predicate matchingManifests = manifest -> manifest.snapshotId() == snapshot.snapshotId();
+
+ Predicate matchingManifestEntries =
+ manifestEntry ->
+ manifestEntry.snapshotId() == snapshot.snapshotId() &&
+ manifestEntry.status() == ManifestEntry.Status.ADDED;
+
+ ManifestGroup manifestGroup = new ManifestGroup(tableOps().io(), snapshot.manifests())
+ .caseSensitive(isCaseSensitive())
+ .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+ .filterData(filter())
+ .filterManifests(matchingManifests)
+ .filterManifestEntries(matchingManifestEntries)
+ .specsById(tableOps().current().specsById())
+ .ignoreDeleted();
+
+ if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) {
+ manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
+ }
+
+ return manifestGroup.planFiles();
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:HiddenField")
+ protected TableScan newRefinedScan(
+ TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
+ boolean caseSensitive, boolean colStats, Collection selectedColumns,
+ ImmutableMap options) {
+ return new IncrementalDataTableScan(
+ ops, table, schema, rowFilter, caseSensitive, colStats, selectedColumns, options,
+ fromSnapshotId, toSnapshotId);
+ }
+
+ private static List snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) {
+ List snapshotIds = SnapshotUtil.snapshotIdsBetween(table, fromSnapshotId, toSnapshotId);
+ List snapshots = Lists.newArrayList();
+ for (Long snapshotId : snapshotIds) {
+ Snapshot snapshot = table.snapshot(snapshotId);
+ // for now, incremental scan supports only appends
+ if (snapshot.operation().equals(DataOperations.APPEND)) {
+ snapshots.add(snapshot);
+ } else if (snapshot.operation().equals(DataOperations.OVERWRITE)) {
+ throw new UnsupportedOperationException(
+ String.format("Found %s operation, cannot support incremental data in snapshots (%s, %s]",
+ DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId));
+ }
+ }
+ return snapshots;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 54a85de72333..968137a73c8a 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -21,27 +21,33 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
class ManifestGroup {
private static final Types.StructType EMPTY_STRUCT = Types.StructType.of();
private final FileIO io;
private final Set manifests;
+ private Predicate manifestPredicate;
+ private Predicate manifestEntryPredicate;
private Map specsById;
private Expression dataFilter;
private Expression fileFilter;
@@ -50,6 +56,7 @@ class ManifestGroup {
private boolean ignoreExisting;
private List columns;
private boolean caseSensitive;
+ private ExecutorService executorService;
ManifestGroup(FileIO io, Iterable manifests) {
this.io = io;
@@ -59,8 +66,10 @@ class ManifestGroup {
this.partitionFilter = Expressions.alwaysTrue();
this.ignoreDeleted = false;
this.ignoreExisting = false;
- this.columns = ImmutableList.of("*");
+ this.columns = ManifestReader.ALL_COLUMNS;
this.caseSensitive = true;
+ this.manifestPredicate = m -> true;
+ this.manifestEntryPredicate = e -> true;
}
ManifestGroup specsById(Map newSpecsById) {
@@ -83,6 +92,16 @@ ManifestGroup filterPartitions(Expression newPartitionFilter) {
return this;
}
+ ManifestGroup filterManifests(Predicate newManifestPredicate) {
+ this.manifestPredicate = manifestPredicate.and(newManifestPredicate);
+ return this;
+ }
+
+ ManifestGroup filterManifestEntries(Predicate newManifestEntryPredicate) {
+ this.manifestEntryPredicate = manifestEntryPredicate.and(newManifestEntryPredicate);
+ return this;
+ }
+
ManifestGroup ignoreDeleted() {
this.ignoreDeleted = true;
return this;
@@ -103,7 +122,35 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) {
return this;
}
+ ManifestGroup planWith(ExecutorService newExecutorService) {
+ this.executorService = newExecutorService;
+ return this;
+ }
+
/**
+ * Returns a iterable of scan tasks. It is safe to add entries of this iterable
+ * to a collection as {@link DataFile} in each {@link FileScanTask} is defensively
+ * copied.
+ * @return a {@link CloseableIterable} of {@link FileScanTask}
+ */
+ public CloseableIterable planFiles() {
+ Iterable> tasks = entries((manifest, entries) -> {
+ PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+ String schemaString = SchemaParser.toJson(spec.schema());
+ String specString = PartitionSpecParser.toJson(spec);
+ ResidualEvaluator residuals = ResidualEvaluator.of(spec, dataFilter, caseSensitive);
+ return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
+ e.copy().file(), schemaString, specString, residuals));
+ });
+
+ if (executorService != null) {
+ return new ParallelIterable<>(tasks, executorService);
+ } else {
+ return CloseableIterable.concat(tasks);
+ }
+ }
+
+ /**
* Returns an iterable for manifest entries in the set of manifests.
*
* Entries are not copied and it is the caller's responsibility to make defensive copies if
@@ -112,11 +159,16 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) {
* @return a CloseableIterable of manifest entries.
*/
public CloseableIterable entries() {
+ return CloseableIterable.concat(entries((manifest, entries) -> entries));
+ }
+
+ private Iterable> entries(
+ BiFunction, CloseableIterable> entryFn) {
LoadingCache evalCache = specsById == null ?
null : Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
- Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
+ Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)),
spec, caseSensitive);
});
@@ -141,7 +193,9 @@ public CloseableIterable entries() {
manifest -> manifest.hasAddedFiles() || manifest.hasDeletedFiles());
}
- Iterable> readers = Iterables.transform(
+ matchingManifests = Iterables.filter(matchingManifests, manifestPredicate::test);
+
+ Iterable> readers = Iterables.transform(
matchingManifests,
manifest -> {
ManifestReader reader = ManifestReader.read(
@@ -151,6 +205,7 @@ public CloseableIterable entries() {
FilteredManifest filtered = reader
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
+ .caseSensitive(caseSensitive)
.select(columns);
CloseableIterable entries = filtered.allEntries();
@@ -168,9 +223,10 @@ public CloseableIterable entries() {
entry -> evaluator.eval((GenericDataFile) entry.file()));
}
- return entries;
+ entries = CloseableIterable.filter(entries, manifestEntryPredicate);
+ return entryFn.apply(manifest, entries);
});
- return CloseableIterable.concat(readers);
+ return readers;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
index ac38a86bc2ad..8886a5abc16c 100644
--- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -42,6 +42,16 @@ public static List currentAncestors(Table table) {
return ancestorIds(table.currentSnapshot(), table::snapshot);
}
+ /**
+ * @return List of snapshot ids in the range - (fromSnapshotId, toSnapshotId]
+ * This method assumes that fromSnapshotId is an ancestor of toSnapshotId
+ */
+ public static List snapshotIdsBetween(Table table, long fromSnapshotId, long toSnapshotId) {
+ List snapshotIds = Lists.newArrayList(ancestorIds(table.snapshot(toSnapshotId),
+ snapshotId -> snapshotId != fromSnapshotId ? table.snapshot(snapshotId) : null));
+ return snapshotIds;
+ }
+
public static List ancestorIds(Snapshot snapshot, Function lookup) {
List ancestorIds = Lists.newArrayList();
Snapshot current = snapshot;
diff --git a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
new file mode 100644
index 000000000000..6831135cdc78
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Set;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestIncrementalDataTableScan extends TableTestBase {
+
+ @Before
+ public void setupTableProperties() {
+ table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2").commit();
+ }
+
+ @Test
+ public void testInvalidScans() {
+ add(table.newAppend(), files("A"));
+ AssertHelpers.assertThrows(
+ "from and to snapshots cannot be the same, since from snapshot is exclusive and not part of the scan",
+ IllegalArgumentException.class, "fromSnapshotId and toSnapshotId cannot be the same",
+ () -> appendsBetweenScan(1, 1));
+ }
+
+ @Test
+ public void testAppends() {
+ add(table.newAppend(), files("A")); // 1
+ add(table.newAppend(), files("B"));
+ add(table.newAppend(), files("C"));
+ add(table.newAppend(), files("D"));
+ add(table.newAppend(), files("E")); // 5
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 5));
+ Assert.assertEquals(Sets.newHashSet("C", "D", "E"), appendsBetweenScan(2, 5));
+ }
+
+ @Test
+ public void testReplaceOverwritesDeletes() {
+ add(table.newAppend(), files("A")); // 1
+ add(table.newAppend(), files("B"));
+ add(table.newAppend(), files("C"));
+ add(table.newAppend(), files("D"));
+ add(table.newAppend(), files("E")); // 5
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 5));
+
+ replace(table.newRewrite(), files("A", "B", "C"), files("F", "G")); // 6
+ Assert.assertEquals("Replace commits are ignored", Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 6));
+ Assert.assertEquals(Sets.newHashSet("E"), appendsBetweenScan(4, 6));
+ // 6th snapshot is a replace. No new content is added
+ Assert.assertTrue("Replace commits are ignored", appendsBetweenScan(5, 6).isEmpty());
+ delete(table.newDelete(), files("D")); // 7
+ // 7th snapshot is a delete.
+ Assert.assertTrue("Replace and delete commits are ignored", appendsBetweenScan(5, 7).isEmpty());
+ Assert.assertTrue("Delete commits are ignored", appendsBetweenScan(6, 7).isEmpty());
+ add(table.newAppend(), files("I")); // 8
+ // snapshots 6 and 7 are ignored
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "I"), appendsBetweenScan(1, 8));
+ Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(6, 8));
+ Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(7, 8));
+
+ overwrite(table.newOverwrite(), files("H"), files("E")); // 9
+ AssertHelpers.assertThrows(
+ "Overwrites are not supported for Incremental scan", UnsupportedOperationException.class,
+ "Found overwrite operation, cannot support incremental data in snapshots (8, 9]",
+ () -> appendsBetweenScan(8, 9));
+ }
+
+ @Test
+ public void testTransactions() {
+ Transaction transaction = table.newTransaction();
+
+ add(transaction.newAppend(), files("A")); // 1
+ add(transaction.newAppend(), files("B"));
+ add(transaction.newAppend(), files("C"));
+ add(transaction.newAppend(), files("D"));
+ add(transaction.newAppend(), files("E")); // 5
+ transaction.commitTransaction();
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 5));
+
+ transaction = table.newTransaction();
+ replace(transaction.newRewrite(), files("A", "B", "C"), files("F", "G")); // 6
+ transaction.commitTransaction();
+ Assert.assertEquals("Replace commits are ignored", Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 6));
+ Assert.assertEquals(Sets.newHashSet("E"), appendsBetweenScan(4, 6));
+ // 6th snapshot is a replace. No new content is added
+ Assert.assertTrue("Replace commits are ignored", appendsBetweenScan(5, 6).isEmpty());
+
+ transaction = table.newTransaction();
+ delete(transaction.newDelete(), files("D")); // 7
+ transaction.commitTransaction();
+ // 7th snapshot is a delete.
+ Assert.assertTrue("Replace and delete commits are ignored", appendsBetweenScan(5, 7).isEmpty());
+ Assert.assertTrue("Delete commits are ignored", appendsBetweenScan(6, 7).isEmpty());
+
+ transaction = table.newTransaction();
+ add(transaction.newAppend(), files("I")); // 8
+ transaction.commitTransaction();
+ // snapshots 6, 7 and 8 are ignored
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "I"), appendsBetweenScan(1, 8));
+ Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(6, 8));
+ Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(7, 8));
+ }
+
+ @Test
+ public void testRollbacks() {
+ add(table.newAppend(), files("A")); // 1
+ add(table.newAppend(), files("B"));
+ add(table.newAppend(), files("C")); // 3
+ // Go back to snapshot "B"
+ table.rollback().toSnapshotId(2).commit(); // 2
+ Assert.assertEquals(2, table.currentSnapshot().snapshotId());
+ Assert.assertEquals(Sets.newHashSet("B"), appendsBetweenScan(1, 2));
+ Assert.assertEquals(Sets.newHashSet("B"), appendsAfterScan(1));
+
+ Transaction transaction = table.newTransaction();
+ add(transaction.newAppend(), files("D")); // 4
+ add(transaction.newAppend(), files("E")); // 5
+ add(transaction.newAppend(), files("F"));
+ transaction.commitTransaction();
+ // Go back to snapshot "E"
+ table.rollback().toSnapshotId(5).commit();
+ Assert.assertEquals(5, table.currentSnapshot().snapshotId());
+ Assert.assertEquals(Sets.newHashSet("B", "D", "E"), appendsBetweenScan(1, 5));
+ Assert.assertEquals(Sets.newHashSet("B", "D", "E"), appendsAfterScan(1));
+ }
+
+ private static DataFile file(String name) {
+ return DataFiles.builder(SPEC)
+ .withPath(name + ".parquet")
+ .withFileSizeInBytes(0)
+ .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ }
+
+ private static void add(AppendFiles appendFiles, List adds) {
+ for (DataFile f : adds) {
+ appendFiles.appendFile(f);
+ }
+ appendFiles.commit();
+ }
+
+ private static void delete(DeleteFiles deleteFiles, List deletes) {
+ for (DataFile f : deletes) {
+ deleteFiles.deleteFile(f);
+ }
+ deleteFiles.commit();
+ }
+
+ private static void replace(RewriteFiles rewriteFiles, List deletes, List adds) {
+ rewriteFiles.rewriteFiles(Sets.newHashSet(deletes), Sets.newHashSet(adds));
+ rewriteFiles.commit();
+ }
+
+ private static void overwrite(OverwriteFiles overwriteFiles, List adds, List deletes) {
+ for (DataFile f : adds) {
+ overwriteFiles.addFile(f);
+ }
+ for (DataFile f : deletes) {
+ overwriteFiles.deleteFile(f);
+ }
+ overwriteFiles.commit();
+ }
+
+ private static List files(String... names) {
+ return Lists.transform(Lists.newArrayList(names), TestIncrementalDataTableScan::file);
+ }
+
+ private Set appendsAfterScan(long fromSnapshotId) {
+ final TableScan appendsAfter = table.newScan().appendsAfter(fromSnapshotId);
+ return filesToScan(appendsAfter);
+ }
+
+ private Set appendsBetweenScan(long fromSnapshotId, long toSnapshotId) {
+ Snapshot s1 = table.snapshot(fromSnapshotId);
+ Snapshot s2 = table.snapshot(toSnapshotId);
+ TableScan appendsBetween = table.newScan().appendsBetween(s1.snapshotId(), s2.snapshotId());
+ return filesToScan(appendsBetween);
+ }
+
+ private static Set filesToScan(TableScan tableScan) {
+ Iterable filesToRead = Iterables.transform(tableScan.planFiles(), t -> {
+ String path = t.file().path().toString();
+ return path.split("\\.")[0];
+ });
+ return Sets.newHashSet(filesToRead);
+ }
+}