[HUDI-7235] Fix checkpoint bug for S3/GCS Incremental Source#10336
[HUDI-7235] Fix checkpoint bug for S3/GCS Incremental Source#10336bvaradar merged 8 commits intoapache:masterfrom
Conversation
bvaradar
left a comment
There was a problem hiding this comment.
@vinishjail97 : Very good catch. Few questions and nit comments. Please take a look at the failing tests including the validate-PR
| if (!checkPointAndDataset.getRight().isPresent()) { | ||
| LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant()); | ||
| return Pair.of(Option.empty(), queryInfo.getEndInstant()); | ||
| LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft().toString()); |
There was a problem hiding this comment.
nit: .toString() is not needed in log message
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
Outdated
Show resolved
Hide resolved
| if (sourceData.isEmpty()) { | ||
| return Pair.of(cloudObjectIncrCheckpoint, Option.empty()); | ||
| // There is no file matching the prefix. | ||
| return Pair.of(new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null), Option.empty()); |
There was a problem hiding this comment.
Why can't we use the cloudObjectIncrCheckpoint that is being passed to this function ?
There was a problem hiding this comment.
Fixed the code to pick the right checkpoint
|
@vinishjail97 : Can you address these comments and land it. |
7f685a6 to
de49a9d
Compare
Updated the diff after addressing review comments. |
0656fa6 to
1bb5e22
Compare
1bb5e22 to
5edeb0f
Compare
bvaradar
left a comment
There was a problem hiding this comment.
I have fixed the issue in additional place when there is no file matching the prefix.
Change Logs
Fix bug in checkpointing logic for S3/GCS in empty dataset use-case. The reason for the bug was following.
1st delta commit's checkpoint, processed 3 files.
2nd delta commit was an empty one and the checkpoint returned was 20231206150423946 which is not a valid checkpoint progression because it should either be equal or increase monotonically (based on lexicographical order)
As the previous commits' checkpoint was a faulty one, the 3rd commit read the same set of files again and wrote duplicate data.
Impact
Bug Fix
Risk level (write none, low medium or high below)
Medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
None, this is a bug fix for an existing feature.
Contributor's checklist