New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds DeltasharingSource #201
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments.
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/io/delta/sharing/spark/DeltaSharingSource.scala
Outdated
Show resolved
Hide resolved
endOffset.tableVersion > 0, s"invalid tableVersion in endOffset: $endOffset") | ||
// Load from snapshot `endOffset.tableVersion - 1L` if endOffset is not | ||
// startingVersion | ||
(endOffset.tableVersion - 1L, -1L, true, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question:
Why do we load the snapshot from prev version?
Do we want to use its metadata ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's anything related to the metadata. The schema of the latest table version at the time the stream query starts is used for constructing the dataframe.
I copied this logic from DeltaSource, I guess:
It's more of a product behavior decision, when getBatch
is called and startOffsetOption is not set, and endOffset is not a startingVersion, it means: 1) no startingVersion is provided, 2) the stream query is not starting from a checkpoint 3) it's the first getBatch
after the stream query started, 4) endOffset is from a previous latestOffset
call. 5) and endOffset
is NOT startingVersion. 6) and given 1) in L410, some version is the startingVersion.
Now, it needs to decide where the starting version is, then I guess it's a prod decision to use (endOffset.tableVersion - 1), or maybe we should use the version we used in L410?
@zsxwing Do you have an idea on this? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding to the comment that we do this to maintain same behavior as delta.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the comment, will merge this first and still confirm with Ryan later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hack is for the following case:
When we start a streaming query, we load the latest version X - 1. However, when latestOffset
is called, we get an offset F in version X (new data got written in between)
Now getBatch
is called with (startOffsetOption=None, end=the above offset). Since this is the first batch, we need to return the table data in version X - 1
+ the data in version X from offset 0 to offset F (Note: we should not return data after offset F). So we use (offset=-1, version = X-1, isStartingVersion=true) to load the table data in version X - 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@linzhou-db Consider adding Ryan's explanation as a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I just want to clarify and confirm:
This only happens to the first latestOffset and getBatch call of a stream query without startingVersion
, and the Offset returned by latestOffset and used as end
in getBatch can only be 1 version away from the startingVersion returned here, because it is loading snapshot at startingVersion.
I'll add the comment in another PR.
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Show resolved
Hide resolved
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Show resolved
Hide resolved
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Show resolved
Hide resolved
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala
Show resolved
Hide resolved
endOffset.tableVersion > 0, s"invalid tableVersion in endOffset: $endOffset") | ||
// Load from snapshot `endOffset.tableVersion - 1L` if endOffset is not | ||
// startingVersion | ||
(endOffset.tableVersion - 1L, -1L, true, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding to the comment that we do this to maintain same behavior as delta.
Adds DeltasharingSource, similar to DeltaSource, supports streaming addfiles,
Not in this PR, will be supported in a separate PR