Skip to content

Commit

Permalink
Core: Include all reachable snapshots with v1 format and REF snapshot…
Browse files Browse the repository at this point in the history
… mode
  • Loading branch information
nastra committed Jun 17, 2023
1 parent 5584426 commit a1b5c68
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 5 deletions.
5 changes: 0 additions & 5 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Expand Up @@ -499,11 +499,6 @@ private synchronized void ensureSnapshotsLoaded() {
List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);

// Format version 1 does not have accurate sequence numbering, so remove based on timestamp
if (this.formatVersion == 1) {
loadedSnapshots.removeIf(s -> s.timestampMillis() > currentSnapshot().timestampMillis());
}

this.snapshots = ImmutableList.copyOf(loadedSnapshots);
this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
validateCurrentSnapshot();
Expand Down
130 changes: 130 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
Expand All @@ -76,6 +77,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -849,6 +852,133 @@ public void testTableSnapshotLoading() {
any());
}

@ParameterizedTest
@ValueSource(strings = {"1", "2"})
public void testTableSnapshotLoadingWithDivergedBranches(String formatVersion) {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));

RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
"snapshot-loading-mode",
"refs"));

Table table =
catalog.createTable(
TABLE,
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", formatVersion));

table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();

String branch = "divergedBranch";
table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit();

// branch and main are diverged now
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();

ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());

// Respond with only referenced snapshots
Answer<?> refsAnswer =
invocation -> {
LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
TableMetadata fullTableMetadata = originalResponse.tableMetadata();

Set<Long> referencedSnapshotIds =
fullTableMetadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());

TableMetadata refsMetadata =
fullTableMetadata.removeSnapshotsIf(
s -> !referencedSnapshotIds.contains(s.snapshotId()));

return LoadTableResponse.builder()
.withTableMetadata(refsMetadata)
.addAllConfig(originalResponse.config())
.build();
};

Mockito.doAnswer(refsAnswer)
.when(adapter)
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

Table refsTables = catalog.loadTable(TABLE);
assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());

// verify that the table was loaded with the refs argument
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

// verify that all snapshots are loaded when referenced
assertThat(catalog.loadTable(TABLE).snapshots())
.containsExactlyInAnyOrderElementsOf(table.snapshots());
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "all")),
any(),
eq(LoadTableResponse.class),
any(),
any());

// verify that committing to branch is possible
catalog
.loadTable(TABLE)
.newAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-c.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();

assertThat(catalog.loadTable(TABLE).snapshots())
.hasSizeGreaterThan(Lists.newArrayList(table.snapshots()).size());
}

public void testTableAuth(
String catalogToken,
Map<String, String> credentials,
Expand Down

0 comments on commit a1b5c68

Please sign in to comment.