Skip to content

feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366

Draft
schenksj wants to merge 19 commits into
apache:mainfrom
schenksj:contrib-delta-direct
Draft

feat(contrib): Native Delta Lake scan via delta-kernel-rs (Iceberg-style contrib)#4366
schenksj wants to merge 19 commits into
apache:mainfrom
schenksj:contrib-delta-direct

Conversation

@schenksj
Copy link
Copy Markdown

@schenksj schenksj commented May 19, 2026

Briefing

This PR lands a native Delta Lake scan for Comet. It supersedes #3932 — the
SPI/registry design there was rejected in favor of the Iceberg-style contrib
pattern this PR uses (typed proto variant + ~40 lines of feature-gated core
touchpoints + standalone contrib/delta/ tree). Default builds are entirely
unaware of this code: no SPI lookups, no ServiceLoader scans, no contrib
surface at runtime. Only when the -Pcontrib-delta Maven profile (and parallel
contrib-delta Cargo feature) is activated do the contrib classes land on the
classpath and the bridge resolve.

The integration reads Delta metadata via delta-kernel-rs on the driver,
encodes the resolved file list (with column mappings, DV info, partition
values) into a typed OpStruct::DeltaScan proto, and executes via DataFusion's
parquet reader on each executor. Deletion vectors, column mapping (name mode,
not id), type widening, row tracking via materialised columns, and
multi-task-per-partition packing are all supported. Drop-in for any Delta 4.1
table that doesn't use unsupported features.

Shape

Layer Path Lives in
Typed proto variant delta_scan = 117 native/proto/src/proto/operator.proto Core
Reflection bridge spark/.../comet/rules/DeltaIntegration.scala Core
Scan-rule arm spark/.../comet/rules/CometScanRule.scala Core (one block)
Exec-rule arm spark/.../comet/rules/CometExecRule.scala Core (one case)
PlanDataInjector.opStructCase spark/.../comet/sql/comet/operators.scala Core (one method)
Per-partition file paths CometExecRDD, CometNativeScanExec, CometExecIterator, ShimSparkErrorConverter Core (load-bearing for input_file_name() and FAILED_READ_FILE.NO_HINT wrapping in any native scan, not just Delta)
Delta scan rule, exec, serde contrib/delta/src/main/scala/... Contrib
Kernel-rs engine + cache, scan, DV filter, planner contrib/delta/native/src/*.rs + native/core/src/execution/planner/contrib_delta_scan.rs Contrib + one core dispatcher arm
Maven profile, Cargo feature spark/pom.xml, contrib/delta/native/Cargo.toml, native/core/Cargo.toml Build
Regression harness contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff Contrib

Key design decisions

Iceberg-style contrib, not SPI. Static helper objects with stable names
(DeltaScanRule.transformV1IfDelta, CometDeltaNativeScan.MODULE$); a single
reflection bridge in core resolves and caches Method handles once per JVM.
No registry, no ServiceLoader, no extension points beyond what core already
exposes. The contrib is just classpath-or-not.

Typed proto, not an envelope. OpStruct::DeltaScan is a first-class
variant alongside IcebergScan and NativeScan. Avoids the
ContribOp { kind, payload } envelope discussed in #3932 — type safety,
IDE refactoring, and PlanDataInjector can key by OpStructCase for O(1)
dispatch instead of string match.

Split-mode plan serialization. CometDeltaNativeScan.convert emits a
DeltaScan proto with the common block only (schemas, table root, filters);
each partition's tasks ride in a per-partition byte array via
PlanDataInjector at execution time. Avoids closure-capturing every file in
every partition, which is what makes scans of large Delta tables tractable.

InputFileBlockHolder thread-local hook in CometExecRDD.compute.
Comet's native scans bypass Spark's FileScanRDD, so the standard
input_file_name() thread-local would otherwise be empty for any native
scan (not just Delta). This is a small but load-bearing change: it fixes
both Delta's UPDATE/DELETE/MERGE flows (which use input_file_name() to
identify touched files) and the FAILED_READ_FILE.NO_HINT error
wrapping (which embeds the file path in the message).

Engine cache by (scheme, authority, DeltaStorageConfig). kernel-rs's
DefaultEngine<TokioBackgroundExecutor> spawns one OS thread per executor
that hosts a tokio runtime with a blocking pool. Without caching, hundreds
of scans/min was leaking threads faster than tokio reaped them, tripping
pthread_create EAGAIN ~2h into regression. The cache bounds live thread
count by table-storage diversity instead of by request count.

DV filter ordering safeguards. DeltaDvFilterExec tracks
current_row_offset across batches, which assumes physical-order input.
Overrides maintains_input_order() = [true] and
benefits_from_input_partitioning() = [false] so any future optimizer that
wants to insert a RepartitionExec is forced to bail rather than silently
re-order rows out from under the DV index map.

Decline-and-fallback everywhere, never silently wrong.

  • DV-bearing reads when the Phase 6 reader-feature gate is hit
  • Column mapping mode id (parquet field-ID resolution not yet wired)
  • Custom Hadoop filesystem schemes (fake:// etc) on V1 scans
  • CreateArray with mismatched child types (CDF struct nullability)
  • Schema element types Comet doesn't support
    Each path emits a withInfo reason so explain-fallback surfaces why.

No SPI/registry/traits, but one new trait method.
PlanDataInjector.opStructCase is the one core API addition. It keys the
existing injector map for O(1) dispatch and lets the contrib declare its
op kind without adding any runtime surface.

Review strategy

Suggest reviewing in this order, with different bars:

  1. Core touchpoints (~5 minutes, high bar). The new core surface area
    is small and worth careful reading because it ships in the default build:

    • native/proto/src/proto/operator.proto (one variant added)
    • spark/.../comet/rules/DeltaIntegration.scala (whole file — reflection bridge)
    • The new arm in CometScanRule.transformV1Scan and the new case in
      CometExecRule.transform
    • CometExecRDD + CometExecIterator + CometNativeScanExec diffs
      (per-partition file paths, InputFileBlockHolder hook)
    • ShimSparkErrorConverter.wrapNativeParquetError
    • spark/.../comet/serde/arrays.scala (CreateArray decline)
  2. Contrib Scala (~30 minutes, contrib bar). Bigger and lives entirely
    in contrib/delta/. Walk in this order:

    • DeltaScanRule.scala — entry point, feature gates, scheme allowlist,
      input_file_name() detection
    • CometDeltaNativeScan.scala — split serde, kernel-rs call, task
      prune/split/pack, column-mapping fixup
    • CometDeltaNativeScanExec.scala — exec wrapper, DPP partition pruning,
      metric reporting
    • DeltaPlanDataInjector.scala, DeltaInputFileBlockHolder.scala
    • DeltaReflection.scala, RowTrackingAugmentedFileIndex.scala
  3. Contrib Rust (~30 minutes, contrib bar):

    • contrib/delta/native/src/engine.rs — kernel-rs engine + cache
    • contrib/delta/native/src/scan.rsplan_delta_scan entry,
      DV row-index resolution
    • contrib/delta/native/src/planner.rsbuild_delta_partitioned_files,
      SessionTimezone, ColumnMappingFilterRewriter
    • contrib/delta/native/src/dv_filter.rsDeltaDvFilterExec
    • contrib/delta/native/src/jni.rsplanDeltaScan JNI entry
    • native/core/src/execution/planner/contrib_delta_scan.rs — the
      core-side dispatcher arm
  4. Build / regression infra (~5 minutes):

    • spark/pom.xml -Pcontrib-delta profile
    • native/core/Cargo.toml contrib-delta feature
    • contrib/delta/native/Cargo.toml (standalone, not in workspace —
      intentional to avoid arrow-57 / arrow-58 cross-contamination)
    • contrib/delta/dev/run-regression.sh + dev/diffs/delta/4.1.0.diff

The git log --oneline main..HEAD is also a useful walk — commits are
labeled by phase (P7a..P7n) and each commit message documents the
specific concern it addresses.

What's not in this PR (follow-ups)

  • Column-mapping id mode (requires parquet field-ID resolution in
    Comet's parquet reader)
  • RowTracking materialisation for tables without materialised columns
  • The remaining TypeWidening cases DataFusion's schema adapter doesn't
    already handle
  • ProjectionExec column-mapping rename pushdown into ParquetSource's
    schema adapter (item copy over the script to enable pyspark as well #4 from the in-PR perf sweep)
  • Engine cache TTL / credential-rotation eviction (fine for validation;
    would block long-lived production drivers using STS)

Test plan

  • Default builds (no -Pcontrib-delta): mvn -pl spark -am test-compile green
  • -Pcontrib-delta builds green (Maven + Cargo)
  • Targeted retest of every cluster surfaced during validation:
    • DescribeDeltaHistorySuite "replaceWhere on data column" — 8/8
    • DeltaTableHadoopOptionsSuite "dropFeatureSupport - with filesystem options" — 1/1
    • SnapshotManagementSuite "should not recover when the current checkpoint is broken..." — 2/2
  • Engine-cache fix verified end-to-end (no more pthread_create EAGAIN)
  • Full Delta 4.1 regression sweep against this branch (relaunched after the post-review fix bundle landed)
  • CI: default + -Pcontrib-delta build paths exercised

Upstream issue

apache/datafusion#22366
— filed for make_array element-type strictness. The CometCreateArray
decline in this PR is a caller-side workaround until upstream relaxes.

🤖 Generated with Claude Code

schenksj and others added 19 commits May 18, 2026 20:01
Initial scaffolding for the direct Delta integration that replaces the
generic contrib SPI proposed in apache#4339. Mirrors Iceberg's pattern:

  - native/proto/src/proto/operator.proto: typed `DeltaScan delta_scan = 117`
    variant on `OpStruct`, with the six message definitions (DeltaScanCommon,
    DeltaScan, DeltaScanTask, DeltaPartitionValue, DeltaScanTaskList,
    DeltaColumnMapping) inlined next to the IcebergScan group. Field numbers
    preserved from the contrib-delta-pr2 branch.

  - native/core/src/execution/planner.rs: unconditional `OpStruct::DeltaScan`
    dispatcher arm with feature-gated body. Default builds return a clear
    "rebuild with --features contrib-delta" error; the feature-on arm is a
    `todo!` stub today and gets filled in as the implementation ports over.

  - native/core/src/execution/jni_api.rs + planner/operator_registry.rs: extend
    the existing `OpStruct` match sites so default builds compile exhaustively.

  - native/core/Cargo.toml: new optional `contrib-delta` feature backed by an
    optional path dep on `comet-contrib-delta`. Default builds carry zero Delta
    surface (verified: `cargo check` builds clean without the feature, and the
    Delta crate is not in the workspace `members` list).

  - native/Cargo.toml: explicit `exclude = ["../contrib"]` so the workspace
    doesn't try to absorb the contrib crate (which would fail -- workspace
    members must live hierarchically under the workspace root).

  - contrib/delta/native/{Cargo.toml,src/lib.rs}: skeleton crate that re-exports
    the typed Delta proto messages so contrib-internal code has a stable short
    alias. Real implementation (kernel-rs log replay, DV filter, column
    mapping, partition parsing) ports over from contrib-delta-pr2 in follow-up
    commits.

Build verification:
  cargo check  -p datafusion-comet                        # default: green
  cargo check  -p datafusion-comet --features contrib-delta # green

This addresses Parth's review on apache#4339: ~40 lines of core touchpoints all
behind a feature gate, no SPI/registry/traits/runtime dispatch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Brings the working delta-kernel-rs integration over from contrib-delta-pr2
without the contrib SPI plumbing Parth flagged on apache#4339.

contrib/delta/native/:
  - jni.rs, scan.rs, engine.rs, error.rs, predicate.rs, dv_filter.rs --
    ported verbatim from contrib-delta-pr2 (only crate::proto::* import paths
    needed adjustment, handled via lib.rs re-export of the typed messages
    that now live in core's proto crate)
  - planner.rs -- Delta-specific helpers (build_delta_partitioned_files,
    parse_delta_partition_scalar with the DATE -> TIMESTAMP_NTZ widening
    fallback already inlined, ColumnMappingFilterRewriter) exposed as
    pure-DataFusion functions that core's dispatcher arm composes onto the
    standard parquet datasource path. NO ContribOperatorPlanner trait, NO
    ContribPlannerContext, NO ParquetDatasourceParams -- the contrib crate
    is now a plain library with public functions.
  - lib.rs -- module decls + a `pub mod proto` re-export of the six typed
    Delta messages from `datafusion_comet_proto::spark_operator`. No
    `#[ctor]` and no `register_contrib_planner` call.
  - Cargo.toml -- standalone (outside the native/ workspace root), no
    comet-contrib-spi dep, all delta-specific deps stay confined here.

native/core/src/execution/planner/contrib_delta_scan.rs (new):
  - `PhysicalPlanner::plan_delta_scan` -- the `OpStruct::DeltaScan` arm body
    extracted into its own file (~210 lines, mirrors `OpStruct::IcebergScan`
    in size and shape). Gated `#[cfg(feature = "contrib-delta")]`; calls
    core's `init_datasource_exec`, `prepare_object_store_with_configs`,
    `convert_spark_types_to_arrow_schema` directly + comet-contrib-delta's
    helpers for the Delta-specific pieces.

native/core/src/execution/planner.rs:
  - `OpStruct::DeltaScan` arm: 6-line dispatcher that calls into
    `self.plan_delta_scan(...)` under `#[cfg(feature = "contrib-delta")]`.

native/core/src/parquet/parquet_exec.rs:
  - New `ignore_missing_files: bool` arg on `init_datasource_exec`.
    Threaded through to `IgnoreMissingFileSource` wrapper (ported verbatim
    from PR2's native/core/src/parquet/missing_file_tolerant.rs) which
    decorates the final FileSource so its FileOpener swallows object-store
    NotFound errors as empty streams. Matches Spark's
    `spark.sql.files.ignoreMissingFiles=true` semantics. All existing call
    sites updated to pass `false`.

Build verification (both checked clean):
  cargo check  -p datafusion-comet                          # default
  cargo check  -p datafusion-comet --features contrib-delta

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
These five files port verbatim from contrib-delta-pr2 -- they touch only
Spark APIs (via reflection) and standard Scala, none of the rejected SPI
surface:

  - DeltaConf.scala               Config keys (COMET_DELTA_NATIVE_ENABLED, ...)
  - Native.scala                  JNI bridge for planDeltaScan
  - DeltaReflection.scala         Reflective access to spark-delta internals
                                  (isDeltaFileFormat, isBatchFileIndex,
                                  extractBatchAddFiles, ...)
  - RowTrackingAugmentedFileIndex Wraps a FileIndex to inject row-tracking
                                  metadata columns
  - DeltaInputFileBlockHolder     Thread-local replacement for
                                  InputFileBlockHolder on the Delta scan path

Plus the regression infrastructure (4.1.0.diff, run-test.sh,
run-regression.sh).

The remaining four files (CometDeltaNativeScan, CometDeltaNativeScanExec,
DeltaScanRuleExtension, DeltaOperatorSerdeExtension, DeltaPlanDataInjector)
each reference the rejected SPI surface (CometOperatorSerde,
CometScanRuleExtension, ContribOp envelope, PlanDataSource, PlanDataInjector).
Those need rewriting before they can compile against main -- queued as the
next commit on this branch:
  - drop the `extends CometOperatorSerde[CometScanExec]` trait bound;
    expose `convert(...)` as a static method
  - replace ContribOp envelope with the typed OpStruct::DeltaScan
  - drop the SPI extension class wrappers; integrate detection directly
    into CometScanRule.scala + CometExecRule.scala (Iceberg-style)
  - bake DeltaPlanDataInjector logic directly into CometDeltaNativeScanExec

Maven `-Pcontrib-delta` profile, scalastyle wiring, and the SPI rewrite
all land together in the follow-up commit so the contrib compiles
end-to-end against main.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ction bridge

The four SPI-touching files from contrib-delta-pr2 rewritten to drop the
rejected SPI base classes and use the typed `OpStruct::DeltaScan` proto
variant directly:

  - CometDeltaNativeScan.scala  no longer `extends CometOperatorSerde`;
    plain object with `convert(scan, builder, childOp*)` static method.
    All `ContribOp` envelope wrapping replaced with
    `builder.setDeltaScan(...)`. DeltaOperator.* imports redirected to
    core's `org.apache.comet.serde.OperatorOuterClass`.
  - CometDeltaNativeScanExec.scala  no longer `with PlanDataSource`;
    public accessors (planDataSourceKey, planDataCommonBytes,
    planDataPerPartitionBytes) stay so core's CometExecRDD can read them
    directly. `nativeOp.getContribOp.getPayload` calls collapse to the
    typed `nativeOp.getDeltaScan` accessor.
  - DeltaScanRule.scala  was `class DeltaScanRuleExtension extends
    CometScanRuleExtension`; now a plain `object DeltaScanRule` with a
    single static entry point `transformV1IfDelta(plan, session,
    scanExec, relation): Option[SparkPlan]`. The private
    `CometScanRule.isSchemaSupported` is unreachable from contrib, so
    inline the equivalent check (CometScanTypeChecker + fallback-reason
    emission).
  - The DeltaOperatorSerdeExtension + DeltaPlanDataInjector files are
    not ported -- their roles fold into the next commit's CometExecRule
    Delta serde dispatch and into CometDeltaNativeScanExec respectively.

Core wiring:
  - spark/pom.xml: new `<profile id="contrib-delta">` adds
    contrib/delta/src/main/scala/ as a compile source on comet-spark and
    pulls in `io.delta:delta-spark_2.13:4.1.0` at provided scope.
  - CometScanRule.scala: 5-line Delta detection block at the head of
    `transformV1Scan`'s HadoopFsRelation case (Iceberg-style; calls into
    `DeltaIntegration.transformV1IfDelta` which is a no-op when the
    contrib isn't bundled).
  - DeltaIntegration.scala (new): reflection bridge that resolves the
    contrib's `DeltaScanRule` + `CometDeltaNativeScan` companion objects
    by class name. Default builds get `None`; -Pcontrib-delta builds get
    a working delegate. No SPI / ServiceLoader / registry.

Build verification:
  mvn compile                  # default: still green
  mvn compile -Pcontrib-delta  # GREEN -- this is the milestone

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tics

Spark's UnsafeRow.getUTF8String wraps bytes via UTF8String.fromAddress with
no UTF-8 validation, and cast(BinaryType -> StringType) is a zero-copy
reinterpret that leaves arbitrary bytes in a StringType column. Delta's
Z-Order uses interleave_bits(...).cast(StringType) for opaque sort keys,
which panicked Comet's strict from_utf8(...).unwrap() and cascaded into
JVM classloader errors (60+ ServiceConfigurationError tests in the
contrib-delta-pr2 regression run).

Switch to from_utf8_unchecked since the bytes flow directly into Arrow's
StringBuilder::append_value and are never introspected as a &str.

Verified on contrib-delta-pr2: OptimizeZOrderScalaSuite "interleaving"
4/4 PASS after this fix.

Pure core fix -- independent of the contrib/delta integration. Lands on
this branch because it's a prerequisite for the Delta regression to be
meaningful (without it the Z-Order panic poisons every following test).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects core's CometExecRule to the contrib's Delta scan serde so the
Delta-marker CometScanExec produced by CometScanRule flows through the
same `convertToComet(scan, handler)` path as Iceberg / NativeScan / etc.

  - CometDeltaNativeScan re-extends core's `CometOperatorSerde` trait
    (the trait itself is core, not part of the rejected extension SPI;
    every Comet operator handler implements it). `getSupportLevel` /
    `enabledConfig` / `convert` now properly override.
  - DeltaIntegration.scanHandler: a single reflective lookup exposes
    the contrib's companion as a `CometOperatorSerde[CometScanExec]`.
    Returns None on default builds.
  - CometExecRule.transform: new case beside the SCAN_NATIVE_DATAFUSION
    one that recognises the Delta scan marker (scanImpl ==
    "native_delta_compat") and dispatches via the handler.

Build verification:
  mvn compile                  GREEN
  mvn compile -Pcontrib-delta  GREEN

Still pending for end-to-end:
  - per-partition task-list injection (replaces PR2's DeltaPlanDataInjector
    SPI) -- baked into CometExecRDD via another small reflection hook
  - live smoke test once the dylib is rebuilt with --features contrib-delta
    and bundled into the jar

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects the contrib's per-partition Delta task-list serialisation into
core's existing `PlanDataInjector.injectPlanData` pipeline. Without this
the native side decodes a tasks-empty `DeltaScan` and returns `EmptyExec`
(0 rows) for every Delta scan.

  - contrib/delta/.../DeltaPlanDataInjector.scala: implements core's
    `PlanDataInjector` trait. `canInject` checks `op.hasDeltaScan` and
    rejects already-injected operators (idempotent). `inject` splices the
    partition's tasks into the operator's common-only DeltaScan envelope
    via `op.toBuilder.setDeltaScan(...)` -- pure typed-proto operations,
    no `ContribOp` envelope.
  - spark/.../operators.scala: `PlanDataInjector.injectors` Seq now
    appends the contrib injector via one reflective Class.forName lookup.
    Default builds get None (no contrib classes on classpath) so the
    list is unchanged; -Pcontrib-delta builds get the Delta injector.

Build verification:
  mvn compile -Pcontrib-delta  GREEN

End-to-end Scala+Maven integration is now complete. Remaining work:
  - rebuild native dylib with `--features contrib-delta` and bundle
    into comet-spark.jar
  - run an isolated test (e.g. OptimizeZOrderScalaSuite "interleaving")
    to confirm the end-to-end path works

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wrap Class.forName calls in `// scalastyle:off classforname`, change
Option[Class[_]] to Option[Class[AnyRef]] to avoid existential type
warnings, reword the doc comment so the verbatim string Class.forName
doesn't trip scalastyle's source-pattern check.

  mvn scalastyle:check -Pcontrib-delta  GREEN

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…th file path

CometExecIterator was wrapping native Parquet failures (e.g. corrupt-footer
errors from kernel-rs reading a broken Delta checkpoint) in `_LEGACY_ERROR_TEMP_2254`,
whose message is literally "Data read failed." -- no file path, no useful context.

That broke tests that mirror Spark/Delta's standard parquet-failure shape, e.g.
SnapshotManagementSuite "should not recover when the current checkpoint is broken"
which asserts the resulting SparkException's message contains both the file path
and "Encountered error while reading file" -- the format
`QueryExecutionErrors.cannotReadFilesError` produces.

Switch the wrapping to `cannotReadFilesError(cause, filePath)` via a new helper
on ShimSparkErrorConverter (which lives in the spark package and can reach the
private InputFileBlockHolder / QueryExecutionErrors). File path is read from
InputFileBlockHolder, with an empty-string fallback when the thread-local
isn't set; the static phrasing still satisfies the test assertion.

Pure core fix -- benefits every native parquet read, not just Delta.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DeltaTable.forPath(spark, path, fsOptions) with a Hadoop custom-fs scheme
(e.g. fake://) was being claimed by CometScanRule for V1 parquet scans on
the _delta_log/checkpoint.parquet files Delta reads internally. The native
side then crashed at executePlan with `Generic URL error: Unable to
recognise URL "fake:///..."` since object_store doesn't know the custom
scheme.

Add a scheme allowlist check (same set already used in the Iceberg branch
and the contrib Delta path) at the top of the HadoopFsRelation arm; decline
via withInfo when any rootPaths scheme is outside the allowlist so Spark's
Hadoop-FS-aware reader handles the scan.

Fixes DeltaTableSuite "dropFeatureSupport - with filesystem options" and is
also a baseline fix (the same crash reproduces on main per
full-20260415-222735.log).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each `plan_delta_scan` JNI call was creating a fresh `DefaultEngine`. Kernel's
`DefaultEngine<TokioBackgroundExecutor>` spawns one std::thread per executor that
hosts a current_thread tokio runtime, and that runtime's blocking pool (used by
kernel for parquet metadata IO and object_store reads) keeps `spawn_blocking`
worker threads alive for ~10s after each task. Under regression load (hundreds
of Delta scans/minute, each spawning a handful of blocking IO tasks) this
accumulates OS threads faster than tokio reaps them, eventually hitting the
per-process `ulimit -u` (~1300 on macOS) — visible in the log as
`pthread_create EAGAIN` aborts of GenerateIdentityValuesSuite and
MergeIntoUnlimitedMergeClausesScalaSuite ~2 hours into the run.

Replace the per-call `create_engine` with `get_or_create_engine` that returns
an `Arc<DeltaEngine>` from a static cache keyed by `(scheme, authority,
DeltaStorageConfig)`. Engines are constructed lazily on first miss per key and
reused for the lifetime of the JVM, bounding live OS threads by table-storage
diversity rather than by request count. The standalone `create_engine` is kept
(behind `#[allow(dead_code)]`) for tests that want a fresh engine.

`scan.rs` updated to deref `Arc<DeltaEngine>` to `&dyn Engine` at each kernel
call (`builder.build`, `scan.scan_metadata`, `dv.get_row_indexes`).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DataFusion's `make_array_inner` asserts strict element-type equality (down to
nested field nullability) via `MutableArrayData::with_capacities`. Spark's
`CreateArray` is more permissive: when the analyzer doesn't insert coercion
casts, children can share the same surface struct type but disagree on a
nested field's nullability. Delta's CDF write path builds
`array(struct(id, b, _change_type=lit("delete")), struct(id, b, _change_type=col))`
manually -- one arm's `_change_type` is `Utf8` non-nullable (from a literal),
another is `Utf8` nullable -- and Comet's native serde happily emitted a
`make_array` call. Native execution then panicked:

  assertion `left == right` failed: Arrays with inconsistent types passed to
  MutableArrayData
   left: Struct([..., Field { name: "_change_type", data_type: Utf8 }])
  right: Struct([..., Field { name: "_change_type", data_type: Utf8, nullable: true }])

Decline in `CometCreateArray` when `children.map(_.dataType).distinct.size > 1`
so the JVM evaluator (which doesn't have this strictness) handles it. Fixes 4
`DescribeDeltaHistorySuite "replaceWhere on data column ... enableCDF=true"`
failures.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… kind

Two perf-sweep items from apache#135:

apache#7 parse_delta_partition_scalar TZ parse-once. The per-row
chrono_tz::Tz::from_str (or fixed-offset parse) was happening inside
parse_delta_partition_scalar for every TIMESTAMP partition value, but the
session TZ string doesn't change within a scan. Introduce SessionTimezone
enum (Tz | Offset | Invalid), parse once in build_delta_partitioned_files,
pass the parsed value through. parse_delta_partition_scalar's signature gains
&SessionTimezone and keeps session_tz: &str only for the error message.

apache#2 PlanDataInjector lookup by op kind. injectPlanData was running
`for (injector <- injectors if injector.canInject(op))` against every
operator in the tree; for a 50-op plan with 3 injectors that's 150
canInject calls just to find no match on most ops. Add `opStructCase` to the
PlanDataInjector trait, build a Map[OpStructCase, PlanDataInjector] once at
object init, and look up by op.getOpStructCase before any canInject call.
Iceberg/NativeScan/Delta injectors set their own opStructCase.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…tion

Perf-sweep #1 from apache#135. `DeltaIntegration.transformV1IfDelta` is invoked for
every V1 scan in every plan (the bridge is called unconditionally by
CometScanRule before the contrib's own Delta-format check). On
-Pcontrib-delta builds each call was doing `getField MODULE$` +
`getMethod("transformV1IfDelta", ...)` + 4-arg Method.invoke -- a reflection
round-trip per scan.

Cache the resolved (module, method) binding once per JVM as
`transformV1IfDeltaBinding: Option[(AnyRef, Method)]`, single OnceLock-style
volatile. Steady-state per-scan cost drops to one volatile read + one
Method.invoke.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Perf-sweep apache#5 from apache#135. `isSchemaCometCompatible` was allocating a fresh
CometScanTypeChecker(CometDeltaNativeScan.ScanImpl) on every scan. The
checker is stateless w.r.t. its scanImpl tag and is safe to share. Promote
it to a private val on DeltaScanRule; the per-scan fallback-reasons
ListBuffer remains per-call (it's the only mutable input).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…artitioned_files

Perf-audit apache#137 finding #1. The inner `partition_schema.fields()` loop was
calling `.iter().find()` on `task.partition_values` for every field --
O(width × values) per task. Pre-build a per-task HashMap<&str, &str> once,
then O(1) gets. The map is reused across tasks via clear() so the allocation
amortises across all DeltaScanTasks in the scan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SnapshotManagementSuite "should not recover when the current checkpoint is
broken..." asserts the wrapped FAILED_READ_FILE.NO_HINT SparkException message
contains the file path (e.g. "0001.checkpoint"). de9e0d3 got the error class
right but left the path empty because:

  1. Comet's native scan path does NOT go through Spark's FileScanRDD, so the
     standard InputFileBlockHolder thread-local is never populated.
  2. ShimSparkErrorConverter.wrapNativeParquetError was reading from
     InputFileBlockHolder, getting null, and passing "" to
     cannotReadFilesError -- producing "Encountered error while reading file . "
     (with the empty path), which the test rejected.

Plumb per-partition file paths from CometNativeScanExec (where they're known
at planning time) -> CometExecRDD -> CometExecPartition -> CometExecIterator
-> wrapNativeParquetError. CometNativeExec.doExecuteColumnar (the actual call
site that constructs the iterator for query trees with a scan) collects file
paths from any CometNativeScanExec leaves and passes them through the same
CometExecRDD parameter.

Verified with a /tmp/cometdiag.log file sentinel that the existing logWarning
diags were being silently dropped by the test's `quietly { ... }` block,
which is why my earlier "the wrap isn't being reached" conclusion was wrong.

Test results after fix: SnapshotManagementSuite checkpoint-broken 2/2 PASS
(was 0/2 with empty path). The other 3 fix clusters
(de9e0d3+effe5f76+56c2b011) continue to pass: replaceWhere CDF 8/8,
dropFeatureSupport 1/1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…+ safeguards

Five fixes from the comprehensive code review of contrib-delta-direct:

1. Implement the missing InputFileBlockHolder hook in CometExecRDD.compute.
   Several docs referenced `CometExecRDD.setInputFileForDeltaScan` but no such
   method existed and nothing called `DeltaInputFileBlockHolder.set`, leaving
   Delta's UPDATE/DELETE/MERGE flows (which use `input_file_name()` to find
   touched files) silently looking at an empty path. Now set the thread-local
   to the partition's first file (one-per-partition is enforced by
   DeltaScanRule when input_file_name() is referenced), unset on task
   completion. Stale doc references updated to point at the real call site.

2. DV filter ordering safeguards. DeltaDvFilterExec's `current_row_offset`
   tracking assumes physical row ordering from the parquet scan. Override
   `maintains_input_order() = [true]` and
   `benefits_from_input_partitioning() = [false]` so any future optimizer
   that wants to insert a RepartitionExec / SortPreservingMergeExec is
   forced to bail rather than silently re-order rows.

3. Tighten IgnoreMissingFileSource's `is_not_found` Display fallback. The
   prior `msg.contains("not found")` would match unrelated parquet messages
   like "row group statistics not found" or "page index not found" and
   silently swallow them as missing-file (returning empty results instead
   of failing). Restrict to recognised NotFound prefixes from object_store /
   S3 / FS error formats.

4. Multi-line regex for native parquet errors in CometExecIterator. Native
   parquet errors with embedded newlines (e.g. footer hex dumps) would slip
   past the single-line `^Parquet error: .*$` and surface as bare
   CometNativeException. Add `(?s)` so `.` spans newlines.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The post-review fixes added/modified scaladoc that broke spotless line-length
rules. Apply spotless:apply across the three touched files. Verified with
test-compile.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant