Skip to content

feat(lance): support simplified path for lance blob inline reading#18575

Merged
yihua merged 10 commits into
apache:masterfrom
rahil-c:rahil/blob-lance-simplify
Apr 25, 2026
Merged

feat(lance): support simplified path for lance blob inline reading#18575
yihua merged 10 commits into
apache:masterfrom
rahil-c:rahil/blob-lance-simplify

Conversation

@rahil-c
Copy link
Copy Markdown
Collaborator

@rahil-c rahil-c commented Apr 23, 2026

Describe the issue this Pull Request addresses

Core problem: Hudi's BLOB model and Lance's blob encoding don't line up out of the box, and without a bridge Hudi BLOB tables can't use hoodie.base.file.format=lance.

  • Hudi BLOB is a per-row-tagged struct: every row independently picks between two storage modes.
    Hudi BLOB row  =  { type:      "INLINE" | "OUT_OF_LINE",
                        data:      Binary  | null,                        // INLINE payload
                        reference: {path, offset, length, managed} | null } // OUT_OF_LINE pointer
    
  • Lance's blob encoding is schema-level: a column is either declared blob-encoded (via the Arrow metadata key lance-encoding:blob=true on a LargeBinary column, routing bytes into a dedicated blob stream) or it isn't.

Before this PR, nothing connected the two:

  1. Writer didn't activate Lance's blob encoding. HoodieSparkLanceWriter handed Lance a plain Binary for the BLOB data child, so INLINE bytes went through Lance's default column path. The dedicated blob-stream optimization that's the whole point of picking Lance was never engaged.
  2. Lance rejected valid Hudi BLOB rows at validation. Hudi BLOB rows legitimately have null data (OUT_OF_LINE) or null reference (INLINE), and their nested children follow suit. Lance's child-nullability check rejected them.
  3. Nothing on the read side declared how INLINE bytes should come back. Future work will want to defer inline-byte reads (return {position, size} instead of materializing bytes) — but there was no config surface to express that intent.

Summary and Changelog

Design (simplified)

Hudi BLOB row shape                                Lance on-disk layout
-------------------                                --------------------
┌─────────────────────────────────────┐           ┌──────────────────────────────────┐
│ type:      "INLINE" | "OUT_OF_LINE" │           │ main columns:  type, reference   │
│ data:      Binary | null            │  ───▶     │ blob stream:   INLINE bytes      │
│ reference: {path,off,len,mgd}| null │           │   (data child carries key        │
└─────────────────────────────────────┘           │    `lance-encoding:blob = true`) │
                                                   └──────────────────────────────────┘

Per-row example:

row type data reference in Lance
#1 INLINE INLINE 0xDEADBEEF… null bytes in blob stream; data references those bytes
#2 OUT_OF_LINE OUT_OF_LINE null {s3://a.jpg, 0, 2048, false} data null; reference populated as a regular struct
#3 mixed-batch either either either same file, same schema, handled per-row

Write path

spark.write.format("hudi").save(...)
               │
               ▼
  HoodieSparkLanceWriter constructor
     arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema)
                     │
                     ▼
     BlobLanceSchemaSupport.annotateBlobFieldsForLance(sparkSchema, arrowSchema)
        • find every Hudi BLOB column (isBlobField)
        • rewrite its nested `data` child as LargeBinary + {lance-encoding:blob=true}
        • force every field inside the BLOB subtree to nullable so Lance accepts null `data`
          (OUT_OF_LINE) and null `reference` (INLINE)
                     │
                     ▼
     Lance's blob writer activates for the `data` column
        INLINE rows:      bytes go to the blob stream
        OUT_OF_LINE rows: data is null, reference populated as a regular struct

Read path

spark.read.format("hudi").load(tablePath)
               │
               ▼
  SparkLanceReaderBase.read()
     • resolveBlobReadMode(storageConf) reads `hoodie.read.blob.inline.mode`
       (today: CONTENT — the only valid value; DESCRIPTOR reserved for a future PR)
     • widenBlobSubtreeNullability(requestSchema): flips fields inside BLOB subtrees to
       nullable=true. Lance returns a null BLOB sub-struct as a non-null parent with null
       children; widening prevents the codegen UnsafeProjection from NPE-ing writing those
       null leaves into slots the Hudi schema declared non-nullable. Scoped to BLOB subtrees
       so non-BLOB contracts are untouched.
     • opens Lance with FileReadOptions(BlobReadMode.CONTENT); blob-encoded columns
       materialize as raw bytes, already matching Hudi's BLOB row shape.
               │
               ▼
  LanceRecordIterator  (unchanged pass-through; no BLOB-aware code)
               │
               ▼
  projection → padding → cast → UnsafeRow
               │
               ▼
  read_blob(payload)
     INLINE      → return `data` bytes directly
     OUT_OF_LINE → pread(external_path, offset, length)

Key invariant: the iterator has no BLOB-aware branches. Rows come off Lance already in the Hudi BLOB shape, so the existing read_blob() SQL function resolves them exactly the same way it does for Parquet-backed BLOB tables.

Changelog

  • BlobLanceSchemaSupport.scala (new, in org.apache.hudi.io.storage) — write-path Arrow schema rewriter. For every Hudi BLOB column in the Spark schema, rebuilds the nested data child as LargeBinary + {lance-encoding:blob=true} and recursively widens nullability within the BLOB subtree. Structural no-op for schemas without BLOB columns.
  • HoodieSparkLanceWriter.java — routes the base Arrow schema through BlobLanceSchemaSupport.annotateBlobFieldsForLance(...) at writer-open time.
  • HoodieSparkLanceReader.java (internal compaction/merge reader) — opens Lance in BlobReadMode.CONTENT because compaction/merge/log-replay paths need actual bytes to rewrite. Pinned to CONTENT regardless of user config; the datasource reader is where the user knob is honored.
  • HoodieReaderConfig.java — adds hoodie.read.blob.inline.mode (default CONTENT, valid values {CONTENT} today). This is the forward seam for a future DESCRIPTOR mode where INLINE bytes are surfaced as {position, size} pointers for deferred reads; landing the config now makes that change a one-line enum expansion instead of a schema-plumbing refactor.
  • SparkLanceReaderBase.scala (datasource read path) — reads the new config to pick the Lance read mode, widens nullability inside BLOB subtrees (see read-path diagram above), and hands the iterator a schema Lance's batch layout can populate without tripping the codegen projection.
  • LanceRecordIterator.java — single unrelated fix: hasNext() now loops through empty Arrow batches instead of terminating on the first zero-row batch. The original if (loadNextBatch()) would silently drop subsequent non-empty batches after any empty one (e.g. after filter pushdown).
  • TestLanceDataSource.scala — two new parameterized tests (COW + MOR):
    • testBlobInlineRoundTrip — writes INLINE rows, verifies the Lance file carries lance-encoding:blob=true on the data child, and byte-asserts the payloads round-trip through both the raw data column and read_blob(payload).
    • testBlobOutline — writes OUT_OF_LINE rows pointing at external .bin files with varying offsets/lengths; byte-compares read_blob(payload) against the expected slice of each external file.

Impact

  • New functionality. Hudi BLOB columns now work end-to-end with hoodie.base.file.format=lance. Existing Parquet-backed BLOB tables and non-BLOB Lance tables are unchanged.
  • One new config. hoodie.read.blob.inline.mode — advanced, default CONTENT, single valid value today. Matches pre-PR behavior out of the box; no user action needed.
  • Forward-only on-disk schema change for BLOB-on-Lance files. Lance files written through this path carry lance-encoding:blob=true on the BLOB data child. No impact on Lance files without BLOB columns.
  • No performance change on the hot path. Non-BLOB Lance reads/writes and all Parquet paths stay exactly on the pre-PR code path. BLOB-on-Lance reads add a schema copy at open time (nullability widening) — amortized across the whole read; no per-row cost.

Risk Level

Medium. Integration between two relatively young subsystems (Hudi BLOB + Lance file format). Mitigated by:

  • End-to-end coverage for both storage modes: INLINE byte round-trip through data and read_blob(payload); OUT_OF_LINE byte round-trip through read_blob(payload) against external .bin files with varying offsets/lengths.
  • Both tests parameterized across COW and MOR.
  • All BLOB-aware logic on the read side is gated on BlobLanceSchemaSupport.isBlobField (metadata marker on the Spark field). Schemas without BLOB columns see the pre-PR code path exactly.
  • No changes to Hudi BLOB semantics, the Parquet BLOB write/read path, or read_blob() SQL resolution — only the Lance↔Hudi translation seam.

Documentation Update

None beyond the in-config documentation on hoodie.read.blob.inline.mode. No public API changes. Hudi BLOB is already documented; this PR extends coverage to hoodie.base.file.format=lance.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

rahil-c and others added 6 commits April 22, 2026 21:29
Covers steps 2, 3, and 4 of VC's 1.2 BLOB release plan:

  2. Wire INLINE writes  - testEndToEndInline drives SparkRDDWriteClient
     insert+upsert with INLINE Avro records (data=bytes, reference=null)
     against a Parquet-backed Hudi table.
  3. read_blob() reads inline values  - testInlineBlobRoundTrip runs
     SELECT read_blob(col) over an in-memory INLINE DataFrame and
     verifies each payload round-trips byte-for-byte.
  4. Mixed datasets  - testMixedInlineAndOutOfLine builds 10 rows
     alternating INLINE and OUT_OF_LINE, pointing the range rows at
     one shared file and asserts the returned sequence matches input
     order (stronger than TestBatchedBlobReader.testMixedBlobTypes,
     which orders by record_id before asserting).

testInlineOnHudiBackedTable mirrors the cherry-picked
testReadBlobOnHudiBackedTable (OUT_OF_LINE) but writes INLINE rows
via spark.write.format("hudi") + bulk_insert, reads back through
spark.read.format("hudi"), and resolves via read_blob() - exercises
the full write -> HoodieFileIndex-backed read -> SQL path that the
cherry-picked BatchedBlobReadExec serialization fix unblocks.

No production code changes. BatchedBlobReader already dispatches
INLINE rows into its inline branch (collectBatch field-0 check)
and preserves row order via sortBy(index) in processNextBatch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@rahil-c rahil-c requested a review from yihua April 23, 2026 23:48
@rahil-c rahil-c marked this pull request as ready for review April 23, 2026 23:57
@github-actions github-actions Bot added the size:L PR with lines of changes in (300, 1000] label Apr 24, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds a simplified inline-read path for Lance blobs: a user-facing hoodie.read.blob.inline.mode config (today limited to CONTENT), writer-side schema annotation so the nested data bytes route through Lance's dedicated blob writer, and reader-side nullability widening to tolerate Lance's inline-vs-outline materialization shape. The refactor of LanceRecordIterator to skip empty batches in a loop is a small but real correctness improvement. No correctness issues found. A few style/readability suggestions in the inline comments. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. Two small suggestions below — a redundant .toSeq pattern in BlobLanceSchemaSupport and a hard-coded metadata key string in the test that should use the shared constant.

cc @yihua

forceNullableRecursively(child)
}
}
new Field(arrowField.getName, arrowField.getFieldType, newChildren.toSeq.asJava)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: newChildren is already a Seq[Field] (produced by .asScala.toSeq.map on line 130), so the second .toSeq before .asJava is a no-op — same redundancy in forceNullableRecursively at line 154. Could you drop the extra .toSeq in both spots?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

@rahil-c rahil-c changed the title Support simplified path for lance blob inline reading feat(lance): support simplified path for lance blob inline reading Apr 24, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the updates! This round is mostly polish: the BlobLanceSchemaSupport class moved out of org.apache.spark.sql.avro into org.apache.hudi.io.storage (a cleaner home, since it doesn't rely on Spark Avro internals), LanceRecordIterator was refactored to drive batch iteration through ColumnarBatch.rowIterator() instead of a manual rowIdInBatch counter (logic is equivalent — empty-batch skip and columnVector caching are preserved), and several comments were tightened. No new issues flagged from this automated pass, and the two prior nits (redundant .toSeq calls in BlobLanceSchemaSupport, hard-coded "lance-encoding:blob" string in the test) remain open but are low-priority — a Hudi committer or PMC member can take it from here for a final review.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

CodeRabbit Walkthrough: This pull request introduces comprehensive support for Apache Hudi across multiple layers: adds Hadoop/Hive/Spark development environments via Docker Compose, implements Lance file format read/write capabilities with vector metadata, introduces Blob and Variant logical types with schema conversion, enhances multi-writer rollback semantics with heartbeat ownership, extends timeline archival with clean-based retention boundaries, migrates map utilities to CollectionUtils, and adds Flink continuous-sort buffering with dictionary-encoded partition paths.

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Writer1 as Writer 1
    participant LM as Lock Manager
    participant TM as Timeline
    participant Writer2 as Writer 2
    participant HB as Heartbeat

    Writer1->>LM: Acquire scheduling lock (skipLocking=false)
    LM-->>Writer1: Lock acquired
    Writer1->>TM: Read timeline, check for pending rollback
    alt No pending rollback exists
        Writer1->>TM: Schedule rollback plan
        TM-->>Writer1: Plan scheduled
    else Pending rollback exists
        Writer1->>Writer1: Return false (avoid duplicate)
    end
    Writer1->>LM: Release scheduling lock
    
    Writer2->>LM: Acquire execution lock (multi-writer heartbeat)
    LM-->>Writer2: Lock acquired
    Writer2->>HB: Acquire rollback heartbeat ownership
    alt Heartbeat doesn't exist or inactive
        HB-->>Writer2: Ownership granted
        Writer2->>TM: Execute rollback (call table.rollback)
        TM-->>Writer2: Rollback completed
        Writer2->>HB: Stop heartbeat (finally block)
    else Heartbeat active from other writer
        HB-->>Writer2: Ownership denied
        Writer2->>Writer2: Return false (skip execution)
    end
    Writer2->>LM: Release execution lock
Loading

CodeRabbit: hudi-agent#19 (review)

valueType = forceTypeNullable(m.valueType),
valueContainsNull = true)
case other => other
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Recurse into non-BLOB containers when widening nullability.

This only rewrites top-level fields. A schema like STRUCT<title: STRING, content: BLOB> keeps content unchanged because media is not itself a BLOB field, so nested inline/null BLOB materialization can still hit the same projection NPE you're trying to avoid.

Suggested fix
 private def widenBlobSubtreeNullability(schema: StructType): StructType = {
-  StructType(schema.fields.map { f =>
-    if (BlobLanceSchemaSupport.isBlobField(f)) {
-      f.copy(nullable = true, dataType = forceTypeNullable(f.dataType))
-    } else {
-      f
-    }
-  })
+  StructType(schema.fields.map(rewriteField))
 }
 
+private def rewriteField(field: StructField): StructField =
+  if (BlobLanceSchemaSupport.isBlobField(field)) {
+    field.copy(nullable = true, dataType = forceTypeNullable(field.dataType))
+  } else {
+    field.copy(dataType = rewriteNested(field.dataType))
+  }
+
+private def rewriteNested(dt: DataType): DataType = dt match {
+  case s: StructType => StructType(s.fields.map(rewriteField))
+  case a: ArrayType => a.copy(elementType = rewriteNested(a.elementType))
+  case m: MapType => m.copy(
+    keyType = rewriteNested(m.keyType),
+    valueType = rewriteNested(m.valueType))
+  case other => other
+}
+
 private def forceFieldNullable(field: StructField): StructField =
   field.copy(nullable = true, dataType = forceTypeNullable(field.dataType))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
}
private def widenBlobSubtreeNullability(schema: StructType): StructType = {
StructType(schema.fields.map(rewriteField))
}
private def rewriteField(field: StructField): StructField =
if (BlobLanceSchemaSupport.isBlobField(field)) {
field.copy(nullable = true, dataType = forceTypeNullable(field.dataType))
} else {
field.copy(dataType = rewriteNested(field.dataType))
}
private def rewriteNested(dt: DataType): DataType = dt match {
case s: StructType => StructType(s.fields.map(rewriteField))
case a: ArrayType => a.copy(elementType = rewriteNested(a.elementType))
case m: MapType => m.copy(
keyType = rewriteNested(m.keyType),
valueType = rewriteNested(m.valueType))
case other => other
}
private def forceFieldNullable(field: StructField): StructField =
field.copy(nullable = true, dataType = forceTypeNullable(field.dataType))
private def forceTypeNullable(dt: DataType): DataType = dt match {
case s: StructType => StructType(s.fields.map(forceFieldNullable))
case a: ArrayType => a.copy(elementType = forceTypeNullable(a.elementType), containsNull = true)
case m: MapType => m.copy(
keyType = forceTypeNullable(m.keyType),
valueType = forceTypeNullable(m.valueType),
valueContainsNull = true)
case other => other
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala`
around lines 223 - 244, widenBlobSubtreeNullability only marks top-level fields
that are directly blob fields, so nested BLOBs inside structs/arrays/maps are
missed; add a helper (e.g., containsBlob(dt: DataType): Boolean) that
recursively inspects DataType (StructType/ArrayType/MapType) and returns true if
any nested StructField satisfies BlobLanceSchemaSupport.isBlobField, then change
widenBlobSubtreeNullability to mark a field nullable (using
forceFieldNullable/forceTypeNullable) when either
BlobLanceSchemaSupport.isBlobField(field) OR containsBlob(field.dataType) is
true so non-BLOB containers that contain BLOBs are widened transitively. Ensure
containsBlob recurses into StructType.fields, ArrayType.elementType, and
MapType.key/value types.

CodeRabbit (original) (source:comment#3139418945)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c do we only support top-level BLOB fields as I remember?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually BLOB in hudi can be nested into other types like ARRAY and MAP
Uploading Screenshot 2026-04-25 at 10.31.01 AM.png…

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually BLOB in hudi can be nested into other types like ARRAY and MAP
Screenshot 2026-04-25 at 10 31 01 AM

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Not a blocker for this PR: @rahil-c @voonhous let's make sure nested type, ARRAY and MAP also works with BLOB type.

val lanceFile = Files.walk(Paths.get(tablePath))
.filter(p => p.toString.endsWith(".lance"))
.findFirst()
.orElseThrow(() => new AssertionError(s"No .lance files found under $tablePath"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Close the Files.walk stream.

This leaves the directory stream open for the rest of the helper, which can make temp-dir cleanup flaky.

♻️ Suggested fix
-    val lanceFile = Files.walk(Paths.get(tablePath))
-      .filter(p => p.toString.endsWith(".lance"))
-      .findFirst()
-      .orElseThrow(() => new AssertionError(s"No .lance files found under $tablePath"))
+    val walk = Files.walk(Paths.get(tablePath))
+    val lanceFile = try {
+      walk
+        .filter(p => p.toString.endsWith(".lance"))
+        .findFirst()
+        .orElseThrow(() => new AssertionError(s"No .lance files found under $tablePath"))
+    } finally {
+      walk.close()
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala`
around lines 969 - 972, The Files.walk(...) stream isn't closed; capture the
stream into a local val (e.g., val stream = Files.walk(Paths.get(tablePath)))
and use a try/finally (or try-with-resources equivalent) to perform the
filter/findFirst on that stream and then call stream.close() in the finally
block so the directory stream is always closed after computing lanceFile; update
the code that currently uses the inline Files.walk(...) call (the lanceFile
binding) to use this scoped stream variable.

CodeRabbit (original) (source:comment#3139418962)

Copy link
Copy Markdown
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hudi-agent review once

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the iteration on this! The round-3 changes (loop on empty batches in LanceRecordIterator, schema annotation helper, nullability widening on read) look reasonable, and the prior rounds have already covered the main concerns. No new critical correctness issues flagged from this automated pass — a Hudi committer or PMC member can take it from here for a final review. A few minor naming and simplification suggestions below.

cc @yihua

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

CodeRabbit Walkthrough: This pull request introduces comprehensive support for Apache Hudi across multiple layers: adds Hadoop/Hive/Spark development environments via Docker Compose, implements Lance file format read/write capabilities with vector metadata, introduces Blob and Variant logical types with schema conversion, enhances multi-writer rollback semantics with heartbeat ownership, extends timeline archival with clean-based retention boundaries, migrates map utilities to CollectionUtils, and adds Flink continuous-sort buffering with dictionary-encoded partition paths.

Sequence Diagram (CodeRabbit):

sequenceDiagram
    participant Writer1 as Writer 1
    participant LM as Lock Manager
    participant TM as Timeline
    participant Writer2 as Writer 2
    participant HB as Heartbeat

    Writer1->>LM: Acquire scheduling lock (skipLocking=false)
    LM-->>Writer1: Lock acquired
    Writer1->>TM: Read timeline, check for pending rollback
    alt No pending rollback exists
        Writer1->>TM: Schedule rollback plan
        TM-->>Writer1: Plan scheduled
    else Pending rollback exists
        Writer1->>Writer1: Return false (avoid duplicate)
    end
    Writer1->>LM: Release scheduling lock
    
    Writer2->>LM: Acquire execution lock (multi-writer heartbeat)
    LM-->>Writer2: Lock acquired
    Writer2->>HB: Acquire rollback heartbeat ownership
    alt Heartbeat doesn't exist or inactive
        HB-->>Writer2: Ownership granted
        Writer2->>TM: Execute rollback (call table.rollback)
        TM-->>Writer2: Rollback completed
        Writer2->>HB: Stop heartbeat (finally block)
    else Heartbeat active from other writer
        HB-->>Writer2: Ownership denied
        Writer2->>Writer2: Return false (skip execution)
    end
    Writer2->>LM: Release execution lock
Loading

CodeRabbit: hudi-agent#19 (review)

super(file, DEFAULT_BATCH_SIZE, bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new));
this.sparkSchema = enrichSparkSchemaForLanceVectors(sparkSchema);
this.arrowSchema = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true);
Schema baseArrow = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fold this line into BlobLanceSchemaSupport.annotateBlobFieldsForLance?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should Lance Java API supports LanceArrowUtils.toArrowSchema to use LargeBinary for lance-encoding:blob=true?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at the LanceArrowUtils.toArrowSchema, it has this logic. So enriching spark schema with ENCODING_BLOB should enable LargeBinary conversion automatically?

if (metadata != null) {
      if (metadata.contains(ENCODING_BLOB)
        && metadata.getString(ENCODING_BLOB).equalsIgnoreCase("true")) {
        large = true
      }
      if (metadata.contains(ARROW_LARGE_VAR_CHAR_KEY)
        && metadata.getString(ARROW_LARGE_VAR_CHAR_KEY).equalsIgnoreCase("true")) {
        large = true
      }

      implicit val formats: Formats = DefaultFormats
      meta = metadata.jsonValue.extract[Map[String, Object]].map { case (k, v) =>
        (k, String.valueOf(v))
      }
    }

Schema baseArrow = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true);
// annotate Hudi BLOB fields so the nested `data` bytes column uses Lance's blob writer (the
// metadata key `lance-encoding:blob=true` on a LargeBinary column).
this.arrowSchema = BlobLanceSchemaSupport.annotateBlobFieldsForLance(sparkSchema, baseArrow);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sparkSchema or this.sparkSchema as they are different?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me make variable names more clear

Schema baseArrow = LanceArrowUtils.toArrowSchema(this.sparkSchema, DEFAULT_TIMEZONE, true);
// annotate Hudi BLOB fields so the nested `data` bytes column uses Lance's blob writer (the
// metadata key `lance-encoding:blob=true` on a LargeBinary column).
this.arrowSchema = BlobLanceSchemaSupport.annotateBlobFieldsForLance(sparkSchema, baseArrow);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can annotateBlobFieldsForLance happen along with enrichSparkSchemaForLanceVectors? enrichSparkSchemaForLanceVectors already does traversal on fields once for vectors. It can be used for blob fields as well to avoid another loop.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline

Comment on lines +141 to +152
/**
* Recursively rebuild an Arrow field tree so every field is marked nullable.
* Lance validates child non-nullability even when the parent struct value is
* null; for BLOB structs, INLINE rows have a null `reference` and OUT_OF_LINE
* rows have a null `data`, so all BLOB descendants must tolerate nulls.
*/
private def forceNullableRecursively(arrowField: Field): Field = {
val oldType = arrowField.getFieldType
val newType = new FieldType(true, oldType.getType, oldType.getDictionary, oldType.getMetadata)
val children = arrowField.getChildren.asScala.toSeq.map(forceNullableRecursively)
new Field(arrowField.getName, newType, children.toSeq.asJava)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is a Lance validation bug of an overly strict validation. Lance checks child-field nullability even when the parent struct value is null. If the parent is null, the children are irrelevant. A null reference struct should be valid regardless of whether external_path is declared nullable.

The forceNullableRecursively workaround is papering over this Lance behavior by lying about the schema, declaring external_path and managed as nullable when they conceptually aren't. This works but loosens the schema contract: Lance would now accept a non-null reference struct with a null external_path, which is invalid in Hudi's BLOB model.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted

Comment on lines +147 to +152
private def forceNullableRecursively(arrowField: Field): Field = {
val oldType = arrowField.getFieldType
val newType = new FieldType(true, oldType.getType, oldType.getDictionary, oldType.getMetadata)
val children = arrowField.getChildren.asScala.toSeq.map(forceNullableRecursively)
new Field(arrowField.getName, newType, children.toSeq.asJava)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of tweaking Arrow schema, could we change Spark Schema to make relevant fields nullable which is much easier? Then we can get rid of this class.

* on if the Hudi schema declares those leaves non-nullable. Non-blob fields keep their
* original nullability so their contracts aren't silently loosened.
*/
private def widenBlobSubtreeNullability(schema: StructType): StructType = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse this on the writer side as well so only Spark StructType schema is modified for consistency?

valueType = forceTypeNullable(m.valueType),
valueContainsNull = true)
case other => other
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c do we only support top-level BLOB fields as I remember?

*/
@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testBlobInlineRoundTrip(tableType: HoodieTableType): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we should add a single BLOB type test class to have the consistent testing logic on both parquet and lance so it's easier to maintain. We can do the refactoring later; @rahil-c to add a tracking issue.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 74.31193% with 28 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.89%. Comparing base (1e64662) to head (bc05cf5).
⚠️ Report is 9 commits behind head on master.

Files with missing lines Patch % Lines
...apache/hudi/io/storage/HoodieSparkLanceWriter.java 77.58% 8 Missing and 5 partials ⚠️
...ution/datasources/lance/SparkLanceReaderBase.scala 67.56% 6 Missing and 6 partials ⚠️
...rg/apache/hudi/io/storage/LanceRecordIterator.java 40.00% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18575      +/-   ##
============================================
+ Coverage     68.87%   68.89%   +0.01%     
- Complexity    28512    28561      +49     
============================================
  Files          2478     2480       +2     
  Lines        136801   136995     +194     
  Branches      16659    16697      +38     
============================================
+ Hits          94225    94382     +157     
- Misses        34990    35009      +19     
- Partials       7586     7604      +18     
Flag Coverage Δ
common-and-other-modules 44.40% <6.42%> (-0.04%) ⬇️
hadoop-mr-java-client 44.75% <100.00%> (-0.02%) ⬇️
spark-client-hadoop-common 48.49% <9.72%> (+0.02%) ⬆️
spark-java-tests 49.48% <74.31%> (+0.04%) ⬆️
spark-scala-tests 45.25% <6.42%> (-0.03%) ⬇️
utilities 37.96% <6.42%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...apache/hudi/io/storage/HoodieSparkLanceReader.java 74.07% <100.00%> (+2.82%) ⬆️
.../apache/hudi/common/config/HoodieReaderConfig.java 100.00% <100.00%> (ø)
...rg/apache/hudi/io/storage/LanceRecordIterator.java 60.27% <40.00%> (-3.50%) ⬇️
...ution/datasources/lance/SparkLanceReaderBase.scala 78.57% <67.56%> (-8.47%) ⬇️
...apache/hudi/io/storage/HoodieSparkLanceWriter.java 87.60% <77.58%> (-5.99%) ⬇️

... and 25 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Comment on lines +193 to +195
newFields[i] = enrichVectorField(field, vec);
} else if (isBlobField(field)) {
newFields[i] = enrichBlobField(field);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could possibly improve the logic by extracting the type from the schema metadata and then check, instead of parsing field multiple times.

Comment on lines +176 to +177
* Arrow-side in {@link #annotateBlobDataChildren}, since lance-spark's
* {@code toArrowSchema} doesn't propagate Spark field metadata to struct children.</li>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the issue should be fixed upstream in LanceArrowUtils#toArrowField where the struct field's meta is not passed down:

case StructType(fields) =>
        val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null, meta.asJava)
        new Field(
          name,
          fieldType,
          fields.map { field =>
            toArrowField(field.name, field.dataType, field.nullable, timeZoneId)
          }.toSeq.asJava)

@yihua yihua merged commit 8530644 into apache:master Apr 25, 2026
55 of 56 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants