Skip to content
Draft

WIP #5280

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.clustering.ClusteringMetadataDomain;
import io.delta.kernel.internal.files.ParsedDeltaData;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
Expand All @@ -51,6 +52,117 @@

/** Implementation of {@link Snapshot}. */
public class SnapshotImpl implements Snapshot {

//////////////////////////////////////////
// Static factory methods and constants //
//////////////////////////////////////////

/**
* Creates a post-commit Snapshot after a transaction.
*
* @param engine The engine to use for operations
* @param dataPath The path to the table
* @param previousSnapshot Optional previous snapshot (empty for CREATE transactions)
* @param newlyCommittedDeltaFiles XXXX
* @param committer The committer for the snapshot
* @param txnEffectiveProtocol The effective protocol after the transaction
* @param txnEffectiveMetadata The effective metadata after the transaction
* @param txnInCommitTimestampOpt The in-commit timestamp from the transaction (if available)
* @return A new post-commit Snapshot
*/
public static SnapshotImpl createPostCommitSnapshot(
Engine engine,
Path dataPath,
Optional<SnapshotImpl> previousSnapshot,
List<ParsedDeltaData> newlyCommittedDeltaFiles,
Committer committer,
Protocol txnEffectiveProtocol,
Metadata txnEffectiveMetadata,
Optional<Long> txnInCommitTimestampOpt) {
// TODO: plumb through CRCInfo

// TODO: Create SnapshotQueryContext.forPostCommitSnapshot
final SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forLatestSnapshot(dataPath.toString());

final LogSegment logSegment =
createLogSegmentForPostCommit(dataPath, previousSnapshot, newlyCommittedDeltaFiles);

// TODO: We should make post-commit Snapshots be fully incremental. That is, we replay state
// using any previously-computed state from the previous Snapshot / LogReplay.
final LogReplay logReplay =
new LogReplay(
dataPath,
engine,
new Lazy<>(() -> logSegment),
Optional.empty(), // snapshotHint
snapshotContext.getSnapshotMetrics());

return new SnapshotImpl(
dataPath,
logSegment.getVersion(),
new Lazy<>(() -> logSegment),
logReplay,
txnEffectiveProtocol,
txnEffectiveMetadata,
committer,
snapshotContext,
txnInCommitTimestampOpt);
}

/** Creates an initial Snapshot for a table at a specific version. */
public static SnapshotImpl createInitialSnapshot(
Path dataPath,
long version,
Lazy<LogSegment> lazyLogSegment,
LogReplay logReplay,
Protocol protocol,
Metadata metadata,
Committer committer,
SnapshotQueryContext snapshotContext) {
return new SnapshotImpl(
dataPath,
version,
lazyLogSegment,
logReplay,
protocol,
metadata,
committer,
snapshotContext,
Optional.empty()); // postCommitInCommitTimestampOpt
}

private static LogSegment createLogSegmentForPostCommit(
Path dataPath,
Optional<SnapshotImpl> previousSnapshot,
List<ParsedDeltaData> newlyCommittedDeltaFiles) {
if (previousSnapshot.isPresent()) {
// UPDATE/REPLACE case: extend existing LogSegment
return previousSnapshot
.get()
.getLogSegment()
.copyWithAdditionalDeltas(newlyCommittedDeltaFiles);
} else {
// CREATE case: single delta at version 0
checkArgument(
newlyCommittedDeltaFiles.size() == 1,
"CREATE transaction must have exactly one delta, got %d",
newlyCommittedDeltaFiles.size());
final ParsedDeltaData firstDelta = newlyCommittedDeltaFiles.get(0);

checkArgument(
firstDelta.getVersion() == 0,
"CREATE transaction requires version 0, got %s",
firstDelta.getVersion());

return LogSegment.fromSingleDelta(new Path(dataPath, "_delta_log"), firstDelta);
}
}

//////////////////////////////////
// Member methods and variables //
//////////////////////////////////

private final Path logPath;
private final Path dataPath;
private final long version;
Expand All @@ -71,7 +183,8 @@ public SnapshotImpl(
Protocol protocol,
Metadata metadata,
Committer committer,
SnapshotQueryContext snapshotContext) {
SnapshotQueryContext snapshotContext,
Optional<Long> postCommitInCommitTimestampOpt) {
checkArgument(version >= 0, "A snapshot cannot have version < 0");
this.logPath = new Path(dataPath, "_delta_log");
this.dataPath = dataPath;
Expand All @@ -81,7 +194,7 @@ public SnapshotImpl(
this.protocol = requireNonNull(protocol);
this.metadata = requireNonNull(metadata);
this.committer = committer;
this.inCommitTimestampOpt = Optional.empty();
this.inCommitTimestampOpt = postCommitInCommitTimestampOpt;

// We create the actual Snapshot report lazily (on first access) instead of eagerly in this
// constructor because some Snapshot metrics, like {@link
Expand Down Expand Up @@ -265,4 +378,5 @@ public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) {
public Optional<Long> getLatestTransactionVersion(Engine engine, String applicationId) {
return logReplay.getLatestTransactionIdentifier(engine, applicationId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.Objects.requireNonNull;

import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.files.ParsedDeltaData;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.lang.ListUtils;
Expand All @@ -34,8 +35,41 @@

public class LogSegment {

//////////////////////////////////////////
// Static factory methods and constants //
//////////////////////////////////////////

/**
* Creates a LogSegment from a single ParsedDeltaData. Used to construct a post-commit Snapshot
* after a CREATE transaction.
*
* @param logPath The path to the _delta_log directory
* @param parsedDelta The ParsedDeltaData (e.g., version 0)
* @return A new LogSegment with just this delta
* @throws IllegalArgumentException if the ParsedDeltaData is not file-based
*/
public static LogSegment fromSingleDelta(Path logPath, ParsedDeltaData parsedDelta) {
checkArgument(parsedDelta.isFile(), "Currently, only file-based deltas are supported");
checkArgument(
parsedDelta.getVersion() == 0L,
"Version must be 0 for a LogSegment with only a single delta");

final FileStatus deltaFile = parsedDelta.getFileStatus();
final long version = parsedDelta.getVersion();
final List<FileStatus> deltas = Collections.singletonList(deltaFile);
final List<FileStatus> checkpoints = Collections.emptyList();
final List<FileStatus> compactions = Collections.emptyList();

return new LogSegment(
logPath, version, deltas, compactions, checkpoints, deltaFile, Optional.empty());
}

private static final Logger logger = LoggerFactory.getLogger(LogSegment.class);

//////////////////////////////////
// Member methods and variables //
//////////////////////////////////

private final Path logPath;
private final long version;
private final List<FileStatus> deltas;
Expand Down Expand Up @@ -237,6 +271,67 @@ public List<FileStatus> allFilesWithCompactionsReversed() {
return deltasCheckpointsCompactionsReversed.get();
}

/**
* Creates a new LogSegment by extending this LogSegment with additional deltas. Used to construct
* a post-commit Snapshot from a previous Snapshot.
*
* <p>The additional deltas must be contiguous and start at version + 1.
*
* @param additionalDeltas List of ParsedDeltaData to add (must be contiguous after current
* version)
* @return A new LogSegment with the additional deltas
* @throws IllegalArgumentException if deltas are not contiguous or don't start at version + 1
*/
public LogSegment copyWithAdditionalDeltas(List<ParsedDeltaData> additionalDeltas) {
if (additionalDeltas.isEmpty()) {
return this;
}

// For now, we only support file-based deltas
checkArgument(
additionalDeltas.stream().allMatch(ParsedDeltaData::isFile),
"Currently, only file-based deltas are supported");

// Validate that the first new delta starts at version + 1
final long firstNewVersion = additionalDeltas.get(0).getVersion();
checkArgument(
firstNewVersion == version + 1,
"First additional delta version %d must equal current version + 1 (%d)",
firstNewVersion,
version + 1);

// Validate that new deltas are contiguous
long expectedVersion = firstNewVersion;
for (ParsedDeltaData delta : additionalDeltas) {
checkArgument(
delta.getVersion() == expectedVersion,
"Delta versions must be contiguous. Expected %d but got %d",
expectedVersion,
delta.getVersion());
expectedVersion++;
}

// Convert ParsedDeltaData to FileStatus to comply with existing LogSegment constructor
final List<FileStatus> newDeltaFileStatuses =
additionalDeltas.stream().map(ParsedDeltaData::getFileStatus).collect(Collectors.toList());

// Create combined deltas list
final List<FileStatus> combinedDeltas = new ArrayList<>(deltas);
combinedDeltas.addAll(newDeltaFileStatuses);

// The new version is the version of the last additional delta
final ParsedDeltaData lastAdditionalDelta = ListUtils.getLast(additionalDeltas);

return new LogSegment(
logPath,
lastAdditionalDelta.getVersion(),
combinedDeltas,
compactions, // Keep existing compactions
checkpoints, // Keep existing checkpoints
lastAdditionalDelta.getFileStatus(),
lastSeenChecksum);
}

@Override
public String toString() {
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private SnapshotImpl createSnapshot(
snapshotContext.getSnapshotMetrics());

final SnapshotImpl snapshot =
new SnapshotImpl(
SnapshotImpl.createInitialSnapshot(
tablePath,
initSegment.getVersion(),
new Lazy<>(() -> initSegment),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private SnapshotImpl createSnapshot(Engine engine, SnapshotQueryContext snapshot
final Lazy<LogSegment> lazyLogSegment = getLazyLogSegment(engine, snapshotCtx, versionToLoad);
final LogReplay logReplay = getLogReplay(engine, lazyLogSegment, snapshotCtx);

return new SnapshotImpl(
return SnapshotImpl.createInitialSnapshot(
tablePath,
versionToLoad.orElseGet(() -> lazyLogSegment.get().getVersion()),
lazyLogSegment,
Expand Down
Loading
Loading