Skip to content

[HUDI-9267] Fix the file group reader log file read sequence#13115

Merged
danny0405 merged 5 commits intoapache:masterfrom
danny0405:HUDI-9267
Apr 10, 2025
Merged

[HUDI-9267] Fix the file group reader log file read sequence#13115
danny0405 merged 5 commits intoapache:masterfrom
danny0405:HUDI-9267

Conversation

@danny0405
Copy link
Contributor

@danny0405 danny0405 commented Apr 9, 2025

Change Logs

Fix the file group reader log file sequence to be in asending order, so that to keep the "processing_time" merging semantics for streaming scenarios: always choose the latest incoming if the ordering val are equals.

This semantics works now for both COMMIT_TIME and EVENT_TIME merging modes after the fix.

Also fix some other issues:

  • the unnecessary copy of rows for position based merging;
  • the event time merging sequence for CUSTOM merger.
  • fix the null ordering value in DeleteBlock coming from HoodieEmptyRecord
  • fix the position based merging fallback read under COMMIT_TIME merging mode

Impact

no impact

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@danny0405 danny0405 added release-1.0.2 priority:critical Production degraded; pipelines stalled labels Apr 9, 2025
@github-project-automation github-project-automation bot moved this to 🆕 New in Hudi PR Support Apr 9, 2025
@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Apr 9, 2025
@danny0405 danny0405 added priority:blocker Production down; release blocker and removed priority:critical Production degraded; pipelines stalled labels Apr 9, 2025
return true;
}
Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal, oldMetadata, readerSchema, orderingFieldName);
return newOrderingVal.compareTo(oldOrderingVal) >= 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 should be DEFAULT_ORDERING_VALUE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a very corner case and we may not have a solution too. but wanted to bring it up.
what incase oldRecord was missing ord value and so we treat it as 0.
and new one has negative ordering value.
w/ L542, we will favor old record right. even though old one did not have a ordering value and new one has a valid value.

not looking to fix in this patch. was just curious if this is a known limitation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption about orderingValue is > 0 since ordering field is generally ts, and only delete could have ordering value = 0. Therefore, L542 favors new records.

But if the ordering field could be a random column, then for delete, we probably need to handle delete differently from here.

So my question is: which case should we really worry about in Hudi? orderingValue > 0, or any values. Remember the orderingField can be a string also.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile, a more fundamental question: Why does merge mode is solely based on time: commit time, event time? should the ordering field be any random column?

// IMPORTANT:
// This should be kept in line with EmptyHoodieRecordPayload
// default natural order
this.orderingVal = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 -> DEFAULT_ORDERING_VALUE.

// Note that the incoming `record` is from an older commit, so it should be put as
// the `older` in the merge API
if (payloadClass.isPresent()) {
if (existingRecordMetadataPair.getLeft().isEmpty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that, it implies a delete is always a commit time based even in custom merge mode.
This is consistent with commit time merge mode.
I just wonder, should we also do this for event time merge mode, i.e., no matter the ordering value of delete, we always treat it as a commit time based delete. CC: @danny0405 , @nsivabalan , @yihua

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do it for event time merging, the records coming from DELETE FROM statements should be padded with the ordering vals so that we can do event time merging with it. This should be a quite cool feature and we can remove the hacky constant 0 and the same type class check.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there any use cases where event-time based delete is really meaningful?

Comment on lines 305 to 311
if (existingRecordMetadataPair.getLeft().isEmpty()
&& shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
// IMPORTANT:
// this is needed when the fallback HoodieAvroRecordMerger got used, the merger would
// return Option.empty when the old payload data is empty(a delete) and ignores its ordering value directly.
return Option.of(Pair.of(Option.of(record), metadata));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DRP with L287-L293

}
return Option.empty();
} else {
if (older.isEmpty() || newer.isEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DRP from L480 - L490


switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
int recordIndex1 = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad variable name.

@linliu-code
Copy link
Collaborator

PR looks reasonable for me overall.

Fix the file group reader log file sequence to be in asending order, so that to keep the "processing_time"
merging semantics for streaming scenarios: always choose the latest incoming if the ordering val are equals.

This semantics works now for both `COMMIT_TIME` and `EVENT_TIME` merging modes after the fix.

Also fix some other issues:

* the unnecessary copy of rows for position based merging;
* the event time merging sequence for CUSTOM merger.
* @param existingRecordMetadataPair The existing record metadata pair
*
* @return The pair of the record that needs to be updated with and its metadata,
* returns empty to skip the update.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we enhance the java docs of this method.
I would like to see whats the expected behavior in all diff cases.

say for event time based merge

old. new. expected behavior
valid rec w/ valid ord value valid rec w/ same ord value new
valid rec w/ valid ord value valid rec w/ higher ord value new
valid rec w/ valid ord value valid rec w/ lower order value old
valid rec w/ valid ord value valid rec w/ missing ord value new
valid rec w/ valid ord value deleted w/ same ord value deleted. but whats the expected return val from this method
valid rec w/ valid ord value deleted w/ higher ord value deleted. but whats the expected return val from this method
valid rec w/ valid ord value deleted w/ lower ord value old
valid rec w/ valid ord value deleted w/ missing ord value deleted. but whats the expected return val from this method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to this, we need to prepare a table and document it for all 3 types of merge modes.
So, that anyone who is touching this code is well aware of whats expected out of this method.
We have made atleast 4 to 5 bug fixes around this so far.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here new records can not be a delete record; since records with delete markers have been routed to a different path.

* @return The pair of the record that needs to be updated with and its metadata,
* returns empty to skip the update.
*/
protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T record,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change the 1st arg to "newRecord" to avoid confusion.

return true;
}
Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal, oldMetadata, readerSchema, orderingFieldName);
return newOrderingVal.compareTo(oldOrderingVal) >= 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a very corner case and we may not have a solution too. but wanted to bring it up.
what incase oldRecord was missing ord value and so we treat it as 0.
and new one has negative ordering value.
w/ L542, we will favor old record right. even though old one did not have a ordering value and new one has a valid value.

not looking to fix in this patch. was just curious if this is a known limitation

if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
return Option.of(Pair.of(Option.of(record), metadata));
}
return Option.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic used in L278-282 and L481-488 are slightly differing. specifically in the else block. is that intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the incomg record is never empty for data block.

switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
return Option.empty();
return Option.of(deleteRecord);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have tests for this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have many functional tests accessing this path. But for unit tests, we probably don't.

@danny0405
Copy link
Contributor Author

readerContext.getSchemaFromMetadata(metadata),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still have tough time to understand the expected value here.
for eg, after we call recordMerger.get().partialMerge(), what is the expected return value if record is deleted. and if its deleted don't we need to return the deletion to the caller so that hash map gets updated.

but in L262, we are returning Option.empty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyways, not blocking the PR. but we should add more documentation to make it more easier to understand

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we introduce a simple POJO to represent a record in this class.
as of now, we are using

Pair<Option, Map<String, Object>>

where key -> record key or position
Map contains metadata like ordering value, schema version id.

thinking how it might look like, if we have something like

FileGroupRecord {
Option record;
Map<String, Object>> recordMetadata;
boolean isDeleted;
}

So, w/ this, we can whenever we call doProcessNextDataRecord(NewRecord, existingRecord),
we can make sure it returns an entry if existing is not null. By returning an entry means, it could refer to a valid record or a deleted record. But essentially, the return value will update the hashmap of records we maintain blindly.

For a delete record, FileGroupRecord.record might be Option.empty and FileGroupRecord.isDeleted will be true. and optionally there could be ordering value in the map.

Unless existingRecord is null when we call doProcessNextDataRecord, we should always return something from doProcessNextDataRecord. I feel this might help us keep the code maintainable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess, we are trying to optimize to avoid returning from doProcessNextDataRecord, if we do not want to update the map (for eg, if new record has lower ordering value compared to existing record). I feel, those are probably < 5% of cases. Lets keep it simple and ensure we make it fool proof. its ok to always return value from doProcessNextDataRecord and update the map w/ the same record as prev version.

Copy link
Contributor

@nsivabalan nsivabalan Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, thinking if we should standardize how a deleted record could be represented in FG reader.
a deleted record from delete block, HoodieEmptyRecordPayload etc.

and if we have above abstraction in place (FileGroupRecord), we can also see if we can avoid using DeleteRecord.

Its again causing some confusion since, doProcessNextDeletedRecord return Option < DeleteRecord > , but while the caller processes is and updates the records map which eventually results in the format Pair<Option, Map<String, Object>>.

so, in the end, every record is represented in the same format in the cached map of records

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its ok to always return value from doProcessNextDataRecord and update the map w/ the same record as prev version.

not really, for spillable, less put/get means less SE/DE of the records.

try {
// Iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReverseReader(storage,
logFormatReaderWrapper = new HoodieLogFormatReader(storage,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we already inconsistency b/w scanInternalV1 and scanInternalV2? :(
looks like for V2, we were doing ascending already.

@github-project-automation github-project-automation bot moved this from 🆕 New to 🛬 Near landing in Hudi PR Support Apr 10, 2025
@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit 0eac555 into apache:master Apr 10, 2025
60 checks passed
@github-project-automation github-project-automation bot moved this from 🛬 Near landing to ✅ Done in Hudi PR Support Apr 10, 2025
voonhous pushed a commit to voonhous/hudi that referenced this pull request Apr 11, 2025
…13115)

* [HUDI-9267] Fix the file group reader log file read sequence

Fix the file group reader log file sequence to be in asending order, so that to keep the "processing_time"
merging semantics for streaming scenarios: always choose the latest incoming if the ordering val are equals.

This semantics works now for both `COMMIT_TIME` and `EVENT_TIME` merging modes after the fix.

Also fix some other issues:

* the unnecessary copy of rows for position based merging;
* the event time merging sequence for CUSTOM merger.
* the HoodieEmptyRecord default ordering value
* the fallback strategy read for position based merging

---------

Co-authored-by: sivabalan <n.siva.b@gmail.com>
(cherry picked from commit 0eac555)
voonhous pushed a commit to voonhous/hudi that referenced this pull request Apr 15, 2025
…13115)

* [HUDI-9267] Fix the file group reader log file read sequence

Fix the file group reader log file sequence to be in asending order, so that to keep the "processing_time"
merging semantics for streaming scenarios: always choose the latest incoming if the ordering val are equals.

This semantics works now for both `COMMIT_TIME` and `EVENT_TIME` merging modes after the fix.

Also fix some other issues:

* the unnecessary copy of rows for position based merging;
* the event time merging sequence for CUSTOM merger.
* the HoodieEmptyRecord default ordering value
* the fallback strategy read for position based merging

---------

Co-authored-by: sivabalan <n.siva.b@gmail.com>
(cherry picked from commit 0eac555)
voonhous pushed a commit to voonhous/hudi that referenced this pull request Apr 16, 2025
…13115)

* [HUDI-9267] Fix the file group reader log file read sequence

Fix the file group reader log file sequence to be in asending order, so that to keep the "processing_time"
merging semantics for streaming scenarios: always choose the latest incoming if the ordering val are equals.

This semantics works now for both `COMMIT_TIME` and `EVENT_TIME` merging modes after the fix.

Also fix some other issues:

* the unnecessary copy of rows for position based merging;
* the event time merging sequence for CUSTOM merger.
* the HoodieEmptyRecord default ordering value
* the fallback strategy read for position based merging

---------

Co-authored-by: sivabalan <n.siva.b@gmail.com>
(cherry picked from commit 0eac555)
voonhous pushed a commit to voonhous/hudi that referenced this pull request Apr 16, 2025
…13115)

* [HUDI-9267] Fix the file group reader log file read sequence

Fix the file group reader log file sequence to be in asending order, so that to keep the "processing_time"
merging semantics for streaming scenarios: always choose the latest incoming if the ordering val are equals.

This semantics works now for both `COMMIT_TIME` and `EVENT_TIME` merging modes after the fix.

Also fix some other issues:

* the unnecessary copy of rows for position based merging;
* the event time merging sequence for CUSTOM merger.
* the HoodieEmptyRecord default ordering value
* the fallback strategy read for position based merging

---------

Co-authored-by: sivabalan <n.siva.b@gmail.com>
(cherry picked from commit 0eac555)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:blocker Production down; release blocker release-1.0.2 size:M PR with lines of changes in (100, 300]

Projects

Status: ✅ Done

Development

Successfully merging this pull request may close these issues.

4 participants