diff --git a/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala index 46a81818a0..b26267f0b1 100644 --- a/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala +++ b/common/src/main/spark-3.x/org/apache/comet/shims/CometTypeShim.scala @@ -21,9 +21,12 @@ package org.apache.comet.shims import scala.annotation.nowarn -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, StructType} trait CometTypeShim { @nowarn // Spark 4 feature; stubbed to false in Spark 3.x for compatibility. def isStringCollationType(dt: DataType): Boolean = false + + @nowarn // Spark 4 feature; Variant shredding doesn't exist in Spark 3.x. + def isVariantStruct(s: StructType): Boolean = false } diff --git a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala index 1b82c04b20..f75fde7de9 100644 --- a/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala +++ b/common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala @@ -19,7 +19,8 @@ package org.apache.comet.shims -import org.apache.spark.sql.types.{DataType, StringType} +import org.apache.spark.sql.execution.datasources.VariantMetadata +import org.apache.spark.sql.types.{DataType, StringType, StructType} trait CometTypeShim { // A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY) @@ -31,4 +32,11 @@ trait CometTypeShim { case st: StringType => st.collationId != StringType.collationId case _ => false } + + // Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose + // fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as + // ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet + // variant shredding layout, so reading such a struct natively returns nulls. Detect the marker + // and force scan fallback. + def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s) } diff --git a/dev/diffs/4.0.1.diff b/dev/diffs/4.0.1.diff index 0da594b58d..336be60b83 100644 --- a/dev/diffs/4.0.1.diff +++ b/dev/diffs/4.0.1.diff @@ -1333,21 +1333,6 @@ index 2e33f6505ab..54f5081e10a 100644 } withTable("t1", "t2") { -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -index fee375db10a..8c2c24e2c5f 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala -@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._ - import org.apache.spark.types.variant._ - import org.apache.spark.unsafe.types.{UTF8String, VariantVal} - --class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest { -+class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest -+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed -+ with IgnoreCometSuite { - def parseJson(s: String): VariantVal = { - val v = VariantBuilder.parseJson(s, false) - new VariantVal(v.getValue, v.getMetadata) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala index 11e9547dfc5..637411056ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala @@ -3130,30 +3115,6 @@ index 09ed6955a51..5cd856ff7b6 100644 ) { checkAllParquetReaders( values = Seq("1.23", "10.34"), -diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala -index 458b5dfc0f4..d209f3c85bc 100644 ---- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala -+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala -@@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile - import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType} - import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName - --import org.apache.spark.sql.{QueryTest, Row} -+import org.apache.spark.sql.{IgnoreCometSuite, QueryTest, Row} - import org.apache.spark.sql.internal.SQLConf - import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType - import org.apache.spark.sql.test.SharedSparkSession -@@ -35,7 +35,9 @@ import org.apache.spark.unsafe.types.VariantVal - /** - * Test shredding Variant values in the Parquet reader/writer. - */ --class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession { -+class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession -+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed -+ with IgnoreCometSuite { - - private def testWithTempDir(name: String)(block: File => Unit): Unit = test(name) { - withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index b8f3ea3c6f3..bbd44221288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index bba3925c94..c12a012630 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -713,6 +713,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C // we don't need specific support for collation in scans, but this // is a convenient place to force the whole query to fall back to Spark for now false + case s: StructType if isVariantStruct(s) => + // Spark 4.0's PushVariantIntoScan rewrites a VariantType column into a struct of typed + // fields plus per-field VariantMetadata, expecting the scan to honor Parquet variant + // shredding semantics. Comet's native scans don't, so fall back to Spark. + fallbackReasons += + s"$scanImpl scan does not support shredded Variant reads (column $name)" + false case s: StructType if s.fields.isEmpty => false case _ =>