feat(blob): Support hudi BLOB inline/outline with Lance file format with transform#18556
feat(blob): Support hudi BLOB inline/outline with Lance file format with transform#18556rahil-c wants to merge 5 commits into
Conversation
Covers steps 2, 3, and 4 of VC's 1.2 BLOB release plan:
2. Wire INLINE writes - testEndToEndInline drives SparkRDDWriteClient
insert+upsert with INLINE Avro records (data=bytes, reference=null)
against a Parquet-backed Hudi table.
3. read_blob() reads inline values - testInlineBlobRoundTrip runs
SELECT read_blob(col) over an in-memory INLINE DataFrame and
verifies each payload round-trips byte-for-byte.
4. Mixed datasets - testMixedInlineAndOutOfLine builds 10 rows
alternating INLINE and OUT_OF_LINE, pointing the range rows at
one shared file and asserts the returned sequence matches input
order (stronger than TestBatchedBlobReader.testMixedBlobTypes,
which orders by record_id before asserting).
testInlineOnHudiBackedTable mirrors the cherry-picked
testReadBlobOnHudiBackedTable (OUT_OF_LINE) but writes INLINE rows
via spark.write.format("hudi") + bulk_insert, reads back through
spark.read.format("hudi"), and resolves via read_blob() - exercises
the full write -> HoodieFileIndex-backed read -> SQL path that the
cherry-picked BatchedBlobReadExec serialization fix unblocks.
No production code changes. BatchedBlobReader already dispatches
INLINE rows into its inline branch (collectBatch field-0 check)
and preserves row order via sortBy(index) in processNextBatch.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR wires up INLINE BLOB support for Lance by annotating the nested data column with lance-encoding:blob=true, remapping INLINE descriptors into Hudi OUT_OF_LINE references at read time, and switching the compaction/merge reader to CONTENT mode. A couple of correctness questions worth double-checking in the inline comments — mainly around the semantics of the Lance DESCRIPTOR position field being treated as an absolute file offset, and how the CONTENT-mode compaction read path interacts with the Hudi-shape projection. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few minor naming nits in the blob read path — an unused pattern binding that will produce a compiler warning, a no-op alias, and some abbreviated locals in the descriptor-to-reference rewrite.
| if (bsa != null && !bsa.isNullAt(rowId)) { | ||
| Long p = bsa.getPosition(rowId); | ||
| Long s = bsa.getSize(rowId); | ||
| if (p != null) { |
There was a problem hiding this comment.
🤖 Could you confirm that the position returned by BlobStructAccessor.getPosition(rowId) in DESCRIPTOR mode is an absolute offset into the .lance file (i.e. something BatchedBlobReader can pread at directly)? If Lance ever treats it as relative to an internal blob-stream region, the synthesized OUT_OF_LINE reference pointing at the raw .lance path would resolve to the wrong bytes. A unit-level byte round-trip for INLINE (not just the shape assertion in testBlobInlineDescriptorShape) would make this tight.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| ArrowReader arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE); | ||
| // Use CONTENT mode so compaction/merge paths get actual blob bytes | ||
| // (DESCRIPTOR mode only returns position+size, which can't be re-written). | ||
| FileReadOptions readOpts = FileReadOptions.builder().blobReadMode(BlobReadMode.CONTENT).build(); |
There was a problem hiding this comment.
🤖 Switching to BlobReadMode.CONTENT here means the blob data child will come back populated (as LargeBinary, since the writer now annotates it that way). The non-blob LanceRecordIterator constructor is used on this path, so the projection maps the column directly via requestedSparkSchema. Have you verified that LanceArrowColumnVector.getBinary() transparently handles LargeBinary when the Hudi schema declares the data child as Binary? Would be worth a test that exercises compaction/merge on an INLINE-written table to confirm the bytes round-trip.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| // UnsafeProjection.copy() detaches the row from any batch-backed storage, | ||
| // so reusing the ColumnarRow view is safe here. | ||
| return projection.apply(currentBatch.getRow(rowId)).copy(); | ||
| } |
There was a problem hiding this comment.
🤖 The non-blob branch calls projection.apply(currentBatch.getRow(rowId)).copy() where currentBatch.getRow(rowId) returns a ColumnarRow and projection.apply(...) produces an UnsafeRow whose variable-length regions may reference buffers in the underlying Arrow/columnar batch. Does UnsafeRow.copy() fully detach off-heap-backed strings/binaries here, or could later calls to currentBatch.close() invalidate the copied row's backing memory?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| arrowField | ||
| } else { | ||
| // Arrow list / fixed-size-list has a single child field ("element"). | ||
| val elementArrow = arrowChildren.get(0) |
There was a problem hiding this comment.
🤖 For ArrayType(elementType: StructType, _), the synthetic StructField(elementArrow.getName, elementType, ...) loses any blob metadata that may have been on the element. That's fine today (BLOBs are only top-level per the reader side), but if future schemas ever allow Array<BLOB>, the element here wouldn't be detected as a blob since isBlobField requires the metadata marker. Probably worth a TODO or a line in the scaladoc noting the current top-level-only assumption.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| } | ||
|
|
||
| private def rewriteField(sparkField: StructField, arrowField: Field): Field = { | ||
| sparkField.dataType match { |
There was a problem hiding this comment.
🤖 nit: st is bound here but never used — the body goes straight to rewriteBlobStruct(arrowField). Could you change it to case _: StructType if isBlobField(sparkField) => to silence the likely unused-variable warning?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
| ColumnVector dataVec = blobChildDataVecs[columnIdx]; | ||
| if (dataVec instanceof LanceArrowColumnVector) { | ||
| BlobStructAccessor bsa = ((LanceArrowColumnVector) dataVec).getBlobStructAccessor(); | ||
| if (bsa != null && !bsa.isNullAt(rowId)) { |
There was a problem hiding this comment.
🤖 nit: bsa, p, and s pack a fair amount of meaning into very short names — something like accessor, rawPosition, and rawSize would let a reader understand the block without having to look up the method signatures.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
|
|
||
| // iteratorSchema already matches forceAllFieldsNullable(requestSchema); reuse it | ||
| // for the padding/cast projections. | ||
| val nullableRequestSchema = iteratorSchema |
There was a problem hiding this comment.
🤖 nit: nullableRequestSchema is just an alias for iteratorSchema — have you considered using iteratorSchema directly in the generateNullPaddingProjection call below and dropping the extra name?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
CodeRabbit Walkthrough: The PR introduces comprehensive support for Apache Hudi BLOB and VECTOR/VARIANT data types alongside Lance columnar format integration. It adds distributed Docker compose configurations, refactors transaction rollback with multi-writer heartbeat management, implements blob batched I/O optimization, extends Spark SQL parsers for new type syntax, and updates Lance/Flink dependencies.
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant App as Spark App
participant Exec as SparkPlan Executor
participant Strat as BatchedBlobReaderStrategy
participant Reader as BatchedBlobReader
participant Storage as HoodieStorage
participant File as Data Files
App->>Exec: execute(BatchedBlobReadExec)
Exec->>Strat: get child RDD with blob config
Exec->>Reader: processRDD(rdd, schema, storageConf, maxGapBytes)
loop per partition
Reader->>Storage: open connection
Reader->>File: list blob file references
loop batch rows
Reader->>Reader: classify blob offsets<br/>(inline vs out-of-line)
Reader->>Reader: merge nearby ranges<br/>(within maxGapBytes)
end
loop per merged range
Reader->>File: seek(offset)
Reader->>File: read(mergedSize)
Reader->>Reader: slice bytes per row
end
Reader->>Storage: close
end
Exec->>App: return Row iterator with blob bytes
Sequence Diagram (CodeRabbit):
sequenceDiagram
participant Client as Hudi Client
participant TxnMgr as Transaction Manager
participant Timeline as Timeline
participant Heartbeat as Heartbeat Service
participant Table as HoodieTable
Client->>TxnMgr: rollback(commit)
alt multi-writer mode
TxnMgr->>Timeline: check pending rollback
alt pending exists
TxnMgr->>Heartbeat: is heartbeat active?
alt heartbeat expired
TxnMgr->>Timeline: reuse & complete pending
else heartbeat active
TxnMgr->>Client: return false (blocked)
end
else pending absent
TxnMgr->>Timeline: schedule new rollback
TxnMgr->>Heartbeat: acquire heartbeat lock
end
else single-writer
TxnMgr->>Timeline: schedule rollback directly
end
Heartbeat->>Table: execute rollback
Table->>Table: delete inflight/data files
Heartbeat->>Heartbeat: release heartbeat
TxnMgr->>Client: return true (completed)
CodeRabbit: hudi-agent#17 (review)
| refBuf[1] = position; | ||
| refBuf[2] = size; | ||
| refBuf[3] = Boolean.TRUE; | ||
| return new GenericInternalRow(new Object[] {outOfLineTypeUtf8, null, new GenericInternalRow(refBuf)}); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
rg -n -C2 'BlobReadMode|FileReadOptions|new LanceRecordIterator|blobFieldNames' \
--glob '*HoodieSparkLanceReader.java' \
--glob '*SparkLanceReaderBase.scala' \
--glob '*LanceRecordIterator.java'Repository: hudi-agent/hudi
Length of output: 12868
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get full context around LanceRecordIterator constructor calls
echo "=== SparkLanceReaderBase.scala constructor call ==="
sed -n '148,165p' hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
echo -e "\n=== HoodieSparkLanceReader.java constructor call ==="
sed -n '195,205p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
echo -e "\n=== Check SparkLanceReaderBase hasBlobColumns logic ==="
sed -n '120,160p' hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scalaRepository: hudi-agent/hudi
Length of output: 3002
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find hasBlobColumns definition and its computation
echo "=== hasBlobColumns definition and assignment ==="
rg -n 'hasBlobColumns.*=' hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
echo -e "\n=== All LanceRecordIterator instantiation sites ==="
rg -n 'new LanceRecordIterator' --type java --type scala
echo -e "\n=== LanceRecordIterator constructor signatures ==="
rg -n 'def __init__|public LanceRecordIterator' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java | head -20Repository: hudi-agent/hudi
Length of output: 830
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get the full signatures of both constructors
echo "=== First constructor (line 120) ==="
sed -n '120,145p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
echo -e "\n=== Second constructor (line 143) ==="
sed -n '143,160p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.javaRepository: hudi-agent/hudi
Length of output: 2352
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify the buildBlobOutputRow method logic more carefully
echo "=== buildBlobOutputRow full method ==="
sed -n '309,345p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
echo -e "\n=== Where buildBlobOutputRow is called ==="
rg -n 'buildBlobOutputRow' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
echo -e "\n=== Context around the call site ==="
sed -n '272,285p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
echo -e "\n=== Check blobChildDataVecs initialization ==="
sed -n '242,256p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.javaRepository: hudi-agent/hudi
Length of output: 3272
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify that the class comment documents the requirement
echo "=== Class-level JavaDoc for LanceRecordIterator ==="
sed -n '60,75p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
# Check if there's any runtime assertion or validation
echo -e "\n=== Search for assertions or validation in constructor/buildBlobOutputRow ==="
rg -n 'assert|throw|requireNonNull|checkState|requireNonEmpty' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java | head -20
# Verify BlobReadMode contract from Lance library perspective
echo -e "\n=== Check if Lance itself enforces descriptor mode when descriptors requested ==="
rg -n 'BlobReadMode.DESCRIPTOR|BlobStructAccessor' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/
# Check what types the INLINE case can produce
echo -e "\n=== Check if there's any type-checking or validation for dataVec ==="
sed -n '299,345p' hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java | grep -A50 'buildBlobOutputRow'Repository: hudi-agent/hudi
Length of output: 4592
Add runtime validation to fail fast if BLOB descriptor assumptions are violated.
The buildBlobOutputRow method silently produces [path, offset=0, length=0] references when dataVec is not a LanceArrowColumnVector or getBlobStructAccessor() returns null. Although current call sites correctly pair BlobReadMode.DESCRIPTOR with non-empty blobFieldNames, this code lacks defensive validation of that contract. If a future call site misconfigures Lance or the library changes behavior, this silently converts a configuration error into corrupted blob references instead of failing fast.
Add a check after line 325 to validate that the descriptor is available, matching the pattern at line 306:
if (!(dataVec instanceof LanceArrowColumnVector) || bsa == null) {
throw new HoodieException("Malformed Lance BLOB configuration at rowId=" + rowId
+ ": expected BlobStructAccessor but got " + dataVec.getClass().getSimpleName());
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java`
around lines 321 - 342, In buildBlobOutputRow inside LanceRecordIterator, add a
defensive runtime check that fails fast when the blob descriptor is missing:
after resolving dataVec and bsa (the BlobStructAccessor) validate that dataVec
is an instance of LanceArrowColumnVector and that bsa is not null; if the check
fails throw a HoodieException with a clear message including rowId and
dataVec.getClass().getSimpleName() to mirror the pattern used earlier (so you
don't silently emit [path,0,0] refs when descriptors are misconfigured).
— CodeRabbit (original) (source:comment#3133951920)
| case (sparkField, arrowField) => rewriteField(sparkField, arrowField) | ||
| } | ||
| new Schema(rebuilt.toSeq.asJava) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "BlobLanceSchemaSupport.scala" 2>/dev/nullRepository: hudi-agent/hudi
Length of output: 160
🏁 Script executed:
cd hudi-client/hudi-spark-client && find . -type f -name "*.scala" | grep -i "blob\|lance" | head -20Repository: hudi-agent/hudi
Length of output: 130
🏁 Script executed:
git ls-files | grep -i "BlobLanceSchemaSupport"Repository: hudi-agent/hudi
Length of output: 158
🏁 Script executed:
rg "BlobLanceSchemaSupport" --type scalaRepository: hudi-agent/hudi
Length of output: 987
🏁 Script executed:
rg "annotateBlobFieldsForLance" --type scalaRepository: hudi-agent/hudi
Length of output: 249
🏁 Script executed:
git ls-files | grep "hudi-client/hudi-spark-client.*\.scala$" | head -30Repository: hudi-agent/hudi
Length of output: 2835
🏁 Script executed:
git ls-files | grep -i avro | grep scalaRepository: hudi-agent/hudi
Length of output: 3495
🏁 Script executed:
ls -la hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/ 2>/dev/null || echo "Path not found, checking alternatives..."Repository: hudi-agent/hudi
Length of output: 491
🏁 Script executed:
find hudi-client -type d -name avro 2>/dev/nullRepository: hudi-agent/hudi
Length of output: 195
🏁 Script executed:
rg "def annotateBlobFieldsForLance"Repository: hudi-agent/hudi
Length of output: 249
🏁 Script executed:
git log --oneline --all | grep -i "blob\|lance" | head -10Repository: hudi-agent/hudi
Length of output: 927
🏁 Script executed:
git diff --stat HEAD | grep -i scalaRepository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
rg "new Schema" --type scala | grep -v ".*/test/" | head -20Repository: hudi-agent/hudi
Length of output: 838
🏁 Script executed:
git log -1 --name-only --pretty="" | grep "\.scala$" | head -20Repository: hudi-agent/hudi
Length of output: 1902
🏁 Script executed:
git status --short | grep scalaRepository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
git ls-files -- "**BlobLanceSchemaSupport*"Repository: hudi-agent/hudi
Length of output: 158
🏁 Script executed:
git show HEAD:hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/BlobLanceSchemaSupport.scala 2>&1 | head -100Repository: hudi-agent/hudi
Length of output: 4425
🏁 Script executed:
find . -name "*.scala" -path "*avro*" -type f 2>/dev/nullRepository: hudi-agent/hudi
Length of output: 2816
🏁 Script executed:
git log --all --oneline -- "*BlobLanceSchemaSupport*" 2>/dev/null | head -5Repository: hudi-agent/hudi
Length of output: 96
🏁 Script executed:
git ls-files "hudi-client/**/avro/*.scala" | head -20Repository: hudi-agent/hudi
Length of output: 461
🏁 Script executed:
git diff HEAD -- "*.scala" | grep -A 10 -B 5 "annotateBlobFieldsForLance" || echo "Not in diff"Repository: hudi-agent/hudi
Length of output: 70
🏁 Script executed:
find . -name "BlobLanceSchemaSupport.scala" -o -name "*BlobLance*" 2>/dev/nullRepository: hudi-agent/hudi
Length of output: 160
🏁 Script executed:
rg -t scala "rewriteField.*sparkField.*arrowField" | head -5Repository: hudi-agent/hudi
Length of output: 416
🌐 Web query:
Apache Arrow Java Schema class constructor getCustomMetadata
💡 Result:
The Apache Arrow Java Schema class is org.apache.arrow.vector.types.pojo.Schema. It has constructors that support custom metadata (Map<String, String>). Key constructors (from official cookbook and API docs): - Schema(List fields) - no metadata - Schema(List fields, Map<String, String> metadata) - with schema-level custom metadata Example from official Apache Arrow Java Cookbook (https://arrow.apache.org/cookbook/java/schema.html): Map<String, String> metadataSchema = new HashMap<>; metadataSchema.put("Key-1", "Value-1"); Schema schemaPerson = new Schema(asList(name, document, age, points), metadataSchema); System.out.print(schemaPerson); // Schema<...>(metadata: {Key-1=Value-1}) To retrieve custom metadata, use getCustomMetadata method (confirmed in older API docs like Arrow Vectors 15.0.0 and 0.5.0 javadocs). Example usage: Map<String, String> metadata = schema.getCustomMetadata; Fields also support metadata via FieldType constructor: new FieldType(nullable, type, dictionary, metadata). This support was added around Apache Arrow Java 0.4.0 (ARROW-1045, resolved). Official current API docs: https://arrow.apache.org/java/current/reference/org.apache.arrow.vector/org/apache/arrow/vector/types/pojo/Schema.html (lists constructors including with metadata). Source code available in apache/arrow-java GitHub repo: vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
Citations:
- 1: https://arrow.apache.org/java/current/reference/org.apache.arrow.vector/org/apache/arrow/vector/types/pojo/Schema.html
- 2: https://arrow.apache.org/cookbook/java/schema.html
- 3: https://www.javadoc.io/doc/org.apache.arrow/arrow-vector/15.0.0/org/apache/arrow/vector/types/pojo/Schema.html?is-external=true
- 4: [JAVA] Add support for custom metadata in org.apache.arrow.vector.types.pojo.* arrow#15414
- 5: https://arrow.apache.org/docs/15.0/java/vector_schema_root.html
- 6: https://arrow.apache.org/java/main/vector_schema_root.html
- 7: https://www.javadoc.io/static/org.apache.arrow/arrow-vector/0.5.0/index.html?org%2Fapache%2Farrow%2Fvector%2Ftypes%2Fpojo%2FSchema.html=
- 8: https://github.com/apache/arrow-java/blob/ed81e5981a2bee40584b3a411ed755cb4cc5b91f/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
- 9: https://arrow.apache.org/java/current/reference/org.apache.arrow.vector/org/apache/arrow/vector/types/pojo/class-use/Schema.html
- 10: https://github.com/apache/arrow-java/
🌐 Web query:
org.apache.arrow.vector.types.pojo.Schema constructor metadata fields
💡 Result:
The org.apache.arrow.vector.types.pojo.Schema class in Apache Arrow Java has constructors that take a list of Field objects representing the columns, and optionally a Map<String, String> for schema-level metadata (key-value pairs). Primary constructors: - Schema(List fields) - Creates a schema with the given fields and no metadata. - Schema(List fields, Map<String, String> metadata) - Creates a schema with the given fields and schema-wide metadata. Example from official docs: Map<String, String> metadata = new HashMap<>; metadata.put("K1", "V1"); Field a = new Field("A", FieldType.nullable(new ArrowType.Int(32, true)), null); Field b = new Field("B", FieldType.nullable(new ArrowType.Utf8), null); Schema schema = new Schema(asList(a, b), metadata); The metadata parameter allows attaching custom key-value annotations to the entire schema, separate from per-field metadata in FieldType. This is consistent across Arrow Java documentation, cookbook examples, and source usage patterns up to recent versions (e.g., 19.0.0-SNAPSHOT, 20.0.0-SNAPSHOT).
Citations:
- 1: https://arrow.apache.org/cookbook/dev/java/schema.html
- 2: https://arrow.apache.org/docs/22.0/java/vector_schema_root.html
- 3: https://arrow.apache.org/cookbook/java/schema.html
- 4: https://arrow.apache.org/java/main/vector_schema_root.html
- 5: https://arrow.apache.org/java/current/quickstartguide.html
🏁 Script executed:
rg "new Schema.*getCustomMetadata" --type scalaRepository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
rg "Schema.*metadata" --type scala | grep -i "new\|constructor" | head -10Repository: hudi-agent/hudi
Length of output: 2313
🏁 Script executed:
rg "new Schema\(" --type scala -A 2 | head -30Repository: hudi-agent/hudi
Length of output: 400
🏁 Script executed:
rg "\.getCustomMetadata" --type scalaRepository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
git ls-files | grep -i "pom\|gradle\|build" | xargs grep -l "arrow" 2>/dev/null | head -5Repository: hudi-agent/hudi
Length of output: 566
🏁 Script executed:
rg "org.apache.arrow" pom.xmlRepository: hudi-agent/hudi
Length of output: 146
🏁 Script executed:
find . -name "pom.xml" -path "*/hudi-client/*" -exec grep -l "arrow" {} \;Repository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
cat pom.xml 2>/dev/null | grep -A 10 -B 2 "arrow" | head -30Repository: hudi-agent/hudi
Length of output: 1374
🏁 Script executed:
rg "arrow-vector|arrow-format" pom.xml | head -5Repository: hudi-agent/hudi
Length of output: 104
🏁 Script executed:
git ls-files | xargs grep "org.apache.arrow" 2>/dev/null | grep -v ".git" | head -20Repository: hudi-agent/hudi
Length of output: 2626
🏁 Script executed:
git ls-files | grep -i avro | xargs head -50 2>/dev/null | grep -i "import.*arrow\|import.*schema"Repository: hudi-agent/hudi
Length of output: 6525
🏁 Script executed:
python3 << 'EOF'
# Check Apache Arrow Java Schema API documentation reference
# Schema class from org.apache.arrow.vector.types.pojo.Schema
# Arrow Schema constructors (as of Arrow 8.x-12.x):
# 1. Schema(List<Field> fields)
# 2. Schema(List<Field> fields, Map<String, String> metadata)
# 3. Schema(List<Field> fields, Map<String, String> metadata, Map<String, String> customMetadata)
# The suggested fix uses: new Schema(rebuilt.toSeq.asJava, arrowSchema.getCustomMetadata())
# Key question: Does Schema have a 2-argument constructor with (List<Field>, Map<String,String>)?
# And does getCustomMetadata() exist and return the right type?
print("Arrow Schema API Analysis:")
print("=" * 50)
print("The suggested fix: new Schema(rebuilt.toSeq.asJava, arrowSchema.getCustomMetadata())")
print()
print("Key verification points:")
print("1. Schema(List<Field>, Map) constructor exists: YES (standard in Arrow)")
print("2. getCustomMetadata() method exists: UNCERTAIN - need to verify")
print("3. Metadata preservation concern: VALID - creating new Schema without metadata loses it")
print()
print("Alternative approaches that might work:")
print("- Schema.builder().setFields(fields).setCustomMetadata(metadata).build()")
print("- Preserving both metadata AND customMetadata if needed")
EOFRepository: hudi-agent/hudi
Length of output: 642
🏁 Script executed:
rg "arrowSchema\.(get|set)" --type scala | grep -i "metadata\|custom" | head -10Repository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
rg "\.getMetadata\(\)" --type scala | head -10Repository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
rg "Schema.*getMetadata\|getCustomMetadata" --type scala -B 2 -A 2Repository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
git log --all --oneline --name-status | grep "BlobLance\|annotateBlobFields" | head -10Repository: hudi-agent/hudi
Length of output: 160
🏁 Script executed:
git show --name-status HEAD | grep -i scala | head -20Repository: hudi-agent/hudi
Length of output: 1956
🏁 Script executed:
git log --all --pretty="" --name-only -20 | grep "\.scala$" | sort -u | head -20Repository: hudi-agent/hudi
Length of output: 1902
🏁 Script executed:
git branch -aRepository: hudi-agent/hudi
Length of output: 162
🏁 Script executed:
git log --all --oneline | head -10Repository: hudi-agent/hudi
Length of output: 923
🏁 Script executed:
python3 << 'EOF'
import json
# Verify Arrow Schema behavior regarding metadata
arrow_schema_info = {
"concern": "Creating new Schema(List<Field>) drops existing metadata",
"validity": "VALID",
"reasoning": [
"Apache Arrow's Schema class stores metadata in two ways:",
"1. metadata: Map<String, String> - general schema metadata",
"2. customMetadata: Map<String, String> - additional custom metadata",
"",
"When calling new Schema(List<Field>) constructor:",
"- Only fields are copied",
"- Both metadata maps are NOT copied (default empty)",
"",
"This is a legitimate concern in data pipeline code"
],
"suggested_fix_validity": {
"fix": "new Schema(rebuilt.toSeq.asJava, arrowSchema.getCustomMetadata())",
"assessment": "INCOMPLETE",
"issues": [
"Suggested fix only preserves customMetadata",
"Does not preserve regular metadata (if present)",
"getCustomMetadata() returns Map<String, String>",
"Need to verify exact method signature in Arrow version used"
]
},
"better_fix": "new Schema(rebuilt.toSeq.asJava, arrowSchema.getMetadata(), arrowSchema.getCustomMetadata())",
"notes": "Arrow versions vary; need to check which constructor signature is available"
}
print(json.dumps(arrow_schema_info, indent=2))
EOFRepository: hudi-agent/hudi
Length of output: 1211
🏁 Script executed:
rg "arrow" --type gradle --type toml --type xml | grep -i "version\|dependency" | head -20Repository: hudi-agent/hudi
Length of output: 212
🏁 Script executed:
git ls-files | xargs grep -h "org.apache.arrow.*arrow" 2>/dev/null | grep -v "^Binary" | sort -uRepository: hudi-agent/hudi
Length of output: 244
🏁 Script executed:
rg "BlobLanceSchemaSupport\|annotateBlobFieldsForLance" --type scala --type javaRepository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
wc -l hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/BlobLanceSchemaSupport.scalaRepository: hudi-agent/hudi
Length of output: 162
🏁 Script executed:
cat -n hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/BlobLanceSchemaSupport.scalaRepository: hudi-agent/hudi
Length of output: 7786
🏁 Script executed:
python3 << 'EOF'
# Verify Arrow 18.3.0 Schema API for metadata preservation
import json
arrow_18_3_0_api = {
"version": "18.3.0",
"schema_class": "org.apache.arrow.vector.types.pojo.Schema",
"constructors": [
{
"signature": "Schema(List<Field> fields)",
"metadata_preserved": False,
"behavior": "Creates schema with only fields, metadata is empty"
},
{
"signature": "Schema(List<Field> fields, Map<String, String> metadata)",
"metadata_preserved": True,
"behavior": "Creates schema with fields and custom metadata"
}
],
"metadata_access_methods": [
{
"method": "getCustomMetadata()",
"returns": "Map<String, String>",
"available_since": "Arrow 0.4.0+",
"note": "In 18.3.0, this is the primary metadata access method"
},
{
"method": "getMetadata()",
"available_in_18_3_0": "Likely available but may be alias/deprecated",
"note": "Need to verify exact implementation"
}
],
"suggested_fix_analysis": {
"fix": "new Schema(rebuilt.toSeq.asJava, arrowSchema.getCustomMetadata())",
"validity": "CORRECT for Arrow 18.3.0",
"reasoning": "Uses 2-argument constructor with proper metadata method",
"concern": "None identified for this version"
}
}
print(json.dumps(arrow_18_3_0_api, indent=2))
EOFRepository: hudi-agent/hudi
Length of output: 1230
🏁 Script executed:
rg "Schema.*getMetadata\|Schema.*custom" --type scala --type java -B 2 -A 2 | head -40Repository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
rg "arrowSchema\." hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/BlobLanceSchemaSupport.scalaRepository: hudi-agent/hudi
Length of output: 110
🏁 Script executed:
sed -n '65,85p' hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/BlobLanceSchemaSupport.scalaRepository: hudi-agent/hudi
Length of output: 1080
🏁 Script executed:
rg "getMetadata\|getCustomMetadata" --type scala --type java hudi-client/hudi-spark-client/ | head -10Repository: hudi-agent/hudi
Length of output: 41
🏁 Script executed:
sed -n '1,40p' hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/BlobLanceSchemaSupport.scalaRepository: hudi-agent/hudi
Length of output: 1819
Preserve Arrow schema metadata when rebuilding the schema.
Line 77 creates a new Schema instance with only the rebuilt fields, discarding schema-level metadata from arrowSchema. Any metadata attached to the original schema will be silently lost, which can cause issues with Lance's encoding and other downstream consumers.
Suggested fix
- new Schema(rebuilt.toSeq.asJava)
+ new Schema(rebuilt.toSeq.asJava, arrowSchema.getCustomMetadata())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } | |
| def annotateBlobFieldsForLance(sparkSchema: StructType, arrowSchema: Schema): Schema = { | |
| val arrowFields = arrowSchema.getFields.asScala | |
| require(sparkSchema.fields.length == arrowFields.size, | |
| s"Spark/Arrow top-level field count mismatch: " + | |
| s"${sparkSchema.fields.length} vs ${arrowFields.size}") | |
| val rebuilt = sparkSchema.fields.zip(arrowFields).map { | |
| case (sparkField, arrowField) => rewriteField(sparkField, arrowField) | |
| } | |
| new Schema(rebuilt.toSeq.asJava, arrowSchema.getCustomMetadata()) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/BlobLanceSchemaSupport.scala`
around lines 69 - 78, annotateBlobFieldsForLance is rebuilding an Arrow Schema
but discards schema-level metadata by calling new Schema(rebuilt.toSeq.asJava)
using only fields; preserve the original arrowSchema's metadata when
constructing the new Schema (use the Arrow Schema constructor that accepts
fields plus the schema metadata from arrowSchema, e.g., pass
arrowSchema.getCustomMetadata / arrowSchema.getSchemaMetadata or the appropriate
metadata accessor alongside rebuilt) so the rebuilt Schema retains all original
schema-level metadata.
— CodeRabbit (original) (source:comment#3133951926)
|
Closing this in favor of #18575 |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18556 +/- ##
==========================================
Coverage 68.87% 68.88%
- Complexity 28482 28506 +24
==========================================
Files 2478 2479 +1
Lines 136699 136835 +136
Branches 16634 16671 +37
==========================================
+ Hits 94150 94256 +106
- Misses 34980 34990 +10
- Partials 7569 7589 +20
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Core problem: Hudi's BLOB model and Lance's blob model are not the same, and nothing was translating between them.
Hudi defines BLOB as a per-row-tagged struct where each row independently picks one of two storage modes:
A single Hudi BLOB column can hold any mix of INLINE and OUT_OF_LINE rows in the same file (see
TestBlobSupport.testMixedInlineAndOutOfLine).Lance's blob storage is schema-level, not row-level. A column is either declared blob-encoded (via the Arrow metadata key
lance-encoding:blob=trueon aLargeBinarycolumn) and routes bytes to a dedicated blob stream, or it isn't. On read, a blob-encoded column can be opened in:BlobReadMode.CONTENT— returns raw bytes.BlobReadMode.DESCRIPTOR— returns{position, size}descriptor structs pointing into the blob stream (used when the caller wants to defer the byte read).These two models overlap but don't line up. Before this PR, nothing connected them, which produced three concrete failures on Lance-backed Hudi BLOB tables:
HoodieSparkLanceWriterhanded Lance a plainBinaryschema for the BLOBdatachild, so INLINE bytes went through Lance's default column path.read_blob()later could not locate them in the expected descriptor form.datacolumn came back asStruct<position, size>— but the downstreamUnsafeProjectionandHoodieFileIndexexpect the Hudi shape{type, data, reference}. The projection would either fail or produce rows with the wrong column types.data(for OUT_OF_LINE) or nullreference(for INLINE), and their nested children follow suit. Writes failed at validation.This PR adds the translation layer so Hudi BLOB works end-to-end on Lance — writes, reads via
spark.read.format("hudi"), and SQLread_blob(col).Summary and Changelog
What the translation looks like
Example rows and where the bytes physically live:
typedatareferenceINLINE0xDEADBEEF…null{position,size}indataslot on diskOUT_OF_LINEnull{s3://a.jpg, 0, 2048, false}datanull;referencepopulated as a regular structWrite path
Read path
Key invariant: the Spark projection only ever sees the Hudi shape. Lance's descriptor shape is contained entirely inside
LanceRecordIterator; nothing downstream has to know Lance's storage format.Changelog
BlobLanceSchemaSupport.scala(new) — purely functional Arrow schema rewrite. Finds Hudi BLOB columns viaisBlobField(checks theTYPE_METADATA_FIELDmarker) and rebuilds the nesteddatachild asLargeBinary + {lance-encoding:blob=true}. Recursively widens nullability within BLOB subtrees. No-op for schemas without BLOB columns. Hasrequire(...)guards against Spark↔Arrow field-count drift.HoodieSparkLanceWriter.java— routes the base Arrow schema throughBlobLanceSchemaSupport.annotateBlobFieldsForLance(...)at writer-open time.HoodieSparkLanceReader.java— compaction/merge internal reader switched toBlobReadMode.CONTENTso it materializes bytes (not descriptors).SparkLanceReaderBase.scala— detects BLOB columns on the request schema and opens Lance inBlobReadMode.DESCRIPTOR. Nullability widening scoped to BLOB subtrees only (non-BLOB fields keep their original contracts).LanceRecordIterator.java— new blob-aware constructor(allocator, lanceReader, arrowReader, outputSchema, path, blobFieldNames). For blob columns, caches Arrow child vectors per batch and reusesoutputRowBuffer+ per-columnrefRowBuffersso the per-row hot path is ~2 allocations instead of ~4. Also picks up two correctness fixes independent of the BLOB feature:hasNext()now skips empty Arrow batches viawhile (loadNextBatch())instead of terminating on the first zero-row batch (latent bug pre-PR).typenow throwsHoodieExceptioninstead of silently remapping to a zero-byte reference.TestLanceDataSource.scala— two new parameterized tests (COW + MOR):testBlobInlineDescriptorShape— writes INLINE rows viabulk_insertand asserts the Hudi OUT_OF_LINE shape round-trips back (type=OUT_OF_LINE,data=null,reference.external_pathending in.lancewithlength>0). Also checks the Lance file carrieslance-encoding:blob=trueon thedatachild.testBlobOutline— writes four OUT_OF_LINE rows pointing at two external.binfiles with varying offsets/lengths; reads back throughspark.read.format("hudi")and byte-comparesread_blob(payload)against the expected slice of each external file.Impact
hoodie.base.file.format=lance. Existing Parquet-backed BLOB tables and non-BLOB Lance tables are unchanged.lance-encoding:blob=trueon thedatacolumn. No impact on Lance files without BLOB columns.GenericInternalRowallocations plus an array lookup per blob column, amortized across the batched Arrow read.Risk Level
Medium. Integration between two young subsystems (Hudi BLOB + Lance file format). Mitigated by:
read_blob()).isBlobField/blobFieldNames; non-BLOB workloads see the pre-PR code path.Documentation Update
None. No new configs or public APIs. Hudi BLOB is already documented; this PR just extends coverage to
hoodie.base.file.format=lance.Contributor's checklist