-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Update ConflictChecker to perform conflict resolution of ICT #3283
Conversation
This PR is based on #3282 merge it first. |
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
675f778
to
6975234
Compare
a26d3b0
to
2d30ab8
Compare
2d30ab8
to
efd5afb
Compare
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
if (i == actionBatchList.size() - 1) { | ||
CommitInfo commitInfo = | ||
getCommitInfo(batch.getColumnVector(COMMITINFO_ORDINAL)); | ||
winningCommitInfoOpt = Optional.ofNullable(commitInfo); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the goal is to get the last commit info, why not always look for the commit info, store in a variable, and the last value of the variable is the one we need?
Also, we can't assume the commit info to be present in the last action batch. A commit file could be read could generate multiple batches and the first batch in the list contains the commit info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We couldn't always store commit info in a variable and use the last value of the variable because this scenario mentioned by @dhruvarya-db. But you are right, it's not necessarily in the last action batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to check if a batch is generated from the last commit file by checking its version from ActionWrapper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed the solution by checking version from ActionWrapper.
long lastWinningVersion = getLastWinningTxnVersion(winningCommits); | ||
return new TransactionRebaseState( | ||
lastWinningVersion, | ||
getLastCommitTimestamp( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to fetch the ICT again? aren't we reading already above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just read the CommitInfo above but not the ICT? We still need to extract the ICT in CommitInfo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vkorukanti From what I understand, this will not perform an additional IO. This simply extracts the timestamp from the CommitInfo
action (or the file modification timestamp if ICT is not enabled.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for clarifying.
efd5afb
to
2748b63
Compare
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
return new TransactionRebaseState( | ||
lastWinningVersion, | ||
getLastCommitTimestamp( | ||
engine, lastWinningVersion, winningCommits, winningCommitInfoOpt.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to check that winningCommitInfoOpt
is not empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This get is for the atomic reference but not for the optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still, how do verify it is actually set? do you want to start with null
and then verify it is not null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It starts with optional.empty
. And in the getLastCommitTimestamp function it will check if it's ict enabled. And if it's ict enabled and the winningCommitInfoOpt is empty, the CommitInfo.getRequiredInCommitTimestamp
will raise an error. It has the same logic with Delta Spark.
@@ -88,10 +92,13 @@ public static TransactionRebaseState resolveConflicts( | |||
|
|||
public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentWriteException { | |||
List<FileStatus> winningCommits = getWinningCommitFiles(engine); | |||
AtomicReference<Optional<CommitInfo>> winningCommitInfoOpt = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this a atmoic reference? Why can't we just use the Optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of this error:
#3283 (comment)
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
winningCommitTimestamp = CommitInfo.getRequiredInCommitTimestamp( | ||
winningCommitInfoOpt, | ||
String.valueOf(lastWinningVersion), | ||
snapshot.getDataPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need the datapath here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the error message print which is consistent with Delta-Spark.
} else { | ||
winningCommitTimestamp = CommitInfo.getRequiredInCommitTimestamp( | ||
winningCommitInfoOpt, | ||
String.valueOf(lastWinningVersion), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why convert to string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for the error message print which is consistent with Delta-Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than passing these as string, may be just prepare a string which contains the lastWinningVersion
and tablePath
and pass it as a context to getRequiredInCommitTimestamp
?
CommitInfo.getRequiredInCommitTimestamp(winningCommitInfoOpt, String.format("error...", ...))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getRequiredInCommitTimestamp
has some logic to check if winningCommitInfoOpt
is empty and if it contains ict and raises different errors accordingly. So I think it's better to leave this function handle with the error messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for working this.
Last few comments. Once addressed, this PR is good to go.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.fromMetadata(ver2Snapshot.getMetadata) | ||
assert(observedEnablementTimestamp.get == ver1Snapshot.getTimestamp(engine) + 1) | ||
assert( | ||
observedEnablementTimestamp.get == getInCommitTimestamp(engine, table, version = 2).get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can remove .get
from both sides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are different actually. Left is Optional[Long] and right is Option[Long].
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala
Show resolved
Hide resolved
} else { | ||
winningCommitTimestamp = CommitInfo.getRequiredInCommitTimestamp( | ||
winningCommitInfoOpt, | ||
String.valueOf(lastWinningVersion), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than passing these as string, may be just prepare a string which contains the lastWinningVersion
and tablePath
and pass it as a context to getRequiredInCommitTimestamp
?
CommitInfo.getRequiredInCommitTimestamp(winningCommitInfoOpt, String.format("error...", ...))
return new TransactionRebaseState( | ||
lastWinningVersion, | ||
getLastCommitTimestamp( | ||
engine, lastWinningVersion, winningCommits, winningCommitInfoOpt.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still, how do verify it is actually set? do you want to start with null
and then verify it is not null?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
long lastWinningVersion = getLastWinningTxnVersion(winningCommits); | ||
return new TransactionRebaseState( | ||
lastWinningVersion, | ||
getLastCommitTimestamp( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for clarifying.
Which Delta project/connector is this regarding?
Description
Update ConflictChecker to perform conflict resolution of inCommitTimestamp and complete the inCommitTimestamp support in Kernel.
How was this patch tested?
Add unit tests to verify the conflict resolution of timestamps and enablement version.
Does this PR introduce any user-facing changes?
Yes, user can enable monotonic inCommitTimestamp by enabling its property.