fix: FileGroupReader drops mandatory partition columns from dataSchema#18570
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the thorough investigation and fix! This mirrors requestedStructType's construction to ensure mandatory partition fields are retained in dataSchema before pruning, which cleanly addresses the null-partition-column regression on MOR base+log slices with CUSTOM mergers. The scoped filter on mandatoryFields keeps this a strict no-op for the common case, and the regression test exercises the exact combination (CustomKeyGenerator + PostgresDebeziumAvroPayload + GLOBAL_SIMPLE + update.partition.path) that triggered the bug. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small nits below — overall the fix is clear and the regression test is well-documented.
cc @yihua
| // downstream output converter then writes null for the field in every row. Most visible on MOR | ||
| // file slices with log files (which take the FileGroupReader path instead of the `readBaseFile` | ||
| // path that would otherwise append partition values from the directory path). | ||
| val dataStructTypeWithMandatoryPartition = StructType(dataStructType.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) |
There was a problem hiding this comment.
🤖 nit: could you rename this to dataStructTypeWithMandatoryPartitionFields (plural)? The filter can match multiple mandatory partition columns, so the singular ...Partition is a little misleading for someone skimming the code later.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| assertEquals("US", rows(6L), "id=6 moved to US") | ||
|
|
||
| // id=4 was deleted. | ||
| assertEquals(false, rows.contains(4L), "id=4 was deleted") |
There was a problem hiding this comment.
🤖 nit: assertFalse(rows.contains(4L), "id=4 was deleted") reads more naturally here than assertEquals(false, ...) — just needs assertFalse added to the import alongside assertEquals and assertNotNull.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
c4d08f8 to
b6f6bca
Compare
HoodieFileGroupReaderBasedFileFormat.buildReaderWithPartitionValues builds two schemas side-by-side: requestedSchema (what to return to Spark) and dataSchema (what to read from parquet). It augments requestedSchema with any partition fields in mandatoryFields before pruning, but pipes dataStructType through unchanged. Spark's dataStructType excludes partition columns by convention, and HoodieSchemaUtils.pruneDataSchema iterates over its second arg, so any mandatory partition field is silently dropped from the resulting dataSchema. The FileGroupReader then does not read the column from the parquet base file, and for non-projection-compatible CUSTOM mergers (e.g. PostgresDebeziumAvroPayload) the output converter writes null for every affected row. Most visible on MOR file slices that have both a base file and a log file, since the readBaseFile path (which would append partition values from the directory name) is skipped in favor of the FileGroupReader path. Regression introduced by apache#13711 ("Improve Logical Type Handling on Col Stats"), which added the pruneDataSchema wrapping but only on the requested-schema side. Fix: mirror requestedStructType's construction — augment dataStructType with the mandatory partition fields before pruning. Also adds a regression test (TestFileGroupReaderPartitionColumn) that reproduces the scenario end-to-end: MOR + CustomKeyGenerator + PostgresDebeziumAvroPayload + GLOBAL_SIMPLE with update.partition.path=true, round-2 partition-key change producing a base+log slice, then verifies untouched records in that slice read back with the correct partition-column value. Fixes: apache#18568 Signed-off-by: tiennguyen-onehouse <tien@onehouse.ai>
b6f6bca to
5562027
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18570 +/- ##
==========================================
Coverage 68.87% 68.87%
- Complexity 28482 28510 +28
==========================================
Files 2478 2478
Lines 136699 136802 +103
Branches 16634 16659 +25
==========================================
+ Hits 94150 94223 +73
- Misses 34980 34990 +10
- Partials 7569 7589 +20
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Closes #18568
Summary and Changelog
HoodieFileGroupReaderBasedFileFormat.buildReaderWithPartitionValuesbuilds two schemas side-by-side:requestedSchema(what to return to Spark) anddataSchema(what to read from parquet). It augmentsrequestedStructTypewith any partition fields inmandatoryFieldsbefore pruning, but pipesdataStructTypethrough unchanged. Spark'sdataStructTypeexcludes partition columns by convention, andHoodieSchemaUtils.pruneDataSchemaiterates over its second argument, so any mandatory partition field is silently dropped from the resultingdataSchema.The FileGroupReader then does not read the partition column from the parquet base file, and for non-projection-compatible CUSTOM mergers (e.g.
PostgresDebeziumAvroPayload) the output converter writesnullfor every affected row viaHoodieInternalRowUtils.genUnsafeStructWriter'ssetNullAtfallback.Most visible on MOR file slices that have both a base file and a log file, since the
readBaseFilepath (which would append partition values from the directory name) is skipped in favor of the FileGroupReader path.Regression introduced by #13711 ("Improve Logical Type Handling on Col Stats"), which added the
pruneDataSchemawrapping but only on the requested-schema side.Fix: mirror
requestedStructType's construction — augmentdataStructTypewith the mandatory partition fields before pruning:Strict no-op when
mandatoryFields.filter(partitionSchema)is empty, which covers all non-CustomKeygen/non-TimestampKeygen tables. The parquet reader projects one extra column per affected row (the mandatory partition column), which the base files already contain (precondition:drop.partition.columns=false).Matrix of when the bug fires (all must hold):
CustomKeyGeneratororTimestampBasedKeyGeneratorCustomKeyGenerator)drop.partition.columns=falsePostgresDebeziumAvroPayload)Changes:
HoodieFileGroupReaderBasedFileFormat.scala: augmentdataStructTypewith mandatory partition fields before pruning.TestFileGroupReaderPartitionColumn.scala(new): regression test reproducing the scenario end-to-end — MOR +CustomKeyGenerator+PostgresDebeziumAvroPayload+GLOBAL_SIMPLEwithupdate.partition.path=true, round-2 partition-key change producing a base+log slice, then verifies untouched records in that slice read back with the correct partition-column value.Impact
Silent data corruption (partition column returning
null) on MOR reads for the matrix of tables described above. No schema or on-disk format change. Only touchesHoodieFileGroupReaderBasedFileFormat.buildReaderWithPartitionValues; the RDD-based MOR path (HoodieMergeOnReadRDDV2) uses the rawtableSchemadirectly and never had this bug.Risk Level
low
The fix is a strict no-op when
mandatoryFields.filter(partitionSchema)is empty (all non-CustomKeygen/non-TimestampKeygen tables, and any table withdrop.partition.columns=true). For non-empty cases it only adds the mandatory partition column to the set read from parquet — base files already contain this column as a precondition. Merger behavior is unchanged becausePostgresDebeziumAvroPayload.combineAndGetUpdateValue/preCombinemake decisions on the precombine key only.Documentation Update
none
Contributor's checklist