Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel][LogReplay] Make a single read request for all checkpoint files #2701

Merged
merged 5 commits into from
Mar 12, 2024

Conversation

vkorukanti
Copy link
Collaborator

@vkorukanti vkorukanti commented Feb 29, 2024

Description

Currently, the kernel-api reads one file (either checkpoint or commit file) at a time. Once the file is fully read, then the next file is read request is issued. This makes reading large checkpoints split over multiple files slower. Instead kernel-api could issue read requests for all checkpoint files at once (in case of multi-part checkpoints) using the ParquetHandler.readParquetFiles and let the implementations of the ParquetHandler prefetch or using multiple threads to read the checkpoint parts concurrently.

This PR makes the change to kernel-api to issue one read request for all checkpoint files that need to be read for state reconstructions.

Resolves #2668
Resolves #1965

How was this patch tested?

Existing tests and a benchmark with a test only parallel parquet reader. Here are the sample benchmark results with the test only parallel Parquet reader. Score tells the average time to construct the Delta table state. parallelReaderCount indicates the number of parallel Parquet reading threads used.

Benchmark                                     (parallelReaderCount)  Mode  Cnt     Score     Error  Units
BenchmarkParallelCheckpointReading.benchmark                      0  avgt    5  1565.520 ±  20.551  ms/op
BenchmarkParallelCheckpointReading.benchmark                      1  avgt    5  1064.850 ±  19.699  ms/op
BenchmarkParallelCheckpointReading.benchmark                      2  avgt    5   785.918 ± 176.285  ms/op
BenchmarkParallelCheckpointReading.benchmark                      4  avgt    5   729.487 ±  51.470  ms/op
BenchmarkParallelCheckpointReading.benchmark                     10  avgt    5   693.757 ±  41.252  ms/op
BenchmarkParallelCheckpointReading.benchmark                     20  avgt    5   702.656 ±  19.145  ms/op

@vkorukanti vkorukanti added this to the 3.2.0 milestone Mar 4, 2024
@vkorukanti vkorukanti force-pushed the multiFileReadRequest branch 2 times, most recently from a0a53cf to 844a719 Compare March 11, 2024 18:40
@@ -378,6 +378,7 @@ class GoldenTables extends QueryTest with SharedSparkSession {

val commitInfoFile = CommitInfo(
version = Some(0L),
inCommitTimestamp = None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is broken on current master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a commit in delta-spark that updated the argument list. Our current CI setup just runs the delta-spark tests and not its dependencies. We need to revisit our CI trigger job to make sure all run tests for all dependent modules.

I can make a separate PR for this change if you would like.

@@ -149,23 +151,22 @@ private void tryEnsureNextActionsIterIsReady() {
}

/**
* Get the next file from `filesIter` (.json or .checkpoint.parquet), contextualize it
* Get the next file from `filesList` (.json or .checkpoint.parquet), contextualize it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still contextualize?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't. Updating the comment.

// We can not read multiple JSON files in parallel (like the checkpoint files),
// because each one has a different version, and we need to associate the version
// with actions read from the JSON file for further optimizations later on.

final CloseableIterator<ColumnarBatch> dataIter =
tableClient.getJsonHandler().readJsonFiles(
Copy link
Contributor

@tdas tdas Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this an optimization that we should consider (not in this PR)? that the returned columnar batches has the file status of the file from which it was read? that would allow parallel json reads.

// Read that file
// We can not read multiple JSON files in parallel (like the checkpoint files),
// because each one has a different version, and we need to associate the version
// with actions read from the JSON file for further optimizations later on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what optimizations need the version for each columnar batch?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is basically the faster metadata and protocol loading with a snapshot hint. The optimizations made for the Flink faster job start. Added in the comments.

}

// Remove the files from the list
filesList.removeAll(checkpointFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need removeAll when you are already doing filesList.pop()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed.

* $ build/sbt
* sbt:delta> project kernelDefaults
* sbt:delta> set fork in run := true
* sbt:delta> test:runMain io.delta.kernel.defaults.benchmarks.BenchmarkParallelCheckpointReading
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the results you got here. so that its in the code rather than in the github PR which will be harder to find when browsing code.

add some specs of your machine as well

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just one comment.

Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments LGTM

FileStatus checkpointFile,
long version) {

// Filter out all the files that are not part of the same checkpoint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the comment. It should have been: // Find the contiguous parquet files that are part of the same checkpoint


FileStatus peek = filesList.peek();
while (peek != null &&
peek.getPath().endsWith(".parquet") &&
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use FileNames.isCheckpointFile here instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated here and the existing code.

ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient);
Scan scan = scanBuilder.build();

Row row = scan.getScanState(tableClient);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine if it's to mimic the real case but could you add a comment explaining what we're doing in this block/the intention?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to mimic the real case. Added comments.

@vkorukanti vkorukanti merged commit d0477bb into delta-io:master Mar 12, 2024
7 of 8 checks passed
@vkorukanti vkorukanti deleted the multiFileReadRequest branch May 9, 2024 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants