Skip to content

Commit

Permalink
Core: Support lazy snapshot loading in TableMetadata (#6811)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcweeks committed Feb 12, 2023
1 parent b6b9972 commit 1521296
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 8 deletions.
60 changes: 52 additions & 8 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableSupplier;

/** Metadata for a table. */
public class TableMetadata implements Serializable {
Expand Down Expand Up @@ -235,16 +236,18 @@ public String toString() {
private final List<SortOrder> sortOrders;
private final Map<String, String> properties;
private final long currentSnapshotId;
private final List<Snapshot> snapshots;
private final Map<Long, Snapshot> snapshotsById;
private final Map<Integer, Schema> schemasById;
private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, SortOrder> sortOrdersById;
private final List<HistoryEntry> snapshotLog;
private final List<MetadataLogEntry> previousFiles;
private final Map<String, SnapshotRef> refs;
private final List<StatisticsFile> statisticsFiles;
private final List<MetadataUpdate> changes;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private volatile List<Snapshot> snapshots;
private volatile Map<Long, Snapshot> snapshotsById;
private volatile Map<String, SnapshotRef> refs;
private volatile boolean snapshotsLoaded;

@SuppressWarnings("checkstyle:CyclomaticComplexity")
TableMetadata(
Expand All @@ -265,6 +268,7 @@ public String toString() {
Map<String, String> properties,
long currentSnapshotId,
List<Snapshot> snapshots,
SerializableSupplier<List<Snapshot>> snapshotsSupplier,
List<HistoryEntry> snapshotLog,
List<MetadataLogEntry> previousFiles,
Map<String, SnapshotRef> refs,
Expand Down Expand Up @@ -305,6 +309,8 @@ public String toString() {
this.properties = properties;
this.currentSnapshotId = currentSnapshotId;
this.snapshots = snapshots;
this.snapshotsSupplier = snapshotsSupplier;
this.snapshotsLoaded = snapshotsSupplier == null;
this.snapshotLog = snapshotLog;
this.previousFiles = previousFiles;

Expand Down Expand Up @@ -359,9 +365,7 @@ public String toString() {
previous.timestampMillis);
}

Preconditions.checkArgument(
currentSnapshotId < 0 || snapshotsById.containsKey(currentSnapshotId),
"Invalid table metadata: Cannot find current version");
validateCurrentSnapshot();
}

public int formatVersion() {
Expand Down Expand Up @@ -473,6 +477,10 @@ public long propertyAsLong(String property, long defaultValue) {
}

public Snapshot snapshot(long snapshotId) {
if (!snapshotsById.containsKey(snapshotId)) {
ensureSnapshotsLoaded();
}

return snapshotsById.get(snapshotId);
}

Expand All @@ -481,9 +489,32 @@ public Snapshot currentSnapshot() {
}

public List<Snapshot> snapshots() {
ensureSnapshotsLoaded();

return snapshots;
}

private synchronized void ensureSnapshotsLoaded() {
if (!snapshotsLoaded) {
List<Snapshot> loadedSnapshots = Lists.newArrayList(snapshotsSupplier.get());
loadedSnapshots.removeIf(s -> s.sequenceNumber() > lastSequenceNumber);

// Format version 1 does not have accurate sequence numbering, so remove based on timestamp
if (this.formatVersion == 1) {
loadedSnapshots.removeIf(s -> s.timestampMillis() > currentSnapshot().timestampMillis());
}

this.snapshots = ImmutableList.copyOf(loadedSnapshots);
this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
validateCurrentSnapshot();

this.refs = validateRefs(currentSnapshotId, refs, snapshotsById);

this.snapshotsLoaded = true;
this.snapshotsSupplier = null;
}
}

public SnapshotRef ref(String name) {
return refs.get(name);
}
Expand Down Expand Up @@ -526,7 +557,7 @@ public TableMetadata replaceSortOrder(SortOrder newOrder) {
}

public TableMetadata removeSnapshotsIf(Predicate<Snapshot> removeIf) {
List<Snapshot> toRemove = snapshots.stream().filter(removeIf).collect(Collectors.toList());
List<Snapshot> toRemove = snapshots().stream().filter(removeIf).collect(Collectors.toList());
return new Builder(this).removeSnapshots(toRemove).build();
}

Expand Down Expand Up @@ -554,6 +585,12 @@ public TableMetadata replaceProperties(Map<String, String> rawProperties) {
.build();
}

private void validateCurrentSnapshot() {
Preconditions.checkArgument(
currentSnapshotId < 0 || snapshotsById.containsKey(currentSnapshotId),
"Invalid table metadata: Cannot find current version");
}

private PartitionSpec reassignPartitionIds(PartitionSpec partitionSpec, TypeUtil.NextID nextID) {
PartitionSpec.Builder specBuilder =
PartitionSpec.builderFor(partitionSpec.schema()).withSpecId(partitionSpec.specId());
Expand Down Expand Up @@ -823,6 +860,7 @@ public static class Builder {
private final Map<String, String> properties;
private long currentSnapshotId;
private List<Snapshot> snapshots;
private SerializableSupplier<List<Snapshot>> snapshotsSupplier;
private final Map<String, SnapshotRef> refs;
private final Map<Long, List<StatisticsFile>> statisticsFiles;

Expand Down Expand Up @@ -885,7 +923,7 @@ private Builder(TableMetadata base) {
this.sortOrders = Lists.newArrayList(base.sortOrders);
this.properties = Maps.newHashMap(base.properties);
this.currentSnapshotId = base.currentSnapshotId;
this.snapshots = Lists.newArrayList(base.snapshots);
this.snapshots = Lists.newArrayList(base.snapshots());
this.changes = Lists.newArrayList(base.changes);
this.startingChangeCount = changes.size();

Expand Down Expand Up @@ -1105,6 +1143,11 @@ public Builder addSnapshot(Snapshot snapshot) {
return this;
}

public Builder setSnapshotsSupplier(SerializableSupplier<List<Snapshot>> snapshotsSupplier) {
this.snapshotsSupplier = snapshotsSupplier;
return this;
}

public Builder setBranchSnapshot(Snapshot snapshot, String branch) {
addSnapshot(snapshot);
setBranchSnapshotInternal(snapshot, branch);
Expand Down Expand Up @@ -1332,6 +1375,7 @@ public TableMetadata build() {
ImmutableMap.copyOf(properties),
currentSnapshotId,
ImmutableList.copyOf(snapshots),
snapshotsSupplier,
ImmutableList.copyOf(newSnapshotLog),
ImmutableList.copyOf(metadataHistory),
ImmutableMap.copyOf(refs),
Expand Down
Expand Up @@ -507,6 +507,7 @@ static TableMetadata fromJson(String metadataLocation, JsonNode node) {
properties,
currentSnapshotId,
snapshots,
null,
entries.build(),
metadataEntries.build(),
refs,
Expand Down

0 comments on commit 1521296

Please sign in to comment.