Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.3.x] Core: Include all reachable snapshots with v1 format and REF snapshot mode #8027

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Expand Up @@ -499,11 +499,6 @@ private synchronized void ensureSnapshotsLoaded() {
List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);

// Format version 1 does not have accurate sequence numbering, so remove based on timestamp
if (this.formatVersion == 1) {
loadedSnapshots.removeIf(s -> s.timestampMillis() > currentSnapshot().timestampMillis());
}

this.snapshots = ImmutableList.copyOf(loadedSnapshots);
this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
validateCurrentSnapshot();
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.SerializableSupplier;
import org.assertj.core.api.Assumptions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -131,6 +132,10 @@ public void testCurrentTableScanDoesNotLoad() {

@Test
public void testFutureSnapshotsAreRemoved() {
Assumptions.assumeThat(formatVersion)
.as("Future snapshots are only removed for V2 tables")
.isGreaterThan(1);

table.newFastAppend().appendFile(FILE_C).commit();

TableMetadata futureTableMetadata =
Expand Down
130 changes: 130 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Expand Up @@ -54,6 +54,7 @@
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
Expand All @@ -76,6 +77,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -849,6 +852,133 @@ public void testTableSnapshotLoading() {
any());
}

@ParameterizedTest
@ValueSource(strings = {"1", "2"})
public void testTableSnapshotLoadingWithDivergedBranches(String formatVersion) {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));

RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
"snapshot-loading-mode",
"refs"));

Table table =
catalog.createTable(
TABLE,
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", formatVersion));

table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();

String branch = "divergedBranch";
table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit();

// branch and main are diverged now
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();

ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());

// Respond with only referenced snapshots
Answer<?> refsAnswer =
invocation -> {
LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
TableMetadata fullTableMetadata = originalResponse.tableMetadata();

Set<Long> referencedSnapshotIds =
fullTableMetadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());

TableMetadata refsMetadata =
fullTableMetadata.removeSnapshotsIf(
s -> !referencedSnapshotIds.contains(s.snapshotId()));

return LoadTableResponse.builder()
.withTableMetadata(refsMetadata)
.addAllConfig(originalResponse.config())
.build();
};

Mockito.doAnswer(refsAnswer)
.when(adapter)
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

Table refsTables = catalog.loadTable(TABLE);
assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());

// verify that the table was loaded with the refs argument
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());

// verify that all snapshots are loaded when referenced
assertThat(catalog.loadTable(TABLE).snapshots())
.containsExactlyInAnyOrderElementsOf(table.snapshots());
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "all")),
any(),
eq(LoadTableResponse.class),
any(),
any());

// verify that committing to branch is possible
catalog
.loadTable(TABLE)
.newAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-c.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();

assertThat(catalog.loadTable(TABLE).snapshots())
.hasSizeGreaterThan(Lists.newArrayList(table.snapshots()).size());
}

public void testTableAuth(
String catalogToken,
Map<String, String> credentials,
Expand Down