Skip to content

fix(vector): Preserve VECTOR/BLOB metadata on SQL INSERT path#18540

Merged
yihua merged 2 commits into
apache:masterfrom
voonhous:fix-vector-insert-into
Apr 27, 2026
Merged

fix(vector): Preserve VECTOR/BLOB metadata on SQL INSERT path#18540
yihua merged 2 commits into
apache:masterfrom
voonhous:fix-vector-insert-into

Conversation

@voonhous
Copy link
Copy Markdown
Member

@voonhous voonhous commented Apr 21, 2026

Describe the issue this Pull Request addresses

Closes: #18537
Closes: #18547
Closes: #18562

VECTOR and BLOB columns do not survive Spark SQL write-path rewrites on INSERT, UPDATE, and MERGE (Spark 3.3 through 4.0):

  • TableOutputResolver's Cast(...) and UPDATE's castIfNeeded strip the StructField metadata that marks custom Hudi types. Downstream conversion then yields the backing physical type (plain ARRAY for VECTOR, plain STRUCT for BLOB) and the schema-compat check fails with MISSING_UNION_BRANCH.
  • Resolved query schemas mark columns nullable (VALUES literals, Cast outputs), diverging from catalog nullability (primaryKey columns are non-null). The DataFrame's Avro writer schema then shows writer=["null","long"] vs reader="long" and the compat check fails with TYPE_MISMATCH.
  • BLOB specifically: RFC-100 declares type, reference.external_path, and reference.managed as strictly non-null, but Spark's type system cannot express "required when parent is present". The strict declarations collide with Cast (strips metadata), TableOutputResolver (rejects nested nullable-source to non-null-target on Spark 3.4 at analysis time), and Cast.canCast on later versions.

Summary and Changelog

Summary: VECTOR and BLOB columns now persist correctly and accept SQL INSERT, UPDATE, and MERGE on Spark 3.3 through 4.0. On-disk Avro remains RFC-100 compliant.

Changes:

  • Add HoodieSchemaConversionUtils.alignSchemaWithCatalog. Runs before the writer receives the DataFrame. Re-attaches custom-type metadata (hudi_type, for VECTOR and BLOB) that Spark's Cast rewrites strip, and optionally narrows source nullability (and nested containsNull / valueContainsNull) to match the catalog.
  • Gate nullability narrowing with alignNullability: Boolean. INSERT and UPDATE pass true (upstream TableOutputResolver / castIfNeeded have already asserted non-null). MERGE passes false because it hands raw pre-assignment source rows to this function (MERGE assignments run at write time inside ExpressionPayload), so narrowing there could relabel legitimately-null rows as non-null.
  • Project BLOB as nullable-everywhere at the Spark type layer in HoodieSparkSchemaConverters.toSqlType. The RFC-100 non-null invariants are enforced on the Avro write boundary through HoodieSchema.Blob#createBlob, independent of the Spark-side nullability.
  • Replace validateBlobStructure's exact-equality check with a purely structural recursive matchesStructure (names, types, position). Case-sensitivity-aware via SQLConf.caseSensitiveAnalysis.
  • Document on validateVariantStructure why VARIANT does not need the BLOB-style projection: Spark 3.x rejects VARIANT at resolution, and Spark 4.0+ exposes it as native VariantType populated via parse_json(...), not user-supplied named_struct. The non-null internal layout only appears via toSqlType, which produces the canonical shape the validator expects.
  • Wire alignNullability at all six call sites: InsertIntoHoodieTableCommand, UpdateHoodieTableCommand, and MergeIntoHoodieTableCommand under both hudi-spark3-common and hudi-spark4-common.

Tests.

  • Unit tests in TestHoodieSchemaConversionUtils:
    • alignSchemaWithCatalog narrows nullability end-to-end (nested struct, array-of-struct, map-of-struct) when alignNullability = true.
    • alignSchemaWithCatalog preserves source nullability at every nesting level when alignNullability = false, pinning the MERGE-path guard.
    • Custom-type metadata reattachment works regardless of the flag.
    • BLOB validator accepts mixed-case field names under case-insensitive analysis.
    • BLOB validator rejects wrong positional ordering even when field names match.
  • Integration tests cover BLOB INSERT in TestCreateTable, BLOB UPDATE in TestUpdateTable, and BLOB MERGE in TestMergeIntoTable, all with RFC-100-compliant sources.

No code copied from external sources.

Impact

  • No public API change. No config change.
  • User-facing: SQL INSERT / UPDATE / MERGE now work against tables with VECTOR or BLOB columns.
  • Performance: negligible. One extra StructType traversal per write command (small, bounded by catalog schema size).

Risk Level

Medium.

Scope is broad: the schema-alignment pass runs on every SQL INSERT / UPDATE / MERGE write, not just VECTOR / BLOB tables. BLOB's Spark catalog shape is flipped from strict non-null to nullable-everywhere (Avro side unchanged).

Mitigation:

  • Unit tests pin the alignNullability flag behavior in all nesting combinations, and pin the BLOB validator's case-insensitive matching and positional-ordering requirement.
  • Integration tests cover BLOB INSERT / UPDATE / MERGE end-to-end on all supported Spark versions.
  • Existing VECTOR tests continue to exercise the metadata reattachment path.
  • MERGE path explicitly passes alignNullability = false to avoid relabeling rows that may legitimately carry nulls.
  • RFC-100 on-disk contract is preserved: HoodieSchema.Blob#createBlob remains the canonical Avro source.

Documentation Update

None.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@github-actions github-actions Bot added the size:S PR with lines of changes in (10, 100] label Apr 21, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR restores hudi_type metadata (VECTOR/BLOB) that Spark's TableOutputResolver strips via Cast, so SQL INSERT INTO no longer fails the schema-compat check for custom logical types. One question worth considering inline around nested-field coverage. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. One naming suggestion on the new utility method — everything else looks clean.

@voonhous voonhous requested review from rahil-c and yihua April 21, 2026 14:30
@rahil-c
Copy link
Copy Markdown
Collaborator

rahil-c commented Apr 21, 2026

Screenshot 2026-04-21 at 11 50 28 AM Was reviewing this with claude and it brought up a point which is the same bug you found in the insert into path might exist for `UPDATE`,`DELETE`, `MERGE` commands. Wondering if you can take a look and see if we should batch them into this PR since i think changes would be small.

@voonhous
Copy link
Copy Markdown
Member Author

Added tests for UPDATE/DELETE/MERGE-INTO

voonhous added a commit to voonhous/hudi that referenced this pull request Apr 22, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
@voonhous voonhous force-pushed the fix-vector-insert-into branch from 34b7555 to f104c57 Compare April 22, 2026 05:38
voonhous added a commit to voonhous/hudi that referenced this pull request Apr 22, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
@voonhous voonhous force-pushed the fix-vector-insert-into branch from f104c57 to 69b7c97 Compare April 22, 2026 05:57
@github-actions github-actions Bot added size:M PR with lines of changes in (100, 300] and removed size:S PR with lines of changes in (10, 100] labels Apr 22, 2026
voonhous added a commit to voonhous/hudi that referenced this pull request Apr 22, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
@github-actions github-actions Bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Apr 22, 2026
@voonhous
Copy link
Copy Markdown
Member Author

The failing tests will be fixed in The failing tests is fixed in #18538, so please merge the aforementioned PR first.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the follow-up! This round addresses the earlier feedback by recursing into nested Struct/Array/Map<_, Struct> shapes, threading caseSensitive through the lookup, renaming appropriately, and extending coverage to UPDATE and MERGE paths (plus BLOB tests). I traced the INSERT/UPDATE/MERGE flows and the isAssignableTo nullability relaxation and didn't find new correctness issues beyond what's already been discussed. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One naming/DRY nit in the new reattachCustomTypeMetadata method; otherwise the code is clean and well-documented.

cc @yihua

voonhous added a commit to voonhous/hudi that referenced this pull request Apr 23, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
@voonhous voonhous force-pushed the fix-vector-insert-into branch from bd7672c to a5c777d Compare April 23, 2026 08:45
voonhous added a commit to voonhous/hudi that referenced this pull request Apr 23, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
@voonhous voonhous force-pushed the fix-vector-insert-into branch from 4729eef to 266cbd9 Compare April 23, 2026 14:23
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the follow-up on metadata preservation! The new alignSchemaWithCatalog helper cleanly centralizes the fix across INSERT/UPDATE/MERGE paths, and the added tests cover the main scenarios. One concern worth double-checking in the inline comments around the nullability override path, especially in the MERGE source context. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and simplification suggestions in HoodieSchemaConversionUtils; everything else looks clean.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

CodeRabbit Walkthrough: This PR introduces Lance data format support, Spark SQL blob reading capabilities, schema conversion enhancements for custom types (VARIANT, BLOB, VECTOR), Flink continuous sorting, improved rollback/archival logic, and dependency upgrades. It includes new Docker Compose configurations, refactors utility classes, and adds extensive test coverage.

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Client as Spark Query Client
    participant Optimizer as Spark Optimizer<br/>(ReadBlobRule)
    participant Planner as Spark Planner<br/>(BatchedBlobReaderStrategy)
    participant Executor as BatchedBlobReadExec
    participant Reader as BatchedBlobReader
    participant Storage as HoodieStorage

    Client->>Optimizer: Execute SQL with read_blob()
    Optimizer->>Optimizer: Pattern match & extract blob columns
    Optimizer->>Optimizer: Wrap child with BatchedBlobRead nodes
    Optimizer->>Optimizer: Rewrite read_blob() to resolved attributes
    Optimizer->>Planner: Pass logical plan
    
    Planner->>Planner: Match BatchedBlobRead nodes
    Planner->>Executor: Create physical execution nodes
    
    Executor->>Executor: child.execute() → child RDD
    Executor->>Executor: Broadcast StorageConfiguration
    Executor->>Reader: processRDD(rdd, schema, config, maxGapBytes, ...)
    
    Reader->>Reader: Process each partition's rows
    Reader->>Reader: Extract inline vs out-of-line blob data
    Reader->>Reader: Group out-of-line requests by file/offset
    Reader->>Reader: Merge overlapping ranges (maxGapBytes threshold)
    Reader->>Storage: Perform batched read for merged ranges
    Storage->>Storage: Single seek/read per merged range
    Reader->>Reader: Map merged data back to individual rows
    Reader->>Executor: Return enriched RDD[InternalRow]
    
    Executor->>Client: Return collected results with blob data
Loading

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Writer as DML Command<br/>(Insert/Merge/Update)
    participant SchemaUtils as HoodieSchemaConversionUtils
    participant Catalog as Catalog Table
    participant Avro as Avro Converter
    participant HoodieWriter as HoodieSparkSqlWriter

    Writer->>Catalog: Read target catalog schema
    Writer->>SchemaUtils: alignSchemaWithCatalog(sourceSchema, catalogSchema)
    SchemaUtils->>SchemaUtils: Match fields by name
    SchemaUtils->>SchemaUtils: Recursively align nested types
    SchemaUtils->>SchemaUtils: Restore custom logical-type metadata
    SchemaUtils->>SchemaUtils: Enforce catalog's nullability
    SchemaUtils->>Writer: Return enriched schema
    
    Writer->>Writer: Create DataFrame with enriched schema
    Writer->>Avro: Convert enriched schema to Avro
    Avro->>Avro: Detect VARIANT type metadata
    Avro->>Avro: Map to Variant schema with metadata/value fields
    Writer->>HoodieWriter: write(enrichedDF, avroSchema, ...)
    HoodieWriter->>HoodieWriter: Serialize records with proper types
    HoodieWriter->>Writer: Return commit result
Loading

CodeRabbit: hudi-agent#15 (review)

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for addressing the feedback! The alignNullability flag cleanly resolves the MERGE-path nullability concern by only narrowing for INSERT/UPDATE where Spark's resolver guarantees no-null inputs, and the lookupKey/alignedField readability nits are also addressed. The new relaxCustomTypeSubtreeNullability helper plus the case-sensitive validator fixes look like sensible additions. One interaction worth checking in the inline comment around combining the relax-pass with alignNullability=true on BLOB/VARIANT subtrees. Please take a look at the inline comment, and this should be ready for a Hudi committer or PMC member to take it from here.

query.schema,
catalogTable.tableSchemaWithoutMetaFields,
sparkSession.sessionState.conf.caseSensitiveAnalysis,
alignNullability = true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 For INSERT/UPDATE, resolveQueryOutputColumns is now called with the relaxed schema, so Spark's TableOutputResolver won't insert AssertNotNull for BLOB/VARIANT leaf fields (it sees them as nullable). Then alignSchemaWithCatalog(..., alignNullability=true) flips those same leaves back to non-null on the DataFrame. The Javadoc on alignSchemaWithCatalog says callers must guarantee the rows carry no nulls for non-null catalog columns — but that precondition is precisely what the relax-pass undoes for these subtrees. Have you considered what happens for something like named_struct('external_path', cast(null as string), 'managed', true) where an inner BLOB leaf is legitimately null? It looks like the schema would be relabeled non-null and the actual null would reach Avro encoding without an upfront check, potentially surfacing as a confusing runtime AvroRuntimeException or silently writing bad data. Worth either threading the relaxation through alignNullability (i.e. also skip alignment for BLOB/VARIANT leaves and rely on Spark's runtime AssertNotNull on 3.5+ / explicit validation on 3.4), or adding an explicit null-assertion step before the write.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

@voonhous
Copy link
Copy Markdown
Member Author

voonhous commented Apr 23, 2026

Note to self, in a bug fix, we should always be fixing the specific issue raised.

If someone says: have you tested this and that, pushback and use this PR as an example of what will happen if we try considering other cases to cover. We can add a test case, but if the test-case surfaces a bug. Create a new issue and file a separate PR to fix it.

Do not chase for perfection with a PR. PRs are meant to be incremental. As can be seen with this PR, by adding in the other tests... Look what it has become.

  1. It killed the momentum.
  2. Issues are no longer isolated.
  3. Instead of fixing 1 issue and we move on to another, the issue that is fixed is blocked by 2 other issues that is sitting of this PR.

So... Whoever is always trying to add scope to a PR... Point them to this PR as an example of why:

image

is the correct answer.

@voonhous voonhous force-pushed the fix-vector-insert-into branch from 839ee51 to 288aad1 Compare April 23, 2026 20:13
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for iterating on this! The latest revision layers the alignNullability flag cleanly, uses positional structural matching with case-insensitive name equality for BLOB validation, and projects BLOB as nullable-everywhere at the Spark type layer while keeping RFC-100 invariants enforced at the Avro boundary. Prior rounds have already surfaced the main concerns (nested custom-type metadata, case sensitivity, TableOutputResolver interaction) and they appear addressed. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One naming nit on a private helper whose name undersells what it does.

cc @yihua

private def nameEquals(a: String, b: String, caseSensitive: Boolean): Boolean =
if (caseSensitive) a == b else a.equalsIgnoreCase(b)

private def withAllFieldsNullable(dataType: DataType): DataType = dataType match {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: withAllFieldsNullable implies it only touches struct fields, but the method also sets containsNull = true on arrays and valueContainsNull = true on maps — a future reader scanning the name alone might not expect those side effects. Something like makeFullyNullable or forceNullable would describe the full scope more accurately.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

voonhous added a commit to voonhous/hudi that referenced this pull request Apr 24, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
voonhous added a commit to voonhous/hudi that referenced this pull request Apr 24, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
voonhous added a commit to voonhous/hudi that referenced this pull request Apr 24, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the careful iteration on this! The round-6 changes look coherent — MERGE consistently uses enrichedSourceDF for both PAYLOAD_RECORD_AVRO_SCHEMA and the writer call, INSERT/UPDATE align schema with alignNullability=true (relying on TableOutputResolver/castIfNeeded for null assertions), and the BLOB nullable-everywhere projection is paired with an updated structural validator. No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. One small naming inconsistency noted below; the new alignSchemaWithCatalog/alignField logic and supporting helpers are otherwise clean and well-documented.

cc @yihua

val fieldsByName = structType.fields.map(f => f.name -> f).toMap
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
val key: String => String =
if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: key reads as a value (like a map key) rather than a normalizer function, and it's inconsistent with lookupKey used for the same pattern a few dozen lines up in alignSchemaWithCatalog. Could you rename it to lookupKey (or normalizeKey) here for consistency?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

CodeRabbit Walkthrough: Adds Lance file-format support, batched out-of-line blob reads with a SQL function read_blob(), VECTOR/VARIANT/BLOB schema handling and alignment, Flink continuous-sort buffer, rollback/archival control changes, various utility refactors, many tests, new Docker Compose files, and dependency upgrades (Spark/Lance).

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Client as Spark SQL Client
    participant Analyzer as Analyzer/Optimizer<br/>(ReadBlobRule)
    participant Planner as Planner<br/>(BatchedBlobReaderStrategy)
    participant Exec as BatchedBlobReadExec
    participant Reader as BatchedBlobReader
    participant Storage as HoodieStorage

    Client->>Analyzer: Submit query with read_blob()
    Analyzer->>Analyzer: Detect read_blob(), extract blob columns
    Analyzer->>Analyzer: Wrap child plan with BatchedBlobRead nodes
    Analyzer->>Planner: Provide logical plan
    Planner->>Planner: Match BatchedBlobRead, create BatchedBlobReadExec
    Exec->>Exec: child.execute() → child RDD
    Exec->>Exec: Broadcast StorageConfiguration
    Exec->>Reader: processRDD(rdd, schema, config, maxGapBytes, ...)
    Reader->>Reader: Buffer partition rows, classify inline vs out-of-line
    Reader->>Reader: Group by file, sort/merge ranges (maxGapBytes)
    Reader->>Storage: Single batched read per merged range
    Storage->>Reader: Return merged bytes
    Reader->>Exec: Map slices back to rows
    Exec->>Client: Return results with blob bytes
Loading

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant DML as DML Command (INSERT/MERGE/UPDATE)
    participant Catalog as Catalog Table
    participant SchemaUtils as HoodieSchemaConversionUtils
    participant Writer as HoodieSparkSqlWriter
    participant Avro as Avro Conversion

    DML->>Catalog: Read target table schema (without meta fields)
    DML->>SchemaUtils: alignSchemaWithCatalog(sourceSchema, catalogSchema)
    SchemaUtils->>SchemaUtils: Match fields, recursively align nested types
    SchemaUtils->>DML: Return enriched/enforced schema (nullability aligned)
    DML->>DML: internalCreateDataFrame(rdd/result, enrichedSchema)
    DML->>Avro: Convert enriched schema to Avro payload (handles VARIANT/BLOB)
    Avro->>Writer: Provide Avro payload schema
    Writer->>Writer: Write records using enriched schema and Avro payload
    Writer->>DML: Commit result
Loading

CodeRabbit: hudi-agent#15 (review)

voonhous added a commit to voonhous/hudi that referenced this pull request Apr 24, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
voonhous added a commit to voonhous/hudi that referenced this pull request Apr 25, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
voonhous added a commit to voonhous/hudi that referenced this pull request Apr 25, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
}
}

/**
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous you think we can make this java doc comment more concise if possibe?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to use multi-line selects next time.

image

This is not showing everything...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
val metadata = metadataBuilder.build()
StructField(f.name(), schemaType.dataType, schemaType.nullable, metadata)
// For BLOB: force nullable-everywhere at the Spark type layer. The RFC-100
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua @voonhous I really feel like we should revisit the BLOB schema definition around what fields should be non-nullable, as this seems to cause complexities in this and others PRs as well as UX of having to provide values for sub fields that may not be needed during write time.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua Should we file some github issue for this to maybe relax this constraint?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, KIV this please, i have encountered quite a bit of errors around this.This is not a 1-day ordeal. Let's take more time to look into this after 1.2 as i've found that Spark3.3,3.4,3.5 and 4.0 have different schema validation/check flows.

And i have encountered quite abit of errors around this error which i had to fix and patch.

Agree that we should file an issue here and find the best way forward.

@voonhous voonhous force-pushed the fix-vector-insert-into branch from c9a5ccd to 4a72e7f Compare April 26, 2026 06:00
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! The PR re-attaches custom-type metadata (and optionally narrows nullability) before the writer sees the DataFrame, with INSERT/UPDATE narrowing and MERGE preserving source nullability. A couple of edge cases worth double-checking in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here.

sourceDF.schema,
hoodieCatalogTable.tableSchemaWithoutMetaFields,
sparkSession.sessionState.conf.caseSensitiveAnalysis,
alignNullability = false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 enrichedSourceDF is built here at the top of executeUpsert, but the diff only updates two later references (enrichedSourceDF.schema at ~497 and enrichedSourceDF at ~508). Could you double-check that no other sourceDF reference in the unchanged portion of this method (e.g. building joined output, projections, schema bookkeeping) is still seeing the un-enriched schema? A mixed view would leave hudi_type metadata only partially threaded through. Same applies to the spark4-common copy.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

target: StructField,
caseSensitive: Boolean,
alignNullability: Boolean): StructField = {
val alignedNullable = if (alignNullability) target.nullable else source.nullable
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 For VECTOR (ArrayType[FloatType]), neither element is a StructType, so this branch is skipped and only top-level nullability is adjusted; source's containsNull is preserved. In the MERGE path (alignNullability=false) with literals like array(cast(0.9 as float), ...) whose containsNull may be true, the metadata is re-attached on top of an array-of-nullable-floats. Have you verified that convertStructTypeToHoodieSchema emits the correct non-null VECTOR element schema in that case (rather than e.g. failing schema-compat with a nullable element union)?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

CodeRabbit Walkthrough: Adds Lance file-format support, VARIANT/VECTOR/BLOB schema handling, batched out-of-line blob I/O with a SQL read_blob() function and planner strategy, Flink continuous-sort buffer, rollback/archival control changes, numerous utilities/tests, Docker Compose stacks, and dependency upgrades.

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Client as Spark SQL Client
    participant Analyzer as Analyzer/Optimizer<br/>(ReadBlobRule)
    participant Planner as Planner<br/>(BatchedBlobReaderStrategy)
    participant Exec as BatchedBlobReadExec
    participant Reader as BatchedBlobReader
    participant Storage as HoodieStorage

    Client->>Analyzer: Submit query with read_blob()
    Analyzer->>Analyzer: Detect read_blob(), extract blob columns
    Analyzer->>Analyzer: Wrap child plan with BatchedBlobRead nodes
    Analyzer->>Planner: Provide logical plan
    Planner->>Planner: Match BatchedBlobRead, create BatchedBlobReadExec
    Exec->>Exec: child.execute() → child RDD
    Exec->>Exec: Broadcast StorageConfiguration
    Exec->>Reader: processRDD(rdd, schema, config, maxGapBytes, ...)
    Reader->>Reader: Buffer partition rows, classify inline vs out-of-line
    Reader->>Reader: Group by file, sort/merge ranges (maxGapBytes)
    Reader->>Storage: Single batched read per merged range
    Storage->>Reader: Return merged bytes
    Reader->>Exec: Map slices back to rows
    Exec->>Client: Return results with blob bytes
Loading

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant DML as DML Command (INSERT/MERGE/UPDATE)
    participant Catalog as Catalog Table
    participant SchemaUtils as HoodieSchemaConversionUtils
    participant Writer as HoodieSparkSqlWriter
    participant Avro as Avro Conversion

    DML->>Catalog: Read target table schema (without meta fields)
    DML->>SchemaUtils: alignSchemaWithCatalog(sourceSchema, catalogSchema)
    SchemaUtils->>SchemaUtils: Match fields, recursively align nested types
    SchemaUtils->>DML: Return enriched/enforced schema (nullability aligned)
    DML->>DML: internalCreateDataFrame(rdd/result, enrichedSchema)
    DML->>Avro: Convert enriched schema to Avro payload (handles VARIANT/BLOB)
    Avro->>Writer: Provide Avro payload schema
    Writer->>Writer: Write records using enriched schema and Avro payload
    Writer->>DML: Commit result
Loading

CodeRabbit: hudi-agent#15 (review)

// schema stays RFC-100 compliant because the write path goes through
// HoodieSchema.Blob.createBlob(), which uses the canonical fields verbatim.
if (isBlob) {
StructField(f.name(), withAllFieldsNullable(schemaType.dataType), nullable = true, metadata)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous @yihua Does the change for now mean that then users can provide something like this, where user can pass null type?

  ▎ CREATE TABLE blob_t (id BIGINT, payload BLOB) USING hudi                          
  ▎   TBLPROPERTIES (primaryKey = 'id');
  ▎                                                                                   
  ▎ -- type=null: Spark side BLOB.type is now nullable, so analysis passes.           
  ▎ -- Avro side BLOB.type is the non-null enum {INLINE, OUT_OF_LINE} → fails at      
  ▎ encode.                                                                           
  ▎ INSERT INTO blob_t VALUES (
  ▎   1,                                                                              
  ▎   named_struct(
  ▎     'type', cast(null as string),
  ▎     'data', cast(X'010203' as binary),
  ▎     'reference', cast(null as                                                     
  ▎ struct<external_path:string,offset:bigint,length:bigint,managed:boolean>)         
  ▎   )  

Copy link
Copy Markdown
Member Author

@voonhous voonhous Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your example should fail during the write stage IIUC as the write side create_blob by Hoodie schema will see a TYPE as null as invalid. i.e. writer side will fail.

Followup task with more details on why this is written this way: #18601

voonhous added a commit to voonhous/hudi that referenced this pull request Apr 26, 2026
Spark's TableOutputResolver aligns INSERT outputs by wrapping them in
Cast(...), which drops the StructField metadata that marks Hudi's custom
logical types (VECTOR, BLOB). As a result, the source schema handed to
HoodieSparkSqlWriter carried a plain ARRAY/BINARY instead of the original
custom type, and the schema-compat check against the persisted
tableCreateSchema failed with MISSING_UNION_BRANCH.

Re-attach the TYPE_METADATA_FIELD from the catalog table's schema onto
the query's StructType by field name at the InsertIntoHoodieTableCommand
choke point, right before constructing the DataFrame.

The data already matches the backing physical layout, so this is purely
a schema-metadata fix-up.

Addresses review on apache#18540:
- Rename `enrichWithCustomTypeMetadata` to `reattachCustomTypeMetadata` and
  make it recurse into nested StructType, ArrayType<Struct>, and
  MapType<_, Struct> so nested BLOB columns keep their `hudi_type` marker
  after Cast.
- Use case-sensitivity-aware field lookup driven by
  `spark.sql.caseSensitiveAnalysis`.
- Apply the same re-attach on the UPDATE and MERGE write paths: UPDATE's
  `castIfNeeded` and MERGE's source-side Avro conversion both strip custom-
  type metadata without this step.
- Add INSERT tests for top-level BLOB and BLOB nested in a struct, an
  UPDATE test on a VECTOR column, and a MERGE test exercising both
  MATCHED UPDATE and NOT MATCHED INSERT on a VECTOR column.
@rahil-c
Copy link
Copy Markdown
Collaborator

rahil-c commented Apr 26, 2026

@yihua to take one pass.

…ERGE

Ensure that VECTOR and BLOB columns survive Spark's SQL write-path
rewrites on all three DML verbs (INSERT, UPDATE, MERGE) and on Spark 3.3
through 4.0.

Root-cause fix for BLOB: project BLOB as nullable-everywhere at the Spark
type layer (HoodieSparkSchemaConverters.toSqlType). The RFC-100 canonical
schema declares `type`, `reference.external_path`, and `reference.managed`
as strictly non-null, but Spark's type system can't express "required when
parent is present", so the strict declarations collided with every
write-path rewrite (Cast strips metadata, TableOutputResolver rejects
nested nullable-source -> non-null-target on Spark 3.4, Cast.canCast
likewise). On-disk physical schema stays RFC-100-compliant because the
write path goes through HoodieSchema.Blob#createBlob, which uses the
canonical non-null fields verbatim independent of the Spark-side
nullability.

Schema-alignment pass before handing DataFrames to the writer:
HoodieSchemaConversionUtils.alignSchemaWithCatalog re-attaches custom-type
metadata (hudi_type) that Cast/castIfNeeded strip, and optionally narrows
source nullability to match the catalog. Nullability narrowing is gated by
an `alignNullability` flag because MERGE passes raw pre-assignment source
rows (assignments run at write time inside ExpressionPayload); narrowing
there could relabel legitimately-null rows as non-null. INSERT/UPDATE pass
true (upstream TableOutputResolver / castIfNeeded have already asserted
non-null); MERGE passes false.

Validators made purely structural: validateBlobStructure compares field
names and types recursively (matchesStructure) ignoring nullability, and
is case-sensitivity-aware (SQLConf.caseSensitiveAnalysis) so mixed-case
source structs under Spark's default case-insensitive analysis are
accepted. validateVariantStructure documented: VARIANT is not projected
nullable-everywhere because its user-facing path is gated (Spark 3.x
rejects VARIANT at resolution; Spark 4.0+ uses native VariantType via
parse_json, never user-supplied named_struct).

Tests:
- Unit tests in TestHoodieSchemaConversionUtils pin the alignNullability
  flag behavior on nested structs, array-of-struct, and map-of-struct;
  pin metadata reattachment independence from the flag; pin the BLOB
  validator's case-insensitive matching and positional ordering
  requirement.
- Integration tests in TestCreateTable, TestUpdateTable, and
  TestMergeIntoTable cover BLOB INSERT / UPDATE / MERGE end-to-end with
  RFC-100-compliant sources.
- TestSchemaConverters.validateBlobFields updated to match the
  nullable-everywhere projection on the Spark side.

Touched Spark shim commands (InsertInto/Update/MergeInto) under both
hudi-spark3-common and hudi-spark4-common to pass the alignNullability
flag at each call site.
@voonhous voonhous force-pushed the fix-vector-insert-into branch from 4a72e7f to 48694ab Compare April 26, 2026 16:57
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the iterations on this! The PR re-attaches custom-type (hudi_type) metadata and conditionally aligns nullability between the resolved query schema and the catalog schema for INSERT/UPDATE/MERGE, plus relaxes BLOB to nullable-everywhere at the Spark type layer (with RFC-100 invariants enforced at the physical schema boundary). No new issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review.

cc @yihua

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

CodeRabbit Walkthrough: Adds Lance file-format support, VARIANT/VECTOR/BLOB schema types, batched out-of-line blob I/O with a SQL read_blob() function and planner strategy, Flink continuous-sort buffer, rollback/archival config changes, many reader/writer fixes, new Docker Compose demos, and dependency upgrades.

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Client as Spark SQL Client
    participant Analyzer as Analyzer/Optimizer<br/>(ReadBlobRule)
    participant Planner as Planner<br/>(BatchedBlobReaderStrategy)
    participant Exec as BatchedBlobReadExec
    participant Reader as BatchedBlobReader
    participant Storage as HoodieStorage

    Client->>Analyzer: Submit query with read_blob()
    Analyzer->>Analyzer: Detect read_blob(), extract blob columns
    Analyzer->>Analyzer: Wrap child plan with BatchedBlobRead nodes
    Analyzer->>Planner: Provide logical plan
    Planner->>Planner: Match BatchedBlobRead, create BatchedBlobReadExec
    Exec->>Exec: child.execute() → child RDD
    Exec->>Exec: Broadcast StorageConfiguration
    Exec->>Reader: processRDD(rdd, schema, config, maxGapBytes, ...)
    Reader->>Reader: Buffer partition rows, classify inline vs out-of-line
    Reader->>Reader: Group by file, sort/merge ranges (maxGapBytes)
    Reader->>Storage: Single batched read per merged range
    Storage->>Reader: Return merged bytes
    Reader->>Exec: Map slices back to rows
    Exec->>Client: Return results with blob bytes
Loading

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant DML as DML Command (INSERT/MERGE/UPDATE)
    participant Catalog as Catalog Table
    participant SchemaUtils as HoodieSchemaConversionUtils
    participant Writer as HoodieSparkSqlWriter
    participant Avro as Avro Conversion

    DML->>Catalog: Read target table schema (without meta fields)
    DML->>SchemaUtils: alignSchemaWithCatalog(sourceSchema, catalogSchema)
    SchemaUtils->>SchemaUtils: Match fields, recursively align nested types
    SchemaUtils->>DML: Return enriched schema with aligned nullability
    DML->>DML: internalCreateDataFrame(result, enrichedSchema)
    DML->>Avro: Convert enriched schema to Avro payload (handles VARIANT/BLOB)
    Avro->>Writer: Provide Avro payload schema
    Writer->>Writer: Write records using enriched schema and Avro payload
    Writer->>DML: Commit result
Loading

CodeRabbit: hudi-agent#15 (review)

fieldsByName.get(HoodieSchema.Variant.VARIANT_METADATA_FIELD).exists(f => f.dataType == BinaryType && !f.nullable) &&
fieldsByName.get(HoodieSchema.Variant.VARIANT_VALUE_FIELD).exists(f => f.dataType == BinaryType && !f.nullable)
fieldsByName.get(key(HoodieSchema.Variant.VARIANT_METADATA_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable) &&
fieldsByName.get(key(HoodieSchema.Variant.VARIANT_VALUE_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject reordered VARIANT physical structs.

This validator only checks for two named binary fields via a map, so struct<value: binary, metadata: binary> currently passes. The documented canonical layout is struct<metadata: binary, value: binary>, and Spark rows are positional, so accepting reordered fields can corrupt the metadata/value swap on write.

🔧 Suggested fix
   private def validateVariantStructure(structType: StructType): Unit = {
     val caseSensitive = SQLConf.get.caseSensitiveAnalysis
-    val key: String => String =
-      if (caseSensitive) identity else (_: String).toLowerCase(Locale.ROOT)
-    val fieldsByName = structType.fields.map(f => key(f.name) -> f).toMap
-    val ok = structType.length == 2 &&
-      fieldsByName.get(key(HoodieSchema.Variant.VARIANT_METADATA_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable) &&
-      fieldsByName.get(key(HoodieSchema.Variant.VARIANT_VALUE_FIELD)).exists(f => f.dataType == BinaryType && !f.nullable)
+    val expectedFields = expectedVariantStructType.fields
+    val ok = structType.length == expectedFields.length &&
+      structType.fields.zip(expectedFields).forall { case (actual, expected) =>
+        nameEquals(actual.name, expected.name, caseSensitive) &&
+          actual.dataType == expected.dataType &&
+          actual.nullable == expected.nullable
+      }
     if (!ok) {
       throw new IllegalArgumentException(
         s"""Invalid variant schema structure. Expected schema:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala`
around lines 434 - 441, The validator validateVariantStructure currently accepts
any two-field binary struct by name lookup, which allows reordered physical
structs like struct<value: binary, metadata: binary>; update
validateVariantStructure to enforce the canonical positional layout by checking
structType.fields length == 2 and then validating that the first field (apply
case-sensitive keying using SQLConf.get.caseSensitiveAnalysis and the key
function) has name equal to HoodieSchema.Variant.VARIANT_METADATA_FIELD and is
BinaryType non-nullable, and the second field has name equal to
HoodieSchema.Variant.VARIANT_VALUE_FIELD and is BinaryType non-nullable (do not
rely on a fieldsByName map for this check).

CodeRabbit (original) (source:comment#3143071733)

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall to fix the SQL experience for VECTOR and BLOB support.

@yihua yihua merged commit fdf27db into apache:master Apr 27, 2026
56 checks passed
@voonhous voonhous deleted the fix-vector-insert-into branch April 27, 2026 10:43
@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 70.68966% with 17 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.90%. Comparing base (edaa168) to head (48694ab).
⚠️ Report is 18 commits behind head on master.

Files with missing lines Patch % Lines
...e/spark/sql/avro/HoodieSparkSchemaConverters.scala 53.57% 2 Missing and 11 partials ⚠️
.../org/apache/hudi/HoodieSchemaConversionUtils.scala 86.66% 0 Missing and 4 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18540      +/-   ##
============================================
+ Coverage     68.88%   68.90%   +0.01%     
- Complexity    28532    28580      +48     
============================================
  Files          2479     2482       +3     
  Lines        136810   137105     +295     
  Branches      16660    16737      +77     
============================================
+ Hits          94244    94467     +223     
- Misses        34982    35018      +36     
- Partials       7584     7620      +36     
Flag Coverage Δ
common-and-other-modules 44.36% <1.72%> (-0.08%) ⬇️
hadoop-mr-java-client 44.83% <ø> (+0.08%) ⬆️
spark-client-hadoop-common 48.42% <1.72%> (-0.05%) ⬇️
spark-java-tests 49.51% <43.10%> (+0.03%) ⬆️
spark-scala-tests 45.32% <70.68%> (+0.04%) ⬆️
utilities 37.91% <1.72%> (-0.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../org/apache/hudi/HoodieSchemaConversionUtils.scala 75.43% <86.66%> (+4.01%) ⬆️
...e/spark/sql/avro/HoodieSparkSchemaConverters.scala 76.88% <53.57%> (-2.43%) ⬇️

... and 30 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MERGE-INTO/UPDATE blob write error Blob Write error Vector Schema incompatibility during write

6 participants