-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30900][SS] FileStreamSource: Avoid reading compact metadata log twice if the query restarts from compact batch #27649
Conversation
…g twice if the query restarts from compact batch
@@ -122,8 +123,35 @@ class FileStreamSourceLog( | |||
} | |||
batches | |||
} | |||
|
|||
def restore(): Array[FileEntry] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To not touch existing semantic of allFiles()
, I simply add a new method to cover the new semantic. I'll just override allFiles()
if it's preferred.
Test build #118720 has finished for PR 27649 at commit
|
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala Lines 92 to 95 in fc4e56a
We're describing reading from latest compacted log file as "quite time-consuming", and this patch addresses another case of reading from latest compacted log file, which would bring actual benefit. |
val allFiles = metadata.allFiles() | ||
|
||
// batch 4 is a compact batch which logs would be cached in fileEntryCache | ||
fileEntryCache.containsKey(4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need some kind of assertion here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol I have no idea why I missed assert here. Nice finding. Will add.
Test build #120409 has finished for PR 27649 at commit
|
val allFiles = metadata.allFiles() | ||
|
||
// batch 4 is a compact batch which logs would be cached in fileEntryCache | ||
assert(fileEntryCache.containsKey(4L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to test that only 4 added:
(0 to 3).foreach { batchId =>
assert(!fileEntryCache.containsKey(batchId))
}
|
||
// restore() will restore the logs for the latest compact batch into file entry cache | ||
assert(metadata2.restore() === allFiles) | ||
assert(fileEntryCache2.containsKey(4L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here.
files.lastOption.foreach { lastEntry => | ||
val latestBatchId = lastEntry.batchId | ||
val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0) | ||
if (latestCompactedBatchId > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it's not working when one set spark.sql.streaming.fileSource.log.compactInterval
to 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's just to prune the case where it may not help much, but yeah let's make it simple. It won't hurt in either way.
(5 to 5 + FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId => | ||
metadata2.add(batchId, createEntries(batchId, 100)) | ||
} | ||
val allFiles2 = metadata2.allFiles() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be inlined, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand about "inline" here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I've seen this val only used in one place.
// if the latest batch is too far from latest compact batch, because it's unlikely Spark | ||
// will request the batch for the start point. | ||
assert(metadata2.restore() === allFiles2) | ||
assert(!fileEntryCache3.containsKey(4L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here.
val latestBatchId = lastEntry.batchId | ||
val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0) | ||
if (latestCompactedBatchId > 0 && | ||
(latestBatchId - latestCompactedBatchId) < PREV_NUM_BATCHES_TO_READ_IN_RESTORE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a comment would be good why this heuristic is useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought I forgot to explain, but looks like I explained already:
// It doesn't know about offset / commit metadata in checkpoint so doesn't know which exactly
// batch to start from, but in practice, only couple of latest batches are candidates to
// be started. We leverage the fact to skip calculation if possible.
only couple of latest batches
is the threshold - I heuristically took 2 here.
Thanks for reviewing. I guess I addressed review comments, please take a look again. Thanks! |
Test build #120466 has finished for PR 27649 at commit
|
Seems unrelated. |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Test build #120478 has finished for PR 27649 at commit
|
Test build #120480 has finished for PR 27649 at commit
|
retest this, please |
Test build #121230 has finished for PR 27649 at commit
|
retest this, please |
Test build #122130 has finished for PR 27649 at commit
|
retest this, please |
Test build #122151 has finished for PR 27649 at commit
|
retest this, please |
Test build #123142 has finished for PR 27649 at commit
|
retest this, please |
Test build #123210 has finished for PR 27649 at commit
|
Test build #123220 has finished for PR 27649 at commit
|
retest this, please |
Test build #123988 has finished for PR 27649 at commit
|
retest this, please |
Test build #124481 has finished for PR 27649 at commit
|
retest this, please |
Test build #125727 has finished for PR 27649 at commit
|
retest this, please |
Test build #127535 has finished for PR 27649 at commit
|
retest this, please |
Test build #127550 has finished for PR 27649 at commit
|
retest this, please |
Test build #128676 has finished for PR 27649 at commit
|
cc. @viirya @xuanyuanking as well to expand the possibility of reviews. |
retest this, please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #130221 has finished for PR 27649 at commit
|
retest this, please |
cc. @tdas @zsxwing @gaborgsomogyi @viirya @xuanyuanking Just a final reminder. I'll merge this in early next week if there's no further comments, according to the feedback from dev@ mailing list. |
Test build #131891 has finished for PR 27649 at commit
|
There's no feedback so far. I'll merge once the Jenkins is happy with the new build. |
retest this, please |
I see Jenkins migration is happening. I'll kick the Github Action instead. |
GA passed. Merging to master. |
What changes were proposed in this pull request?
This patch addresses the case where compact metadata file is read twice in FileStreamSource during restarting query.
When restarting the query, there is a case which the query starts from compaction batch, and the batch has source metadata file to read. One case is that the previous query succeeded to read from inputs, but not finalized the batch for various reasons.
The patch finds the latest compaction batch when restoring from metadata log, and put entries for the batch into the file entry cache which would avoid reading compact batch file twice.
FileStreamSourceLog doesn't know about offset / commit metadata in checkpoint so doesn't know which exactly batch to start from, but in practice, only couple of latest batches are candidates to
be started from when restarting query. This patch leverages the fact to skip calculation if possible.
Why are the changes needed?
Spark incurs unnecessary cost on reading the compact metadata file twice on some case, which may not be ignorable when the query has been processed huge number of files so far.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UT.