Feat/delta kernel read#4607
Closed
schenksj wants to merge 74 commits into
Closed
Conversation
A field of a NULL struct must be NULL (Spark semantics). Arrow stores a StructArray's child arrays with their own validity, INDEPENDENT of the parent struct's null buffer, so the raw child value at a row where the struct itself is null can be non-null (e.g. parquet files where a logically-null struct column still carries a populated child buffer). GetStructField.evaluate returned the child column verbatim, so isnotnull(struct.field) wrongly evaluated TRUE for a null struct. Fix: union the parent struct's null mask into the extracted child (null where the struct is null OR the child is null). Adds a standalone unit test that fails without the fix and passes with it. Closes apache#4432 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Spark's UnsafeRow.getUTF8String performs no UTF-8 validation, and cast(BinaryType -> StringType) is a zero-copy reinterpret, so a StringType column can legitimately hold arbitrary non-UTF-8 bytes. get_string decoded with from_utf8(..).unwrap(), which panics on such rows even though Spark treats them as opaque. Use from_utf8_lossy (returning Cow<str>): a zero-cost borrow for valid UTF-8 and a String with U+FFFD replacements otherwise -- defined behavior, no UB. Avoids from_utf8_unchecked, which would construct a &str from arbitrary bytes (UB) and propagate into downstream Arrow ops. Adds a standalone unit test that panics without the fix and passes with it. Closes apache#4521 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…chemes Comet's native readers go through object_store, which only understands a fixed set of URL schemes. A custom Hadoop FileSystem (e.g. registered via spark.hadoop.fs.<scheme>.impl) crashes the native reader at execution with "Generic URL error: Unable to recognise URL", with no graceful recovery. Decline such scans at planning time so Spark's Hadoop-FS-aware reader handles them. Whether object_store recognizes a scheme is answered by the native layer itself (NativeBase.isObjectStoreSchemeSupported, backed by object_store's ObjectStoreScheme::parse -- the same path prepare_object_store_with_configs uses) rather than a hardcoded list, so the planner can't drift from object_store's actual support. The user's libhdfs scheme config (spark.hadoop.fs.comet.libhdfs.schemes) is unioned in on the JVM side; results are cached per scheme; if native can't be consulted the scheme is assumed supported rather than over-restricting. Adds CometScanSchemeFallbackSuite, which asserts a `fake://` scan falls back to Spark; it fails without the gate (Comet claims the scan) and passes with it. Closes apache#4520 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
A left-deep chain of N associative boolean operands serializes to a proto nested N levels deep. With N > protobuf's default recursion limit (100), the message overflows when the serialized plan is re-parsed -- on the JVM via Operator.parseFrom (findShuffleScanIndices / explain) and in the Rust prost decoder -- failing an otherwise-supported query. Comet evaluates AND/OR vectorially (both sides always evaluated, no row-level short-circuit), so the chains are fully associative. Flatten each chain and rebuild it as a balanced O(log n) tree before serialization; this is semantically identical and only changes the proto's shape. Adds QueryPlanSerde.flattenAssociative + createBalancedBinaryExpr and routes CometAnd / CometOr through them. Closes apache#4526 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Spark wraps file-source partition columns and other per-batch constants in ConstantColumnVector. When such a batch reaches Comet's serialization path (Utils.getBatchFieldVectors, used by broadcast/shuffle) or FFI export path (NativeUtil.exportBatch), it was rejected with "Comet execution only takes Arrow Arrays". Materialize the constant into a fresh Arrow FieldVector (the constant repeated numRows times) inline. The materializer reuses the existing per-type ArrowFieldWriters, so it covers every type -- scalars, decimal, timestamps, and complex struct/array/map -- and stays in sync with Spark's type handling. Adds ConstantColumnVectors.materialize (arrow package) + Utils.materializeConstantColumnVector, with new match arms in getBatchFieldVectors and exportBatch. Closes apache#4527 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
DataFusion's make_array asserts strict element-type equality in MutableArrayData and panics on a mismatch. Spark's CreateArray coerces element types with `sameType`, which ignores nullability, so children that share a surface type but differ only in a nested struct field's nullability get no unifying cast (e.g. array(struct(a not null), struct(a nullable))). Native execution then panics: "Arrays with inconsistent types passed to MutableArrayData". DataFusion tolerates container nullability differences (ArrayType.containsNull / MapType.valueContainsNull are coerced), so decline only the cases that actually panic: children that still differ after normalizing container nullability while keeping struct field nullability significant. Those fall back to Spark's evaluator. Closes apache#4528 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
PlanDataInjector.injectPlanData walked every operator in the tree against every registered injector (`for (injector <- injectors if injector.canInject(op))`) -- N operators x M injectors canInject calls -- even though most operators in any tree are non-scan and match no injector. Add `opStructCase` to the PlanDataInjector trait and key a Map[OpStructCase, PlanDataInjector]. Look up by op.getOpStructCase (O(1)) then a single canInject confirm; non-scan operators skip the iteration entirely. Pure performance change -- no behavior difference. Closes apache#4530 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
When Comet's native DataFusion scan hits a corrupt footer, corrupt page/column
data, a truncated/empty file, or a deleted file, it rethrew the raw native
message instead of Spark's FAILED_READ_FILE. The native path does not go through
Spark's FileScanRDD, so the offending path was usually missing too.
Classify these failures by TYPED DataFusionError variant in the native error
path (ParquetError / ObjectStore / ArrowError-wrapping-ParquetError / IoError,
unwrapping Context/Shared) rather than by matching error-message prose -- the
strings come from three upstream crates (DataFusion, arrow-rs, object_store) and
drift across version bumps with no compile-time signal. The match arms are
checked by the compiler.
- native: new SparkError::CannotReadFile { file_path, message } variant; a typed
try_classify_file_read_error in the JNI bridge converts a file-read
DataFusionError into it, replacing the previous "not found"/"No such file"
string match. file_path is taken from object_store::Error::NotFound when
available. Deliberately does NOT match object_store Generic errors (also used
for non-file config errors that must surface as-is).
- JVM: the structured error crosses JNI as the existing CometQueryExecutionException
JSON payload; SparkErrorConverter decodes "CannotReadFile" and, when the native
error carried no path, fills it from the per-task file list threaded from
CometNativeScanExec via CometExecRDD. The shims wrap it via
QueryExecutionErrors.cannotReadFilesError. No JVM-side message matching.
Closes apache#4529
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
check-suites.py requires every *Suite.scala to appear in both pr_build_linux.yml and pr_build_macos.yml. Add the new PlanDataInjectorSuite alongside its sibling org.apache.spark.sql.comet suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
check-suites.py requires every *Suite.scala to appear in both pr_build_linux.yml and pr_build_macos.yml. Add the new CometScanSchemeFallbackSuite alongside its sibling org.apache.comet.rules suites. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…la 2.12 SQLTestUtils.withSQLConf returns Unit on Spark 3.5 but a value on Spark 4.x, so assigning its block result to `val sparkPlan: SparkPlan` failed to compile under the spark-3.5 profile (type mismatch: found Unit, required SparkPlan). Capture the plan via a var assigned inside the block, which is cross-version-safe. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback: import java.lang.Boolean (as JBoolean), java.net.URI, java.util.Locale and java.util.concurrent.ConcurrentHashMap rather than referencing them with fully-qualified class names in the newly-added scheme-gating code. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…R WHERE Address review feedback on the deep-chain rebalancing PR: - flattenAssociative now uses an explicit work stack and an accumulating buffer instead of recursion. The chains that trigger this are left-deep and O(n) deep, so the prior recursive walk could itself overflow the JVM stack and the `++` accumulation was O(n^2). - The recursion-limit test now mixes a nullable column into the chains so the rebalanced tree is exercised under SQL three-valued logic, and adds a deep OR in a WHERE clause -- a common trigger that, unlike a top-level AND, Spark does not split and so stays deeply nested. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…che#4521) Address review feedback: add a Spark-level regression test demonstrating the bug. cast(binary -> string) is a zero-copy reinterpret in Spark, so a StringType column can hold arbitrary non-UTF-8 bytes. The test disables Comet's Cast so those raw bytes reach Comet's columnar (JVM) shuffle inside a JVM UnsafeRow, exercising the native row->Arrow get_string path that used to panic via from_utf8(..).unwrap() and now decodes lossily. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rename) The unsupported-scheme fallback still called withInfo, the old name of withFallbackReason (renamed in apache#4508). It was the only remaining old-name call in the file and broke compilation after merging main; rename it to match the rest of CometScanRule. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… nullable GetStructField::nullable() reported only the extracted field's own nullability, ignoring whether the parent struct can be null. A field of a null struct is null (Spark semantics, enforced here by project_field unioning the parent null mask), so a NON-nullable field of a NULLABLE struct must itself be reported nullable. Reporting the field's own flag under-declares: the projected column then carries the parent's nulls while claiming non-nullable, and Arrow RecordBatch validation rejects it downstream with "Column '...' is declared as non-nullable but contains null values" (e.g. once the column reaches a shuffle read-back or a projection over a final aggregate). This is the companion to the value-side null-mask propagation in this PR -- the value is now correct, this makes the declared nullability match. Mirrors Spark's GetStructField.nullable = child.nullable || field.nullable. Surfaced by Delta's action frame: each log row is exactly one action type, so the action columns (add, remove, ...) are nullable structs whose inner fields are declared NON-nullable by Delta's typed SingleAction schema (e.g. add.size). The non-AddFile rows leave add null, so add.size carries nulls while declared non-nullable, crashing Comet's native shuffle during OPTIMIZE / commit. Tests: - Rust unit tests for the nullability matrix (nullable/non-nullable parent x field). - A Spark repro in CometExpressionSuite that builds that exact shape with an explicit in-memory schema (a Parquet round-trip would mark every field nullable, and a CreateNamedStruct would be declined), shuffles it, and projects the non-nullable inner field. It fails with the above error before this fix and passes after. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…Error), not CannotReadFile
A genuinely-missing file (object_store NotFound) is distinct from a corrupt/truncated one:
Spark surfaces it via `readCurrentFileNotFoundError` ("It is possible the underlying files
have been updated."), not `cannotReadFilesError` (FAILED_READ_FILE). `try_classify_file_read_error`
mapped every per-file read failure -- including NotFound -- to `SparkError::CannotReadFile`, so a
file removed between planning and execution produced the wrong Spark error.
Classify object_store NotFound as `SparkError::FileNotFound` instead. The NotFound may arrive
directly (`DataFusionError::ObjectStore`) or wrapped by the parquet reader as
`ParquetError::External(..)` / `ArrowError::ParquetError`, so a `source_chain_has_object_store_not_found`
helper walks the typed source chain (never message text). Corrupt/truncated reads stay
CannotReadFile -> FAILED_READ_FILE. The JVM shim already maps the `FileNotFound` errorType to
`readCurrentFileNotFoundError`, so no shim change is needed.
Surfaced by Delta's CDC-after-VACUUM read: `DeltaVacuumSuite` "vacuum for cdc - update/merge" and
"... - delete tombstones" vacuum the `_change_data` files and assert the subsequent read throws
`readCurrentFileNotFoundError`; with the native scan these failed because Comet returned the
cannotReadFilesError message. Both pass with this fix (verified locally).
Tests:
- Rust unit tests for the classifier: object_store NotFound (direct and ParquetError::External-wrapped)
-> FileNotFound; corrupt ParquetError stays CannotReadFile.
- Spark `CometExecSuite` "native parquet read of a missing file surfaces readCurrentFileNotFoundError"
(red before, green after): reads a file deleted between planning and execution.
- Made the existing FAILED_READ_FILE corrupt-file assertion spark-version-stable (assert
"Encountered error while reading file" -- present on both 3.5 and 4.x; only 4.x prepends the
[FAILED_READ_FILE.NO_HINT] class tag), so the test passes under -Pspark-3.5 as well.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ons, IoError scoping Addresses @andygrove's review on apache#4536: - spark-3.4 shim: add the `CannotReadFile` case (it only existed in the 3.5 and 4.x shims), so a corrupt/truncated read is wrapped via `cannotReadFilesError` (FAILED_READ_FILE) on Spark 3.4 too. (The `FileNotFound` case was already present on 3.4.) - SparkErrorConverterSuite: assert on the version-stable message ("Encountered error while reading file ...") instead of the `FAILED_READ_FILE` literal, which only Spark 4.x prepends to getMessage as the error-class tag (3.4/3.5 render only the message). Fixes the two failing tests on 3.4/3.5; same version-stable style already applied to the CometExecSuite e2e test. - native classifier: stop treating a bare `DataFusionError::IoError` as a file read. Scans surface read failures as a typed ParquetError/ObjectStore error; a bare IoError can also come from non-scan paths (spill, shuffle), which must not be relabelled FAILED_READ_FILE with a per-task path attached. Test updated accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ow path Follow-up to @andygrove's review on apache#4536: - (point 3, wording) parquet-rs reports a bad magic / unreadable footer as "Invalid Parquet file. Corrupt footer", whereas Spark's reader -- and Spark's `ParquetQuerySuite` ("ignoreCorruptFiles", "ignoreMissingFiles using parquet") -- phrase it as "<file> is not a Parquet file". `cannot_read_file_message` now appends Spark's phrasing for the magic/footer case so the FAILED_READ_FILE cause carries it. The outer `cannotReadFilesError` wrapper ("Encountered error while reading file …") is unchanged, so this composes with Spark's tests and does not disturb the Delta shims that match Comet's outer message. Other read failures keep their native message. (On behavior: the native scan already declines and falls back to Spark when `spark.sql.files.ignoreCorruptFiles`/`ignoreMissingFiles` is enabled -- CometNativeScan.scala -- so the skip semantics are preserved; no behavior gap.) - (point 5, tidy) `try_classify_file_read_error` is no longer evaluated twice (`.is_some()` guard + `.unwrap()`): the DataFusion arm is a single `if let Some(..)`, and the generic fallback is extracted to `throw_generic_exception`. Tests: classifier unit tests for the magic/footer wording (added) vs other parquet errors (unchanged native message). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Replace `String::from_utf8_lossy` in `get_string` with `decode_utf8_spark_lossy`, which mirrors `sun.nio.cs.UTF_8.Decoder` (action REPLACE) byte-for-byte so a Comet columnar shuffle of arbitrary bytes renders identically to a Spark JVM shuffle. `from_utf8_lossy` follows the Unicode "maximal subpart" rule and can emit more than one U+FFFD per ill-formed multi-byte unit; the JDK collapses certain units (notably surrogate-range three-byte sequences `ED A0..BF ..`, e.g. CESU-8 / modified-UTF-8 supplementary chars) into a single U+FFFD. Valid UTF-8 still returns a zero-cost borrow via the fast path. Tests use JDK-17 `new String(bytes, UTF_8)` output as the oracle: a 7-case replacement-granularity table (incl. the `ED A0 80` -> single U+FFFD parity case), zero-copy borrow for valid UTF-8, and valid multibyte chars preserved around an invalid byte. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review feedback on apache#4525. When `spark.hadoop.fs.comet.libhdfs.schemes` is unset, the scheme gate now defaults `libhdfsSchemes` to `Set("hdfs")` rather than the empty set, mirroring the native default: `is_hdfs_scheme` (parquet_support.rs) treats `hdfs` as natively readable when the config is unset, and `create_hdfs_object_store` is in the default build (`default = ["hdfs-opendal"]`). Previously a plain `hdfs://` V1 scan was declined and silently fell back to Spark in the default HDFS configuration even though native could read it. `s3a`/`file` are unaffected (object_store recognizes them via `parse_url`); an explicit config value still takes over verbatim. Test: add `native scan claims hdfs:// when libhdfs.schemes is unset` to CometScanSchemeFallbackSuite, alongside the existing `fake://` decline case. It backs the `hdfs` scheme with a local FS (FakeHdfsSchemeFileSystem) so an `hdfs://` path is exercised without a live cluster, then asserts CometScanRule claims the scan. Verified RED (fails with `Set.empty`: scan falls back to Spark) -> GREEN (passes with `Set("hdfs")`) on Spark 3.5. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…into delta-integration-base-v2
…delta-integration-base-v2
…to delta-integration-base-v2
…imit' into delta-integration-base-v2
…vector' into delta-integration-base-v2
…d' into delta-integration-base-v2
…y default) Add an experimental "kernel reads" path for Delta scans, gated behind `spark.comet.delta.kernelRead.enabled` (default false). When enabled on a plain table, each data file is read through delta-kernel-rs's own read + transform + deletion-vector path (DeltaKernelScanExec) and bridged into Comet's Arrow plan, instead of the ParquetSource + DV-sweep + synthetic-columns stack. The old path stays the default; nothing is removed. Native: - read_file_via_kernel: per-file kernel read + DV. Takes an already-decoded selection vector (kernel DvInfo is non-serializable / pub(crate), so it can't cross JNI or be rebuilt executor-side). - arrow_bridge: kernel_batch_to_comet (arrow-57 -> arrow-58 over the Arrow C Data Interface; the FFI structs are ABI-identical) + comet_schema_to_kernel (arrow-58 Schema -> kernel StructType via the same FFI + kernel's converter). - DeltaKernelScanExec composing the above; plan_delta_scan branches to it when DeltaScanCommon.kernel_read is set (plain-table guarded). Proto: bool kernel_read on DeltaScanCommon (reuses existing per-file data, no new message). JVM: DeltaConf.COMET_DELTA_KERNEL_READ_ENABLED; CometDeltaNativeScan sets the flag only for plain tables (no column mapping / partitions / row-tracking / metadata columns). Tests: 5 Rust unit tests (each primitive + the full exec, against real Delta fixtures incl. a red-green DV-drop test); CometDeltaNativeSuite smoke test proving the kernel path reads correctly (== vanilla Spark) and actually engages (native proto carries kernel_read=true). Phase 1c will add column mapping / partitions / row-tracking (executor-side kernel Transform rebuild) and lazy streaming, then remove the old path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ping (Phase 1c apache#44) Extend DeltaKernelScanExec to read column-mapped Delta tables (name mode, top-level columns) via kernel. The exec now takes a separate physical schema (physical column names) plus a needs_transform flag; under column mapping it reads each file by physical name and applies an identity kernel Transform (Transform::new_top_level) so the evaluator relabels physical -> logical via the physical/logical schema pair. This hands the historically bug-prone physicalisation to kernel instead of re-doing it in Comet. - plan_delta_kernel_scan builds the physical read schema by renaming each top-level required field to its physical name (from DeltaScanCommon. column_mappings) and sets needs_transform; guard relaxed to allow top-level name-mode mapping while still rejecting partitions, row-tracking, metadata columns, id-mode mapping (use_field_id), and nested types. - Scala eligibility relaxed from !columnMappingActive to !useFieldIdActive && !requiredSchemaHasNested, so name-mode tables route to the kernel path; id-mode and nested stay on the default reader. Tests: 2 new native unit tests (exec column-mapping rename + the transform spike proving identity-transform relabel from public kernel API); a CometDeltaNativeSuite smoke test on a renamed name-mode table (physical != logical) asserting correctness vs vanilla + kernel-read engagement. 14/14 suite tests pass. Still on the old path (next in apache#44): id-mode (parquet field-id matching) and nested column mapping. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…FFI bridge Phase 1c apache#44 id-mode/nested feasibility. Kernel matches parquet columns by the `parquet.field.id` metadata key (recursively, via get_indices), so id-mode column mapping on the kernel-read path hinges on the field-id metadata flowing arrow-58 -> FFI -> arrow-57 -> kernel StructType. This spike confirms it does: a field carrying `parquet.field.id` survives comet_schema_to_kernel intact. Recipe this unlocks for id-mode (incl. nested): - the Scala translateDeltaFieldIdToParquet already stamps PARQUET:field_id on every field (incl. nested), preserved through convert_spark_types_to_arrow_schema - remap PARQUET:field_id (arrow key) -> parquet.field.id (kernel key) when building the kernel schema, then kernel matches by field-id recursively - relax the id-mode guard (native use_field_id + Scala useFieldIdActive) name-mode nested remains separate: column_mappings is top-level only, so nested physical names need new driver-side plumbing (delta.columnMapping.physicalName per nested field). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ase 1c apache#44) Extend the kernel-read column-mapping support to id mode (top-level). id-mode tables route through the same rename-then-relabel path as name mode (column_mappings carries the logical<->physical pairs in both modes), so the exec reads by physical name and the identity transform relabels to logical. For robustness against id-mode tables whose parquet physical column name ever diverges from the column-mapping physical name, the physical read schema also carries parquet field ids, letting kernel fall back to field-id matching: - comet_schema_to_kernel now copies arrow's PARQUET:field_id onto kernel's parquet.field.id key (recursively) and re-types the value to a number, which is what kernel's match_parquet_fields requires (it ignores string field ids) - plan_delta_kernel_scan preserves field-id metadata through the physical-name rename and no longer rejects id-mode (use_field_id) - Scala eligibility drops the !useFieldIdActive gate (keeps the nested guard) Tests: native id_mode_matches_parquet_by_field_id (proves field-id matching retrieves the right data when names differ) + the metadata-survival spike; a CometDeltaNativeSuite id-mode smoke test on a renamed id-mode table. 15/15 suite tests pass (name + id mode). Nested column mapping remains on the old path (guarded): name-mode nested needs driver-side nested physical names; id-mode nested needs nested relabel handling. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…he#45) Support partitioned Delta tables via DeltaKernelScanExec. The proto's required_schema already contains the requested data columns followed by the partition columns (Spark's requiredSchema ++ partitionFieldsForRequired), and partition columns aren't stored in the data files -- so plan_delta_kernel_scan SPLITS required_schema by partition-column name: the data fields become the parquet read schema (renamed to physical names under column mapping), the partition fields are injected as constants, and the exec reassembles them into required_schema order. Because the exec reproduces the scan's exact output layout, projections, column reordering, and partition filters all work with no special handling -- nothing falls back for them. - KernelScanFile gains per-file partition_values; DeltaKernelScanExec gains a read_logical_schema (the data-only transform target, distinct from the final output_schema) and appends partition constants via parse_delta_partition_scalar (reused from the old path; honours the session timezone for TIMESTAMP values). - Splitting by name also covers a partition column that appears among the data fields, so no separate guard for that. - Scala eligibility allows partitioned tables; the only remaining partition gate is reading zero data columns (only-partition / count(*)), which leaves no column to drive the per-file row count -- tracked for apache#48. Test: CometDeltaNativeSuite partitioned-table case covering SELECT *, data-only and data+partition projections, reordering, and a partition filter. 16/16 pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…(Phase 1c apache#46) Support synthetic columns (row_index / is_row_deleted / row_id / row_commit_version) and _metadata.* on the kernel-read path by REUSING the existing DeltaSyntheticColumnsExec, stacked on top of DeltaKernelScanExec -- exactly as the old path stacks it on the parquet read. The synthetic exec already computes every synthetic and does DV drop-or-flag, so this is composition, not new logic. - DeltaKernelScanExec gains an apply_dv flag: when a synthetic exec wraps it (synthetics requested), the kernel read does NOT drop DV rows -- the wrapper drops them or surfaces is_row_deleted, computing row positions against the full physical rows. Standalone reads keep apply_dv = true. - plan_delta_kernel_scan builds the per-partition synthetic vectors (dv descriptors, base_row_ids, default_commit_versions, task metadata) from scan.tasks and wraps the kernel exec, then applies final_output_indices. - Per-file partitioning (needed so row_index resets per file) is the Spark side's job via CometDeltaNativeScanExec.oneTaskPerPartition, which already fires for DV / row-tracking tables; the native exec stays single-partition and reads whatever file subset it's injected. (An earlier attempt to make the native exec multi-partition fought that model and dropped files.) - Scala eligibility drops the emit_* / metadata gates; the native guard keeps only nested-types and zero-data-column (only-partition / only-synthetic / count(*)) cases, tracked for apache#48. DeltaSyntheticColumnsExec is now REUSED, so it is NOT deleted in apache#50. Test: CometDeltaNativeSuite covers a _metadata.file_path read and a DELETE that creates a deletion vector (exercising is_row_deleted + apply_dv=false). 17/17 suite tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Spark writes TIMESTAMP (LTZ) as parquet INT96 by default. delta-kernel 0.19's parquet handler reads with a default ArrowReaderOptions, so arrow-rs maps INT96 to Timestamp(Nanosecond) -- and i64 nanoseconds overflow at ~year 2262, so any later instant reads back as garbage (9999-12-31 -> 1816, the Gap 1 from the coverage probe). read_file_via_kernel now reads the parquet itself instead of going through kernel's read_parquet_files: it fetches the bytes via the engine's storage handler (same sync entry dv_reader uses, so no nested tokio runtime) and decodes them with ArrowReaderOptions::with_schema, supplying a schema that types INT96 columns as microseconds (coerce_int96_to_micros, ported from DataFusion's coerce_int96_to_resolution). This mirrors Comet's canonical parquet path (coerce_int96="us") and the Iceberg integration's custom-read + schema-adapt pattern. Columns are projected by physical name and aligned to the physical schema (align_batch_to_schema: reorder + cast + null-fill of schema-evolution columns absent from older files) before kernel's transform_to_logical runs unchanged. Because the read now matches columns by physical name rather than parquet field id, id-mode column mapping relies on the driver-supplied physical names (which equal the parquet column names, unique UUIDs); the repurposed unit test covers this. Verified with kernelRead on: 9 native unit tests, CometDeltaNativeSuite 17/17 (incl. schema evolution), CometDeltaTypeRoundTripAuditSuite 10/10 (incl. the year-9999 INT96 round-trip that was failing). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Gap 2 from the coverage probe. Now that the kernel-read path reads the
parquet itself (previous commit), a corrupt/truncated data file fails inside
read_file_via_kernel (e.g. "EOF: footer metadata requires 8 bytes") and
read_all turned that into a plain DataFusionError::Execution string -- so the
query surfaced a raw Arrow error instead of Spark's FAILED_READ_FILE.
Add map_file_read_error (sibling to map_dv_error_to_datafusion): a per-file
read failure becomes a typed SparkError::CannotReadFile carrying the data-file
path, which the JVM shim converts to Spark's cannotReadFilesError ("Encountered
error while reading file ...", a [FAILED_READ_FILE] SparkException on Spark
4.x) -- matching vanilla Spark+Delta and the old ParquetSource path (apache#4536). A
genuinely missing file still maps to FileNotFound, as on the DV path.
Verified with kernelRead on: CometDeltaEdgeCaseRegressionSuite "F6: reading a
corrupted file surfaces a Spark-compatible error (SC-8810)" passes, plus 2 new
native unit tests for the mapping.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Audits each kernel-read design decision against Comet's Iceberg integration and the main parquet path. Verdict: coherent. Reuses existing pieces (DV reader, DeltaSyntheticColumnsExec, partition scalar parsing, kernel's transform_to_logical, the typed SparkError plumbing) wherever the arrow-57/58 boundary allows; the two custom pieces (the parquet read and the pre-bridge align_batch_to_schema) match exactly what Iceberg and parquet_exec already do. Records why align_batch_to_schema can't reuse SparkSchemaAdapter (arrow-58 only; reconciliation must precede the arrow-57->58 bridge) and one cleanup item: the field-id remap in arrow_bridge is now dead at read time (name-based matching replaced kernel's field-id matcher), to be removed with the old path in the deletion phase. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…arrow bridge delta-kernel 0.24 supports arrow-58 (feature `arrow-58`), the same arrow Comet uses. Bumping the contrib from 0.19 unifies kernel onto Comet's arrow 58.3.0 / parquet 58.3.0 / object_store 0.13 and removes the arrow-version boundary that the kernel-read path was built around -- a large net deletion. Removed: - `arrow_bridge.rs` entirely. `kernel_batch_to_comet` (the Arrow C Data Interface FFI `transmute` between arrow-57 and arrow-58) is gone -- kernel `RecordBatch`es are Comet batches now (`batch.clone()`). This deletes the module's only `unsafe`. `comet_schema_to_kernel` collapses to a one-line `arrow_to_kernel_schema` (`StructType::try_from_arrow`), with no FFI and no `parquet.field.id` remap (the read matches columns by physical name). - the `object_store_kernel` 0.12 rename hack: kernel 0.24 uses object_store 0.13, matching Comet, so `engine.rs` / `error.rs` use the one `object_store`. kernel 0.24 API deltas (small): `DefaultEngine::new(store)` -> `DefaultEngine::builder(store).build()`; the transform builder moved (`Expression::transform(Transform::new_top_level())` -> `Expression::struct_patch(ExpressionStructPatch::new_top_level())`); `transform_to_logical`'s signature is unchanged. Renamed the now-misleading `*57` arrow aliases to `Arrow*` and refreshed the arrow-57/0.19 comments. Verified end-to-end with kernelRead on (kernel 0.24 dylib): 120 native unit tests, CometDeltaTypeRoundTripAuditSuite 10/10, CometDeltaNativeSuite 17/17, CometDeltaEdgeCaseRegressionSuite 2/2 (incl. the INT96 round-trip and the F6 FAILED_READ_FILE cases). Updates the coherence-audit doc, whose arrow-version -boundary rationale is now obsolete. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ve proto groundwork Two native spikes (apache#47) feed read_file_via_kernel a nested column-mapped read and confirm the read primitive already handles it on kernel 0.24: - spike_nested_struct_rename: a struct whose nested field is named physically ("phys_a") comes back relabeled to the logical name ("log_a") -- kernel's identity transform + logical output schema relabels nested fields recursively, not just top level. - spike_nested_struct_prune: a struct with two physical fields, requested with one, comes back pruned to the single logical field -- align_batch_to_schema's arrow_cast selects struct fields by name. So nested CM needs no read-path change, only plumbing: the native must build physical_schema with nested physical names. To carry those consistently (today's proto required_schema is a hybrid -- logical top-level name, physical nested names), DeltaColumnMapping gains a recursive `children` field: struct fields match by logical name, descending transparently through array/map. The field is inert until the planner wiring lands. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drops the last big kernel-read guard (nested-typed columns). The read primitive already handled nested column mapping on kernel 0.24 -- kernel's identity transform relabels nested struct fields physical->logical recursively and align_batch_to_schema prunes nested children (proven by the spike_nested_struct_* tests). The only missing piece was plumbing the nested physical names. Makes the wire schema consistent (it was a hybrid -- logical top-level name, physical nested names): - DeltaColumnMapping is now recursive; scan.rs builds the full tree from the kernel snapshot schema (physicalName at every struct level, descending through array/map transparently). jni.rs/Scala relay it as-is (children preserved). - The proto `required_schema` is now PURE LOGICAL at every level. The native planner physicalises it from the recursive mappings: all levels for the kernel-read `physical_schema` (physicalise_field rename_top=true), and nested- only for the old path, which reconstructs its historical hybrid shape so its top-level rename ProjectionExec + ordinal-based nested access (apache#79) are unchanged. The Scala keeps the physicalised `requiredSchemaFields` only to build the old path's `data_schema`. - Scala kernelReadEligible no longer excludes nested schemas. Verified: old path green with kernelRead off (ColumnMapping 5/5, RoundTrip 10/10, PhysicalNameRepro 1/1); kernel path green with kernelRead on (RoundTrip 10/10 incl. name-mode + complex types, Native 17/17, ColumnMapping 5/5) plus a new explicit test renaming a nested field (s.a -> renamed_a) over a struct + array-of- struct, asserting kernel-read engagement and nested-projection parity. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drops the last kernel-read guards. Partition-only reads (e.g.
groupBy(partition).agg(count("*"))) read no data column, so there was nothing to
drive the per-file row count. DeltaKernelScanExec now handles that: when the
physical (data) schema is empty it drives each file's row count from record_count
(Delta numRecords; the parquet footer via the storage handler as a fallback when
stats are absent), emits an empty-data batch of that length, applies the DV to
shorten the count (or leaves the full count for a wrapping DeltaSyntheticColumnsExec
to drop/flag), and appends partition constants. The native data_fields.is_empty()
guard and the Scala hasDataColumn eligibility check are removed -- kernelReadEligible
is now just the config flag.
Verified with kernelRead on: CometDeltaNativeSuite 19/19 (incl. a new partition-only
count test asserting kernel-read engagement) and CometDeltaCoverageSuite 24/24
(its groupBy-partition count(*) aggregates now run on the kernel path).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Flips spark.comet.delta.kernelRead.enabled to true. The kernel-read path (DeltaKernelScanExec on delta-kernel 0.24 / arrow-58) now handles the full Delta read surface -- plain tables, column mapping (name + id, including nested), partitions, deletion vectors, row-tracking, _metadata columns, schema evolution, INT96 timestamps, zero-data-column reads, and FAILED_READ_FILE error mapping -- with no remaining guards. Verified with the default flipped on: all 24 contrib suites pass, 141 tests, 0 failures (1 environment-gated cancel in CometDeltaSpecialCharFilenameSuite that also cancels on the legacy path). The Spark-3.5 sbt regression is deferred. Set to false to fall back to the legacy ParquetSource reader (until that path is removed in the iceberg-style shrink). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Kernel-read is now the only Delta read path, so the legacy ParquetSource + DV-sweep + synthetic-columns + column-mapping-physicalisation stack is removed. - plan_delta_scan collapses to the kernel-read branch (967 -> ~380 lines): the whole non-kernel arm (init_datasource_exec / FileScanConfig, the physical-name rename ProjectionExec, the DV-sweep + synthetic stacking, filter rewriting, the empty-partition fast path) is gone. When the driver leaves kernel_read false (user disabled it) the planner errors and Comet falls back to vanilla Spark. - Deletes the old-path-only contrib modules: missing_file_tolerant.rs (IgnoreMissingFileSource) and the end_to_end.rs integration test that exercised plan_delta_scan -> build_delta_partitioned_files -> ParquetSource. Net ~1050 lines removed. DeltaSyntheticColumnsExec is REUSED by the kernel path and stays. Verified with the default (kernel-read) on: CometDeltaNativeSuite 19/19, CometDeltaCoverageSuite 24/24, RoundTrip 10/10, EdgeCase 2/2, ColumnMapping 5/5. Remaining old-path dead code is tracked for cleanup: planner.rs's now-orphaned build_delta_partitioned_files + ColumnMappingFilterRewriter (used only by their own tests), the Scala physicaliseRequiredField/physicalFileDataSchema emission, and the proto data_schema / projection_vector fields (apache#71). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The read path is now "kernel reads" (DeltaKernelScanExec on delta-kernel 0.24 / arrow-58); the legacy ParquetSource + DV-sweep + synthetic-columns + column-mapping-physicalisation stack has been removed. - README.md: a current-architecture banner up top, and the one-paragraph summary rewritten for the kernel-read path (per-file kernel read + transform, arrow-58 no-bridge, DV decode, DeltaSyntheticColumnsExec, INT96 coercion). - 01-overview.md: a note that the executor-side read path it describes is replaced (driver-side planning / deployment / "does NOT touch" sections still accurate). - 03-native-execution.md: marked SUPERSEDED (legacy execution tree), pointing to 10/11. - 04-design-decisions.md: read-path decisions marked partly superseded. - 10-iceberg-style-kernel-read.md: retitled and marked IMPLEMENTED + default, with the current feature coverage and the remaining custom pieces noted. The current design lives in 10 + 11 (coherence audit); 02-04 are retained for history. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…hanges keep/drop (apache#55-71) Completes the evaluation backlog. Verdicts: - Custom kernel-path pieces (apache#55-61): none removable in our code today. Almost all are blocked on two upstream delta-kernel gaps -- no reader-options / INT96 hook on ParquetHandler, and ScanFile / ScanMetadata / transform Expression / DvInfo aren't serializable so we can't ship per-file scan data over JNI and call Scan::execute executor-side. Two upstream asks would collapse most of the custom pipeline. - The 8 extracted core PRs (apache#62-69): all KEEP -- 4 general Comet fixes, 4 still exercised by the kernel-read path; none old-path-only. apache#4536 flipped from old-path to kernel-read dependency. - Delta core hooks (apache#70): all KEEP (marker + split-mode injection + DPP + routing are how Delta scans reach the native exec). - Proto/Scala (apache#71): the only concrete removable code -- data_schema, projection_vector, data_filters proto fields + the physicalFileDataSchema/physicaliseRequiredField Scala emission that feeds them are populated but never read by the kernel path (the apache#50 deletion residual). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pache#71, part 1) The apache#50 old-path deletion left dead wire fields and helpers. This removes the ones that were populated but never read by the kernel-read path: - proto DeltaScanCommon: data_schema (2), data_filters (4), projection_vector (5) removed and reserved. The kernel-read path reads only required_schema (pure logical) + column_mappings; kernel does its own stats-based file pruning, so no pushed predicate is shipped. - Scala: drop the addAllDataSchema / addAllProjectionVector setters, the addPushedDataFilters function + call, and drop the removed fields from computeSourceKey's split-mode injection key (table_root + snapshot_version + required_schema + column_mappings still uniquely identify a scan). - contrib native planner.rs: delete the now-orphaned build_delta_partitioned_files and ColumnMappingFilterRewriter (+ their tests); keep parse_delta_partition_scalar / SessionTimezone (the kernel path uses them). Verified with the default on: CometDeltaNativeSuite 19/19 (incl. split-mode injection via partitioned/multi-file/DV), CometDeltaCoverageSuite 24/24. The dead Scala computation that fed the removed fields (physicalise* / physicalFileDataSchema / projection-vector) is removed in part 2. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…, part 2) Deletes the Scala computation that fed the now-removed data_schema / projection_vector proto fields (part 1): the physicalise* helpers (physicaliseNestedTypesOnly / physicaliseDataTypePreserving / physicaliseRequiredField and the orphaned physicaliseStructField / physicaliseDataType pair), requiredSchemaFields (physicalised), physicalFileDataSchemaFields, dataSchemaForProto, the projection-vector index computation (fileDataSchemaFields, materializedRowTrackingFields, customMaterializedColNames, partitionNames, requiredIndexes, partitionTailIndexes), and the stale comments referencing them. ~250 lines of dead code removed; the kernel-read path builds its schema from required_schema (pure logical) + column_mappings, so none of it was reachable. Verified with the default on: CometDeltaNativeSuite 19/19, Coverage 24/24, ColumnMapping 5/5, RoundTrip 10/10, RowTrackingMaterialized 5/5, MetadataColumnAudit 8/8. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…gine The kernel-read design is implemented and proven by the contrib + Delta own-suite coverage, so the Phase-1c spike tests that originally informed it are no longer needed: - spike_transform_renames_physical_to_logical - spike_nested_struct_rename - spike_nested_struct_prune Removing them drops the last non-test callers of the standalone `engine::create_engine` constructor (the genuine unit tests `engine_and_schema` and `id_mode_*` migrate to the cached `get_or_create_engine`, deref'd via `Arc::as_ref` at the `read_file_via_kernel` / `Snapshot::build` call sites), so `create_engine` and its `#[allow(dead_code)]` are deleted along with the now-dead `lib.rs` re-export. Also drops the two test imports (`Field`, `TreeNode`) the spike + earlier planner cleanup orphaned, and refreshes the DeltaKernelScanExec doc comment that referenced the removed spike + the deleted old path. Audit results behind this: `cargo clippy --all-targets` and a from-scratch `-Ywarn-unused` Scala compile are both clean of dead-code/unused warnings. Also documents two newly-found cloud-credential gaps in 08-known-limitations.md (A2d: DV-read + synthetic-columns paths ignore the threaded storage_config and use Default::default(); A2e: only static keys resolved, no IAM-role/IRSA/IMDS provider chain) for the post-cleanup credential audit. Verified: contrib native lib tests 110/110. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Pre-existing clippy style lints surfaced once test code was included
(`--all-targets`); none affect behavior:
- field_reassign_with_default (predicate.rs tests): `let mut x =
T::default(); x.f = ..` -> struct literals. Where all fields are set the
redundant `..Default::default()` is omitted (avoids needless_update).
- needless_range_loop (scan.rs, production `row_tracking_from_scan_files` /
DV-descriptor extraction): `for i in 0..bounded_rows { sel[i] }` ->
`for (i, &keep) in sel.iter().enumerate().take(bounded_rows)`.
- inconsistent_digit_grouping (planner.rs tests): `1705321845_000_000` ->
`1_705_321_845_000_000`.
- type_complexity (core delta_scan.rs): the ProjectionExec pair vec is now a
`ProjectionColumns` type alias.
`cargo clippy --all-targets` (contrib crate) and
`cargo clippy --all-targets --features contrib-delta` (core, PR files) are now
warning-free. Contrib native lib tests 110/110.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… diff
The MergeIntoMetricsBase de-flake annotation the diff adds was a single 204-char
line, tripping Delta's own `testScalastyle` gate ("File line length exceeds 100
characters" at MergeIntoMetricsBase.scala:1049). Because `sbt <module>/test`
depends on testScalastyle, the gate failed the whole run before any test
executed -- zero tests ran. Split the comment across two <=100-char lines above
the `-1)` sentinel and bump the hunk header count (+1045,7 -> +1045,9).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The executor data read threads the table's DeltaStorageConfig (from the proto's object_store_options) into get_or_create_engine, but the deletion-vector read (dv_reader::read_dv_indexes) was called with a hardcoded DeltaStorageConfig::default() from both executor DV sites -- DeltaKernelScanExec::read_all (where self.storage_config was in scope but unused) and DeltaSyntheticColumnsExec (which carried only table_root_url, not the config). On a statically-credentialed S3/Azure table WITH deletion vectors the data read would succeed but the DV read would build a separate, credential-less engine (distinct cache key) and fail. Fix: read_dv_indexes now takes &DeltaStorageConfig; DeltaKernelScanExec passes &self.storage_config, and DeltaSyntheticColumnsExec gains a storage_config field threaded from the core planner (cloned alongside the kernel exec's copy). A matching config means the DV read reuses the data read's cached engine instead of building a credential-less one. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… audit apache#72) Aligns the Delta engine with core Comet's prepare_object_store_with_configs: - A2a/A2c (GCS + Azure): create_object_store now builds az/azure/abfs/abfss/ wasb/wasbs/gs/gcs through object_store::parse_url, sourcing credentials from the ambient environment (AZURE_*/GOOGLE_*/ADC/instance metadata) -- exactly as core does for non-S3 schemes. The hand-rolled MicrosoftAzureBuilder arm and the dead azure_* DeltaStorageConfig fields are removed; the object_store gcp/azure/aws/http features are enabled to match core. No bespoke fs.gs.*/fs.azure.* bridging (core doesn't bridge them either -- this is parity, not new capability). - A2b (per-bucket S3): delta_storage_config_from_map takes the table's S3 bucket (parsed from the table-root URL in jni.rs / delta_scan.rs) and prefers fs.s3a.bucket.<bucket>.<suffix> over the global fs.s3a.<suffix>. S3 stays hand-rolled (object_store::parse_url can't read Hadoop fs.s3a.* keys -- the same reason core has a custom S3 resolver). A2e (Hadoop explicit credential-provider classes: assumed-role / web-identity) remains a documented residual -- closing it needs core's s3 resolver exposed via comet-contrib-spi. Updated jni::tests (per-bucket matrix + azure/gcs-don't-leak guard), engine::tests (azure/gcs via parse_url), the Scala CometDeltaCredentialAuditSuite comments, and 08-known-limitations.md A2 (A2a-d FIXED, A2e residual). Verified: contrib lib tests 110/110, clippy --all-targets clean, core compiles with -F contrib-delta. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ead (apache#73) Dominant Delta 4.1 own-suite failure family (~95 tests): reading an array<struct> / nested-struct / map column through the kernel-read path during a schema-evolution MERGE panicked -- Invalid argument error: column types must match schema types, expected List(Struct("a","b")) but found List(Struct("a","b"), field: 'element') (and an equivalent assert_eq! / arrow-select coalesce.rs:539 panic on the unpartitioned pass-through). The kernel-read batch names nested Arrow fields by Spark convention (list element "element"), but DeltaKernelScanExec.output_schema -- derived from the proto required_schema via convert_spark_types_to_arrow_schema -- carries empty nested field names, so batch.schema() != output_schema and DataFusion rejects it. A plain read of a consistent array<struct> table is fine; the trigger is the schema-evolution read. The old ParquetSource path normalized nested names via Comet's SparkSchemaAdapter. Fix: DeltaKernelScanExec::append_partition_columns now reconciles every emitted column to output_schema's exact field type (incl. nested list/struct/map field names) via arrow_cast -- a metadata-only relabel, since the data is already in logical (output) order from the kernel transform. Applied on both the partitioned path and the previously pass-through unpartitioned path. Red->green proven by a Spark suite (CometDeltaNestedArrayStructReproSuite: MERGE array<struct> with reordered nested fields). Verified: that suite 1/1 (was a native panic before), contrib native lib 110/110, clippy --all-targets clean, and no regression across CometDelta{Coverage(24),TypeRoundTripAudit(10),Native(19),ColumnMapping(5)}. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…umn mapping, apache#47/apache#74) The kernel-read path reconstructed its physical schema Comet-side and physicalised only the top level, leaving nested fields with logical names and no field-ids. Kernel's `read_parquet_files` matches file columns by field-id (then name), so nested column-mapped columns went unmatched and were NULL-filled -- a nested-rename table read back `(id, null, null)`. Use delta-kernel's own primitives instead of re-implementing schema reconciliation: - `read_file_via_kernel` reads via `parquet_handler().read_parquet_files(physical_schema)` + `transform_to_logical(physical, logical, struct_patch)`; kernel's `fixup_parquet_read` handles reorder / null-fill / cast (added & reordered nested fields). Deletes the hand-rolled `align_batch_to_schema` / `reconcile_array` / `coerce_int96_to_micros`. - The driver projects the snapshot schema (`snapshot.schema().project(cols)`, which keeps the `delta.columnMapping.physicalName` annotations `with_schema` needs), builds the scan, and ships `scan.physical_schema()` / `scan.logical_schema()` (Arrow IPC, field-ids at every nesting level) to the executor via `DeltaScanCommon.kernel_physical_schema` / `kernel_logical_schema`. The kernel-driver `planDeltaScan` path returns them inline; the batch-file-index path (`PreparedDeltaFileIndex`) fetches them via the new schema-only `planDeltaReadSchemas`. `physicalise_field` remains only as a fallback. INT96 far-future (> ~year 2262) timestamps overflow under kernel's nanosecond decode (no coercion knob; filed delta-io/delta-kernel-rs#2709); documented as A6 and the type round-trip test's far-future TIMESTAMP capped to 2261. Guards (red->green, assert Comet matches Spark): CometDeltaNativeSuite apache#47, CometDeltaNestedArrayStructReproSuite, CometDeltaPercentFileNameReproSuite, CometParquetPercentPathSuite. All contrib Delta suites green. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…lumbing (apache#76) Now that the kernel-read path ships kernel's own `scan.physical_schema()` / `scan.logical_schema()` (physical names + field-ids resolved at every nesting level), the Comet-side column-mapping reimplementation and everything that fed it are dead. Removed: - `physicalise_field` / `physicalise_type` / `descend_elem` in core's `delta_scan.rs` (the recursive logical->physical rename). The kernel-read planner now requires the shipped kernel schemas for any data-column read (zero-data reads need none); a missing schema is a driver bug, surfaced as a clear error rather than papered over by re-deriving it (which got nested column mapping wrong). - `DeltaScanCommon.column_mappings` (proto field 13) and `DeltaColumnMapping`'s recursive `children` -- the executor's only consumer was `physicalise_field`. - `DeltaScanTaskList.column_mappings` (proto field 5) and the whole `DeltaColumnMapping` proto message: the only remaining use is the DRIVER-side partition-value key translation, which keeps a plain `(logical, physical)` list internal to `plan_delta_scan` -- nothing goes over the wire. - The JVM re-derivation of a column-mapping tree for the add-files path (`CometDeltaNativeScan`) and the two `addAllColumnMappings` relays. All contrib Delta suites stay green (CometDeltaNativeSuite 19/19, ColumnMapping 5/5, Coverage 24/24, NestedArrayStruct 4/4, TypeRoundTrip 10/0). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…che#78 schema-change-since-analysis) Regression from the kernel-schema-shipping work: DeltaColumnMappingSuite "column mapping batch scan should detect physical name changes" read the current data instead of null-filling, because the driver fed `ScanBuilder::with_schema` the LIVE snapshot schema. Kernel resolves physical names from the schema passed to `with_schema` (StateInfo::try_new -> StructField::make_physical), so it must be fed the schema the query was PLANNED with -- then kernel's field-id matching null-fills any column whose id changed since analysis (Delta's schema-on-read escape hatch), which a pure-kernel engine handles itself with no fallback. - The JVM ships the analysis-time read schema as Delta schema JSON (`StructType.json` from DeltaScanRule's stashed reference schema, carrying `delta.columnMapping.physicalName` + id at every level). The driver parses it (`serde_json` -> kernel `StructType`, the same format kernel reads from the log) and feeds it straight to `with_schema`; it falls back to projecting the live snapshot by column name only when no analysis-time schema is available. - The analysis-time JSON and the Arrow-IPC names are mutually exclusive on the wire (ship the JSON when present, else the names) -- no redundant double-ship. - Fix `planDeltaReadSchemas` to build kernel schemas when EITHER carrier is present (it previously gated on the IPC only, so a JSON-only payload silently produced no schemas). Red->green guard: CometDeltaSchemaChangeReproSuite (Comet returned data, Spark null-filled; now both null-fill). Own-suite "physical name changes" goes green; all contrib Delta suites stay green. (DeltaColumnMappingSuite "explicit id matching" -- a contrived manual field-id repoint -- is a separate kernel id-vs-name matching nuance, tracked in apache#79.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Contributor
Author
|
sorry wrong fork |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
How are these changes tested?