From 94155a89873b25094372f2de53e3c9346e7e6296 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 26 Sep 2025 19:59:58 +0000 Subject: [PATCH 1/2] done first pass w unit tests --- .../kernel/internal/snapshot/LogSegment.java | 95 +++++++++++++++ .../internal/snapshot/LogSegmentSuite.scala | 115 +++++++++++++++++- 2 files changed, 208 insertions(+), 2 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java index b3ae748b388..06884612cea 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java @@ -20,6 +20,7 @@ import static java.util.Objects.requireNonNull; import io.delta.kernel.internal.annotation.VisibleForTesting; +import io.delta.kernel.internal.files.ParsedDeltaData; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.lang.ListUtils; @@ -34,8 +35,41 @@ public class LogSegment { + ////////////////////////////////////////// + // Static factory methods and constants // + ////////////////////////////////////////// + + /** + * Creates a LogSegment from a single ParsedDeltaData. Used to construct a post-commit Snapshot + * after a CREATE transaction. + * + * @param logPath The path to the _delta_log directory + * @param parsedDelta The ParsedDeltaData (e.g., version 0) + * @return A new LogSegment with just this delta + * @throws IllegalArgumentException if the ParsedDeltaData is not file-based + */ + public static LogSegment fromSingleDelta(Path logPath, ParsedDeltaData parsedDelta) { + checkArgument(parsedDelta.isFile(), "Currently, only file-based deltas are supported"); + checkArgument( + parsedDelta.getVersion() == 0L, + "Version must be 0 for a LogSegment with only a single delta"); + + final FileStatus deltaFile = parsedDelta.getFileStatus(); + final long version = parsedDelta.getVersion(); + final List deltas = Collections.singletonList(deltaFile); + final List checkpoints = Collections.emptyList(); + final List compactions = Collections.emptyList(); + + return new LogSegment( + logPath, version, deltas, compactions, checkpoints, deltaFile, Optional.empty()); + } + private static final Logger logger = LoggerFactory.getLogger(LogSegment.class); + ////////////////////////////////// + // Member methods and variables // + ////////////////////////////////// + private final Path logPath; private final long version; private final List deltas; @@ -237,6 +271,67 @@ public List allFilesWithCompactionsReversed() { return deltasCheckpointsCompactionsReversed.get(); } + /** + * Creates a new LogSegment by extending this LogSegment with additional deltas. Used to construct + * a post-commit Snapshot from a previous Snapshot. + * + *

The additional deltas must be contiguous and start at version + 1. + * + * @param additionalDeltas List of ParsedDeltaData to add (must be contiguous after current + * version) + * @return A new LogSegment with the additional deltas + * @throws IllegalArgumentException if deltas are not contiguous or don't start at version + 1 + */ + public LogSegment copyWithAdditionalDeltas(List additionalDeltas) { + if (additionalDeltas.isEmpty()) { + return this; + } + + // For now, we only support file-based deltas + checkArgument( + additionalDeltas.stream().allMatch(ParsedDeltaData::isFile), + "Currently, only file-based deltas are supported"); + + // Validate that the first new delta starts at version + 1 + final long firstNewVersion = additionalDeltas.get(0).getVersion(); + checkArgument( + firstNewVersion == version + 1, + "First additional delta version %d must equal current version + 1 (%d)", + firstNewVersion, + version + 1); + + // Validate that new deltas are contiguous + long expectedVersion = firstNewVersion; + for (ParsedDeltaData delta : additionalDeltas) { + checkArgument( + delta.getVersion() == expectedVersion, + "Delta versions must be contiguous. Expected %d but got %d", + expectedVersion, + delta.getVersion()); + expectedVersion++; + } + + // Convert ParsedDeltaData to FileStatus to comply with existing LogSegment constructor + final List newDeltaFileStatuses = + additionalDeltas.stream().map(ParsedDeltaData::getFileStatus).collect(Collectors.toList()); + + // Create combined deltas list + final List combinedDeltas = new ArrayList<>(deltas); + combinedDeltas.addAll(newDeltaFileStatuses); + + // The new version is the version of the last additional delta + final ParsedDeltaData lastAdditionalDelta = ListUtils.getLast(additionalDeltas); + + return new LogSegment( + logPath, + lastAdditionalDelta.getVersion(), + combinedDeltas, + compactions, // Keep existing compactions + checkpoints, // Keep existing checkpoints + lastAdditionalDelta.getFileStatus(), + lastSeenChecksum); + } + @Override public String toString() { return String.format( diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala index 0290ae0190c..48cacda22dc 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala @@ -20,18 +20,21 @@ import java.util.{Collections, List => JList, Optional} import scala.collection.JavaConverters._ +import io.delta.kernel.internal.files.ParsedDeltaData import io.delta.kernel.internal.fs.Path -import io.delta.kernel.test.MockFileSystemClientUtils +import io.delta.kernel.test.{MockFileSystemClientUtils, VectorTestUtils} import io.delta.kernel.utils.FileStatus import org.scalatest.funsuite.AnyFunSuite -class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { +class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils with VectorTestUtils { private val checkpointFs10List = singularCheckpointFileStatuses(Seq(10)).toList.asJava private val checksumAtVersion10 = checksumFileStatus(10) private val deltaFs11List = deltaFileStatuses(Seq(11)).toList.asJava private val deltaFs12List = deltaFileStatuses(Seq(12)).toList.asJava private val deltasFs11To12List = deltaFileStatuses(Seq(11, 12)).toList.asJava + private val parsedRatifiedCommits11To12List = + Seq(11, 12).map(v => ParsedDeltaData.forFileStatus(stagedCommitFile(v))).asJava private val compactionFs11To12List = compactedFileStatuses(Seq((11, 12))).toList.asJava private val badJsonsList = Collections.singletonList( FileStatus.of(s"${logPath.toString}/gibberish.json", 1, 1)) @@ -442,4 +445,112 @@ class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { s"doesn't belong in the transaction log at $tablePath")) } + //////////////////////////////////// + // copyWithAdditionalDeltas tests // + //////////////////////////////// + + test("copyWithAdditionalDeltas: single additional delta") { + val baseSegment = createLogSegmentForTest( + version = 10, + checkpoints = checkpointFs10List) + + val additionalDeltas = List(ParsedDeltaData.forFileStatus(stagedCommitFile(11))).asJava + val updated = baseSegment.copyWithAdditionalDeltas(additionalDeltas) + + assert(updated.getVersion === 11) + assert(updated.getDeltas.size() === 1) + } + + test("copyWithAdditionalDeltas: multiple additional deltas") { + val baseSegment = createLogSegmentForTest( + version = 10, + checkpoints = checkpointFs10List) + + val updated = baseSegment.copyWithAdditionalDeltas(parsedRatifiedCommits11To12List) + + assert(updated.getVersion === 12) + assert(updated.getDeltas.size() === 2) + } + + test("copyWithAdditionalDeltas: empty list returns same segment") { + val baseSegment = createLogSegmentForTest( + version = 10, + checkpoints = checkpointFs10List) + + val updated = baseSegment.copyWithAdditionalDeltas(Collections.emptyList()) + assert(updated eq baseSegment) + } + + test("copyWithAdditionalDeltas: first delta must be version + 1") { + val baseSegment = createLogSegmentForTest( + version = 10, + checkpoints = checkpointFs10List) + + val wrongVersionDeltas = List(ParsedDeltaData.forFileStatus(stagedCommitFile(12))).asJava + val exMsg = intercept[IllegalArgumentException] { + baseSegment.copyWithAdditionalDeltas(wrongVersionDeltas) + }.getMessage + assert(exMsg.contains("First additional delta version 12 must equal current version + 1 (11)")) + } + + test("copyWithAdditionalDeltas: deltas must be contiguous") { + val baseSegment = createLogSegmentForTest( + version = 10, + checkpoints = checkpointFs10List) + + val nonContiguousDeltas = List( + ParsedDeltaData.forFileStatus(stagedCommitFile(11)), + ParsedDeltaData.forFileStatus(stagedCommitFile(13))).asJava + val exMsg = intercept[IllegalArgumentException] { + baseSegment.copyWithAdditionalDeltas(nonContiguousDeltas) + }.getMessage + assert(exMsg.contains("Delta versions must be contiguous. Expected 12 but got 13")) + } + + test("copyWithAdditionalDeltas: inline delta fails") { + val baseSegment = createLogSegmentForTest( + version = 10, + checkpoints = checkpointFs10List) + + // Create inline ParsedDeltaData (not file-based) + val inlineDelta = ParsedDeltaData.forInlineData(11, emptyColumnarBatch) + val inlineDeltas = List(inlineDelta).asJava + + val exMsg = intercept[IllegalArgumentException] { + baseSegment.copyWithAdditionalDeltas(inlineDeltas) + }.getMessage + assert(exMsg.contains("Currently, only file-based deltas are supported")) + } + + /////////////////////////// + // fromSingleDelta tests // + /////////////////////////// + + test("fromSingleDelta -- creates valid LogSegment") { + val deltaData = ParsedDeltaData.forFileStatus(deltaFileStatus(0)) + val logSegment = LogSegment.fromSingleDelta(logPath, deltaData) + + assert(logSegment.getVersion === 0) + assert(logSegment.getDeltas.size() === 1) + assert(logSegment.getCheckpoints.isEmpty) + assert(logSegment.getCompactions.isEmpty) + assert(logSegment.getLastSeenChecksum === Optional.empty()) + } + + test("fromSingleDelta -- non-zero version fails") { + val deltaData = ParsedDeltaData.forFileStatus(deltaFileStatus(1)) + val exMsg = intercept[IllegalArgumentException] { + LogSegment.fromSingleDelta(logPath, deltaData) + }.getMessage + assert(exMsg.contains("Version must be 0 for a LogSegment with only a single delta")) + } + + test("fromSingleDelta -- inline delta fails") { + val inlineDelta = ParsedDeltaData.forInlineData(0, emptyColumnarBatch) + val exMsg = intercept[IllegalArgumentException] { + LogSegment.fromSingleDelta(logPath, inlineDelta) + }.getMessage + assert(exMsg.contains("Currently, only file-based deltas are supported")) + } + } From 4b5dd1cec83f66a4d7ae70461d9ad479b3263944 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 30 Sep 2025 18:31:01 +0000 Subject: [PATCH 2/2] WIP --- .../delta/kernel/internal/SnapshotImpl.java | 118 +++++++++++++++++- .../internal/snapshot/SnapshotManager.java | 2 +- .../internal/table/SnapshotFactory.java | 2 +- 3 files changed, 118 insertions(+), 4 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 1d8ebf29aad..5af8ce72d22 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -33,6 +33,7 @@ import io.delta.kernel.internal.annotation.VisibleForTesting; import io.delta.kernel.internal.checksum.CRCInfo; import io.delta.kernel.internal.clustering.ClusteringMetadataDomain; +import io.delta.kernel.internal.files.ParsedDeltaData; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.metrics.SnapshotQueryContext; @@ -51,6 +52,117 @@ /** Implementation of {@link Snapshot}. */ public class SnapshotImpl implements Snapshot { + + ////////////////////////////////////////// + // Static factory methods and constants // + ////////////////////////////////////////// + + /** + * Creates a post-commit Snapshot after a transaction. + * + * @param engine The engine to use for operations + * @param dataPath The path to the table + * @param previousSnapshot Optional previous snapshot (empty for CREATE transactions) + * @param newlyCommittedDeltaFiles XXXX + * @param committer The committer for the snapshot + * @param txnEffectiveProtocol The effective protocol after the transaction + * @param txnEffectiveMetadata The effective metadata after the transaction + * @param txnInCommitTimestampOpt The in-commit timestamp from the transaction (if available) + * @return A new post-commit Snapshot + */ + public static SnapshotImpl createPostCommitSnapshot( + Engine engine, + Path dataPath, + Optional previousSnapshot, + List newlyCommittedDeltaFiles, + Committer committer, + Protocol txnEffectiveProtocol, + Metadata txnEffectiveMetadata, + Optional txnInCommitTimestampOpt) { + // TODO: plumb through CRCInfo + + // TODO: Create SnapshotQueryContext.forPostCommitSnapshot + final SnapshotQueryContext snapshotContext = + SnapshotQueryContext.forLatestSnapshot(dataPath.toString()); + + final LogSegment logSegment = + createLogSegmentForPostCommit(dataPath, previousSnapshot, newlyCommittedDeltaFiles); + + // TODO: We should make post-commit Snapshots be fully incremental. That is, we replay state + // using any previously-computed state from the previous Snapshot / LogReplay. + final LogReplay logReplay = + new LogReplay( + dataPath, + engine, + new Lazy<>(() -> logSegment), + Optional.empty(), // snapshotHint + snapshotContext.getSnapshotMetrics()); + + return new SnapshotImpl( + dataPath, + logSegment.getVersion(), + new Lazy<>(() -> logSegment), + logReplay, + txnEffectiveProtocol, + txnEffectiveMetadata, + committer, + snapshotContext, + txnInCommitTimestampOpt); + } + + /** Creates an initial Snapshot for a table at a specific version. */ + public static SnapshotImpl createInitialSnapshot( + Path dataPath, + long version, + Lazy lazyLogSegment, + LogReplay logReplay, + Protocol protocol, + Metadata metadata, + Committer committer, + SnapshotQueryContext snapshotContext) { + return new SnapshotImpl( + dataPath, + version, + lazyLogSegment, + logReplay, + protocol, + metadata, + committer, + snapshotContext, + Optional.empty()); // postCommitInCommitTimestampOpt + } + + private static LogSegment createLogSegmentForPostCommit( + Path dataPath, + Optional previousSnapshot, + List newlyCommittedDeltaFiles) { + if (previousSnapshot.isPresent()) { + // UPDATE/REPLACE case: extend existing LogSegment + return previousSnapshot + .get() + .getLogSegment() + .copyWithAdditionalDeltas(newlyCommittedDeltaFiles); + } else { + // CREATE case: single delta at version 0 + checkArgument( + newlyCommittedDeltaFiles.size() == 1, + "CREATE transaction must have exactly one delta, got %d", + newlyCommittedDeltaFiles.size()); + final ParsedDeltaData firstDelta = newlyCommittedDeltaFiles.get(0); + + checkArgument( + firstDelta.getVersion() == 0, + "CREATE transaction requires version 0, got %s", + firstDelta.getVersion()); + + return LogSegment.fromSingleDelta(new Path(dataPath, "_delta_log"), firstDelta); + } + } + + ////////////////////////////////// + // Member methods and variables // + ////////////////////////////////// + private final Path logPath; private final Path dataPath; private final long version; @@ -71,7 +183,8 @@ public SnapshotImpl( Protocol protocol, Metadata metadata, Committer committer, - SnapshotQueryContext snapshotContext) { + SnapshotQueryContext snapshotContext, + Optional postCommitInCommitTimestampOpt) { checkArgument(version >= 0, "A snapshot cannot have version < 0"); this.logPath = new Path(dataPath, "_delta_log"); this.dataPath = dataPath; @@ -81,7 +194,7 @@ public SnapshotImpl( this.protocol = requireNonNull(protocol); this.metadata = requireNonNull(metadata); this.committer = committer; - this.inCommitTimestampOpt = Optional.empty(); + this.inCommitTimestampOpt = postCommitInCommitTimestampOpt; // We create the actual Snapshot report lazily (on first access) instead of eagerly in this // constructor because some Snapshot metrics, like {@link @@ -265,4 +378,5 @@ public CreateCheckpointIterator getCreateCheckpointIterator(Engine engine) { public Optional getLatestTransactionVersion(Engine engine, String applicationId) { return logReplay.getLatestTransactionIdentifier(engine, applicationId); } + } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 8281ab831d6..1a18a2ff637 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -192,7 +192,7 @@ private SnapshotImpl createSnapshot( snapshotContext.getSnapshotMetrics()); final SnapshotImpl snapshot = - new SnapshotImpl( + SnapshotImpl.createInitialSnapshot( tablePath, initSegment.getVersion(), new Lazy<>(() -> initSegment), diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java index 71dd74be3d4..a144e9c6936 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/table/SnapshotFactory.java @@ -137,7 +137,7 @@ private SnapshotImpl createSnapshot(Engine engine, SnapshotQueryContext snapshot final Lazy lazyLogSegment = getLazyLogSegment(engine, snapshotCtx, versionToLoad); final LogReplay logReplay = getLogReplay(engine, lazyLogSegment, snapshotCtx); - return new SnapshotImpl( + return SnapshotImpl.createInitialSnapshot( tablePath, versionToLoad.orElseGet(() -> lazyLogSegment.get().getVersion()), lazyLogSegment,