[HUDI-9120] Fix delete ordering comparison issue#12979
[HUDI-9120] Fix delete ordering comparison issue#12979linliu-code wants to merge 2 commits intoapache:masterfrom
Conversation
c9eaf6e to
e451e74
Compare
1fabe8e to
8d77025
Compare
| orderingField: StructField): Seq[StructField] = { | ||
| val fields = ArrayBuffer[StructField]() | ||
| fields ++= requiredSchema.fields | ||
| fields ++= partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)) |
There was a problem hiding this comment.
@jonvex , do you remember why we need to do this filtering? Can we directly add all fields from mandatoryFields?
8d77025 to
4bd2b06
Compare
| fields ++= requiredSchema.fields | ||
| fields ++= partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)) | ||
| if (orderingField != null && !fields.contains(orderingField)) { | ||
| fields.append(orderingField) |
There was a problem hiding this comment.
So the delete record is always in the last pos.
There was a problem hiding this comment.
requestedSchema is the internal schema used by fg reader during merging. The output one will be requiredSchema. So the order of orderingField should not affect the output.
There was a problem hiding this comment.
Do we introduce additial projection for the rows if the user output schema does not sync with it?
There was a problem hiding this comment.
I think Spark itself will do one projection anyways. But I need to confirm. This is the same for "row index" column we use for position based merging.
4bd2b06 to
fab48e8
Compare
|
|
||
| // schema that we want fg reader to output to us | ||
| val requestedSchema = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) | ||
| val requestedSchema = getRequestedSchema(options, dataSchema, partitionSchema, requiredSchema, mandatoryFields) |
There was a problem hiding this comment.
HoodieFileGroupReaderSchemaHandler#generateRequiredSchema has the logic to append all necessary fields for merging in MOR which is independent of engines. Could you check why that's not applied for the bug you discovered?
Also, could we avoid such logic of appending all necessary fields for merging at the Spark layer if possible, to reduce the complexity in this class?
|
Close this one since it is not a correct fix. |
|
This is not needed since we have found a better fix: #12991 |
Change Logs
Root cause:
For queries using event time based merge mode, the
requestedSchemamay not contain the ordering field if therequiredSchema(output schema) does not contain it. In this case, when we merge base file records and log file records, base file has to use DEFAULT_ORDERING_VALUE (integer 0); However, the ordering field from log files is of their original data type, like long, or float, so the comparison fails due to type conflicts.Fix:
The fix is to add the ordering field into the requestedSchema if possible.
Follow up:
Why do we add the field for commit time based queries as well?
Ideally we should not. We add it since 1. These are corner cases: with delete records, without the field requested. 2. it saves driver a trip to the storage for reading the table config for merge mode; 3. simplicity.
Impact
Fix a bug for event time based delete.
Risk level (write none, low medium or high below)
Low.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist