Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ package org.apache.comet.shims

import scala.annotation.nowarn

import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{DataType, StructType}

trait CometTypeShim {
@nowarn // Spark 4 feature; stubbed to false in Spark 3.x for compatibility.
def isStringCollationType(dt: DataType): Boolean = false

@nowarn // Spark 4 feature; Variant shredding doesn't exist in Spark 3.x.
def isVariantStruct(s: StructType): Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

package org.apache.comet.shims

import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.execution.datasources.VariantMetadata
import org.apache.spark.sql.types.{DataType, StringType, StructType}

trait CometTypeShim {
// A `StringType` carries collation metadata in Spark 4.0. Only non-default (non-UTF8_BINARY)
Expand All @@ -31,4 +32,11 @@ trait CometTypeShim {
case st: StringType => st.collationId != StringType.collationId
case _ => false
}

// Spark 4.0's `PushVariantIntoScan` rewrites `VariantType` columns into a `StructType` whose
// fields each carry `__VARIANT_METADATA_KEY` metadata, then pushes `variant_get` paths down as
// ordinary struct field accesses. Comet's native scans don't understand the on-disk Parquet
// variant shredding layout, so reading such a struct natively returns nulls. Detect the marker
// and force scan fallback.
def isVariantStruct(s: StructType): Boolean = VariantMetadata.isVariantStruct(s)
}
39 changes: 0 additions & 39 deletions dev/diffs/4.0.1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1333,21 +1333,6 @@ index 2e33f6505ab..54f5081e10a 100644
}

withTable("t1", "t2") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
index fee375db10a..8c2c24e2c5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala
@@ -33,7 +33,9 @@ import org.apache.spark.sql.types._
import org.apache.spark.types.variant._
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}

-class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest {
+class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest
+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed
+ with IgnoreCometSuite {
def parseJson(s: String): VariantVal = {
val v = VariantBuilder.parseJson(s, false)
new VariantVal(v.getValue, v.getMetadata)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
index 11e9547dfc5..637411056ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationSuite.scala
Expand Down Expand Up @@ -3130,30 +3115,6 @@ index 09ed6955a51..5cd856ff7b6 100644
) {
checkAllParquetReaders(
values = Seq("1.23", "10.34"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
index 458b5dfc0f4..d209f3c85bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVariantShreddingSuite.scala
@@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{IgnoreCometSuite, QueryTest, Row}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.test.SharedSparkSession
@@ -35,7 +35,9 @@ import org.apache.spark.unsafe.types.VariantVal
/**
* Test shredding Variant values in the Parquet reader/writer.
*/
-class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession {
+class ParquetVariantShreddingSuite extends QueryTest with ParquetTest with SharedSparkSession
+ // TODO enable tests once https://github.com/apache/datafusion-comet/issues/2209 is fixed
+ with IgnoreCometSuite {

private def testWithTempDir(name: String)(block: File => Unit): Unit = test(name) {
withTempDir { dir =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index b8f3ea3c6f3..bbd44221288 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,13 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
// we don't need specific support for collation in scans, but this
// is a convenient place to force the whole query to fall back to Spark for now
false
case s: StructType if isVariantStruct(s) =>
// Spark 4.0's PushVariantIntoScan rewrites a VariantType column into a struct of typed
// fields plus per-field VariantMetadata, expecting the scan to honor Parquet variant
// shredding semantics. Comet's native scans don't, so fall back to Spark.
fallbackReasons +=
s"$scanImpl scan does not support shredded Variant reads (column $name)"
false
case s: StructType if s.fields.isEmpty =>
false
case _ =>
Expand Down
Loading