[HUDI-8433] Fix not update issuedOffset when stream read empty commits#12166
Conversation
| IncrementalInputSplits.Result result = | ||
| incrementalInputSplits.inputSplits(metaClient, this.issuedOffset, this.cdcEnabled); | ||
| if (result.isEmpty()) { | ||
| if (result.equals(IncrementalInputSplits.Result.EMPTY)) { |
There was a problem hiding this comment.
should we always update the issuedInstant if it is not null?
There was a problem hiding this comment.
If value of read.speed.limit is smaller than number of empty instants , result.isEmpty is true and issuedInstant is not updated.
There was a problem hiding this comment.
it is not updated but can still be used to reset the variable.
There was a problem hiding this comment.
In speed limit scenario, it can not be updated any more. I try to give an example to explain this:
assume current active timeline is instant1、instant2、instant3、instant4、instant5、instant6 ...
If the issuedInstant is instant2 and read.speed.limit=2 and instant3 and instant4 are empty commits, then the inputSplits method return Result.instance(inputSplits, endInstant, offsetToIssue).
In this return, inputSplits is Collections.emptyList() , endInstant is instant4, offsetToIssue is max(instant3 completionTime, instant4.completionTime).
So result.isEmpty() is true and monitorDirAndForwardSplits() returned, but offsetToIssue and issuedInstant are not updated. Next time when monitorDirAndForwardSplits() is called, the inputSplits method return the same as above, endless loop.
@danny0405
There was a problem hiding this comment.
I meant just to use instant4.completionTime to update the issuedInstant in this case.
There was a problem hiding this comment.
This bug is easy to be reproduced. Let us take UT TestStreamReadMonitoringFunction#testConsumeForSpeedLimitWhenEmptyCommitExists() as an example:
Step1: create 4 empty commit
Step2: trigger streaming read from first instant and set READ_COMMITS_LIMIT 2
Step3: assert current IssuedOffset couldn't be null.
Base on IncrementalInputSplits#inputSplits => .startCompletionTime(issuedOffset != null ? issuedOffset : this.conf.getString(FlinkOptions.READ_START_COMMIT))
If IssuedOffset still was null, hudi would take FlinkOptions.READ_START_COMMIT again, which means streaming read is blocked.
Without this PR this UT will be failed which means streaming reading empty commits will be blocked.
There was a problem hiding this comment.
We only update the issuedOffset with non-null issuedInstant, does it make sense?
| } | ||
|
|
||
| private Integer intervalBetween2Instants(HoodieTimeline timeline, String instant1, String instant2) { | ||
| Integer idxInstant1 = getInstantIdxInTimeline(timeline, instant1); |
There was a problem hiding this comment.
Can we just filter the timeline with proper range and count the instants instead?
There was a problem hiding this comment.
not called, just remove it.
| // Step3: assert current IssuedOffset couldn't be null. | ||
| // Base on "IncrementalInputSplits#inputSplits => .startCompletionTime(issuedOffset != null ? issuedOffset : this.conf.getString(FlinkOptions.READ_START_COMMIT))" | ||
| // If IssuedOffset still was null, hudi would take FlinkOptions.READ_START_COMMIT again, which means streaming read is blocked. | ||
| assertTrue(function.getIssuedOffset() != null); |
|
|
||
| if (result.isEmpty() && StringUtils.isNullOrEmpty(result.getEndInstant())) { | ||
| // no new instants, returns early | ||
| LOG.info("result is empty, do not update issuedInstant."); |
There was a problem hiding this comment.
result -> Result,and use log.warn instead, maybe we just eliminate the log message because it's almost useless.
|
The Azure failure is not related. |
got it. thank you so much for Teacher Danny's advice and is there anything esle to revise? |
Change Logs
When empty commits are allowed , if number of continuous empty commits exceeds 'read.commits.limit' , IssuedInstant will not be updated which will block stream read.
Impact
hudi-flink-datasource
Risk level (write none, low medium or high below)
low
Documentation Update
None
Contributor's checklist