Skip to content

feat(flink): Wire Flink 2.1 nested Parquet readers into the Hudi read path (FLINK-35702)#18700

Merged
danny0405 merged 1 commit into
apache:masterfrom
skywalker0618:shihuanl/rewire_parquet_parsing_functions
May 8, 2026
Merged

feat(flink): Wire Flink 2.1 nested Parquet readers into the Hudi read path (FLINK-35702)#18700
danny0405 merged 1 commit into
apache:masterfrom
skywalker0618:shihuanl/rewire_parquet_parsing_functions

Conversation

@skywalker0618
Copy link
Copy Markdown
Contributor

@skywalker0618 skywalker0618 commented May 8, 2026

Describe the issue this Pull Request addresses

Hudi-flink's ParquetColumnarRowSplitReader read path still routes nested types (ARRAY, MAP, MULTISET, ROW) through the legacy ArrayColumnReader / MapColumnReader / RowColumnReader chain that PR 3 left untouched, so the ClassCastException class of bugs from FLINK-35702 (nested-in-nested ARRAY / MAP, ROW subtrees, legacy 2-level list encoding) is still reachable in production. PR 3 vendored the two Dremel-style readers but parked them as dead code; this PR rewires the read path to actually use them while preserving Hudi's schema-evolution handling and the Hudi-specific "row-collapse-on-all-null-children" semantic that the legacy RowColumnReader implemented.

This is the 4th PR in the series. The legacy ArrayColumnReader / MapColumnReader / RowColumnReader and the ColumnarGroup* data classes are no longer reachable from the read path after this PR, but are left in the tree for this change. Their removal is deferred to PR 5 so this PR stays narrowly scoped to the wiring switch.

Summary and Changelog

Faithful port from Apache Flink 2.1:

ParquetSplitReaderUtil is rewritten in line with the FLINK-35702 ParquetSplitReaderUtil:

  • New createColumnReader(... , ParquetField field) overload that routes ARRAY / MAP / MULTISET / ROW through the vendored NestedColumnReader and keeps Hudi's specialized primitive readers (Int64TimestampColumnReader, FixedLenBytesColumnReader, HeapDecimalVector) on the primitive path.
  • @deprecated 5-arg shim retained for backward compat; primitive callers are unaffected, callers projecting nested types are now expected to use the 6-arg overload.
  • createPrimitiveColumnReader extracted from the old createColumnReader body verbatim.
  • createWritableColumnVector simplified: drops the legacy HeapArrayGroupColumnVector branches, adds MULTISET, and (Hudi-specific divergence) pre-fills missing ROW children with null-filled vectors.
  • createVectorFromConstant extended to handle MULTISET and pre-fill ROW children for the partition / missing-column slot path.
  • buildFieldsList / constructField / lookupColumnByName / getMapKeyValueColumn / getArrayElementColumn ported from upstream to build a ParquetField tree per split.

ParquetColumnarRowSplitReader: builds the ParquetField tree once per split via new ColumnIOFactory().getColumnIO(requestedSchema), then feeds each per-column ParquetField into the new createColumnReader overload from readNextRowGroup. The pre-existing field-name / type bookkeeping (clipParquetSchema, checkSchema, patchedVector) is unchanged, as is the missing-top-level-column patching that was already in place.

HeapMapColumnVector.getMap(int): switched from the legacy ColumnarGroupMapData(keys, values, rowId) to the standard ColumnarMapData(keys, values, offset, length) wired through the offsets / lengths populated by NestedColumnReader (PR 3 added that state and the new accessors; this PR consumes them). Field types and the Lombok-generated getKeys() / getValues() accessors are kept as WritableColumnVector to avoid an API break on the surface PR 3 exposed.

Deliberate divergences from Flink 2.1:

ParquetSplitReaderUtil#constructField (ROW branch): a logical child may be absent from the Parquet file (Hudi schema evolution); this is handled via a case-insensitive lookupColumnByNameOrNull that yields a null ParquetField for the absent child. Upstream Flink throws on missing children. Companion behavior: createWritableColumnVector ROW branch slots a null-filled vector for the same child via createVectorFromConstant, so the Dremel assembler in NestedColumnReader.readRow passes the slot through unchanged when its ParquetField is null.

NestedColumnReader#readRow (Hudi backward-compat divergence): after the Dremel-driven null assignment via setFieldNullFlag, run an extra pass that collapses a present row whose every child is null into a null row. Flink 2.1 would surface row(null, ..., null) as Row(null, ..., null); the legacy Hudi RowColumnReader collapsed it to NULL, and existing Hudi users' SQL (the ITTestHoodieDataSource#testParquetNullChildColumnsRowTypes integration test in particular) depends on this. Documented inline at the divergence site.

Tests added:

None new in this PR. Hudi's existing integration tests in ITTestHoodieDataSource cover the rewired read path end-to-end at the SQL level (across insert / upsert / bulk_insert):

  • testParquetComplexTypes — flat ARRAY / MAP / ROW
  • testParquetComplexNestedRowTypes — ROW(ARRAY, ROW) and array-of-int
  • testParquetArrayMapOfRowTypes — ARRAY, MAP<varchar, ROW> with an in-test ALTER TABLE adding a new field (schema evolution)
  • testParquetNullChildColumnsRowTypes — Hudi's row-collapse-on-all-nulls case that drove the divergence above

These are the same tests that exercised the legacy readers, so any regression in nesting / schema-evolution / null-handling between the legacy and the vendored Dremel path surfaces here. The upstream FLINK-35702 PR added a single unit test (testNestedRead in ParquetColumnarRowSplitReaderTest) which ports awkwardly into Hudi (Hudi has no upstream test class to extend, and the upstream test pulls a flink-shaded-guava31 transitive that Hudi's hudi-flink1.18.x test classpath doesn't ship); the IT coverage above subsumes its scope.

Part of #18491.

Impact

ARRAY / MAP / MULTISET / ROW reads on hudi-flink1.18.x now go through the vendored Dremel-style NestedColumnReader instead of the legacy ArrayColumnReader / MapColumnReader / RowColumnReader. The previously known ClassCastException surfaces from FLINK-35702 (nested-in-nested ARRAY, MAP-inside-ARRAY, ROW subtrees, legacy 2-level list encoding) are fixed.

Pre-existing Hudi-specific behaviors are preserved by the divergences listed above:

  • Schema-evolution null-tolerant ROW-child handling (already in PR 3 at the reader level; this PR completes it at the field-tree construction and vector-pre-allocation level).
  • row(null, ..., null) → NULL collapse on read.
    No public API is removed; the deprecated 5-arg createColumnReader entry point is preserved for primitive-only callers.

Risk Level

Medium-low.

  • The hudi-flink1.18.x split reader path is now strictly the vendored Dremel path for nested types. The legacy ArrayColumnReader / MapColumnReader / RowColumnReader are unreachable from this path but still on disk; PR 5 will remove them.
  • 12/12 nested-type IT cases (4 IT methods × {insert, upsert, bulk_insert}) in ITTestHoodieDataSource pass; one regression surfaced and was fixed during validation (the row-collapse divergence above), which is exactly the kind of issue this IT layer is here to catch.
  • Other Flink shim modules (1.17.x / 1.19.x / 1.20.x / 2.0.x / 2.1.x) are untouched in this PR; their port lands in the per-version PRs once the 1.18.x series is merged.

Documentation Update

N/A

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

Rollout plan

The backport into flink 1.18 will be split into 5 PRs for easier reviews. After they are all merged, we will backport to higher flink versions (1.19.x, 1.20.x, 2.0.x and 2.1.x) as well, and it will be 1 PR per version because it will be easy to diff between hudi-flink1.18.x and higher versions.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 45.45455% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.14%. Comparing base (4029560) to head (1789969).
⚠️ Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
...e/format/cow/vector/reader/NestedColumnReader.java 45.45% 2 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18700      +/-   ##
============================================
+ Coverage     67.90%   68.14%   +0.23%     
- Complexity    28958    29078     +120     
============================================
  Files          2521     2522       +1     
  Lines        141039   141177     +138     
  Branches      17480    17514      +34     
============================================
+ Hits          95777    96209     +432     
+ Misses        37401    37060     -341     
- Partials       7861     7908      +47     
Flag Coverage Δ
common-and-other-modules 44.42% <45.45%> (+0.19%) ⬆️
hadoop-mr-java-client 45.00% <ø> (+0.12%) ⬆️
spark-client-hadoop-common 48.35% <ø> (-0.08%) ⬇️
spark-java-tests 49.01% <ø> (+0.34%) ⬆️
spark-scala-tests 44.90% <ø> (+0.13%) ⬆️
utilities 37.63% <ø> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...e/format/cow/vector/reader/NestedColumnReader.java 73.45% <45.45%> (+73.45%) ⬆️

... and 26 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

hudi-bot commented May 8, 2026

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405 danny0405 merged commit 47bf4e4 into apache:master May 8, 2026
64 of 65 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants