Skip to content

Commit

Permalink
Core: Include all reachable snapshots with v1 format and REF snapshot…
Browse files Browse the repository at this point in the history
… mode
  • Loading branch information
nastra committed May 16, 2023
1 parent 42cf157 commit 8bf71d5
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 3 deletions.
21 changes: 19 additions & 2 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.apache.iceberg.util.SnapshotUtil;

/** Metadata for a table. */
public class TableMetadata implements Serializable {
Expand Down Expand Up @@ -499,9 +500,25 @@ private synchronized void ensureSnapshotsLoaded() {
List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);

// Format version 1 does not have accurate sequence numbering, so remove based on timestamp
// Format version 1 does not have accurate sequence numbering, so remove based on
// ref reachability
if (this.formatVersion == 1) {
loadedSnapshots.removeIf(s -> s.timestampMillis() > currentSnapshot().timestampMillis());
Set<Long> snapshotsToKeep = Sets.newHashSet();

for (Long snapshotId :
refs().values().stream().map(SnapshotRef::snapshotId).collect(Collectors.toSet())) {
SnapshotUtil.ancestorsOf(
snapshotId,
id ->
loadedSnapshots.stream()
.filter(s -> s.snapshotId() == id)
.findFirst()
.orElse(null),
false)
.forEach(s -> snapshotsToKeep.add(s.snapshotId()));
}

loadedSnapshots.removeIf(s -> !snapshotsToKeep.contains(s.snapshotId()));
}

this.snapshots = ImmutableList.copyOf(loadedSnapshots);
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Expand Up @@ -145,8 +145,16 @@ public static Snapshot oldestAncestorOf(long snapshotId, Function<Long, Snapshot
}

public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Snapshot> lookup) {
return ancestorsOf(snapshotId, lookup, true);
}

public static Iterable<Snapshot> ancestorsOf(
long snapshotId, Function<Long, Snapshot> lookup, boolean validateStartSnapshotId) {
Snapshot start = lookup.apply(snapshotId);
Preconditions.checkArgument(start != null, "Cannot find snapshot: %s", snapshotId);
if (validateStartSnapshotId) {
Preconditions.checkArgument(start != null, "Cannot find snapshot: %s", snapshotId);
}

return ancestorsOf(start, lookup);
}

Expand Down
130 changes: 130 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
Expand All @@ -76,6 +77,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -849,6 +852,133 @@ public void testTableSnapshotLoading() {
any());
}

@ParameterizedTest
@ValueSource(strings = {"1", "2"})
public void testTableSnapshotLoadingWithDivergedBranches(String formatVersion) {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));

RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
"snapshot-loading-mode",
"refs"));

Table table =
catalog.createTable(
TABLE,
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", formatVersion));

table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();

String branch = "divergedBranch";
table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit();

// branch and main are diverged now
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();

ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());

// Respond with only referenced snapshots
Answer<?> refsAnswer =
invocation -> {
LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
TableMetadata fullTableMetadata = originalResponse.tableMetadata();

Set<Long> referencedSnapshotIds =
fullTableMetadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());

TableMetadata refsMetadata =
fullTableMetadata.removeSnapshotsIf(
s -> !referencedSnapshotIds.contains(s.snapshotId()));

return LoadTableResponse.builder()
.withTableMetadata(refsMetadata)
.addAllConfig(originalResponse.config())
.build();
};

Mockito.doAnswer(refsAnswer)
.when(adapter)
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

Table refsTables = catalog.loadTable(TABLE);
assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());

// verify that the table was loaded with the refs argument
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

// verify that all snapshots are loaded when referenced
assertThat(catalog.loadTable(TABLE).snapshots())
.containsExactlyInAnyOrderElementsOf(table.snapshots());
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "all")),
any(),
eq(LoadTableResponse.class),
any(),
any());

// verify that committing to branch is possible
catalog
.loadTable(TABLE)
.newAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-c.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();

assertThat(catalog.loadTable(TABLE).snapshots())
.hasSizeGreaterThan(Lists.newArrayList(table.snapshots()).size());
}

public void testTableAuth(
String catalogToken,
Map<String, String> credentials,
Expand Down

0 comments on commit 8bf71d5

Please sign in to comment.