feat(flink): Backport Flink 2.1 nested Parquet column readers and INT64 timestamp dispatch (FLINK-35702)#18636
Conversation
…64 timestamp dispatch (FLINK-35702)
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR vendors Flink 2.1's NestedColumnReader / NestedPrimitiveColumnReader (FLINK-35702), adds Flink-2.1-compatible accessors to the existing heap-vector classes, and extends ParquetDataColumnReaderFactory to dispatch INT64 TIMESTAMP columns to a dedicated reader. A couple of points worth a closer look in the inline comments — chiefly that the factory change is not dead code at HEAD (it transitively affects the existing legacy ArrayColumnReader/MapColumnReader/RowColumnReader path via BaseVectorizedColumnReader). Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor typos in the vendored NestedPrimitiveColumnReader — one in a comment and one in a runtime exception message — are worth fixing before the code lands.
| * backwards compatibility with files written by older Hive / Spark / Impala versions.) | ||
| * | ||
| * <p>Used by {@link NestedPrimitiveColumnReader} when a TIMESTAMP column sits inside a | ||
| * {@code Row}, {@code Array} or {@code Map}; the top-level path continues to use |
There was a problem hiding this comment.
🤖 Worth flagging that this factory change isn't actually dead code at HEAD — BaseVectorizedColumnReader (parent of the legacy ArrayColumnReader/MapColumnReader/RowColumnReader) calls getDataColumnReaderByType(...) and then invokes dataColumn.readTimestamp() from those subclasses' TIMESTAMP cases. Before this PR, INT64 timestamps inside ARRAY/MAP/ROW would have hit DefaultParquetDataColumnReader.readTimestamp() which throws "Unsupported operation"; with this PR they now decode correctly. That's a strict fix — but it does mean the factory change ships behaviour today, distinct from the truly-dormant NestedColumnReader. Could the PR description be updated, and any existing tests/snapshots that may have implicitly relied on the previous failure double-checked?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| return dictionary.readInteger(dictionaryValue); | ||
| case BOOLEAN: | ||
| return dictionary.readBoolean(dictionaryValue) ? 1 : 0; | ||
| case DOUBLE: |
There was a problem hiding this comment.
🤖 I think the dictionary path produces an Integer (1/0) for BOOLEAN here, but fillColumnVector casts the value list to List<Boolean> (heapBooleanVector.vector[i] = ((List<Boolean>) valueList).get(i);). On a dictionary-encoded BOOLEAN column the cast would throw ClassCastException. I realise this is a faithful port of upstream Flink, and Parquet writers rarely dictionary-encode BOOLEAN (the encoding offers no benefit), so it likely never triggers in practice — but it might be worth either noting it or aligning the dictionary path to return a Boolean to keep the assembly path safe.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
This is a faithful copy from upstream flink, prefer to leave it as is and fix the latent bug in the future. As mentioned using dictionary to encode BOOLEAN field has no benefit and should be rare
| } | ||
| } | ||
|
|
||
| // This won't call, will actually call readAndNewVector |
There was a problem hiding this comment.
🤖 nit: the comment reads a bit oddly — "This won't call" is ambiguous (sounds like the method won't call anything internally). Could you rephrase to something like // Not invoked directly; callers use readAndNewVector instead.?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // This won't call, will actually call readAndNewVector | ||
| @Override | ||
| public void readToVector(int readNumber, WritableColumnVector vector) throws IOException { | ||
| throw new UnsupportedOperationException("This function should no be called."); |
There was a problem hiding this comment.
🤖 nit: small typo in the exception message — "should no be called" → "should not be called". This surfaces in production logs so worth fixing even in vendored code.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
overall looks good, do we have plan to port these changes into other Flink releases that under 2.1? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18636 +/- ##
============================================
- Coverage 68.06% 67.88% -0.18%
+ Complexity 28919 28918 -1
============================================
Files 2518 2520 +2
Lines 140570 140945 +375
Branches 17416 17471 +55
============================================
+ Hits 95680 95687 +7
- Misses 37033 37402 +369
+ Partials 7857 7856 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR vendors the Flink 2.1 nested Parquet readers (NestedColumnReader / NestedPrimitiveColumnReader), adds the additive accessor surface on the heap-vector classes they depend on, and extends ParquetDataColumnReaderFactory to dispatch INT64 timestamp columns. Existing inline comments from prior rounds cover the main concerns (BOOLEAN dictionary type mismatch, factory dispatch affecting the legacy ArrayColumnReader path, and a couple of nit/typo fixes); I traced the read paths and the int64ToTimestamp dispatch and didn't surface new substantive issues from this round. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.
cc @yihua
|
One scope question: this PR ports the nested Parquet reader work only into At minimum, could we document the rollout plan so users on other supported Flink versions know whether they will get the nested Parquet reader and INT64 timestamp dispatch fixes? |
@danny0405 Yes once I am done merging all changes into flink1.18.x, I will backport to other versions. Those versions will probably only 1 PR per version, unlike the 1.18.x I split into multiple PRs for easier review. |
@cshuo Sure I added a "Rollout plan" in the PR description. We will backport to other versions as well. |
|
the test failure is not related, will merge it first. |
Describe the issue this Pull Request addresses
Hudi-flink's ParquetColumnarRowSplitReader raises ClassCastException for a range of nested Parquet schemas (ARRAY, MAP<,> inside ARRAY, nested ROW subtrees, legacy 2-level list encodings). Flink 2.1 fixed this class of problems in FLINK-35702 by rewriting nested reads in Dremel form ("one primitive reader per leaf column" + an assembler that stitches repetition / definition levels into offsets, lengths, and null flags). Hudi PMC approved porting that fix into hudi-flink1.18.x while preserving Hudi's schema-evolution handling and custom primitive readers.
This is the third PR in the series (after the ParquetDecimalVector sync and the data-holder / utility-class scaffolding). It vendors the two FLINK-35702 nested-column readers themselves, adds the additive accessor surface they require on the existing heap-vector classes, and extends ParquetDataColumnReaderFactory to dispatch INT64 timestamp columns to a dedicated reader. The new readers are dead code at HEAD: no production caller is rewired in this PR. Wiring ParquetSplitReaderUtil to construct them is the next PR.
Summary and Changelog
Faithful copies from Apache Flink 2.1:
Both readers are direct, line-by-line copies of the upstream Flink 2.1 sources delivered as part of FLINK-35702:
NestedPrimitiveColumnReader: vendored from org.apache.flink.formats.parquet.vector.reader.NestedPrimitiveColumnReader, reads a single primitive leaf column inside a nested (Dremel) structure, emitting both the values vector and the repetition / definition-level streams the assembler consumes.
NestedColumnReader: vendored from org.apache.flink.formats.parquet.vector.reader.NestedColumnReader; resolves Map / Array / Row group fields using the Dremel striping / assembly algorithm, dispatching to NestedPrimitiveColumnReader for the leaves.
Deliberate divergences from Flink 2.1:
NestedPrimitiveColumnReader: package + 3 import substitutions to point at Hudi-local equivalents introduced earlier in this series (ParquetDecimalVector, LevelDelegation, IntArrayList); plus two added default: clauses in the inner switch on descriptor.getPrimitiveType().getPrimitiveTypeName() for the DECIMAL case (in both readPrimitiveTypedRow and dictionaryDecodeValue). Upstream Flink leaves these inner switches open, which means a DECIMAL column whose physical type isn't INT32 / INT64 / BINARY / FIXED_LEN_BYTE_ARRAY would silently fall through into the outer switch's TIMESTAMP_* cases: a latent bug. Hudi's checkstyle (MissingSwitchDefault) requires explicit defaults; the added throw new RuntimeException(...) is therefore both compliant and a strict semantic improvement over upstream.
NestedColumnReader: imports retargeted at Hudi-local types (HeapRowColumnVector, HeapMapColumnVector, HeapArrayVector, ParquetField, ParquetGroupField, ParquetPrimitiveField, LevelDelegation, CollectionPosition, RowPosition, NestedPositionUtil), plus Hudi schema-evolution support in readRow: a ParquetGroupField representing a Flink RowType may contain null children (the corresponding logical field is absent from the Parquet file). Upstream Flink uses an ImmutableList-backed children and would NPE on this; the Hudi version skips null children, passing their pre-populated slot vector through unchanged and not contributing to the row's level stream. Documented in class Javadoc and inline.
Both classes' algorithmic bodies are otherwise faithful ports; cosmetic differences are limited to Hudi house style (2-space indent, single-space license header) and a provenance paragraph added to each class Javadoc.
Additive modifications to existing classes:
These changes only add API surface; they do not remove or change the behavior of any existing call site (with the one explicit bug-fix exception called out under Impact below).
cow/vector/HeapArrayVector: added 6 accessors that mirror the existing public offsets / lengths / child fields: getOffsets / setOffsets, getLengths / setLengths, getChild / setChild. Existing public fields, Lombok @Getter/ @Setter on size, getLen() and getArray(int) are untouched.
cow/vector/HeapMapColumnVector: removed final from keys / values to permit the new setters (Lombok @getterS and existing callers of getKeys() / getValues() are unaffected); added Dremel state (offsets, lengths, size); added 8 accessors: getOffsets, setOffsets, getLengths, setLengths, getSize, setSize, setKeys(WritableColumnVector), setValues(WritableColumnVector), getKeyColumnVector(): ColumnVector, getValueColumnVector(): ColumnVector. getMap(int) is unchanged: still routes through ColumnarGroupMapData; switching it to consume the new offsets/lengths via Flink's standard ColumnarMapData is part of the next PR's wiring switch, not this one.
cow/vector/HeapRowColumnVector: added 2 accessors mirroring the existing public vectors field: getFields() and setFields(WritableColumnVector[]). Constructor, getRow(int) and reset() unchanged.
cow/vector/reader/ParquetDataColumnReaderFactory: added TypesFromInt64PageReader (decodes Parquet INT64 timestamp columns into TimestampData parameterized by ChronoUnit and the UTC flag); added private helpers resolveInt64TimestampUnit(PrimitiveType): ChronoUnit (handles both modern LogicalTypeAnnotation.TimestampLogicalTypeAnnotation and legacy OriginalType.TIMESTAMP_MILLIS / TIMESTAMP_MICROS) and int64ToTimestamp(boolean, long, ChronoUnit): TimestampData; extended getDataColumnReaderByTypeHelper to dispatch INT64 timestamp columns to the new reader. INT96 dispatch and the DefaultParquetDataColumnReader fallback path are preserved bytewise; the two public entry points (getDataColumnReaderByType, getDataColumnReaderByTypeOnDictionary) and TypesFromInt96PageReader are unchanged.
Tests added:
TestParquetDataColumnReaderFactory: 13 tests covering the type-dispatch matrix (INT96, plain INT64, INT64 with each of TimestampLogicalTypeAnnotation MILLIS / MICROS / NANOS, INT64 with legacy OriginalType.TIMESTAMP_MILLIS / TIMESTAMP_MICROS, INT32) on both the values-reader and dictionary entry points, plus decoding-correctness tests asserting the returned TimestampData.getMillisecond() and getNanoOfMillisecond() for each ChronoUnit.
TestHeapColumnVectorAccessors — 4 tests pinning down the additive accessor contract across HeapArrayVector, HeapMapColumnVector, HeapRowColumnVector: the new getters/setters reflect underlying state; legacy public fields stay in sync with the new accessors (backward compatibility); HeapMapColumnVector's constructor pre-allocates offsets / lengths of the right size.
End-to-end correctness of the new readers' read path is exercised by integration tests once they are wired up in the follow-up PR.
Part of #18491.
Impact
No user-visible change for the new readers — they are dead code at HEAD and will be wired up in the next PR.
One intentional bug fix is mixed into the additive surface change in ParquetDataColumnReaderFactory. The new INT64 timestamp dispatch is consumed not just by the new NestedPrimitiveColumnReader but also by the pre-existing BaseVectorizedColumnReader / ArrayColumnReader path. So an INT64-encoded TIMESTAMP column inside an array previously failed at runtime; after this PR it reads correctly. INT96 timestamps and plain (non-timestamp) INT64 columns hit exactly the same code path as before — no behavior change there.
Risk Level
Low.
The two new readers are dead code at HEAD; no production caller instantiates them. The heap-vector changes are strictly additive on the API surface (the only field-level change is removing final from HeapMapColumnVector.keys / values, which has no observable consequence — getMap() still routes through the legacy ColumnarGroupMapData and existing callers of getKeys() / getValues() are unaffected). The factory dispatch change is a strict bug fix (runtime exception → correct decode) scoped to INT64 timestamp columns; INT96 and plain INT64 paths are bytewise unchanged. The previously vendored heap-vector classes are not yet privatized; existing public fields remain so this PR stays strictly additive on the API surface. The cleanup is reserved for a later contract-tightening PR.
mvn test-compile is clean (62 main + 11 test sources, checkstyle clean), the 17 new tests pass, and apache-rat:check is clean (74 / 74 approved).
Documentation Update
N/A
Contributor's checklist
Rollout Plan
The backport into flink 1.18 will be splited into 5 PRs for easier reviews. After they are all merged, we will backport to higher flink versions (1.19.x, 1.20.x, 2.0.x and 2.1.x) as well, and it will be 1 PR per version because it will be easy to diff between hudi-flink-1.18.x and higher versions.