From e732af8756e0e35f5ead1f9310380fc0312654e8 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 2 Mar 2026 11:44:02 +0800 Subject: [PATCH 1/7] Support SecondOfTime expression via version-specific shims --- .../org/apache/comet/serde/strings.scala | 46 +++++++++++++++++++ .../apache/comet/shims/CometExprShim.scala | 5 ++ .../apache/comet/shims/CometExprShim.scala | 3 ++ .../apache/comet/shims/CometExprShim.scala | 3 ++ .../apache/comet/CometExpressionSuite.scala | 17 +++++++ 5 files changed, 74 insertions(+) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 64ba644048..b7648d8023 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -408,4 +408,50 @@ trait CommonStringExprs { None } } + + def secondOfTimeToProto( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childOpt = expr.children.headOption.orElse { + withInfo(expr, "SecondOfTime has no child expression") + None + } + + childOpt.flatMap { child => + val timeZoneId = { + val exprClass = expr.getClass + try { + val timeZoneIdMethod = exprClass.getMethod("timeZoneId") + timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]] + } catch { + case _: NoSuchMethodException => + try { + val timeZoneIdField = exprClass.getField("timeZoneId") + timeZoneIdField.get(expr).asInstanceOf[Option[String]] + } catch { + case _: NoSuchFieldException | _: SecurityException => None + } + } + } + + exprToProtoInternal(child, inputs, binding) + .map { childExpr => + val builder = ExprOuterClass.Second.newBuilder() + builder.setChild(childExpr) + + val timeZone = timeZoneId.getOrElse("UTC") + builder.setTimezone(timeZone) + + ExprOuterClass.Expr + .newBuilder() + .setSecond(builder) + .build() + } + .orElse { + withInfo(expr, child) + None + } + } + } } diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 600931c346..8a1a399ca2 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -21,9 +21,11 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ +import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode import org.apache.comet.serde.CommonStringExprs import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} +import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. @@ -43,6 +45,9 @@ trait CometExprShim extends CommonStringExprs { // Right child is the encoding expression. stringDecode(expr, s.charset, s.bin, inputs, binding) + case _ if expr.getClass.getSimpleName == "SecondOfTime" => + secondOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 8e9cb1c07b..7f97f61606 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -91,6 +91,9 @@ trait CometExprShim extends CommonStringExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) + case _ if expr.getClass.getSimpleName == "SecondOfTime" => + secondOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 2c5cebd166..53f3b9e0bb 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -113,6 +113,9 @@ trait CometExprShim extends CommonStringExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) + case _ if expr.getClass.getSimpleName == "SecondOfTime" => + secondOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index f0f022868f..fb7b096b84 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -572,6 +572,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("seconds_of_time expression") { + // This test verifies that seconds() function works correctly with timestamp columns. + // If Spark generates SecondOfTime expression (a RuntimeReplaceable expression), + // it will be handled by the version-specific shim and converted to Second proto. + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + readParquetFile(path.toString) { df => + val query = df.select(expr("second(_1)")) + + checkSparkAnswerAndOperator(query) + } + } + } + } + test("hour on int96 timestamp column") { import testImplicits._ From b30aead545ce94bc1f764ea76a082c8bc73118ff Mon Sep 17 00:00:00 2001 From: ChenChen Lai <72776271+0lai0@users.noreply.github.com> Date: Tue, 3 Mar 2026 10:29:17 +0800 Subject: [PATCH 2/7] Update spark/src/main/scala/org/apache/comet/serde/strings.scala Co-authored-by: Martin Grigorov --- spark/src/main/scala/org/apache/comet/serde/strings.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index b7648d8023..74e7f69f2a 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -425,7 +425,7 @@ trait CommonStringExprs { val timeZoneIdMethod = exprClass.getMethod("timeZoneId") timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]] } catch { - case _: NoSuchMethodException => + case _: NoSuchMethodException | _: SecurityException => try { val timeZoneIdField = exprClass.getField("timeZoneId") timeZoneIdField.get(expr).asInstanceOf[Option[String]] From 69cb09dccbb9a81b8a773c66a307f5859025f21d Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 3 Mar 2026 16:55:17 +0800 Subject: [PATCH 3/7] modify string.scala to datatime --- .../org/apache/comet/serde/datetime.scala | 51 ++++++++++++++++++- .../org/apache/comet/serde/strings.scala | 46 ----------------- .../apache/comet/shims/CometExprShim.scala | 10 ++-- .../apache/comet/shims/CometExprShim.scala | 8 +-- .../apache/comet/shims/CometExprShim.scala | 8 +-- 5 files changed, 62 insertions(+), 61 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index d36b6a3b40..db984ba180 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Expression, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -541,3 +541,52 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } } } + +trait CommonDateTimeExprs { + + def secondsOfTimeToProto( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childOpt = expr.children.headOption.orElse { + withInfo(expr, "SecondsOfTime has no child expression") + None + } + + childOpt.flatMap { child => + val timeZoneId = { + val exprClass = expr.getClass + try { + val timeZoneIdMethod = exprClass.getMethod("timeZoneId") + timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]] + } catch { + case _: NoSuchMethodException | _: SecurityException => + try { + val timeZoneIdField = exprClass.getField("timeZoneId") + timeZoneIdField.get(expr).asInstanceOf[Option[String]] + } catch { + case _: NoSuchFieldException | _: SecurityException => None + } + } + } + + exprToProtoInternal(child, inputs, binding) + .map { childExpr => + val builder = ExprOuterClass.Second.newBuilder() + builder.setChild(childExpr) + + val timeZone = timeZoneId.getOrElse("UTC") + builder.setTimezone(timeZone) + + ExprOuterClass.Expr + .newBuilder() + .setSecond(builder) + .build() + } + .orElse { + withInfo(expr, child) + None + } + } + } +} diff --git a/spark/src/main/scala/org/apache/comet/serde/strings.scala b/spark/src/main/scala/org/apache/comet/serde/strings.scala index 74e7f69f2a..64ba644048 100644 --- a/spark/src/main/scala/org/apache/comet/serde/strings.scala +++ b/spark/src/main/scala/org/apache/comet/serde/strings.scala @@ -408,50 +408,4 @@ trait CommonStringExprs { None } } - - def secondOfTimeToProto( - expr: Expression, - inputs: Seq[Attribute], - binding: Boolean): Option[Expr] = { - val childOpt = expr.children.headOption.orElse { - withInfo(expr, "SecondOfTime has no child expression") - None - } - - childOpt.flatMap { child => - val timeZoneId = { - val exprClass = expr.getClass - try { - val timeZoneIdMethod = exprClass.getMethod("timeZoneId") - timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]] - } catch { - case _: NoSuchMethodException | _: SecurityException => - try { - val timeZoneIdField = exprClass.getField("timeZoneId") - timeZoneIdField.get(expr).asInstanceOf[Option[String]] - } catch { - case _: NoSuchFieldException | _: SecurityException => None - } - } - } - - exprToProtoInternal(child, inputs, binding) - .map { childExpr => - val builder = ExprOuterClass.Second.newBuilder() - builder.setChild(childExpr) - - val timeZone = timeZoneId.getOrElse("UTC") - builder.setTimezone(timeZone) - - ExprOuterClass.Expr - .newBuilder() - .setSecond(builder) - .build() - } - .orElse { - withInfo(expr, child) - None - } - } - } } diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 8a1a399ca2..99787897b2 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -21,16 +21,14 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ -import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.CommonStringExprs +import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} -import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { +trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) @@ -45,8 +43,8 @@ trait CometExprShim extends CommonStringExprs { // Right child is the encoding expression. stringDecode(expr, s.charset, s.bin, inputs, binding) - case _ if expr.getClass.getSimpleName == "SecondOfTime" => - secondOfTimeToProto(expr, inputs, binding) + case e if e.getClass.getName == "org.apache.spark.sql.catalyst.expressions.SecondsOfTime" => + secondsOfTimeToProto(e, inputs, binding) case _ => None } diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 7f97f61606..13b0835e13 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -24,14 +24,14 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { +trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) @@ -91,8 +91,8 @@ trait CometExprShim extends CommonStringExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) - case _ if expr.getClass.getSimpleName == "SecondOfTime" => - secondOfTimeToProto(expr, inputs, binding) + case e if e.getClass.getName == "org.apache.spark.sql.catalyst.expressions.SecondsOfTime" => + secondsOfTimeToProto(e, inputs, binding) case _ => None } diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 53f3b9e0bb..48640e55e1 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -27,14 +27,14 @@ import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringTyp import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { +trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) @@ -113,8 +113,8 @@ trait CometExprShim extends CommonStringExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) - case _ if expr.getClass.getSimpleName == "SecondOfTime" => - secondOfTimeToProto(expr, inputs, binding) + case e if e.getClass.getName == "org.apache.spark.sql.catalyst.expressions.SecondsOfTime" => + secondsOfTimeToProto(e, inputs, binding) case _ => None } From cbecd163f3ea172be4b8371df6eefc698888a56a Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 3 Mar 2026 23:47:59 +0800 Subject: [PATCH 4/7] remove timeZoneId refactor --- .../org/apache/comet/serde/datetime.scala | 20 ++----------------- 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index db984ba180..f093b6a74c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -554,29 +554,13 @@ trait CommonDateTimeExprs { } childOpt.flatMap { child => - val timeZoneId = { - val exprClass = expr.getClass - try { - val timeZoneIdMethod = exprClass.getMethod("timeZoneId") - timeZoneIdMethod.invoke(expr).asInstanceOf[Option[String]] - } catch { - case _: NoSuchMethodException | _: SecurityException => - try { - val timeZoneIdField = exprClass.getField("timeZoneId") - timeZoneIdField.get(expr).asInstanceOf[Option[String]] - } catch { - case _: NoSuchFieldException | _: SecurityException => None - } - } - } - exprToProtoInternal(child, inputs, binding) .map { childExpr => val builder = ExprOuterClass.Second.newBuilder() builder.setChild(childExpr) - val timeZone = timeZoneId.getOrElse("UTC") - builder.setTimezone(timeZone) + // SecondsOfTime does not carry a timeZoneId; assume UTC. + builder.setTimezone("UTC") ExprOuterClass.Expr .newBuilder() From 9191b3e6b6aa2a5d3a1f97d8058a1c972b3950ae Mon Sep 17 00:00:00 2001 From: ChenChen Lai <72776271+0lai0@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:33:06 +0800 Subject: [PATCH 5/7] Update spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala Co-authored-by: Martin Grigorov --- .../main/spark-3.4/org/apache/comet/shims/CometExprShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 99787897b2..78bcd79a4d 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -43,7 +43,7 @@ trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { // Right child is the encoding expression. stringDecode(expr, s.charset, s.bin, inputs, binding) - case e if e.getClass.getName == "org.apache.spark.sql.catalyst.expressions.SecondsOfTime" => + case _: SecondsOfTime => secondsOfTimeToProto(e, inputs, binding) case _ => None From 4447a453125faebe040f3bbff6adef634f38ad57 Mon Sep 17 00:00:00 2001 From: ChenChen Lai <72776271+0lai0@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:33:15 +0800 Subject: [PATCH 6/7] Update spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala Co-authored-by: Martin Grigorov --- .../main/spark-4.0/org/apache/comet/shims/CometExprShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 48640e55e1..1f37e47e29 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -113,7 +113,7 @@ trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) - case e if e.getClass.getName == "org.apache.spark.sql.catalyst.expressions.SecondsOfTime" => + case _: SecondsOfTime => secondsOfTimeToProto(e, inputs, binding) case _ => None From d26f8978f38fec298a85f95e67f5a3cc1030b8c7 Mon Sep 17 00:00:00 2001 From: ChenChen Lai <72776271+0lai0@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:33:23 +0800 Subject: [PATCH 7/7] Update spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala Co-authored-by: Martin Grigorov --- .../main/spark-3.5/org/apache/comet/shims/CometExprShim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 13b0835e13..c3027bb12a 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -91,7 +91,7 @@ trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) - case e if e.getClass.getName == "org.apache.spark.sql.catalyst.expressions.SecondsOfTime" => + case _: SecondsOfTime => secondsOfTimeToProto(e, inputs, binding) case _ => None