Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Add kernel support for v2 checkpoints #2826

Merged
merged 76 commits into from
Apr 25, 2024

Conversation

chirag-s-db
Copy link
Contributor

@chirag-s-db chirag-s-db commented Mar 29, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Add support for V2 checkpoints. When reconstructing the LogSegment of a table at a given version, check if the checkpoint file to be read is a checkpoint manifest. If it is, include the sidecar files referenced by that manifest in the LogSegment checkpoint files. See #2232

How was this patch tested?

See changes to LogReplaySuite, SnapshotManagementSuite, CheckpointerSuite, FileNamesSuite, and CheckpointInstanceSuite.

Does this PR introduce any user-facing changes?

No.

@chirag-s-db chirag-s-db changed the title [WIP][Kernel] Add kernel support for v2 checkpoints. [WIP][Kernel] Add kernel support for v2 checkpoints Mar 29, 2024

// invalid checkpoints
intercept[RuntimeException] {
new CheckpointInstance(
new Path(FAKE_DELTA_LOG_PATH,
"00000000000000000010.checkpoint.0000000002.parquet").toString)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Had to change to make in valid - previously was valid V2 checkpoint path

@chirag-s-db chirag-s-db changed the title [WIP][Kernel] Add kernel support for v2 checkpoints [Kernel] Add kernel support for v2 checkpoints Apr 1, 2024
@@ -598,6 +629,22 @@ protected Optional<LogSegment> getLogSegmentForVersion(
);
throw new IllegalStateException(msg);
}

// Reading sidecars only applies for v2 checkpoints.
if (newCheckpoint.format == CheckpointInstance.CheckpointFormat.V2) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: For a V2 checkpoint instance, do not include the compatibility file in the checkpoints in the LogSegment. Only include checkpoint manifest + sidecar files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can potentially lead to incorrect results being served to the user:

  1. Table A only has one checkpoint which happens to be a compatibility V2 checkpoint. Because of https://github.com/delta-io/delta/pull/2826/files#diff-12397663510cec22feeea64e8e3f0ddee114de86025e00495b02ab89cc3d2d01R69, newCheckpoint.format will be single_checkpoint.
  2. Since we only read sidecars if format == v2, no sidecars will be read for this checkpoint and only the non-file actions will be read.
    This could result in the construction of a snapshot which looks legal but is missing references to many AddFiles that are present in those sidecars.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed - now, we read the highest version checkpoint, preferring V2 over V1 checkpoints if multiple exist at the same version.


if (pathParts.length == 3 && pathParts[1].equals("checkpoint") &&
pathParts[2].equals("parquet")) {
// Classic checkpoint 00000000000000000010.checkpoint.parquet
this.version = Long.parseLong(pathParts[0]);
this.numParts = Optional.empty();
this.format = CheckpointFormat.SINGLE_PART;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With V2 Checkpoints, a checkpoint that appears to be a single_part checkpoint can actually be a V2 checkpoint. The only way to be sure about whether the checkpoint is V2 or classic is to read the checkpoint and look for the CheckpointManifest option action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, fixed.

Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this!

few questions:

  1. How does the ActionsIterator.java handle the v2 checkpoint (of json format) or sidecar files when reading the actions?
  2. Can we add some integration tests that test see end-2-end? You can look at DeltaTableReadsSuite for examples.
  3. Do we need to read the sidecar files list within a checkpoint file while creating the LogSegment? Can this be delayed until we start reading the delta actions for table state construction?
  4. Small doc explaining the design and any decision will be helpful in understanding.

}
return Optional.of(files);
} catch (Exception e) {
logger.warn("Failed to load sidecars from file {}", checkpointPath, e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the logger.warn(new ParametrizedMessage(..., checkpointPath), e)

// Either V2 Parquet manifest or V1 Parquet single checkpoint.
actionItr = tableClient.getParquetHandler().readParquetFiles(
singletonCloseableIterator(checkpointFile),
SidecarFile.READ_SCHEMA,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qn: is the read schema SidecardFile.READ_SCHEMA valid? From my understanding looking at the spec, the checkpoint parquet file will have top-level columns add, remove, sidecar etc. The read schema should be sidecard: struct (path: String, sizeInBytes: Long, modTime: Long)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 635 to 647
if (newCheckpoint.format.usesSidecars()) {
final List<FileStatus> sidecarFiles = listSidecars(tableClient);
// Only read the checkpoint for sidecars if any sidecars exist in the table.
if (!sidecarFiles.isEmpty()) {
// Safe to call .get() here, as we know the CheckpointInstance was initialized
// with a valid filepath.
final Set<Path> referencedSidecars = new HashSet<>(
newCheckpoint.getReferencedSidecars(tableClient, logPath).get());
newCheckpointFileList.addAll(sidecarFiles.stream()
.filter(f -> referencedSidecars.contains(new Path(f.getPath())))
.collect(Collectors.toList()));
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delay the loading of the sidecard files until we start reading?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so - will update.

Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Few minor changes.

@prakharjain09 @dhruvarya-db Please take a look at it.

Comment on lines 187 to 190
if (format == CheckpointFormat.V2) {
return Objects.hash(version, numParts, format, filePath);
}
return Objects.hash(version, numParts, format, "");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other comment.

).foreach(ci => assert(ci.compareTo(CheckpointInstance.MAX_VALUE) < 0))
}

test("checkpoint instance equality") {
val single = new CheckpointInstance("01.checkpoint.parquet")
val multipartPart1 = new CheckpointInstance("01.checkpoint.01.02.parquet")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delta spec mandates the versions to be 20 length padded. So these are invalid names.

I think delta-spark also doesn't enforce this right now and this should be fixed in both places. Should not block this PR though.

}
}

test("v2 checkpoint support with multiple sidecars") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also test empty sidecars?

}
}

test("compatibility checkpoint with sidecar files") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also have a test case where both checkpoints - v2 and compat are present and assert that v2 is picked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is covered in SnapshotManagerSuite

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add here also. From the SnapshotImpl you can get the LogSegment and verify it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chirag-s-db Is this handled?


// Validate snapshot and data.
validateSnapshot(path.toString, DeltaLog.forTable(spark, path.toString).update())
checkTable(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also assert that the right checkpoint is being used for the snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be covered in SnapshotManagerSuite - this suite is end-to-end just to cover the overall read process.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes its already covered in UTs. It will be good to assert that the tests are exactly testing the scenario that they were supposed to. Helps with avoiding and configuration related issues e.g. this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, this is now in the test as well.

Comment on lines 188 to 190
spark.sql("INSERT INTO tbl VALUES (1, 'a'), (2, 'b')")
spark.sql("INSERT INTO tbl VALUES (3, 'c'), (4, 'd')")
spark.sql("INSERT INTO tbl VALUES (5, 'e'), (6, 'f')")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we insert bit more data, may be 100 rows using Seq.range. Set the parquet/json handler batch size to less than 100, so that we can multiple batches from the ParquetHandler when reading checkpoint manifest or side car files. This exercises the iterator changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

}
}

test("compatibility checkpoint with sidecar files") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chirag-s-db Is this handled?

Comment on lines +191 to +193
// For V2 checkpoints, the filepath is included in the hash of the instance (as we consider
// different UUID checkpoints to be different checkpoint instances. Otherwise, ignore
// the filepath (which is empty) when hashing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, has been updated for only V2 containing filepath.

Comment on lines 48 to 51
public static StructType READ_SCHEMA = new StructType()
.add("path", StringType.STRING, false /* nullable */)
.add("sizeInBytes", LongType.LONG, false /* nullable */)
.add("modificationTime", LongType.LONG, false /* nullable */);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the public static fields to the top.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -224,7 +221,7 @@ class CheckpointerSuite extends AnyFunSuite
}
}

object CheckpointerSuite {
object CheckpointerSuite extends MockTableClientUtils with VectorTestUtils {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why extend MockTableClientUtils? VectorTestUtils makes sense as it contains the stringVector etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 103 to 104
"delta.kernel.default.parquet.reader.batch-size" -> "10",
"delta.kernel.default.json.reader.batch-size" -> "10",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting configs like this works for Kernel?

I thought we need to create a custom table client with Configuration containing these. And pass that custom table client to checkTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines +73 to +74
val expectedV2CkptToRead =
ckptVersionExpected.getOrElse(snapshotFromSpark.version - (snapshotFromSpark.version % 2))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this check? snapshotFromSpark.version % 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checks that it is the most recent even version.

Comment on lines 79 to 80
assert(snapshotImpl.getLogSegment.checkpoints.asScala.map(
f => new Path(f.getPath).getName.contains("-")).contains(expectV2CheckpointFormat))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use the regex added in FileNames to identify the v2 checkpoint conclusively?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a check in FileNames to identify conclusively, used CheckpointInstance.

Copy link
Collaborator

@prakharjain09 prakharjain09 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vkorukanti vkorukanti merged commit 40205ff into delta-io:master Apr 25, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants