Skip to content

feat(flink): write/read (only unshredded) variant to Flink parquet file writers/readers using Flink's Variant type#18539

Open
kbuci wants to merge 5 commits into
apache:masterfrom
kbuci:flink-write-unshredded
Open

feat(flink): write/read (only unshredded) variant to Flink parquet file writers/readers using Flink's Variant type#18539
kbuci wants to merge 5 commits into
apache:masterfrom
kbuci:flink-write-unshredded

Conversation

@kbuci
Copy link
Copy Markdown
Contributor

@kbuci kbuci commented Apr 21, 2026

Describe the issue this Pull Request addresses

Add support for reading and writing unshredded Variant data types through the Flink write/read paths in Hudi. This enables Flink-based ingestion pipelines to both write new Variant data (e.g., via PARSE_JSON() in Flink SQL) and read existing Variant data (including tables written by Spark 4.0 / PR #18036). Flink Variant writes produce the same canonical Parquet layout ({metadata: required binary, value: required binary}) as Spark, ensuring full cross-engine interoperability.

On Flink 2.1+, Variant columns are exposed as native VARIANT LogicalType (enabling SQL functions like PARSE_JSON and correct DESCRIBE TABLE output). Pre-2.1 Flink does not support Variant — all Variant code paths throw UnsupportedOperationException with a clear message indicating Flink 2.1+ is required (similar to how Spark pre-4.0 rejects Variant usage).

Shredded Variant is explicitly blocked with clear error messages until full support is added in a follow-up.

We explicitly want to block unsupported Variant use cases for Flink:

  • Using a pre-2.1 Flink build
  • Using shredded variant (temporarily until we add support)
  • Nesting variants in vectorized column reader related code paths (temporarily until we add support)

with explicit error messages, to reduce issues with debugging or data correctness.

Summary and Changelog

  • Native Flink 2.1+ VARIANT type via DataTypeAdapter: HoodieSchemaConverter.convertVariant() emits Flink's native VariantType on 2.1+ using the multi-version DataTypeAdapter shim (compiled per-Flink-version in hudi-flink2.1.x). This avoids runtime reflection entirely — DataTypeAdapter.createVariantType() provides the VariantType instance, and companion methods (createVariant(), getVariant(), getVariantMetadata(), getVariantValue()) handle RowData access. On pre-2.1 Flink, throws UnsupportedOperationException.

  • Reverse VARIANT mapping: HoodieSchemaConverter.convertToSchema() handles LogicalTypeRoot.VARIANT (by name string comparison, to avoid compile-time dependency on Flink 2.1) and maps it back to HoodieSchema.createVariant().

  • Factory-based schema injection for Flink Parquet readers: HoodieFileReaderFactory gains a new getFileReader(config, path, schemaOption) overload and a newParquetFileReader(path, schemaOption) extension point (with a backward-compatible default that delegates to the old newParquetFileReader(path)). The Flink factory (HoodieRowDataFileReaderFactory) overrides the new method to pass the Option<HoodieSchema> to the HoodieRowDataParquetReader constructor. This ensures schema flows through at construction time — not via a post-construction setter — so that:

    • The read path (FlinkRowDataReaderContext, ClusteringOperator) supplies the table schema for correct Variant/Blob/Vector type inference.
    • The write/merge path (HoodieMergeHelper, HoodieWriteMergeHandle) supplies the writer schema, fixing upsert failures where the reader previously couldn't derive the correct RowType for Variant columns.
    • The log block read path (HoodieParquetDataBlock) supplies the writer schema from the block header.
    • Metadata-only callers (bloom filter lookups, min/max key reads) pass Option.empty() via the existing 2-arg getFileReader(config, path) and never invoke schema-dependent methods.

    No changes to Spark, Avro, or Trino factories — they inherit the default which delegates to their existing newParquetFileReader(path) override.

  • Fail-fast schema enforcement: HoodieRowDataParquetReader.getSchema() and getRowType() throw IllegalStateException if the schema was not supplied at construction time. This prevents silent mis-inference of Variant/Blob/Vector columns as ordinary BYTES/ROW during record reading. The error message directs callers to use the schema-aware factory overload.

  • Parquet schema conversion (write): ParquetSchemaConverter.convertVariantToParquetType() produces the canonical unshredded Parquet Variant group ({metadata: required binary, value: required binary}).

  • Defense-in-depth Variant annotation detection: ParquetSchemaConverter detects the Parquet VARIANT logical type annotation (parquet-java 1.15.2+) via class-name string matching to avoid compile-time dependency. Shredded variants (with value_shredded field) are rejected with clear errors.

  • Vectorized VARIANT read (Flink 2.1.x): ParquetSplitReaderUtil in hudi-flink2.1.x adds case VARIANT: in createColumnReader, createWritableColumnVector, and createVectorFromConstant. The reader produces a HeapRowColumnVector with child vectors ordered [value, metadata] to match Flink's VectorizedColumnBatch.getVariant().

  • Nested Variant support:

    • Supported: Nested Variant inside complex types (e.g., ARRAY<VARIANT>, MAP<STRING, VARIANT>) works through the Avro converter and Parquet schema write paths.
    • Not yet supported: Nested Variant in the vectorized columnar reader (ColumnarGroupRowData.getVariant(), ColumnarGroupArrayData.getVariant()) throws UnsupportedOperationException.
  • RowData ↔ Avro converters: Both RowDataToAvroConverters and AvroToRowDataConverters detect VARIANT LogicalTypeRoot at runtime (by name) and handle native Flink Variant objects via DataTypeAdapter methods. On pre-2.1 Flink the VARIANT case never fires because the schema conversion would have already thrown.

  • Shredded Variant guards: All Flink Variant code paths — HoodieSchemaConverter.convertVariant(), RowDataToAvroConverters.convertVariantToAvro(), and ParquetSchemaConverter.isShreddedVariant() — throw UnsupportedOperationException for shredded Variant schemas until full support is added.

  • Unit tests: TestRowDataToAvroConvertersVariant (shredded rejection, unshredded Avro round-trip, nested variant in arrays/maps), TestHoodieSchemaConverter (shredded rejection, pre-2.1 rejection for standalone/nested/record variants), TestParquetSchemaConverter (annotation-based detection, shredded rejection with annotation, pre-2.1 rejection).

Impact

  • No public API changes. Variant support is transparent to users — tables with Variant columns can now be both written and read by Flink 2.1+ pipelines, with full cross-engine interoperability with Spark.
  • Flink 2.1+ users see native VARIANT type in DESCRIBE TABLE output and can use Variant SQL functions.
  • Pre-2.1 Flink users will get UnsupportedOperationException if they attempt to use Variant columns, with a clear message that Flink 2.1+ is required.
  • Shredded Variant operations fail fast with clear error messages rather than producing silently incorrect data.
  • No regression risk for non-Variant tables: the factory schema plumbing is additive (callers that don't pass schema get the same behavior as before via the default delegation). Variant detection requires either a Parquet annotation or an explicit HoodieSchema declaration.
  • The HoodieFileReaderFactory schema overload is a cross-engine addition (in hudi-common) but only the Flink factory overrides it. Spark/Avro/Trino factories are untouched.

Risk Level

Low. Changes are additive and scoped to the Flink Variant code path. Non-Variant schemas are completely unaffected — the new logic only activates when HoodieSchemaType.VARIANT is detected. The DataTypeAdapter shim avoids reflection entirely. The factory newParquetFileReader(path, schemaOption) default delegates to the old method, preserving all existing behavior for non-Flink engines.

Documentation Update

None. Variant type support is an internal storage capability that works transparently with existing Flink SQL / DataStream APIs.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:M PR with lines of changes in (100, 300] label Apr 21, 2026
@kbuci kbuci force-pushed the flink-write-unshredded branch from 92cb6af to ad65566 Compare April 21, 2026 07:28
@github-actions github-actions Bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Apr 22, 2026
@kbuci kbuci force-pushed the flink-write-unshredded branch 2 times, most recently from 37f47b6 to 01b9b5a Compare April 28, 2026 00:37
@kbuci kbuci marked this pull request as ready for review April 28, 2026 00:41
@kbuci kbuci changed the title feat(flink) write/read unshredded variant to Flink parquet file writers feat(flink): write/read (only unshredded) variant to Flink parquet file writers Apr 28, 2026
@kbuci kbuci changed the title feat(flink): write/read (only unshredded) variant to Flink parquet file writers feat(flink): write/read (only unshredded) variant to Flink parquet file writers/readers Apr 28, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! This PR threads HoodieSchema through the Flink Parquet write path so unshredded Variant columns can be detected and written with the canonical Parquet layout, and unblocks getVariant on the read side via BinaryVariant reflection. A few correctness concerns worth double-checking in the inline comments — notably the broad catch (Exception e) around the 4-arg write-support reflection (which can silently fall back and lose Variant schema info) and null handling in the new getVariant implementations. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small readability nits below.

writeSupport = (HoodieRowDataParquetWriteSupport) ReflectionUtils.loadClass(
writeSupportClass,
new Class<?>[] {Configuration.class, RowType.class, BloomFilter.class},
conf, rowType, filter);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This catch (Exception e) is quite broad — it falls back to the 3-arg constructor not just when the 4-arg signature is missing, but also if the 4-arg constructor itself throws (e.g. a runtime issue inside RowDataParquetWriteSupport/ParquetSchemaConverter while building the schema for a Variant column). In that case we'd silently downgrade to a 3-arg writer that ignores HoodieSchema, so a Variant column would be emitted as a plain ROW rather than the canonical Variant Parquet group — silent schema corruption. Could you narrow this to NoSuchMethodException (or check getConstructor upfront) so genuine instantiation errors propagate? Same pattern at line 76-90 in newParquetFileWriter(... OutputStream ...).

- AI-generated; verify before applying. React 👍/👎 to flag quality.

public Variant getVariant(int i) {
throw new UnsupportedOperationException("Variant is not supported yet.");
public Variant getVariant(int ordinal) {
RowData variantRow = row.getRow(rebaseOrdinal(ordinal), 2);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Could a null Variant cell reach this path? row.getRow(rebaseOrdinal(ordinal), 2) will return null when the underlying field is null, and HoodieVariant.fromRowData(null) would then NPE on variantRow.isNullAt(0). Most other accessors here (getString, getDecimal, etc.) guard with isNullAt at the call site, but it might be safer to either return null up front when row.isNullAt(rebaseOrdinal(ordinal)) is true, or null-check inside fromRowData. Same concern in BootstrapRowData.getVariant.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

public Variant getVariant(int i) {
throw new UnsupportedOperationException("Variant is not supported yet.");
public Variant getVariant(int pos) {
throw new UnsupportedOperationException(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Curious about the read path here — the PR description says read support is added, but ColumnarGroupRowData (vectorized read) and ColumnarGroupArrayData still throw on getVariant. Is the columnar/vectorized reader path expected to be exercised for Variant columns, and if so, will callers fall back to getRow(pos, 2) correctly? It might be worth either implementing this consistently or documenting which read path supports Variant in this PR vs. follow-ups.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* <p>Variant is stored as a struct with two binary fields: metadata and value.
* Field order follows the Parquet spec and Iceberg convention (metadata first, value second).</p>
*/
public static HoodieVariant fromRowData(org.apache.flink.table.data.RowData variantRow) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you add a proper import org.apache.flink.table.data.RowData; and use the simple name here? A fully-qualified class name in a method signature is unusual enough that readers will wonder if there's a deliberate module-boundary reason — if there is, a brief inline comment would help; if not, the import keeps it consistent with the rest of the file.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* Resolves the HoodieSchema for a field at the given index, unwrapping nullable unions.
* Returns null if the field list is null or the index is out of bounds.
*/
private static HoodieSchema resolveFieldSchema(List<HoodieSchemaField> fields, int index, String fieldName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: fieldName is accepted here but never read — the resolution is purely positional via index. Could you either drop the parameter or actually use it (e.g. as a cross-check or name-based fallback)? As written it gives callers the impression that the name is being validated, which could mask field-ordering bugs silently.

- AI-generated; verify before applying. React 👍/👎 to flag quality.


@Override
public Object convert(HoodieSchema schema, Object object) {
if (schema.getType() == HoodieSchemaType.VARIANT) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not add a new field converter for variant data type?

this.rowType = rowType;
this.hadoopConf = new Configuration(config);
this.schema = ParquetSchemaConverter.convertToParquetMessageType("flink_schema", rowType);
this.schema = ParquetSchemaConverter.convertToParquetMessageType(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because the flink RowType can not represent the variant type with integrity?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I couldn't figure out a way to like "annotate"/"tag" a Flink type to indicate that it should be a variant, though I considered RawType/DistinctType
To be transparent, I'm still new to Flink so if there's a different/out of the box approach we can use to elegantly have the Flink type "indicate" that its a variant (without having to pass Hoodie avro schema) then I would much rather do that.

Copy link
Copy Markdown
Contributor Author

@kbuci kbuci Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually based on other comments #18539 (comment) let me see if we can avoid having to rely on RowType and Hoodie schema passing (by limiting support to Flink 2.1)

@cshuo
Copy link
Copy Markdown
Collaborator

cshuo commented Apr 28, 2026

Thanks for adding this. I had one question about the integration with Flink native VARIANT.

From the current implementation, it looks like Hudi VARIANT is exposed to Flink as ROW<metadata BYTES, value BYTES>, and the write path also seems to expect the input value as a two-field RowData with metadata and value, rather than the flink native Variant data.

I’m wondering how this is expected to work with Flink 2.1+ native VARIANT support. Flink already has native variant semantics and SQL functions/operators such as PARSE_JSON, JSON_STRING(variant), variant['INT'], and variant['STRING']. If the column type is exposed as ROW<metadata BYTES, value BYTES>, would Flink’s query engine still be able to recognize it as a variant and apply those functions/operators?

My understanding is that we probably need to keep these two layers separate:

  • Flink SQL/logical layer: native VARIANT for Flink 2.1+
  • Storage/Parquet layer: canonical variant physical layout with rows containing metadata and value field

And we can just throw unsupported exception for flink versions before 2.1.

Could you clarify whether the current ROW<metadata BYTES, value BYTES> mapping is meant only as the internal physical representation, or whether it is also intended to be the user-facing Flink type?

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR adds unshredded Variant write/read support for Flink by threading HoodieSchema through the Parquet write path and intercepting Variant types in the Avro converters, with shredded Variant explicitly blocked. One edge case worth double-checking around the isVariantGroup heuristic on the read path. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of duplication nits below — same write-support reflection logic appears in two methods, and BinaryVariant reflection is reimplemented twice across the variant codepaths.

if (inner.getType() != HoodieSchemaType.NULL) {
return inner.getType();
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Could this misidentify a regular user struct as a Variant? Any group with two BINARY fields literally named metadata and value will match here, and the read-side then forces both fields to notNull() and drops any other field metadata. Until the LogicalTypeAnnotation.variantType() annotation is emitted (the TODO above), would it be worth at least also requiring both fields to be REQUIRED (matching what convertVariantToParquetType writes) and the group to have exactly 2 fields? Otherwise an existing parquet file with optional binary metadata + optional binary value could be silently re-interpreted as a Variant ROW with non-null fields, which would mismatch the actual nullability of the data.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

String writeSupportClass = config.getStringOrDefault(HoodieStorageConfig.HOODIE_PARQUET_FLINK_ROW_DATA_WRITE_SUPPORT_CLASS);
HoodieRowDataParquetWriteSupport writeSupport;
if (ReflectionUtils.hasConstructor(writeSupportClass,
new Class<?>[] {Configuration.class, RowType.class, BloomFilter.class, Option.class})) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: this 4-arg-with-fallback-to-3-arg writeSupport construction block is duplicated almost verbatim in the other newParquetFileWriter overload below (~line 156). Could you extract a small private helper like loadWriteSupport(conf, rowType, filter, hoodieSchema, configClass) so both call sites share one implementation? Easier to maintain when the constructor signature evolves.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@kbuci
Copy link
Copy Markdown
Contributor Author

kbuci commented Apr 28, 2026

@cshuo

My understanding is that we probably need to keep these two layers separate:

  • Flink SQL/logical layer: native VARIANT for Flink 2.1+
  • Storage/Parquet layer: canonical variant physical layout with rows containing metadata and value field

And we can just throw unsupported exception for flink versions before 2.1.

Could you clarify whether the current ROW<metadata BYTES, value BYTES> mapping is meant only as the internal physical representation, or whether it is also intended to be the user-facing Flink type?

Oh good point, currently in this PR it also is user-facing, but I think your right and we shouldn't do it this way. When I made this PR I was trying to keep the existing Flink logic of treating variant as Flink ROW<metadata BYTES, value BYTES> as a "fallback". But taking a step back, I guess this actually doesn't give us much benefit in long term, we should go with your suggested approach of marking this an unsupported for pre Flink 2.1 (instead of trying to make it "fall back" to a ROW<metadata BYTES, value BYTES> for older Flink version). Let me update the PR to try this approach.

@kbuci kbuci marked this pull request as draft April 28, 2026 07:55
@github-actions github-actions Bot added size:XL PR with lines of changes > 1000 size:L PR with lines of changes in (300, 1000] and removed size:L PR with lines of changes in (300, 1000] size:XL PR with lines of changes > 1000 labels Apr 28, 2026
@kbuci kbuci force-pushed the flink-write-unshredded branch from 8cb53b8 to 49c1833 Compare April 29, 2026 04:40
@kbuci kbuci changed the title feat(flink): write/read (only unshredded) variant to Flink parquet file writers/readers feat(flink): write/read (only unshredded) variant to Flink parquet file writers/readers using Flink's Variant type Apr 30, 2026
@kbuci kbuci force-pushed the flink-write-unshredded branch from 1e72dc2 to 133edc7 Compare April 30, 2026 03:09
@kbuci
Copy link
Copy Markdown
Contributor Author

kbuci commented May 1, 2026

My understanding is that we probably need to keep these two layers separate:

  • Flink SQL/logical layer: native VARIANT for Flink 2.1+
  • Storage/Parquet layer: canonical variant physical layout with rows containing metadata and value field

And we can just throw unsupported exception for flink versions before 2.1.
Could you clarify whether the current ROW<metadata BYTES, value BYTES> mapping is meant only as the internal physical representation, or whether it is also intended to be the user-facing Flink type?

Oh good point, currently in this PR it also is user-facing, but I think your right and we shouldn't do it this way. When I made this PR I was trying to keep the existing Flink logic of treating variant as Flink ROW<metadata BYTES, value BYTES> as a "fallback". But taking a step back, I guess this actually doesn't give us much benefit in long term, we should go with your suggested approach of marking this an unsupported for pre Flink 2.1 (instead of trying to make it "fall back" to a ROW<metadata BYTES, value BYTES> for older Flink version). Let me update the PR to try this approach.

@voonhous What are your thoughts on this #18539 (comment) (not allowing HUDI readers/writers with Flink versions older than 2.1 to read/write variant). I personally agree and think that, instead of trying to fallback to using ROW<metadata BYTES, value BYTES> (like the code seems to do currently) that we just explictly only support it for Flink 2.1+. Since that simplifies logic and prevents us from having to worry about certain edge cases (like a pre Flink 2.1 writer rewriting a variant in a record to a Flink Row type). But wanted to see if you had any concerns on only support Flink 2.1+ for variant in HUDI?

@kbuci kbuci marked this pull request as ready for review May 2, 2026 05:46
@kbuci kbuci marked this pull request as draft May 2, 2026 05:51
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR adds Flink read/write support for unshredded Variant via reflection-based detection of Flink 2.1+'s native VariantType. One nullability handling issue worth double-checking in the inline comments — please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and simplification suggestions below.

"VARIANT type is only supported in Flink 2.1+. "
+ "VariantType class not found on the classpath.");
}
dataType = variantDataType.notNull();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 I think this .notNull() makes the variant case inconsistent with the other group branches (ARRAY/MAP/ROW above) which produce a nullable dataType and rely on the trailing if (REQUIRED) dataType.notNull() at line 192. Since that trailing check only adds notNull and never makes a type nullable, an OPTIONAL variant group ends up as a non-nullable VariantType in the resulting RowField. Could you drop the .notNull() here so OPTIONAL → nullable / REQUIRED → notNull works the same way as ROW?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// to match the BinaryVariant(value, metadata) constructor.
GroupType variantGroup = physicalType.asGroupType();
int valueIdx = variantGroup.getFieldIndex(HoodieSchema.Variant.VARIANT_VALUE_FIELD);
int metadataIdx = variantGroup.getFieldIndex(HoodieSchema.Variant.VARIANT_METADATA_FIELD);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: variantDescs only holds descriptors for the value field specifically — could you rename it to valueDescs (and variantDescs.get(0) becomes valueDescs.get(0) on the next line) to match what it actually contains? As-is it reads like it covers the full variant group, which could trip up a future reader.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* fields named {@code metadata} and {@code value}.
*/
public static boolean isVariantGroup(GroupType groupType, LogicalTypeAnnotation logicalType) {
if (hasVariantAnnotation(logicalType)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the if/return true / return false pattern here could be collapsed to return hasVariantAnnotation(logicalType) || isVariantByStructure(groupType); — saves a few lines and reads more naturally for a boolean predicate method.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@kbuci kbuci marked this pull request as ready for review May 4, 2026 07:36
* Looks up the HoodieSchema for a named field within a record schema.
* Returns null if the record schema is null or does not contain the field.
*/
static HoodieSchema resolveFieldSchema(HoodieSchema recordSchema, String fieldName) {
Copy link
Copy Markdown
Contributor Author

@kbuci kbuci May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ideally didn't want to make this expansive change in this PR, but I wanted to make sure that Flink would correctly infer parquet variant group written by Spark 4.0 (when parquet variant annotations aren't available), without always assuming that a parquet group with metadata,value is a variant

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! This adds Flink read/write support for unshredded Variant data with reflection-based handling for the Flink 1.20 vs 2.1+ split, schema-driven variant detection for Spark 4.0 compatibility, and clear error messages for unsupported configurations (shredded, nested vectorized, pre-2.1 Flink). After tracing through the variant detection paths (annotation + schema-driven), the vectorized read child ordering, the null-handling wrapper around the AvroToRowData/RowDataToAvro converters, and the TableSchemaResolver propagation in ClusteringOperator and FlinkRowDataReaderContext, no new issues were flagged from this automated pass beyond what prior rounds have already raised — a Hudi committer or PMC member can take it from here for a final review. A few readability suggestions around an unused parameter and reflection caching consistency.

cc @yihua

GroupType groupType, LogicalTypeAnnotation logicalType, HoodieSchemaType schemaHint) {
if (hasVariantAnnotation(logicalType)) {
return true;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: groupType is unused in isVariantGroup — only logicalType and schemaHint are read. Could you either drop the parameter or wire it into a structural check, since right now its presence implies it's part of the detection logic?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* or {@code null} if the class is not on the classpath (pre-2.1 Flink).
* The reflection result is cached so the class lookup happens at most once per JVM.
* Only called during schema conversion (cold path), never per row.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you use the lazy-holder idiom here (like GetVariantHolder in AbstractHoodieRowData)? That avoids the synchronized on every call and keeps the two reflection-cache patterns in this PR consistent.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

if (recordSchema == null || recordSchema.getType() != HoodieSchemaType.RECORD) {
return null;
}
org.apache.hudi.common.util.Option<HoodieSchemaField> field = recordSchema.getField(fieldName);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you import org.apache.hudi.common.util.Option instead of fully-qualifying it inline? It would match the import style used elsewhere in this file.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds native Flink 2.1+ Variant support to the parquet write/read paths via reflection, while keeping the shared module compiled against Flink 1.20. The schema-driven variant detection (annotation OR HoodieSchema hint, with no structural fallback) is a nice safe design that avoids misidentifying regular ROW<metadata,value> columns. One small thread-safety question worth a look in the inline comment. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of consistency/dedup suggestions around the reflection-based VARIANT handling, otherwise the code reads cleanly.

if (metadataMethod == null) {
metadataMethod = object.getClass().getMethod("getMetadata");
valueMethod = object.getClass().getMethod("getValue");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The lazy init of metadataMethod / valueMethod here isn't thread-safe — if two threads race in convert(), one could see metadataMethod != null after the first assignment but before valueMethod is set, and then NPE on valueMethod.invoke(...). In normal Flink (single-threaded operator task) this can't fire, but since the sibling AvroToRowDataConverters.createVariantConverter() already resolves its Constructor upfront against a known class, would it make sense to do the same here against BinaryVariant.class? That would also remove the per-call getMethod lookup on the first invocation.

- AI-generated; verify before applying. React 👍/👎 to flag quality.


case RAW:
default:
if ("VARIANT".equals(logicalType.getTypeRoot().name())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the "VARIANT".equals(logicalType.getTypeRoot().name()) check is duplicated across HoodieSchemaConverter, ParquetSchemaConverter, AvroToRowDataConverters, and RowDataToAvroConverters. Could you extract a small helper (e.g. HoodieSchemaConverter.isVariantLogicalType(LogicalType)) so the intent is obvious in one place and future tweaks (e.g. once a hard dependency on Flink 2.1+ lands) only touch one site?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* already rejects shredded variants before a Flink type or converter is ever constructed,
* and Flink 2.1 itself only supports unshredded variants (FLIP-521).
*/
private static RowDataToAvroConverter createVariantConverter() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: for consistency with AvroToRowDataConverters.createVariantConverter() (which resolves the BinaryVariant constructor once at converter-creation time), could we resolve getMetadata/getValue Methods eagerly here too? The current per-instance lazy resolution adds a null check on every call and diverges from the sibling helper without an obvious reason.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! This PR adds Flink read/write support for unshredded Variant data, with native VARIANT type on Flink 2.1+ via reflection (and clear UnsupportedOperationException on pre-2.1). The variant detection (annotation + schema-hint) and reflection patterns look reasonable to me, and the withTableSchema() plumbing has been applied at all current HoodieRowDataParquetReader call sites. Prior review rounds have already flagged the key concerns I identified. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One minor consistency nit below; otherwise the code is clean and well-documented.

cc @yihua

byte[] value = (byte[]) valueMethod.invoke(object);

final GenericRecord record = new GenericData.Record(schema.toAvroSchema());
record.put("metadata", ByteBuffer.wrap(metadata));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: could you use HoodieSchema.Variant.VARIANT_METADATA_FIELD and VARIANT_VALUE_FIELD here (and on the next line) instead of the raw "metadata"/"value" strings? ParquetSchemaConverter.convertVariantToParquetType already references those constants, so using them here keeps the field names consistent across the codebase.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@kbuci kbuci requested a review from danny0405 May 7, 2026 22:57
@danny0405
Copy link
Copy Markdown
Contributor

hi, @kbuci I have made a PR to add a adapter class for variant type access: #18702, so that we can get rid of those Java reflections, it majorly covers the schema converters, I actually copied some logic from your PR, hope it helps.

@kbuci
Copy link
Copy Markdown
Contributor Author

kbuci commented May 8, 2026

hi, @kbuci I have made a PR to add a adapter class for variant type access: #18702, so that we can get rid of those Java reflections, it majorly covers the schema converters, I actually copied some logic from your PR, hope it helps.

@danny0405 Thanks for sharing! I guess then it would be better if I wait for that to land and rebase this PR off of that right?

@cshuo @danny0405 After taking a step back, I think we should first get consensus on how we want Flink in HUDI to always correctly infer the correct Flink type to use for Variant and Vector and Blob. Can we discuss on #18711 and I can work on and land initial PR(s) that does the needed "wiring" so that when we start tackling blob and vector we can use a similar approach as variant. Since although with Variant we are a bit "lucky" in the sense that parquet has an official variant annotation, it would be good to have a common solution and agree on what kind of "backwards compatibility" we want (like HUDI 1.3 Flink being able to read data written by Spark 4.0 HUDI 1.2 builds).

@kbuci kbuci force-pushed the flink-write-unshredded branch from 04028a1 to 5a83603 Compare May 11, 2026 17:48
Krishen Bhan and others added 3 commits May 11, 2026 10:57
…ers/readers using Flink's Variant type

Co-authored-by: Cursor <cursoragent@cursor.com>
@kbuci kbuci force-pushed the flink-write-unshredded branch from 5a83603 to 56adff0 Compare May 11, 2026 18:39
@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 37.93103% with 18 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.25%. Comparing base (e72b19b) to head (af3cac9).

Files with missing lines Patch % Lines
.../hudi/table/format/HoodieRowDataParquetReader.java 0.00% 9 Missing ⚠️
...pache/hudi/sink/clustering/ClusteringOperator.java 0.00% 5 Missing ⚠️
...i/table/format/HoodieRowDataFileReaderFactory.java 0.00% 2 Missing ⚠️
...he/hudi/table/action/commit/HoodieMergeHelper.java 50.00% 1 Missing ⚠️
...e/hudi/table/format/FlinkRowDataReaderContext.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18539      +/-   ##
============================================
- Coverage     68.07%   58.25%   -9.82%     
+ Complexity    29108    25250    -3858     
============================================
  Files          2528     2528              
  Lines        141510   141517       +7     
  Branches      17552    17549       -3     
============================================
- Hits          96329    82441   -13888     
- Misses        37255    51987   +14732     
+ Partials       7926     7089     -837     
Flag Coverage Δ
common-and-other-modules 23.58% <13.79%> (-20.82%) ⬇️
hadoop-mr-java-client 44.99% <66.66%> (-0.03%) ⬇️
spark-client-hadoop-common 48.34% <66.66%> (-0.01%) ⬇️
spark-java-tests 49.01% <75.00%> (+0.01%) ⬆️
spark-scala-tests 44.90% <50.00%> (+<0.01%) ⬆️
utilities 37.62% <50.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ava/org/apache/hudi/io/HoodieWriteMergeHandle.java 81.06% <100.00%> (ø)
...io/storage/row/parquet/ParquetSchemaConverter.java 68.57% <ø> (-2.86%) ⬇️
...va/org/apache/hudi/util/HoodieSchemaConverter.java 64.96% <ø> (-1.58%) ⬇️
...common/table/log/block/HoodieParquetDataBlock.java 93.47% <100.00%> (ø)
...pache/hudi/io/storage/HoodieFileReaderFactory.java 60.52% <100.00%> (+2.19%) ⬆️
...he/hudi/table/action/commit/HoodieMergeHelper.java 62.92% <50.00%> (ø)
...e/hudi/table/format/FlinkRowDataReaderContext.java 0.00% <0.00%> (-82.20%) ⬇️
...i/table/format/HoodieRowDataFileReaderFactory.java 0.00% <0.00%> (-100.00%) ⬇️
...pache/hudi/sink/clustering/ClusteringOperator.java 0.00% <0.00%> (-48.44%) ⬇️
.../hudi/table/format/HoodieRowDataParquetReader.java 0.00% <0.00%> (-62.97%) ⬇️

... and 469 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@kbuci
Copy link
Copy Markdown
Contributor Author

kbuci commented May 12, 2026

@danny0405 @cshuo Based on our discussion in #18711 I updated this PR
Note that I had to make some "Schema passing" changes just for the merge handle related logic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants