From 47ba43ddb3ccb22729b3f415f86878463b3abb6c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 09:24:39 -0600 Subject: [PATCH] fix: fall back for shredded Variant scans on Spark 4.0 Spark 4.0's PushVariantIntoScan rewrites VariantType columns into a StructType whose fields carry __VARIANT_METADATA_KEY metadata, then pushes variant_get paths down as ordinary struct field accesses. By the time CometScanRule runs, the requiredSchema looks like a normal struct of primitives, so Comet scans natively but does not honor the on-disk variant shredding layout, returning nulls for typed paths. Detect the marker via VariantMetadata.isVariantStruct in the Spark 4.0 type shim and reject those structs in CometScanTypeChecker so the scan falls back to Spark. Stop ignoring VariantShreddingSuite and ParquetVariantShreddingSuite in the 4.0.1 diff. Closes #2209. --- .../apache/comet/shims/CometTypeShim.scala | 5 ++- .../apache/comet/shims/CometTypeShim.scala | 10 ++++- dev/diffs/4.0.1.diff | 39 ------------------- .../apache/comet/rules/CometScanRule.scala | 7 ++++ 4 files changed, 20 insertions(+), 41 deletions(-) diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala index 46a81818a0..b26267f0b1 100644 --- a/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala +++ b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala @@ -21,9 +21,12 @@ package org.apache.comet.shims import scala.annotation.nowarn -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, StructType} trait CometTypeShim { @nowarn // Spark 4 feature; stubbed to false in Spark 3.x for compatibility. def isStringCollationType(dt: DataType): Boolean = false + + @nowarn // Spark 4 feature; Variant shredding doesn't exist in Spark 3.x. + def isVariantStruct(s: StructType): Boolean = false } diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala index 1b82c04b20..f75fde7de9 100644 --- a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala +++ b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala @@ -19,7 +19,8 @@ package org.apache.comet.shims -import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.sql.execution.datasources.VariantMetadata +import org.apache.spark.sql.types.{DataType, StringType, StructType} trait CometTypeShim { // A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY) @@ -31,4 +32,11 @@ trait CometTypeShim { case st: StringType => st.collationId != StringType.collationId case _ => false } + + // Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose + // fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as + // ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet + // variant shredding layout, so reading such a struct natively returns nulls. Detect the marker + // and force scan fallback. + def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s) } diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 0da594b58d..336be60b83 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -1333,21 +1333,6 @@ index 2e33f6505ab..54f5081e10a 100644 } withTable("t1", "t2") { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -index fee375db10a..8c2c24e2c5f 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._ - import org.apache.spark.types.variant._ - import org.apache.spark.unsafe.types.{UTF8String, VariantVal} - --class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest { -+class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest -+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed -+ with IgnoreCometSuite { - def parseJson(s: String): VariantVal = { - val v = VariantBuilder.parseJson(s, false) - new VariantVal(v.getValue, v.getMetadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala index 11e9547dfc5..637411056ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala @@ -3130,30 +3115,6 @@ index 09ed6955a51..5cd856ff7b6 100644 ) { checkAllParquetReaders( values = Seq("1.23", "10.34"), -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala -index 458b5dfc0f4..d209f3c85bc 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala -@@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile - import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType} - import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName - --import org.apache.spark.sql.{QueryTest, Row} -+import org.apache.spark.sql.{IgnoreCometSuite, QueryTest, Row} - import org.apache.spark.sql.internal.SQLConf - import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType - import org.apache.spark.sql.test.SharedSparkSession -@@ -35,7 +35,9 @@ import org.apache.spark.unsafe.types.VariantVal - /** - * Test shredding Variant values in the Parquet reader/writer. - */ --class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession { -+class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession -+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed -+ with IgnoreCometSuite { - - private def testWithTempDir(name: String)(block: File => Unit): Unit = test(name) { - withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index bba3925c94..c12a012630 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -713,6 +713,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C // we don't need specific support for collation in scans, but this // is a convenient place to force the whole query to fall back to Spark for now false + case s: StructType if isVariantStruct(s) => + // Spark 4.0's PushVariantIntoScan rewrites a VariantType column into a struct of typed + // fields plus per-field VariantMetadata, expecting the scan to honor Parquet variant + // shredding semantics. Comet's native scans don't, so fall back to Spark. + fallbackReasons += + s"$scanImpl scan does not support shredded Variant reads (column $name)" + false case s: StructType if s.fields.isEmpty => false case _ =>