feat(blob): followup fixes for blob reader#18538
Conversation
Covers steps 2, 3, and 4 of VC's 1.2 BLOB release plan:
2. Wire INLINE writes - testEndToEndInline drives SparkRDDWriteClient
insert+upsert with INLINE Avro records (data=bytes, reference=null)
against a Parquet-backed Hudi table.
3. read_blob() reads inline values - testInlineBlobRoundTrip runs
SELECT read_blob(col) over an in-memory INLINE DataFrame and
verifies each payload round-trips byte-for-byte.
4. Mixed datasets - testMixedInlineAndOutOfLine builds 10 rows
alternating INLINE and OUT_OF_LINE, pointing the range rows at
one shared file and asserts the returned sequence matches input
order (stronger than TestBatchedBlobReader.testMixedBlobTypes,
which orders by record_id before asserting).
testInlineOnHudiBackedTable mirrors the cherry-picked
testReadBlobOnHudiBackedTable (OUT_OF_LINE) but writes INLINE rows
via spark.write.format("hudi") + bulk_insert, reads back through
spark.read.format("hudi"), and resolves via read_blob() - exercises
the full write -> HoodieFileIndex-backed read -> SQL path that the
cherry-picked BatchedBlobReadExec serialization fix unblocks.
No production code changes. BatchedBlobReader already dispatches
INLINE rows into its inline branch (collectBatch field-0 check)
and preserves row order via sortBy(index) in processNextBatch.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Fixes UnsupportedOperationException thrown during deduceWriterSchema when converting a HoodieSchema containing a BLOB primitive into InternalSchema. BLOB is a fixed-shape RECORD (type / data / reference) that must round-trip through InternalSchema without losing its `blob` logical-type annotation. Sentinel negative field IDs (-10, -11, -12) tag the three children so the reverse path can detect the shape and rebuild via HoodieSchema.createBlob(). Cherry-picked from blob/lance-blob-support (4c5ed48); only the InternalSchemaConverter change is taken — Lance-specific changes are dropped. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Spark's upstream iterator may return the same InternalRow instance across next() calls with the underlying buffer mutated in place. collectBatch() was stashing the reference into RowInfo.originalRow, which made every buffered row alias the last-seen row once the batch was emitted — inline bytes were captured correctly (getBinary copies), but non-blob columns like `id` collapsed to the tail value. Add a `copy` method on the RowAccessor type class (identity for Row, InternalRow.copy() for the internal variant) and invoke it when entering the batch. Manifests under Spark SQL via testMixedInlineAndOutOfLine and testEndToEndInline; existing OUT_OF_LINE-only tests didn't assert id/value on per-row basis so the bug was latent. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the follow-up fixes! This PR closes the INLINE write / SQL read / mixed-blob gaps from the original BLOB PR and fixes two latent correctness bugs in the OOL path — the InternalRow buffer-aliasing bug in collectBatch and the HoodieFileIndex serialization leak via the logical-plan field on BatchedBlobReadExec. Both root-causes and fixes look well-reasoned, the InternalSchemaConverter round-trip uses sentinel negative field IDs plus name checks that should avoid false positives on user schemas, and the new tests cover the previously unexercised paths. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming nits below.
cc @yihua
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: This PR introduces blob streaming support for Spark SQL with a read_blob() scalar function, adds a continuous-sort write buffering option for Flink, migrates MapUtils to CollectionUtils across the codebase, adds VECTOR type DDL support, and includes lock provider/test utility robustness improvements.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant User as User/App
participant Parser as Spark Parser
participant Rule as ReadBlobRule
participant Optimizer as Optimizer
participant Planner as Planner
participant Executor as BatchedBlobReadExec
participant Reader as BatchedBlobReader
participant Storage as HoodieStorage
User->>Parser: SQL: SELECT read_blob(blob_col) FROM table
Parser->>Parser: Parse read_blob() as ReadBlobExpression
Parser->>Rule: Logical Plan with ReadBlobExpression
Rule->>Rule: Detect read_blob(), extract blob attributes
Rule->>Rule: Wrap child with BatchedBlobRead nodes
Rule->>Rule: Replace ReadBlobExpression with data attributes
Rule-->>Optimizer: Transformed Logical Plan
Optimizer->>Planner: Optimized Plan
Planner->>Executor: Pattern match BatchedBlobRead
Executor->>Executor: Broadcast storage config
Executor->>Executor: Build physical execution plan
Executor-->>User: Physical Plan Ready
Executor->>Reader: processRDD(rdd, schema, config)
Reader->>Reader: Buffer rows, extract blob references
Reader->>Reader: Merge consecutive byte ranges by file
Reader->>Storage: Read merged ranges (seekable)
Storage-->>Reader: Byte buffers
Reader->>Reader: Slice buffers to per-row byte arrays
Reader->>Reader: Restore original row order
Reader-->>Executor: RDD[InternalRow] with blob data
Executor-->>User: Result rows with materialized blob data
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Source as Data Source
participant Function as AppendWriteFunctionWithContinuousSort
participant TreeMap as TreeMap (Buffer)
participant Metrics as Metrics/Tracer
participant Drain as Drain Handler
participant Output as Output Sink
Source->>Function: processElement(record, context)
Function->>Function: Check object reuse, copy if needed
Function->>Function: Compute normalized sort key
Function->>TreeMap: Insert record with SortKey
Function->>Metrics: Update insertion sequence, memory size
Function->>Function: Check buffer/memory threshold exceeded?
alt Buffer Full
Function->>Drain: Drain buffered records
Drain->>TreeMap: Iterate in sorted order
Drain->>Output: Emit sorted records
Drain->>TreeMap: Clear buffer
Drain->>Metrics: Reset insertion sequence, tracer
else Continue Buffering
Function-->>Source: Ready for next record
end
Source->>Function: snapshotState() [Checkpoint]
Function->>Drain: Drain all remaining records
Drain->>Output: Emit final sorted batch
Function->>Function: Call superclass snapshotState()
Source->>Function: endInput() [Completion]
Function->>Drain: Drain residual records
Function->>Function: Call superclass endInput()
CodeRabbit: hudi-agent#10 (review)
|
@yihua to take a look |
| static final int VARIANT_METADATA_FIELD_ID = -2; | ||
|
|
||
| // Sentinel field IDs used to mark Blob sub-fields in the internal schema representation. | ||
| // The BLOB logical type is itself a fixed-shape RECORD; we round-trip it through InternalSchema by |
There was a problem hiding this comment.
Non-blocker: My understanding is that the "InternalSchema" is only used for schema evolution with schema on read feature. It looks like an anti-pattern to use round-trip conversion between HoodieSchema and InternalSchema (used by a different feature that is not turned on by default) to support variant and blob type.
Also, I remember there was a plan to unify HoodieSchema and InternalSchema as HoodieSchema should be the source of truth of schema representation. We should have a follow-up to revisit this.
There was a problem hiding this comment.
And these negative field IDs are persisted in the InternalSchema written to storage if schema on read is enabled, correct? Will that cause issue?
There was a problem hiding this comment.
@yihua yes, they are persisted. FileBasedInternalSchemaStorageManager.persistHistorySchemaStr writes the InternalSchema as JSON under .hoodie/.schema/, and SerDeHelper.toJson/fromJson round-trip field.fieldId() as a raw integer (negative values included).
It should be safe though:
- Auto-assigned IDs start at 0 and only increment, so positive vs. negative spaces can never overlap.
InternalSchema.maxColumnIdis computed asmax(ids)and is unaffected by negative values.- No code path validates
fieldId >= 0; the only check isfieldId() < 0in the reverse path, used intentionally to detect Variant/Blob sentinels. - Variant has been using
-1 / -2since feat(blob): Read Blobs in Spark SQL #18098 — the Blob sentinels just extend the same pattern.
There was a problem hiding this comment.
The original issue i hit was the following during some basic blob read/write testing (not sure what the sql script i was using that hit the issue but @voonhous also hit this). Hence why i made this change.
java.lang.UnsupportedOperationException: Unsupported primitive type: BLOB
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.visitPrimitiveToBuildInternalType(InternalSchemaConverter.java:370)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.visitSchemaToBuildType(InternalSchemaConverter.java:301)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.lambda$visitSchemaToBuildType$2(InternalSchemaConverter.java:283)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.visitSchemaToBuildType(InternalSchemaConverter.java:282)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.lambda$visitSchemaToBuildType$1(InternalSchemaConverter.java:262)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.visitSchemaToBuildType(InternalSchemaConverter.java:261)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.buildTypeFromHoodieSchema(InternalSchemaConverter.java:211)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.convertToField(InternalSchemaConverter.java:185)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.convert(InternalSchemaConverter.java:198)
at org.apache.hudi.internal.schema.convert.InternalSchemaConverter.fixNullOrdering(InternalSchemaConverter.java:158)
at org.apache.hudi.HoodieSchemaUtils$.deduceWriterSchema(HoodieSchemaUtils.scala:122)
at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:469)
at org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$write$1(HoodieSparkSqlWriter.scala:187)
at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:205)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:127)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:166)
``
- InternalSchemaConverter: hoist the BLOB internal record type into a static constant since it is fully determined by the BLOB type definition. - TestReadBlobSQL: rename testReadBlobOnHudiBackedTable to testReadOutOfLineBlobOnHudiBackedTable, drop the redundant precombine option, tighten the header/body comments, and assert blob byte content. - TestBlobSupport: drop redundant hoodie.datasource.write.* keys from the meta-client Properties, fix the 0xA/0xB payload comments, and change testEndToEndInline's second commit to upsert only ids 5..9 so the merge path is exercised against non-updated records.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18538 +/- ##
============================================
+ Coverage 68.85% 68.86% +0.01%
- Complexity 28452 28478 +26
============================================
Files 2475 2475
Lines 136485 136658 +173
Branches 16590 16626 +36
============================================
+ Hits 93974 94111 +137
- Misses 34955 34974 +19
- Partials 7556 7573 +17
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
@yihua can we merge this |
(cherry picked from commit 4ef56e4)
(cherry picked from commit 4ef56e4)
Describe the issue this Pull Request addresses
Follow-up to #18098 (BLOB logical type). That PR landed the BLOB schema,
read_blob()SQL function, andBatchedBlobReaderfor OUT_OF_LINE rows on Parquet. This PR closes three gaps from the 1.2 BLOB plan (steps 2/3/4) for the Parquet path:SparkRDDWriteClient.read_blob()on INLINE rows was implemented inBatchedBlobReader.collectBatchbut had no coverage at the SQL-plan layer.testMixedBlobTypesordered rows byrecord_idbefore asserting, which does not actually prove input-row-order preservation).While writing those tests, two latent correctness bugs in the OOL path surfaced — both fixed here.
Summary and Changelog
Correctness fixes (production code):
BatchedBlobReader.scala—collectBatchbuffered child-row references directly. Spark's upstreamInternalRowiterators reuse a single mutable buffer acrossnext()calls, so every cachedRowInfoaliased the last row by emission time, collapsing non-blob columns (e.g.id) to the tail value. Added acopyop to theRowAccessortype class (identity forRow,row.copy()forInternalRow) and call it when buffering. INLINE byte payloads were accidentally safe becausegetBinarymaterializes; any adjacent non-blob column was corrupted.BatchedBlobReadExec.scala/BatchedBlobReaderStrategy.scala— the exec node held a reference to theBatchedBlobReadlogical plan, which transitively pulledHoodieFileIndex(non-Serializable) into the Spark task payload, failing any Hudi-backed BLOB read. Replaced the logical-plan field with the two values the exec actually needs (blobAttrName: String,override val output: Seq[Attribute]) so only serializable state crosses the driver→executor boundary.InternalSchemaConverter.java— added a BLOB case tovisitPrimitiveToBuildInternalTypeand a reverse-detection branch invisitInternalRecordToBuildHoodieRecord, so BLOB round-trips throughHoodieSchema ↔ InternalSchemawithout throwingUnsupportedOperationException: Unsupported primitive type: BLOB. Uses sentinel negative field IDs (-10/-11/-12) on the outer struct, mirroring the existing Variant convention (-1/-2).Test coverage:
TestBlobSupport.testEndToEndInline—@ParameterizedTestover bothHoodieTableTypevalues. Two commits (insert + upsert) throughSparkRDDWriteClientwith INLINEGenericData.Records (type=INLINE,inline_dataasByteBuffer, reference=null). Read back and assert both via direct struct-field access and viaSELECT read_blob(data) FROM view.TestReadBlobSQL.testReadBlobOnHudiBackedTable—spark.write.format("hudi")bulk-insert +spark.read.format("hudi").load(...)+read_blob(), exercising the full logical→physical plan path against a real Hudi table (previously only covered over in-memory DataFrames).Impact
read_blob()against Hudi-backed tables no longer hitNotSerializableExceptiononHoodieFileIndex.Risk Level
Low.
UnsafeRowper buffered input row during the BLOB batch window — bounded by batch size and required for correctness.mvn install -pl hudi-spark-datasource/hudi-spark-common -am -DskipTests -Dspark3.5and the new + pre-existing tests inTestBlobSupport,TestReadBlobSQL,TestBatchedBlobReader.Documentation Update
None. No new user-facing configs; BLOB type itself was introduced in #18098.
Contributor's checklist
read_blob()+ existing tests remain green)