From 67ed1f28754a02ce97eaa82dc066dfaf76d27cfd Mon Sep 17 00:00:00 2001 From: Ratandeep Ratt Date: Sun, 12 Jan 2020 14:53:38 -0800 Subject: [PATCH] Incremental scan implementation --- .../java/org/apache/iceberg/TableScan.java | 20 ++ .../org/apache/iceberg/BaseTableScan.java | 32 ++- .../org/apache/iceberg/DataTableScan.java | 70 +++--- .../iceberg/IncrementalDataTableScan.java | 140 ++++++++++++ .../org/apache/iceberg/ManifestGroup.java | 68 +++++- .../org/apache/iceberg/util/SnapshotUtil.java | 10 + .../iceberg/TestIncrementalDataTableScan.java | 209 ++++++++++++++++++ 7 files changed, 503 insertions(+), 46 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java create mode 100644 core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java index 5bbcf1395da7..70f2b2f2f501 100644 --- a/api/src/main/java/org/apache/iceberg/TableScan.java +++ b/api/src/main/java/org/apache/iceberg/TableScan.java @@ -131,6 +131,25 @@ default TableScan select(String... columns) { */ Expression filter(); + /** + * Create a table scan which can read append data from {@param fromSnapshotId} + * exclusive and up to {@param toSnapshotId} inclusive + * @param fromSnapshotId - the last snapshot id read by the user, exclusive + * @param toSnapshotId - read append data up to this snapshot id + * @return a table scan which can read append data from {@param fromSnapshotId} + * exclusive and up to {@param toSnapshotId} inclusive + */ + TableScan appendsBetween(long fromSnapshotId, long toSnapshotId); + + /** + * Create a table scan which can read append data from {@param fromSnapshotId} + * exclusive and up to the current snapshot id exclusive + * @param fromSnapshotId - the last snapshot id read by the user, exclusive + * @return a table scan which can read append data from {@param fromSnapshotId} + * exclusive and up to current snapshot inclusive + */ + TableScan appendsAfter(long fromSnapshotId); + /** * Plan the {@link FileScanTask files} that will be read by this scan. *

@@ -180,4 +199,5 @@ default TableScan select(String... columns) { * @return true if case sensitive, false otherwise. */ boolean isCaseSensitive(); + } diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index 84d852c4215a..fb0e5f7dc0ab 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -80,6 +80,26 @@ protected BaseTableScan(TableOperations ops, Table table, Long snapshotId, Schem this.options = options != null ? options : ImmutableMap.of(); } + protected TableOperations tableOps() { + return ops; + } + + protected Long snapshotId() { + return snapshotId; + } + + protected boolean colStats() { + return colStats; + } + + protected Collection selectedColumns() { + return selectedColumns; + } + + protected ImmutableMap options() { + return options; + } + @SuppressWarnings("checkstyle:HiddenField") protected abstract long targetSplitSize(TableOperations ops); @@ -98,10 +118,20 @@ public Table table() { return table; } + @Override + public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) { + throw new UnsupportedOperationException("Incremental scan is not supported"); + } + + @Override + public TableScan appendsAfter(long fromSnapshotId) { + throw new UnsupportedOperationException("Incremental scan is not supported"); + } + @Override public TableScan useSnapshot(long scanSnapshotId) { Preconditions.checkArgument(this.snapshotId == null, - "Cannot override snapshot, already set to id=%s", scanSnapshotId); + "Cannot override snapshot, already set to id=%s", snapshotId); Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null, "Cannot find snapshot with ID %s", scanSnapshotId); return newRefinedScan( diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 019e24e15199..10f46d925eb5 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -19,17 +19,12 @@ package org.apache.iceberg; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import java.util.Collection; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.expressions.ManifestEvaluator; -import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,15 +32,15 @@ public class DataTableScan extends BaseTableScan { private static final Logger LOG = LoggerFactory.getLogger(DataTableScan.class); - private static final ImmutableList SCAN_COLUMNS = ImmutableList.of( + static final ImmutableList SCAN_COLUMNS = ImmutableList.of( "snapshot_id", "file_path", "file_ordinal", "file_format", "block_size_in_bytes", "file_size_in_bytes", "record_count", "partition", "key_metadata" ); - private static final ImmutableList SCAN_WITH_STATS_COLUMNS = ImmutableList.builder() + static final ImmutableList SCAN_WITH_STATS_COLUMNS = ImmutableList.builder() .addAll(SCAN_COLUMNS) .add("value_counts", "null_value_counts", "lower_bounds", "upper_bounds", "column_sizes") .build(); - private static final boolean PLAN_SCANS_WITH_WORKER_POOL = + static final boolean PLAN_SCANS_WITH_WORKER_POOL = SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true); public DataTableScan(TableOperations ops, Table table) { @@ -58,6 +53,24 @@ protected DataTableScan(TableOperations ops, Table table, Long snapshotId, Schem super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, options); } + @Override + public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) { + Long scanSnapshotId = snapshotId(); + Preconditions.checkState(scanSnapshotId == null, + "Cannot enable incremental scan, scan-snapshot set to id=%s", scanSnapshotId); + return new IncrementalDataTableScan( + tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(), + fromSnapshotId, toSnapshotId); + } + + @Override + public TableScan appendsAfter(long fromSnapshotId) { + Snapshot currentSnapshot = table().currentSnapshot(); + Preconditions.checkState(currentSnapshot != null, "Cannot scan appends after %s, there is no current snapshot", + fromSnapshotId); + return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId()); + } + @Override protected TableScan newRefinedScan( TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter, @@ -70,39 +83,18 @@ protected TableScan newRefinedScan( @Override public CloseableIterable planFiles(TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) { - LoadingCache evalCache = Caffeine.newBuilder().build(specId -> { - PartitionSpec spec = ops.current().spec(specId); - return ManifestEvaluator.forRowFilter(rowFilter, spec, caseSensitive); - }); - - Iterable nonEmptyManifests = Iterables.filter(snapshot.manifests(), - manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles()); - Iterable matchingManifests = Iterables.filter(nonEmptyManifests, - manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); - - Iterable> readers = Iterables.transform( - matchingManifests, - manifest -> { - ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest.path()), - ops.current().specsById()); - PartitionSpec spec = ops.current().spec(manifest.partitionSpecId()); - String schemaString = SchemaParser.toJson(spec.schema()); - String specString = PartitionSpecParser.toJson(spec); - ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive); - return CloseableIterable.transform( - reader.filterRows(rowFilter) - .caseSensitive(caseSensitive) - .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS), - file -> new BaseFileScanTask(file, schemaString, specString, residuals) - ); - }); + ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.manifests()) + .caseSensitive(caseSensitive) + .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) + .filterData(rowFilter) + .specsById(ops.current().specsById()) + .ignoreDeleted(); if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) { - return new ParallelIterable<>(readers, ThreadPools.getWorkerPool()); - } else { - return CloseableIterable.concat(readers); + manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool()); } + + return manifestGroup.planFiles(); } @Override diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java new file mode 100644 index 000000000000..aace44ad519e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.ThreadPools; + +class IncrementalDataTableScan extends DataTableScan { + private long fromSnapshotId; + private long toSnapshotId; + + IncrementalDataTableScan(TableOperations ops, Table table, Schema schema, + Expression rowFilter, boolean caseSensitive, boolean colStats, + Collection selectedColumns, ImmutableMap options, + long fromSnapshotId, long toSnapshotId) { + super(ops, table, null, schema, rowFilter, caseSensitive, colStats, selectedColumns, options); + Preconditions.checkArgument(fromSnapshotId != toSnapshotId, "fromSnapshotId and toSnapshotId cannot be the same"); + this.fromSnapshotId = fromSnapshotId; + this.toSnapshotId = toSnapshotId; + } + + @Override + public TableScan asOfTime(long timestampMillis) { + throw new UnsupportedOperationException(String.format( + "Cannot scan table as of time %s: configured for incremental data in snapshots (%s, %s]", + timestampMillis, fromSnapshotId, toSnapshotId)); + } + + @Override + public TableScan useSnapshot(long scanSnapshotId) { + throw new UnsupportedOperationException(String.format( + "Cannot scan table using scan snapshot id %s: configured for incremental data in snapshots (%s, %s]", + scanSnapshotId, fromSnapshotId, toSnapshotId)); + } + + @Override + public TableScan appendsBetween(long newFromSnapshotId, long newToSnapshotId) { + Preconditions.checkArgument( + table().snapshot(newFromSnapshotId) != null, "fromSnapshotId: %s does not exist", newFromSnapshotId); + Preconditions.checkArgument( + table().snapshot(newToSnapshotId) != null, "toSnapshotId: %s does not exist", newToSnapshotId); + return new IncrementalDataTableScan( + tableOps(), table(), schema(), filter(), isCaseSensitive(), colStats(), selectedColumns(), options(), + newFromSnapshotId, newToSnapshotId); + } + + @Override + public TableScan appendsAfter(long newFromSnapshotId) { + final Snapshot currentSnapshot = table().currentSnapshot(); + Preconditions.checkState(currentSnapshot != null, + "Cannot scan appends after %s, there is no current snapshot", newFromSnapshotId); + return appendsBetween(newFromSnapshotId, currentSnapshot.snapshotId()); + } + + @Override + public CloseableIterable planFiles() { + //TODO publish an incremental appends scan event + List snapshots = snapshotsWithin(table(), fromSnapshotId, toSnapshotId); + Iterable> files = Iterables.transform(snapshots, this::planFiles); + return CloseableIterable.concat(files); + } + + private CloseableIterable planFiles(Snapshot snapshot) { + Predicate matchingManifests = manifest -> manifest.snapshotId() == snapshot.snapshotId(); + + Predicate matchingManifestEntries = + manifestEntry -> + manifestEntry.snapshotId() == snapshot.snapshotId() && + manifestEntry.status() == ManifestEntry.Status.ADDED; + + ManifestGroup manifestGroup = new ManifestGroup(tableOps().io(), snapshot.manifests()) + .caseSensitive(isCaseSensitive()) + .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) + .filterData(filter()) + .filterManifests(matchingManifests) + .filterManifestEntries(matchingManifestEntries) + .specsById(tableOps().current().specsById()) + .ignoreDeleted(); + + if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.manifests().size() > 1) { + manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool()); + } + + return manifestGroup.planFiles(); + } + + @Override + @SuppressWarnings("checkstyle:HiddenField") + protected TableScan newRefinedScan( + TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter, + boolean caseSensitive, boolean colStats, Collection selectedColumns, + ImmutableMap options) { + return new IncrementalDataTableScan( + ops, table, schema, rowFilter, caseSensitive, colStats, selectedColumns, options, + fromSnapshotId, toSnapshotId); + } + + private static List snapshotsWithin(Table table, long fromSnapshotId, long toSnapshotId) { + List snapshotIds = SnapshotUtil.snapshotIdsBetween(table, fromSnapshotId, toSnapshotId); + List snapshots = Lists.newArrayList(); + for (Long snapshotId : snapshotIds) { + Snapshot snapshot = table.snapshot(snapshotId); + // for now, incremental scan supports only appends + if (snapshot.operation().equals(DataOperations.APPEND)) { + snapshots.add(snapshot); + } else if (snapshot.operation().equals(DataOperations.OVERWRITE)) { + throw new UnsupportedOperationException( + String.format("Found %s operation, cannot support incremental data in snapshots (%s, %s]", + DataOperations.OVERWRITE, fromSnapshotId, toSnapshotId)); + } + } + return snapshots; + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 54a85de72333..968137a73c8a 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -21,27 +21,33 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.BiFunction; +import java.util.function.Predicate; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ParallelIterable; class ManifestGroup { private static final Types.StructType EMPTY_STRUCT = Types.StructType.of(); private final FileIO io; private final Set manifests; + private Predicate manifestPredicate; + private Predicate manifestEntryPredicate; private Map specsById; private Expression dataFilter; private Expression fileFilter; @@ -50,6 +56,7 @@ class ManifestGroup { private boolean ignoreExisting; private List columns; private boolean caseSensitive; + private ExecutorService executorService; ManifestGroup(FileIO io, Iterable manifests) { this.io = io; @@ -59,8 +66,10 @@ class ManifestGroup { this.partitionFilter = Expressions.alwaysTrue(); this.ignoreDeleted = false; this.ignoreExisting = false; - this.columns = ImmutableList.of("*"); + this.columns = ManifestReader.ALL_COLUMNS; this.caseSensitive = true; + this.manifestPredicate = m -> true; + this.manifestEntryPredicate = e -> true; } ManifestGroup specsById(Map newSpecsById) { @@ -83,6 +92,16 @@ ManifestGroup filterPartitions(Expression newPartitionFilter) { return this; } + ManifestGroup filterManifests(Predicate newManifestPredicate) { + this.manifestPredicate = manifestPredicate.and(newManifestPredicate); + return this; + } + + ManifestGroup filterManifestEntries(Predicate newManifestEntryPredicate) { + this.manifestEntryPredicate = manifestEntryPredicate.and(newManifestEntryPredicate); + return this; + } + ManifestGroup ignoreDeleted() { this.ignoreDeleted = true; return this; @@ -103,7 +122,35 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) { return this; } + ManifestGroup planWith(ExecutorService newExecutorService) { + this.executorService = newExecutorService; + return this; + } + /** + * Returns a iterable of scan tasks. It is safe to add entries of this iterable + * to a collection as {@link DataFile} in each {@link FileScanTask} is defensively + * copied. + * @return a {@link CloseableIterable} of {@link FileScanTask} + */ + public CloseableIterable planFiles() { + Iterable> tasks = entries((manifest, entries) -> { + PartitionSpec spec = specsById.get(manifest.partitionSpecId()); + String schemaString = SchemaParser.toJson(spec.schema()); + String specString = PartitionSpecParser.toJson(spec); + ResidualEvaluator residuals = ResidualEvaluator.of(spec, dataFilter, caseSensitive); + return CloseableIterable.transform(entries, e -> new BaseFileScanTask( + e.copy().file(), schemaString, specString, residuals)); + }); + + if (executorService != null) { + return new ParallelIterable<>(tasks, executorService); + } else { + return CloseableIterable.concat(tasks); + } + } + + /** * Returns an iterable for manifest entries in the set of manifests. *

* Entries are not copied and it is the caller's responsibility to make defensive copies if @@ -112,11 +159,16 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) { * @return a CloseableIterable of manifest entries. */ public CloseableIterable entries() { + return CloseableIterable.concat(entries((manifest, entries) -> entries)); + } + + private Iterable> entries( + BiFunction, CloseableIterable> entryFn) { LoadingCache evalCache = specsById == null ? null : Caffeine.newBuilder().build(specId -> { PartitionSpec spec = specsById.get(specId); return ManifestEvaluator.forPartitionFilter( - Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)), + Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)), spec, caseSensitive); }); @@ -141,7 +193,9 @@ public CloseableIterable entries() { manifest -> manifest.hasAddedFiles() || manifest.hasDeletedFiles()); } - Iterable> readers = Iterables.transform( + matchingManifests = Iterables.filter(matchingManifests, manifestPredicate::test); + + Iterable> readers = Iterables.transform( matchingManifests, manifest -> { ManifestReader reader = ManifestReader.read( @@ -151,6 +205,7 @@ public CloseableIterable entries() { FilteredManifest filtered = reader .filterRows(dataFilter) .filterPartitions(partitionFilter) + .caseSensitive(caseSensitive) .select(columns); CloseableIterable entries = filtered.allEntries(); @@ -168,9 +223,10 @@ public CloseableIterable entries() { entry -> evaluator.eval((GenericDataFile) entry.file())); } - return entries; + entries = CloseableIterable.filter(entries, manifestEntryPredicate); + return entryFn.apply(manifest, entries); }); - return CloseableIterable.concat(readers); + return readers; } } diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index ac38a86bc2ad..8886a5abc16c 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -42,6 +42,16 @@ public static List currentAncestors(Table table) { return ancestorIds(table.currentSnapshot(), table::snapshot); } + /** + * @return List of snapshot ids in the range - (fromSnapshotId, toSnapshotId] + * This method assumes that fromSnapshotId is an ancestor of toSnapshotId + */ + public static List snapshotIdsBetween(Table table, long fromSnapshotId, long toSnapshotId) { + List snapshotIds = Lists.newArrayList(ancestorIds(table.snapshot(toSnapshotId), + snapshotId -> snapshotId != fromSnapshotId ? table.snapshot(snapshotId) : null)); + return snapshotIds; + } + public static List ancestorIds(Snapshot snapshot, Function lookup) { List ancestorIds = Lists.newArrayList(); Snapshot current = snapshot; diff --git a/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java new file mode 100644 index 000000000000..6831135cdc78 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestIncrementalDataTableScan.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import java.util.List; +import java.util.Set; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestIncrementalDataTableScan extends TableTestBase { + + @Before + public void setupTableProperties() { + table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2").commit(); + } + + @Test + public void testInvalidScans() { + add(table.newAppend(), files("A")); + AssertHelpers.assertThrows( + "from and to snapshots cannot be the same, since from snapshot is exclusive and not part of the scan", + IllegalArgumentException.class, "fromSnapshotId and toSnapshotId cannot be the same", + () -> appendsBetweenScan(1, 1)); + } + + @Test + public void testAppends() { + add(table.newAppend(), files("A")); // 1 + add(table.newAppend(), files("B")); + add(table.newAppend(), files("C")); + add(table.newAppend(), files("D")); + add(table.newAppend(), files("E")); // 5 + Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 5)); + Assert.assertEquals(Sets.newHashSet("C", "D", "E"), appendsBetweenScan(2, 5)); + } + + @Test + public void testReplaceOverwritesDeletes() { + add(table.newAppend(), files("A")); // 1 + add(table.newAppend(), files("B")); + add(table.newAppend(), files("C")); + add(table.newAppend(), files("D")); + add(table.newAppend(), files("E")); // 5 + Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 5)); + + replace(table.newRewrite(), files("A", "B", "C"), files("F", "G")); // 6 + Assert.assertEquals("Replace commits are ignored", Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 6)); + Assert.assertEquals(Sets.newHashSet("E"), appendsBetweenScan(4, 6)); + // 6th snapshot is a replace. No new content is added + Assert.assertTrue("Replace commits are ignored", appendsBetweenScan(5, 6).isEmpty()); + delete(table.newDelete(), files("D")); // 7 + // 7th snapshot is a delete. + Assert.assertTrue("Replace and delete commits are ignored", appendsBetweenScan(5, 7).isEmpty()); + Assert.assertTrue("Delete commits are ignored", appendsBetweenScan(6, 7).isEmpty()); + add(table.newAppend(), files("I")); // 8 + // snapshots 6 and 7 are ignored + Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "I"), appendsBetweenScan(1, 8)); + Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(6, 8)); + Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(7, 8)); + + overwrite(table.newOverwrite(), files("H"), files("E")); // 9 + AssertHelpers.assertThrows( + "Overwrites are not supported for Incremental scan", UnsupportedOperationException.class, + "Found overwrite operation, cannot support incremental data in snapshots (8, 9]", + () -> appendsBetweenScan(8, 9)); + } + + @Test + public void testTransactions() { + Transaction transaction = table.newTransaction(); + + add(transaction.newAppend(), files("A")); // 1 + add(transaction.newAppend(), files("B")); + add(transaction.newAppend(), files("C")); + add(transaction.newAppend(), files("D")); + add(transaction.newAppend(), files("E")); // 5 + transaction.commitTransaction(); + Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 5)); + + transaction = table.newTransaction(); + replace(transaction.newRewrite(), files("A", "B", "C"), files("F", "G")); // 6 + transaction.commitTransaction(); + Assert.assertEquals("Replace commits are ignored", Sets.newHashSet("B", "C", "D", "E"), appendsBetweenScan(1, 6)); + Assert.assertEquals(Sets.newHashSet("E"), appendsBetweenScan(4, 6)); + // 6th snapshot is a replace. No new content is added + Assert.assertTrue("Replace commits are ignored", appendsBetweenScan(5, 6).isEmpty()); + + transaction = table.newTransaction(); + delete(transaction.newDelete(), files("D")); // 7 + transaction.commitTransaction(); + // 7th snapshot is a delete. + Assert.assertTrue("Replace and delete commits are ignored", appendsBetweenScan(5, 7).isEmpty()); + Assert.assertTrue("Delete commits are ignored", appendsBetweenScan(6, 7).isEmpty()); + + transaction = table.newTransaction(); + add(transaction.newAppend(), files("I")); // 8 + transaction.commitTransaction(); + // snapshots 6, 7 and 8 are ignored + Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "I"), appendsBetweenScan(1, 8)); + Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(6, 8)); + Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(7, 8)); + } + + @Test + public void testRollbacks() { + add(table.newAppend(), files("A")); // 1 + add(table.newAppend(), files("B")); + add(table.newAppend(), files("C")); // 3 + // Go back to snapshot "B" + table.rollback().toSnapshotId(2).commit(); // 2 + Assert.assertEquals(2, table.currentSnapshot().snapshotId()); + Assert.assertEquals(Sets.newHashSet("B"), appendsBetweenScan(1, 2)); + Assert.assertEquals(Sets.newHashSet("B"), appendsAfterScan(1)); + + Transaction transaction = table.newTransaction(); + add(transaction.newAppend(), files("D")); // 4 + add(transaction.newAppend(), files("E")); // 5 + add(transaction.newAppend(), files("F")); + transaction.commitTransaction(); + // Go back to snapshot "E" + table.rollback().toSnapshotId(5).commit(); + Assert.assertEquals(5, table.currentSnapshot().snapshotId()); + Assert.assertEquals(Sets.newHashSet("B", "D", "E"), appendsBetweenScan(1, 5)); + Assert.assertEquals(Sets.newHashSet("B", "D", "E"), appendsAfterScan(1)); + } + + private static DataFile file(String name) { + return DataFiles.builder(SPEC) + .withPath(name + ".parquet") + .withFileSizeInBytes(0) + .withPartitionPath("data_bucket=0") // easy way to set partition data for now + .withRecordCount(1) + .build(); + } + + private static void add(AppendFiles appendFiles, List adds) { + for (DataFile f : adds) { + appendFiles.appendFile(f); + } + appendFiles.commit(); + } + + private static void delete(DeleteFiles deleteFiles, List deletes) { + for (DataFile f : deletes) { + deleteFiles.deleteFile(f); + } + deleteFiles.commit(); + } + + private static void replace(RewriteFiles rewriteFiles, List deletes, List adds) { + rewriteFiles.rewriteFiles(Sets.newHashSet(deletes), Sets.newHashSet(adds)); + rewriteFiles.commit(); + } + + private static void overwrite(OverwriteFiles overwriteFiles, List adds, List deletes) { + for (DataFile f : adds) { + overwriteFiles.addFile(f); + } + for (DataFile f : deletes) { + overwriteFiles.deleteFile(f); + } + overwriteFiles.commit(); + } + + private static List files(String... names) { + return Lists.transform(Lists.newArrayList(names), TestIncrementalDataTableScan::file); + } + + private Set appendsAfterScan(long fromSnapshotId) { + final TableScan appendsAfter = table.newScan().appendsAfter(fromSnapshotId); + return filesToScan(appendsAfter); + } + + private Set appendsBetweenScan(long fromSnapshotId, long toSnapshotId) { + Snapshot s1 = table.snapshot(fromSnapshotId); + Snapshot s2 = table.snapshot(toSnapshotId); + TableScan appendsBetween = table.newScan().appendsBetween(s1.snapshotId(), s2.snapshotId()); + return filesToScan(appendsBetween); + } + + private static Set filesToScan(TableScan tableScan) { + Iterable filesToRead = Iterables.transform(tableScan.planFiles(), t -> { + String path = t.file().path().toString(); + return path.split("\\.")[0]; + }); + return Sets.newHashSet(filesToRead); + } +}