Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import org.apache.spark.sql.comet.CometLocalTableScanExec.createMetricsIterator
import org.apache.spark.sql.comet.execution.arrow.CometArrowConverters
import org.apache.spark.sql.execution.{LeafExecNode, LocalTableScanExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.common.base.Objects

import org.apache.comet.{CometConf, ConfigEntry}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.{Compatible, OperatorOuterClass, SupportLevel, Unsupported}
import org.apache.comet.serde.operator.CometSink

case class CometLocalTableScanExec(
Expand Down Expand Up @@ -112,7 +113,17 @@ object CometLocalTableScanExec extends CometSink[LocalTableScanExec] {
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED)

override def createExec(nativeOp: Operator, op: LocalTableScanExec): CometNativeExec = {
override def getSupportLevel(op: LocalTableScanExec): SupportLevel = {
if (op.output.exists(_.dataType == NullType)) {
Unsupported(Some("Unsupported data type: NullType"))
} else {
Compatible()
}
}

override def createExec(
nativeOp: OperatorOuterClass.Operator,
op: LocalTableScanExec): CometNativeExec = {
CometScanWrapper(nativeOp, CometLocalTableScanExec(op, op.rows, op.output))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3882,6 +3882,12 @@ class CometExecSuite extends CometTestBase {
}
}

test("LocalTableScanExec with NullType aggregate falls back without crashing") {
withSQLConf(CometConf.COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED.key -> "true") {
checkAnswer(sql("SELECT max(col) FROM VALUES (NULL), (NULL) AS t(col)"), Seq(Row(null)))
}
}

test("SparkToColumnar with timestamps in non-UTC timezone") {
withTempDir { dir =>
val path = new java.io.File(dir, "data").getAbsolutePath
Expand Down