diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala index 622168bcc9..df0f11baa6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometLocalTableScanExec.scala @@ -27,12 +27,13 @@ import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.types.NullType import org.apache.spark.sql.vectorized.ColumnarBatch import com.google.common.base.Objects import org.apache.comet.{CometConf, ConfigEntry} -import org.apache.comet.serde.OperatorOuterClass.Operator +import org.apache.comet.serde.{Compatible, OperatorOuterClass, SupportLevel, Unsupported} import org.apache.comet.serde.operator.CometSink case class CometLocalTableScanExec( @@ -112,7 +113,17 @@ object CometLocalTableScanExec extends CometSink[LocalTableScanExec] { override def enabledConfig: Option[ConfigEntry[Boolean]] = Some( CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED) - override def createExec(nativeOp: Operator, op: LocalTableScanExec): CometNativeExec = { + override def getSupportLevel(op: LocalTableScanExec): SupportLevel = { + if (op.output.exists(_.dataType == NullType)) { + Unsupported(Some("Unsupported data type: NullType")) + } else { + Compatible() + } + } + + override def createExec( + nativeOp: OperatorOuterClass.Operator, + op: LocalTableScanExec): CometNativeExec = { CometScanWrapper(nativeOp, CometLocalTableScanExec(op, op.rows, op.output)) } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index 16601d056b..58d43f4aba 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -3882,6 +3882,12 @@ class CometExecSuite extends CometTestBase { } } + test("LocalTableScanExec with NullType aggregate falls back without crashing") { + withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") { + checkAnswer(sql("SELECT max(col) FROM VALUES (NULL), (NULL) AS t(col)"), Seq(Row(null))) + } + } + test("SparkToColumnar with timestamps in non-UTC timezone") { withTempDir { dir => val path = new java.io.File(dir, "data").getAbsolutePath