feat: add variant type adapter for Flink#18702
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR introduces a DataTypeAdapter indirection for Flink's Variant type, wires it through the Avro/RowData/Parquet/Schema converters, and provides a real implementation for Flink 2.1.x with stub-throwing variants for older Flink versions. The shredded-variant rejection path looks consistent across HoodieSchemaConverter and ParquetSchemaConverter, and the Avro field order assumption (metadata at index 0, value at index 1) matches HoodieSchema.Variant's contract. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of spots use raw positional indices or string literals where the named HoodieSchema.Variant field constants (already used in ParquetSchemaConverter in this same PR) would make the intent clearer.
cc @yihua
| * GenericRecord carrying metadata/value binary fields and produces a Flink | ||
| * {@code BinaryVariant}. | ||
| */ | ||
| private static AvroToRowDataConverter createVariantConverter() { |
There was a problem hiding this comment.
🤖 nit: record.get(0) and record.get(1) are opaque — could you cast to GenericRecord and use HoodieSchema.Variant.VARIANT_METADATA_FIELD / VARIANT_VALUE_FIELD for named access? The same constants are already used in ParquetSchemaConverter in this PR.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| return new RowDataToAvroConverter() { | ||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| @Override |
There was a problem hiding this comment.
🤖 nit: the string literals "metadata" and "value" here could use HoodieSchema.Variant.VARIANT_METADATA_FIELD / VARIANT_VALUE_FIELD to stay consistent with how ParquetSchemaConverter references the same fields in this PR.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| try { | ||
| return DataTypeAdapter.createVariant(value, metadata); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to create Flink BinaryVariant via reflection.", e); |
There was a problem hiding this comment.
This is a direct constructor call, not reflection.
| * Uses class-name matching so this compiles against parquet-java versions that predate the | ||
| * {@code VariantLogicalTypeAnnotation} class (< 1.15.2). | ||
| */ | ||
| private static boolean hasVariantAnnotation(LogicalTypeAnnotation logicalType) { |
There was a problem hiding this comment.
Since the writer does not attach the variant logical annotation (todo in line 227), a Flink-written variant column will read back as a plain row and HoodieSchemaConverter.convertToSchema() will lose the VARIANT type?
There was a problem hiding this comment.
There was a problem hiding this comment.
Yeah based on my understanding, in order to make sure a Flink-written variant field (or any variant field written without the parquet variant annotation — which is all current Hudi writes since the code has a TODO to add it like you pointed out) is re-read as variant in Flink, we would need a subsequent PR like #18539 that add HoodieSchema for detection or we need to actually attach the annotation at write time.
Maybe we can add a TODO comment here, just so that readers know this state of Flink variant integration (in case for some reason we don't fully "fix" this by 1.3)?
There was a problem hiding this comment.
yeah, add a TODO to add variant on writer side first.
| DataTypes.FIELD("metadata", DataTypes.BYTES().notNull()), | ||
| DataTypes.FIELD("value", DataTypes.BYTES().notNull()) | ||
| ).notNull(); | ||
| if (schema instanceof HoodieSchema.Variant |
There was a problem hiding this comment.
can be removed, since line 569 ensures the type is VARIANT.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18702 +/- ##
============================================
- Coverage 68.14% 68.13% -0.02%
+ Complexity 29077 29074 -3
============================================
Files 2522 2522
Lines 141177 141207 +30
Branches 17514 17522 +8
============================================
- Hits 96208 96205 -3
- Misses 37061 37092 +31
- Partials 7908 7910 +2
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
add a new utils class
DataTypeAdapterthat can work as an adapter for variant type related APIs.Summary and Changelog
DataTypeAdapter;Impact
none
Risk Level
low
Documentation Update
might need to update the site for variant data type support for Flink
Contributor's checklist