-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Update the Kernel read with coordinated commit support #3381
Conversation
272f6e3
to
181a900
Compare
4549bf5
to
03ade37
Compare
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
...lts/src/main/java/io/delta/kernel/defaults/engine/DefaultCommitCoordinatorClientHandler.java
Outdated
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Outdated
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Outdated
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
List<Commit> unbackfilledCommits = commitCoordinatorClientHandlerOpt | ||
.map(commitCoordinatorClientHandler -> commitCoordinatorClientHandler | ||
.getCommits( | ||
logPath.toString(), | ||
coordinatedCommitsTableConf, | ||
startVersion, | ||
versionToLoad.orElse(null)) | ||
.getCommits()) | ||
.orElse(Collections.emptyList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you handle errors here?
For example the commit coordinator fails due to an intermittent issue or misconfiguration, do you fail the query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can you move this a separate method add long some log statements before making a request to commit coordinator. You can actually follow the log statements added in Delta-Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have checked exception for the getCommits
API. But I've added some log statements for it.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check if we need to make any changes for the snapshot protocol and metadata optimization to work in this new model of snapshot loading.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
...api/src/main/java/io/delta/kernel/internal/snapshot/TableCommitCoordinatorClientHandler.java
Show resolved
Hide resolved
public TableCommitCoordinatorClientHandler( | ||
CommitCoordinatorClientHandler commitCoordinatorClientHandler, | ||
String logPath, | ||
Map<String, String> tableConf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dhruvarya-db @prakharjain09 Why do we need to pass the tableConf
to each API on the commit coordinator? Isn't it used when creating the commit coordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti Commit Coordinator is supposed to be reuseable across multiple tables (like JSONHandler). tableConf
can be thought of as a tablePath
/tableId
which is used to uniquely identify the table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for working on this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working through the comments. One last pending issue the regression of timetravel for non-coordinated commits enabled tables.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
80e6690
to
533e438
Compare
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
Show resolved
Hide resolved
kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Outdated
Show resolved
Hide resolved
...est/scala/io/delta/kernel/defaults/internal/coordinatedcommits/CoordinatedCommitsSuite.scala
Outdated
Show resolved
Hide resolved
* This can be optimized by making snapshot hint optimization to work with coordinated commits. | ||
* @see <a href="https://github.com/delta-io/delta/issues/3437">issue #3437</a>. | ||
*/ | ||
private SnapshotImpl getCoordinatedCommitsAwareSnapshot( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For coordinated commit table then do we list the log dir twice? Can we not reuse the log segment we've already built? If this is because there may have been changes I'm wondering if it's possible (maybe to much additional complexity) to list from the last file we've seen and reuse existing segment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must list the log dir twice otherwise there might be gap because of this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the logic should be unbackfilled commit list -> deltalog list -> filter out duplicated unbackfilled commits using the results from deltalog list otherwise there might be gap or duplicated commits.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding this feature to Delta Kernel!
public static <T> T last(List<T> list) { | ||
return list.get(list.size() - 1); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it actually used? If not please remove it with a followup PR. We could remove in this PR itself, but will take another 4hr+ for the test to finish.
…a-io#3381) ## Description Update the Kernel read with coordinated commit support and prepare for Kernel coordinated commit write support. ## How was this patch tested? Unit tests
Which Delta project/connector is this regarding?
Description
Update the Kernel read with coordinated commit support and prepare for kernel coordinated commit write support.
How was this patch tested?
Unit tests
Does this PR introduce any user-facing changes?
Yes. Users can enable coordinated commit read support by configuring its properties.