Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8fdc05d
contrib(delta): scaffold typed-proto dispatch + skeleton crate
schenksj May 19, 2026
c41e915
contrib(delta): port native implementation from PR2 (minus SPI)
schenksj May 19, 2026
381282d
contrib(delta): port reflection-only Scala files + dev scripts
schenksj May 19, 2026
91e1d5c
contrib(delta): port SPI-touching Scala files + Maven profile + refle…
schenksj May 19, 2026
93f0f54
fix(shuffle): get_string matches Spark's UTF8String no-validate seman…
schenksj May 19, 2026
53fb5c0
contrib(delta): wire CometExecRule serde dispatch
schenksj May 19, 2026
1ee31b2
contrib(delta): wire per-partition DeltaPlanDataInjector
schenksj May 19, 2026
53f6cb4
contrib(delta): scalastyle fixes for DeltaIntegration + operators
schenksj May 19, 2026
de9e0d3
fix(error): wrap native Parquet errors as FAILED_READ_FILE.NO_HINT wi…
schenksj May 19, 2026
effe5f7
fix(contrib-delta) P7h: decline native scan for unsupported FS schemes
schenksj May 19, 2026
9096957
fix(contrib-delta) P7i: cache DeltaEngine per (scheme,authority,config)
schenksj May 19, 2026
56c2b01
fix(serde): decline CreateArray with mismatched child data types
schenksj May 19, 2026
fea28d7
perf(contrib-delta) P7j: pre-parse session TZ and key injectors by op…
schenksj May 19, 2026
7e9249f
perf(contrib-delta) P7k: cache resolved Method handle in DeltaIntegra…
schenksj May 19, 2026
a805f81
perf(contrib-delta) P7l: hoist CometScanTypeChecker out of per-scan loop
schenksj May 19, 2026
e346776
perf(contrib-delta) P7m: O(1) partition-value lookup in build_delta_p…
schenksj May 19, 2026
ed0d8ac
fix(error): thread file paths to FAILED_READ_FILE.NO_HINT wrapping
schenksj May 19, 2026
43768c1
fix(contrib-delta) P7n: review findings -- InputFileBlockHolder hook …
schenksj May 19, 2026
3005d6e
chore: spotless:apply (no behavioral change)
schenksj May 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions contrib/delta/dev/diffs/4.1.0.diff
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
diff --git a/build.sbt b/build.sbt
index 6f16864..e623a08 100644
--- a/build.sbt
+++ b/build.sbt
@@ -50,6 +50,9 @@ val internalModuleNames = settingKey[Set[String]]("Internal module artifact name

// Spark version to delta-spark and its dependent modules
// For more information see CrossSparkVersions.scala
+// Comet regression testing (added by dev/diffs/delta/4.1.0.diff in Apache DataFusion Comet)
+val cometVersion = "0.17.0-SNAPSHOT"
+ThisBuild / resolvers += Resolver.mavenLocal
val sparkVersion = settingKey[String]("Spark version")

// Dependent library versions
@@ -617,6 +620,19 @@ lazy val spark = (project in file("spark-unified"))
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.mockito" % "mockito-inline" % "4.11.0" % "test",
+ // Comet regression testing — pulls comet-spark for Spark 4.1 + Scala 2.13 from
+ // the user's mavenLocal repo. The published artifact is what `mvn install
+ // -Pspark-4.1 -Pcontrib-delta` produces; the contrib's Delta wiring is bundled
+ // into that JAR. No separate Delta-specific Comet artifact.
+ //
+ // `exclude(comet-contrib-delta-deps)`: the published comet-spark pom lists this
+ // as a transitive (advertising `delta-spark` provided-scope to end users), but
+ // its own pom has un-interpolated `${spark.version.short}` / `${scala.binary.version}`
+ // in `<artifactId>` and `<parent>`. Maven re-interpolates from filename, SBT
+ // doesn't — so SBT fails resolving the parent. Delta's own build already
+ // supplies delta-spark on the test classpath, so the exclude is safe here.
+ ("org.apache.datafusion" % s"comet-spark-spark4.1_${scalaBinaryVersion.value}" % cometVersion % "test")
+ .exclude("org.apache.datafusion", s"comet-contrib-delta-deps-spark4.1_${scalaBinaryVersion.value}"),
),

Test / testOptions += Tests.Argument("-oDF"),
@@ -636,7 +652,15 @@ lazy val spark = (project in file("spark-unified"))
"-Ddelta.log.cacheSize=3",
"-Dspark.databricks.delta.delta.log.cacheSize=3",
"-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5",
- "-Xmx1024m"
+ // Bumped from 1024m: 1GB is too tight for Spark + Delta + parquet I/O + Comet
+ // native library + offheap. Caused GC thrashing on memory-heavy suites (DV,
+ // Merge, CDC) under the 3.3.2 regression sweep — keeping 4g for 4.1.0.
+ "-Xmx4g",
+ // Match stock Delta CI: run on UTC so Delta's force-verify-all-files-in-CRC
+ // path (triggered by non-UTC user.timezone) stays disabled. Otherwise
+ // ChecksumSuite tests fail because `TimeZone.setDefault(UTC)` doesn't
+ // update `System.getProperty("user.timezone")`.
+ "-Duser.timezone=UTC"
),

// Required for testing table features see https://github.com/delta-io/delta/issues/1602
diff --git a/build/sbt-config/repositories b/build/sbt-config/repositories
index 2853417..62ff4b6 100644
--- a/build/sbt-config/repositories
+++ b/build/sbt-config/repositories
@@ -13,3 +13,12 @@
typesafe-releases: https://repo.typesafe.com/typesafe/releases/
apache-snapshot: https://repository.apache.org/content/groups/snapshots/
jitpack: https://jitpack.io
+ # Comet regression testing: dev/run-delta-regression.sh publishes Comet's just-built
+ # artifacts to an ISOLATED dir under $TMPDIR rather than ~/.m2/repository/. Pointing
+ # SBT directly at ~/.m2 triggers a coursier sticky-resolver bug: when an unrelated
+ # transitive (parquet/guava/azure/...) has an orphan pom-only entry in ~/.m2 from a
+ # prior `mvn` run, coursier resolves the POM at local-m2, then refuses to fall
+ # through to maven-central for the JAR — failing the build on artifacts that have
+ # nothing to do with Comet. The isolated dir contains only `org.apache.datafusion:*`
+ # so there are no unrelated POMs to mistakenly match.
+ local-comet: file:///tmp/comet-published-4.1/
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CometSmokeTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CometSmokeTest.scala
new file mode 100644
index 0000000..013ee98
--- /dev/null
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/CometSmokeTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Copyright (2021) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Smoke test asserting that Comet is actually loaded and executing queries when the
+ * Delta regression diff is applied. Catches silent configuration drift where Comet
+ * is on the classpath but not wired into the physical plan (e.g. a typo in
+ * `spark.plugins` that Spark silently ignores).
+ *
+ * Added by the Comet regression diff at `dev/diffs/delta/4.1.0.diff` in
+ * Apache DataFusion Comet.
+ */
+class CometSmokeTest extends QueryTest with SharedSparkSession with DeltaSQLCommandTest {
+
+ test("Comet plugin is registered in SparkConf") {
+ val plugins = spark.conf.get("spark.plugins")
+ assert(plugins.contains("CometPlugin"),
+ s"Comet plugin not registered. spark.plugins=$plugins")
+ }
+
+ test("Delta streaming source read returns the micro-batch AddFiles") {
+ // Regression guard for the pre-materialized FileIndex path: Delta's streaming
+ // micro-batch gives us an exact `TahoeBatchFileIndex` with the AddFiles for
+ // [startOffset, endOffset]. The contrib's native Delta scan must honour that
+ // list instead of re-running kernel log replay against the snapshot root
+ // (which would return an empty or different set for a streaming batch).
+ val src = "comet_stream_src"
+ try {
+ spark.sql(s"CREATE TABLE $src (key INT, value INT) USING DELTA")
+ var collected: Seq[org.apache.spark.sql.Row] = Seq.empty
+ val sw = spark.readStream.table(src).writeStream
+ .format("console")
+ .foreachBatch { (df: org.apache.spark.sql.DataFrame, _: Long) =>
+ collected = df.collect().toSeq
+ }
+ .outputMode("append")
+ .start()
+ spark.sql(s"INSERT INTO $src(key, value) VALUES(0, 42)")
+ sw.processAllAvailable()
+ sw.stop()
+ assert(collected.nonEmpty, "Streaming Delta source produced no rows")
+ assert(collected.map(r => (r.getInt(0), r.getInt(1))) == Seq((0, 42)))
+ } finally {
+ spark.sql(s"DROP TABLE IF EXISTS $src")
+ }
+ }
+
+ test("Comet operators appear in Delta query physical plan") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ spark.range(10).toDF("id")
+ .write.format("delta").save(path)
+
+ val df = spark.read.format("delta").load(path).filter("id > 2")
+ val planString = df.queryExecution.executedPlan.toString
+
+ assert(planString.contains("Comet"),
+ s"No Comet operators found in physical plan. Is Comet actually wired in?\n$planString")
+
+ checkAnswer(df, (3 until 10).map(i => Row(i)))
+ }
+ }
+}
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
index cb28a4f..44a2a37 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
@@ -35,5 +35,22 @@ trait DeltaSQLCommandTest extends SharedSparkSession {
classOf[DeltaSparkSessionExtension].getName)
.set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
classOf[DeltaCatalog].getName)
+ // --- Comet regression wiring (added by dev/diffs/delta/4.1.0.diff) ---
+ // CometPlugin registers the native lib + memory manager. ServiceLoader pulls
+ // the Delta contrib's DeltaScanRuleExtension / DeltaOperatorSerdeExtension out
+ // of META-INF/services in comet-spark.jar at first use.
+ .set("spark.plugins", "org.apache.spark.CometPlugin")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.scan.enabled", "true")
+ // Enable the contrib's native Delta scan path. Key is defined in
+ // org.apache.comet.contrib.delta.DeltaConf.
+ .set("spark.comet.scan.deltaNative.enabled", "true")
+ .set("spark.comet.explainFallback.enabled", "true")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "10g")
}
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/ScanReportHelper.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/ScanReportHelper.scala
index e7c4b90..32246a2 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/test/ScanReportHelper.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/ScanReportHelper.scala
@@ -44,6 +44,11 @@ trait ScanReportHelper extends SharedSparkSession with AdaptiveSparkPlanHelper {
collectWithSubqueries(plan)({
case fs: FileSourceScanExec => Seq(fs)
case cached: InMemoryTableScanExec => collectScans(cached.relation.cacheBuilder.cachedPlan)
+ // Comet regression: Comet rewrites `FileSourceScanExec` into its own leaf
+ // variants. Tests that inspect scan metrics would otherwise see an empty
+ // list and fail with `MatchError: ArrayBuffer()`.
+ case c: org.apache.spark.sql.comet.CometScanExec => Seq(c.wrapped)
+ case c: org.apache.spark.sql.comet.CometDeltaNativeScanExec => Seq(c.originalPlan)
}).flatten
}

diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/TestsStatistics.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/TestsStatistics.scala
index 93c7da9..755fae9 100644
--- a/spark/src/test/scala/org/apache/spark/sql/delta/test/TestsStatistics.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/TestsStatistics.scala
@@ -79,6 +79,13 @@ trait TestsStatistics { self: DeltaSQLTestUtils =>
case f: FileSourceScanExec => Some(f)
case InputAdapter(f: FileSourceScanExec) => Some(f)
case ColumnarToRowExec(InputAdapter(f: FileSourceScanExec)) => Some(f)
+ // Comet regression: unwrap the Comet scan variants Comet's rules produce in
+ // place of Spark's `FileSourceScanExec`. Tests that search for the scan in
+ // the executed plan (e.g. to read its metrics) are otherwise blind to them.
+ case c: org.apache.spark.sql.comet.CometScanExec => Some(c.wrapped)
+ case c: org.apache.spark.sql.comet.CometDeltaNativeScanExec => Some(c.originalPlan)
+ case org.apache.spark.sql.comet.CometNativeColumnarToRowExec(inner) =>
+ unapply(inner)
case _ => None
}
}
diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
--- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
@@ -118,6 +118,11 @@
val query = spark.read.format("delta").load(testPath).where("part = 1")
val fileScans = query.queryExecution.executedPlan.collect {
case f: FileSourceScanExec => f
+ // Comet regression: Comet's planner replaces `FileSourceScanExec` with
+ // `CometDeltaNativeScanExec`. Its `metrics` map aliases `numFiles` to the
+ // post-pruning task count so this assertion exercises the same partition-skip
+ // semantics from the Comet path.
+ case c: org.apache.spark.sql.comet.CometDeltaNativeScanExec => c
}

// Force the query to read files and generate metrics
Loading