Skip to content
Open
35 changes: 34 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.serde

import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Expression, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year}
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -541,3 +541,36 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] {
}
}
}

trait CommonDateTimeExprs {

def secondsOfTimeToProto(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
val childOpt = expr.children.headOption.orElse {
withInfo(expr, "SecondsOfTime has no child expression")
None
}

childOpt.flatMap { child =>
exprToProtoInternal(child, inputs, binding)
.map { childExpr =>
val builder = ExprOuterClass.Second.newBuilder()
builder.setChild(childExpr)

// SecondsOfTime does not carry a timeZoneId; assume UTC.
builder.setTimezone("UTC")

ExprOuterClass.Expr
.newBuilder()
.setSecond(builder)
.build()
}
.orElse {
withInfo(expr, child)
None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ package org.apache.comet.shims
import org.apache.spark.sql.catalyst.expressions._

import org.apache.comet.expressions.CometEvalMode
import org.apache.comet.serde.CommonStringExprs
import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs}
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
*/
trait CometExprShim extends CommonStringExprs {
trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

Expand All @@ -43,6 +43,9 @@ trait CometExprShim extends CommonStringExprs {
// Right child is the encoding expression.
stringDecode(expr, s.charset, s.bin, inputs, binding)

case _: SecondsOfTime =>
secondsOfTimeToProto(e, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.spark.sql.types.DataTypes

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
*/
trait CometExprShim extends CommonStringExprs {
trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

Expand Down Expand Up @@ -91,6 +91,9 @@ trait CometExprShim extends CommonStringExprs {
// val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
// optExprWithInfo(optExpr, wb, wb.children: _*)

case _: SecondsOfTime =>
secondsOfTimeToProto(e, inputs, binding)

case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringTyp

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible}
import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr}
import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal

/**
* `CometExprShim` acts as a shim for parsing expressions from different Spark versions.
*/
trait CometExprShim extends CommonStringExprs {
trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs {
protected def evalMode(c: Cast): CometEvalMode.Value =
CometEvalModeUtil.fromSparkEvalMode(c.evalMode)

Expand Down Expand Up @@ -113,6 +113,9 @@ trait CometExprShim extends CommonStringExprs {
// val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
// optExprWithInfo(optExpr, wb, wb.children: _*)

case _: SecondsOfTime =>
secondsOfTimeToProto(e, inputs, binding)

case _ => None
}
}
Expand Down
17 changes: 17 additions & 0 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("seconds_of_time expression") {
// This test verifies that seconds() function works correctly with timestamp columns.
// If Spark generates SecondOfTime expression (a RuntimeReplaceable expression),
// it will be handled by the version-specific shim and converted to Second proto.
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000)
readParquetFile(path.toString) { df =>
val query = df.select(expr("second(_1)"))

checkSparkAnswerAndOperator(query)
}
}
}
}

test("hour on int96 timestamp column") {
import testImplicits._

Expand Down