feat(blob): add support for lance blob inline descriptor reading#18586
Conversation
Covers steps 2, 3, and 4 of VC's 1.2 BLOB release plan:
2. Wire INLINE writes - testEndToEndInline drives SparkRDDWriteClient
insert+upsert with INLINE Avro records (data=bytes, reference=null)
against a Parquet-backed Hudi table.
3. read_blob() reads inline values - testInlineBlobRoundTrip runs
SELECT read_blob(col) over an in-memory INLINE DataFrame and
verifies each payload round-trips byte-for-byte.
4. Mixed datasets - testMixedInlineAndOutOfLine builds 10 rows
alternating INLINE and OUT_OF_LINE, pointing the range rows at
one shared file and asserts the returned sequence matches input
order (stronger than TestBatchedBlobReader.testMixedBlobTypes,
which orders by record_id before asserting).
testInlineOnHudiBackedTable mirrors the cherry-picked
testReadBlobOnHudiBackedTable (OUT_OF_LINE) but writes INLINE rows
via spark.write.format("hudi") + bulk_insert, reads back through
spark.read.format("hudi"), and resolves via read_blob() - exercises
the full write -> HoodieFileIndex-backed read -> SQL path that the
cherry-picked BatchedBlobReadExec serialization fix unblocks.
No production code changes. BatchedBlobReader already dispatches
INLINE rows into its inline branch (collectBatch field-0 check)
and preserves row order via sortBy(index) in processNextBatch.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
fb60d32 to
c2be9ff
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR wires the DESCRIPTOR blob read mode through the Lance reader path with a dedicated rewrite iterator, and shares the close logic across the existing and new iterators. A couple of edge cases around the close path and INLINE descriptor handling worth double-checking in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor naming and style suggestions below.
591ac80 to
b5b06f1
Compare
| // Build ColumnVector[] in Spark-schema order by looking each field up by name; | ||
| // lance-spark 0.4.0's VectorSchemaRoot may return the file's on-disk order, which | ||
| // would misalign the UnsafeProjection. Cached on the first batch and reused thereafter. | ||
| if (columnVectors == null) { |
There was a problem hiding this comment.
Something to revisit separately: the columnVectors is only assigned once, for all batches. This is reused, and I see that BlobDescriptorTransform use it across different batches. Have we test multiple batches from arrow reader and see if it works?
There was a problem hiding this comment.
It looks like the test cases are loose here. So it would be good to add stronger tests in a follow-up.
There was a problem hiding this comment.
Follow up sounds like a good idea
| + "CONTENT returns the raw inline bytes (default). " | ||
| + "A future DESCRIPTOR mode will return a {position, size} pointer instead of materializing " | ||
| + "the bytes, so callers can defer the byte read."); | ||
| + "CONTENT (default) returns the raw inline bytes. " |
There was a problem hiding this comment.
@yihua just to confirm then we want default to always read the raw bytes for both lance and parquet?
There was a problem hiding this comment.
I think ideally DESCRIPTOR mode should be the default. I kept it as is in this PR.
| Exception lanceException = null; | ||
|
|
||
| if (currentBatch != null) { | ||
| currentBatch.close(); |
There was a problem hiding this comment.
This is not a regression from you but more so maybe a miss in general in this code path. I was reviewing this again and Im wondering if this current batch close needs to be in try/catch as well. Claude was pointing this out to me as a potential issue and i think it seems accurate for this case.
2.1 Resource leak when currentBatch.close() or allocator.close() throws — LanceResourceCloser.java:48-67
if (currentBatch != null) {
currentBatch.close(); // not in try/catch
}
if (arrowReader != null) { try { ... } catch (...) }
if (lanceReader != null) { try { ... } catch (...) }
if (allocator != null) {
allocator.close(); // not in try/catch
}
If currentBatch.close() throws (a ColumnarBatch close cascades to its child column vectors and can throw), the arrowReader,
lanceReader, and allocator are never closed → buffer / file-handle leak. The same applies if allocator.close() throws
(BufferAllocator.close() throws IllegalStateException when buffers are still outstanding) — the throw escapes without
rethrowing the prior captured Arrow/Lance exceptions.
The whole point of consolidating this helper was to make sure "a failed reader close never leaks the allocator" (per the class
Javadoc). The current code only protects against arrowReader/lanceReader failures, not the other two.
Fix: wrap all four closes in try/catch and aggregate via addSuppressed.
There was a problem hiding this comment.
I checked currentBatch.close() default implementation should not throw exception as it's in-memory processing so I didn't address the try/catch. It's good to be defensive here and add the additional try/catch. Let's do a separate fix as this PR is not changing this part.
There was a problem hiding this comment.
We can follow up on this in seperate pr so it doesnt block ci
| @@ -888,14 +888,103 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { | |||
| } | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
I am thinking a followup which i or someone can pick up on is a test for doing mixed INLINE and OUTLINE BLOBs within a table?
|
@yihua thanks for the help on this, for the most part this LGTM |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18586 +/- ##
=========================================
Coverage 68.89% 68.90%
- Complexity 28576 28581 +5
=========================================
Files 2480 2482 +2
Lines 136995 137053 +58
Branches 16694 16713 +19
=========================================
+ Hits 94389 94436 +47
- Misses 35007 35009 +2
- Partials 7599 7608 +9
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
…che#18586) Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
Describe the issue this Pull Request addresses
Core problem: Hudi's BLOB model and Lance's blob encoding don't line up out of the box, and without a bridge Hudi BLOB tables can't use
hoodie.base.file.format=lance.Hudi BLOB is a per-row-tagged struct where each row independently picks INLINE (data in-place) or OUT_OF_LINE (reference to external file). Lance's blob encoding is schema-level: a column is either blob-encoded or it isn't.
Before this PR there was no translation layer, causing three failures on Lance-backed BLOB tables:
data(OUT_OF_LINE) or nullreference(INLINE) tripped Lance's strict child-nullability check.Summary and Changelog
This PR adds the Hudi-BLOB ↔ Lance translation layer and a read-mode config so BLOB works end-to-end on Lance.
Config
hoodie.read.blob.inline.mode(advanced, defaultCONTENT):dataread_blob()and compaction/mergetype=INLINEbut setsdata=nulland populatesreferencewith the Lance blob-stream position/size for user-level lazy resolutionread_blob()which reads with CONTENT modeThe internal compaction/merge reader (
HoodieSparkLanceReader) andread_blob()stay pinned to CONTENT regardless of this config.Write path
HoodieSparkLanceWriterenriches the Arrow schema: each BLOBdatachild becomesLargeBinary + lance-encoding:blob=true, and nullability is widened inside the BLOB subtree so Lance accepts null children on valid rows.Read path
SparkLanceReaderBaseresolves the config, widens nullability inside BLOB subtrees, and composes aBlobDescriptorTransformintoLanceRecordIteratorwhen in DESCRIPTOR mode.LanceRecordIteratoris a singlefinaliterator for all Lance reads. An optionalBlobDescriptorTransform(composition pattern) handles per-row BLOB rewriting in DESCRIPTOR mode.BlobDescriptorTransformreads type/nulls from theInternalRow(always accurate per-row); the Lance-specificBlobStructAccessorfor reading{position, size}on INLINE rows is obtained fresh per row from the column vectors to avoid stale references across batches. The originaltypeis preserved (INLINEstaysINLINE) so users see the storage mode they wrote.Per-file changelog
BlobDescriptorTransform.java(new) — Transform composed intoLanceRecordIterator. Static-final UTF8 constants,Set<Integer>for blob column indices. Type/null checks useInternalRow;BlobStructAccessorobtained fresh per INLINE row from column vectors. Explicit INLINE/OUT_OF_LINE type check (throws on unknown), defensive null-data handling.LanceRecordIterator.java— Singlefinaliterator for all Lance reads. Accepts optionalBlobDescriptorTransformvia constructor (composition, not inheritance). UsesrowIteratorfor batch iteration.LanceResourceCloser.java(renamed fromLanceCloseables.java) — Name clarified. Attaches suppressed exception when both Arrow and Lance close fail.SparkLanceReaderBase.scala— CreatesBlobDescriptorTransformwhen DESCRIPTOR + BLOB columns present. Idiomatic Scalacollectfor blob field names.HoodieSparkLanceWriter.java— Routes Arrow schema through blob annotation at writer-open time.HoodieSparkLanceReader.java— Pinned to CONTENT mode for compaction/merge.HoodieReaderConfig.java— Addshoodie.read.blob.inline.mode.TestLanceDataSource.scala— Parameterized tests (COW + MOR):testBlobInlineRoundTrip— CONTENT mode byte round-trip.testBlobOutOfLine— Parameterized on read mode (default/CONTENT/DESCRIPTOR); OUT_OF_LINE references survive unchanged;read_blob()resolves correct bytes via CONTENT mode.testBlobInlineDescriptorMode— DESCRIPTOR on INLINE rows: type stays INLINE, reference points at.lancefile,read_blob()reads original bytes via CONTENT mode.Impact
hoodie.base.file.format=lanceend-to-end.hoodie.read.blob.inline.mode— advanced, defaultCONTENT. No action required for existing users.lance-encoding:blob=trueon the BLOBdatachild. No impact on non-BLOB Lance files or Parquet tables.Object[]allocations per BLOB column per row;BlobStructAccessorreads are direct Arrow buffer accesses.Risk Level
Medium. Integration between two relatively young subsystems (Hudi BLOB + Lance). Mitigated by:
BlobReadMode.DESCRIPTOR && blobFieldNames.nonEmpty; non-BLOB tables traverse the pre-PR code path exactly.read_blob()SQL resolution.Documentation Update
None beyond the in-config documentation on
hoodie.read.blob.inline.mode.Contributor's checklist