From a16a81a94e9f122a4eeefe640dc263540d5985fe Mon Sep 17 00:00:00 2001 From: vinodkc Date: Mon, 24 Nov 2025 18:42:29 -0800 Subject: [PATCH] Add time_bucket method --- .../pyspark/sql/connect/functions/builtin.py | 7 + python/pyspark/sql/functions/__init__.py | 1 + python/pyspark/sql/functions/builtin.py | 73 +++++++ .../org/apache/spark/sql/functions.scala | 30 +++ .../catalyst/analysis/FunctionRegistry.scala | 1 + .../expressions/timeExpressions.scala | 105 ++++++++++ .../sql-functions/sql-expression-schema.md | 1 + .../sql-tests/analyzer-results/time.sql.out | 172 ++++++++++++++++ .../test/resources/sql-tests/inputs/time.sql | 44 +++++ .../resources/sql-tests/results/time.sql.out | 186 ++++++++++++++++++ .../spark/sql/TimeFunctionsSuiteBase.scala | 153 ++++++++++++++ .../CollationExpressionWalkerSuite.scala | 7 + 12 files changed, 780 insertions(+) diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index 4dfee6b96487..785813f5a19d 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -3664,6 +3664,13 @@ def time_trunc(unit: "ColumnOrName", time: "ColumnOrName") -> Column: time_trunc.__doc__ = pysparkfuncs.time_trunc.__doc__ +def time_bucket(bucket_width: "ColumnOrName", time: "ColumnOrName") -> Column: + return _invoke_function_over_columns("time_bucket", bucket_width, time) + + +time_bucket.__doc__ = pysparkfuncs.time_bucket.__doc__ + + def timestamp_millis(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("timestamp_millis", col) diff --git a/python/pyspark/sql/functions/__init__.py b/python/pyspark/sql/functions/__init__.py index ac9ae67ac446..5b35f1ea72c1 100644 --- a/python/pyspark/sql/functions/__init__.py +++ b/python/pyspark/sql/functions/__init__.py @@ -248,6 +248,7 @@ "timestamp_micros", "timestamp_millis", "timestamp_seconds", + "time_bucket", "time_diff", "time_from_micros", "time_from_millis", diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 17eda26f76e1..1499221360b5 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -12920,6 +12920,79 @@ def time_trunc(unit: "ColumnOrName", time: "ColumnOrName") -> Column: return _invoke_function_over_columns("time_trunc", unit, time) +@_try_remote_functions +def time_bucket(bucket_width: "ColumnOrName", time: "ColumnOrName") -> Column: + """ + Returns the start of the time bucket containing the input time value. + Buckets are aligned to midnight (00:00:00). + + .. versionadded:: 4.2.0 + + Parameters + ---------- + bucket_width : :class:`~pyspark.sql.Column` or column name + A day-time interval specifying the width of each bucket. + Can be specified as a column expression like ``sf.expr("INTERVAL '15' MINUTE")``. + time : :class:`~pyspark.sql.Column` or column name + The time value to bucket. Must be of TIME type. + + Returns + ------- + :class:`~pyspark.sql.Column` + The start time of the bucket containing the input time. + + See Also + -------- + :meth:`pyspark.sql.functions.time_trunc` + :meth:`pyspark.sql.functions.date_trunc` + + Examples + -------- + Example 1: Basic 15-minute bucketing + + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame([ + ... (1, '09:37:22'), + ... (2, '14:47:00'), + ... (3, '10:05:30') + ... ], ['id', 't']) + >>> df = df.withColumn("t", sf.to_time(df.t)) + >>> df.select(sf.time_bucket(sf.expr("INTERVAL '15' MINUTE"), df.t).alias("bucket")).show() + +--------+ + | bucket| + +--------+ + |09:30:00| + |14:45:00| + |10:00:00| + +--------+ + + Example 2: Hourly bucketing + + >>> df.select(sf.time_bucket(sf.expr("INTERVAL '1' HOUR"), df.t).alias("bucket")).show() + +--------+ + | bucket| + +--------+ + |09:00:00| + |14:00:00| + |10:00:00| + +--------+ + + Example 3: Aggregation with time buckets + + >>> df.groupBy( + ... sf.time_bucket(sf.expr("INTERVAL '30' MINUTE"), df.t).alias("time_slot") + ... ).count().orderBy("time_slot").show() + +---------+-----+ + |time_slot|count| + +---------+-----+ + | 09:30:00| 1| + | 10:00:00| 1| + | 14:30:00| 1| + +---------+-----+ + """ + return _invoke_function_over_columns("time_bucket", bucket_width, time) + + @_try_remote_functions def timestamp_millis(col: "ColumnOrName") -> Column: """ diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index 78675364d841..a0557fabe31e 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -6910,6 +6910,36 @@ object functions { Column.fn("time_trunc", unit, time) } + /** + * Returns the start of the time bucket containing the input time value. Buckets are aligned to + * midnight (00:00:00). + * + * @param bucketWidth + * A Column representing a day-time interval (e.g., INTERVAL '15' MINUTE) + * @param time + * The time column to bucket + * + * @group datetime_funcs + * @since 4.2.0 + */ + def time_bucket(bucketWidth: Column, time: Column): Column = + Column.fn("time_bucket", bucketWidth, time) + + /** + * Returns the start of the time bucket containing the input time value. Buckets are aligned to + * midnight (00:00:00). + * + * @param bucketWidth + * A string representing a day-time interval (e.g., "15 minutes") + * @param time + * The time column to bucket + * + * @group datetime_funcs + * @since 4.2.0 + */ + def time_bucket(bucketWidth: String, time: Column): Column = + time_bucket(expr(s"INTERVAL '$bucketWidth'"), time) + /** * Creates a TIME from the number of seconds since midnight. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 199306104d85..a857c73e94b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -701,6 +701,7 @@ object FunctionRegistry { expression[MakeDate]("make_date"), expression[MakeTime]("make_time"), expression[TimeTrunc]("time_trunc"), + expression[TimeBucket]("time_bucket", setAlias = true), expression[TimeFromSeconds]("time_from_seconds"), expression[TimeFromMillis]("time_from_millis"), expression[TimeFromMicros]("time_from_micros"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala index f5fc953bff87..055af9c9387e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/timeExpressions.scala @@ -752,6 +752,111 @@ case class TimeTrunc(unit: Expression, time: Expression) } } +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ + _FUNC_(bucket_width, time) - Returns the start of the time bucket containing `time`, + where buckets are aligned to midnight (00:00:00). + """, + arguments = """ + Arguments: + * bucket_width - A day-time interval specifying the width of each bucket (e.g., INTERVAL '15' MINUTE). + * time - The time value to bucket. Must be of TIME type. + """, + examples = """ + Examples: + > SELECT _FUNC_(INTERVAL '15' MINUTE, TIME'09:37:22'); + 09:30:00 + > SELECT _FUNC_(INTERVAL '30' MINUTE, TIME'14:47:00'); + 14:30:00 + > SELECT _FUNC_(INTERVAL '1' HOUR, TIME'16:35:00'); + 16:00:00 + > SELECT _FUNC_(INTERVAL '2' HOUR, TIME'15:20:00'); + 14:00:00 + """, + note = """ + Notes: + * Buckets are aligned to midnight (00:00:00) and cannot span across midnight. + * The bucket width must be a positive day-time interval. + * The function returns the start time of the bucket (inclusive lower bound). + """, + group = "datetime_funcs", + since = "4.2.0") +// scalastyle:on line.size.limit +case class TimeBucket( + bucketWidth: Expression, + time: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def left: Expression = bucketWidth + override def right: Expression = time + + override def inputTypes: Seq[AbstractDataType] = + Seq(DayTimeIntervalType, AnyTimeType) + + override def dataType: DataType = time.dataType + + override def nullable: Boolean = time.nullable || bucketWidth.nullable + + override def nullIntolerant: Boolean = true + + override def checkInputDataTypes(): TypeCheckResult = { + super.checkInputDataTypes() match { + case TypeCheckSuccess => + if (!bucketWidth.foldable) { + DataTypeMismatch( + errorSubClass = "NON_FOLDABLE_INPUT", + messageParameters = Map( + "inputName" -> toSQLId("bucket_width"), + "inputType" -> toSQLType(bucketWidth.dataType), + "inputExpr" -> toSQLExpr(bucketWidth) + ) + ) + } else { + val widthValue = bucketWidth.eval() + if (widthValue != null) { + val widthMicros = widthValue.asInstanceOf[Long] + if (widthMicros <= 0) { + DataTypeMismatch( + errorSubClass = "VALUE_OUT_OF_RANGE", + messageParameters = Map( + "exprName" -> "bucket_width", + "valueRange" -> s"(0, ${Long.MaxValue}]", + "currentValue" -> toSQLValue(widthMicros, LongType) + ) + ) + } else { + TypeCheckSuccess + } + } else { + TypeCheckSuccess + } + } + case failure => failure + } + } + + override def nullSafeEval(bucketWidthValue: Any, timeValue: Any): Any = { + val bucketMicros = bucketWidthValue.asInstanceOf[Long] + val timeNanos = timeValue.asInstanceOf[Long] + val bucketNanos = bucketMicros * NANOS_PER_MICROS + (timeNanos / bucketNanos) * bucketNanos + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + defineCodeGen(ctx, ev, (bucket, time) => { + val scaleFactor = s"${NANOS_PER_MICROS}L" + s"($time / ($bucket * $scaleFactor)) * ($bucket * $scaleFactor)" + }) + } + + override def prettyName: String = "time_bucket" + + override protected def withNewChildrenInternal( + newLeft: Expression, newRight: Expression): TimeBucket = + copy(bucketWidth = newLeft, time = newRight) +} + abstract class IntegralToTimeBase extends UnaryExpression with ExpectsInputTypes with CodegenFallback { protected def upScaleFactor: Long diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index cd4538162b8d..52b32758a41b 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -362,6 +362,7 @@ | org.apache.spark.sql.catalyst.expressions.ThetaIntersection | theta_intersection | SELECT theta_sketch_estimate(theta_intersection(theta_sketch_agg(col1), theta_sketch_agg(col2))) FROM VALUES (5, 4), (1, 4), (2, 5), (2, 5), (3, 1) tab(col1, col2) | struct | | org.apache.spark.sql.catalyst.expressions.ThetaSketchEstimate | theta_sketch_estimate | SELECT theta_sketch_estimate(theta_sketch_agg(col)) FROM VALUES (1), (1), (2), (2), (3) tab(col) | struct | | org.apache.spark.sql.catalyst.expressions.ThetaUnion | theta_union | SELECT theta_sketch_estimate(theta_union(theta_sketch_agg(col1), theta_sketch_agg(col2))) FROM VALUES (1, 4), (1, 4), (2, 5), (2, 5), (3, 6) tab(col1, col2) | struct | +| org.apache.spark.sql.catalyst.expressions.TimeBucket | time_bucket | SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22') | struct | | org.apache.spark.sql.catalyst.expressions.TimeDiff | time_diff | SELECT time_diff('HOUR', TIME'20:30:29', TIME'21:30:28') | struct | | org.apache.spark.sql.catalyst.expressions.TimeFromMicros | time_from_micros | SELECT time_from_micros(0) | struct | | org.apache.spark.sql.catalyst.expressions.TimeFromMillis | time_from_millis | SELECT time_from_millis(0) | struct | diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/time.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/time.sql.out index 7075a9f8c4b4..bdb423cb9e05 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/time.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/time.sql.out @@ -2294,3 +2294,175 @@ SELECT time_from_micros(time_to_micros(TIME'14:30:00.5')) -- !query analysis Project [time_from_micros(time_to_micros(14:30:00.5)) AS time_from_micros(time_to_micros(TIME '14:30:00.5'))#x] +- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22.123456') +-- !query analysis +Project [time_bucket(INTERVAL '15' MINUTE, 09:37:22.123456) AS time_bucket(INTERVAL '15' MINUTE, TIME '09:37:22.123456')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '30' MINUTE, TIME'14:47:00.987654') +-- !query analysis +Project [time_bucket(INTERVAL '30' MINUTE, 14:47:00.987654) AS time_bucket(INTERVAL '30' MINUTE, TIME '14:47:00.987654')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '1' HOUR, TIME'16:35:00.500000') +-- !query analysis +Project [time_bucket(INTERVAL '01' HOUR, 16:35:00.5) AS time_bucket(INTERVAL '01' HOUR, TIME '16:35:00.5')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '2' HOUR, TIME'15:20:00.123000') +-- !query analysis +Project [time_bucket(INTERVAL '02' HOUR, 15:20:00.123) AS time_bucket(INTERVAL '02' HOUR, TIME '15:20:00.123')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '90' MINUTE, TIME'10:45:00.456789') +-- !query analysis +Project [time_bucket(INTERVAL '90' MINUTE, 10:45:00.456789) AS time_bucket(INTERVAL '90' MINUTE, TIME '10:45:00.456789')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '1' HOUR, TIME'00:00:00.123456') +-- !query analysis +Project [time_bucket(INTERVAL '01' HOUR, 00:00:00.123456) AS time_bucket(INTERVAL '01' HOUR, TIME '00:00:00.123456')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '30' MINUTE, TIME'23:59:59.888888') +-- !query analysis +Project [time_bucket(INTERVAL '30' MINUTE, 23:59:59.888888) AS time_bucket(INTERVAL '30' MINUTE, TIME '23:59:59.888888')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:30:00.555555') +-- !query analysis +Project [time_bucket(INTERVAL '15' MINUTE, 09:30:00.555555) AS time_bucket(INTERVAL '15' MINUTE, TIME '09:30:00.555555')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '1' SECOND, TIME'00:00:00.000001') +-- !query analysis +Project [time_bucket(INTERVAL '01' SECOND, 00:00:00.000001) AS time_bucket(INTERVAL '01' SECOND, TIME '00:00:00.000001')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '100' MILLISECOND, TIME'12:34:56.789123') +-- !query analysis +Project [time_bucket(INTERVAL '00.1' SECOND, 12:34:56.789123) AS time_bucket(INTERVAL '00.1' SECOND, TIME '12:34:56.789123')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '1000' MICROSECOND, TIME'14:30:00.555555') +-- !query analysis +Project [time_bucket(INTERVAL '00.001' SECOND, 14:30:00.555555) AS time_bucket(INTERVAL '00.001' SECOND, TIME '14:30:00.555555')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(INTERVAL '15' MINUTE, NULL) +-- !query analysis +Project [time_bucket(INTERVAL '15' MINUTE, cast(null as time(6))) AS time_bucket(INTERVAL '15' MINUTE, NULL)#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(NULL, TIME'12:34:56.789012') +-- !query analysis +Project [time_bucket(cast(null as interval day to second), 12:34:56.789012) AS time_bucket(NULL, TIME '12:34:56.789012')#x] ++- OneRowRelation + + +-- !query +SELECT time_bucket(NULL, NULL) +-- !query analysis +Project [time_bucket(cast(null as interval day to second), cast(null as time(6))) AS time_bucket(NULL, NULL)#x] ++- OneRowRelation + + +-- !query +WITH time_bucket_data AS ( + SELECT * FROM VALUES + (1, TIME'09:15:30.123456'), + (2, TIME'09:37:45.654321'), + (3, TIME'10:05:12.987654'), + (4, TIME'14:42:00.111222'), + (5, TIME'14:55:30.333444') + AS t(id, event_time) +) +SELECT time_bucket(INTERVAL '30' MINUTE, event_time) AS time_slot, + COUNT(*) AS cnt +FROM time_bucket_data +GROUP BY time_slot +ORDER BY time_slot +-- !query analysis +WithCTE +:- CTERelationDef xxxx, false +: +- SubqueryAlias time_bucket_data +: +- Project [id#x, event_time#x] +: +- SubqueryAlias t +: +- LocalRelation [id#x, event_time#x] ++- Sort [time_slot#x ASC NULLS FIRST], true + +- Aggregate [time_bucket(INTERVAL '30' MINUTE, event_time#x)], [time_bucket(INTERVAL '30' MINUTE, event_time#x) AS time_slot#x, count(1) AS cnt#xL] + +- SubqueryAlias time_bucket_data + +- CTERelationRef xxxx, true, [id#x, event_time#x], false, false, 5 + + +-- !query +SELECT time_bucket(INTERVAL '0' MINUTE, TIME'12:34:56.789123') +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE", + "sqlState" : "42K09", + "messageParameters" : { + "currentValue" : "0L", + "exprName" : "bucket_width", + "sqlExpr" : "\"time_bucket(INTERVAL '00' MINUTE, TIME '12:34:56.789123')\"", + "valueRange" : "(0, 9223372036854775807]" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 62, + "fragment" : "time_bucket(INTERVAL '0' MINUTE, TIME'12:34:56.789123')" + } ] +} + + +-- !query +SELECT time_bucket(INTERVAL '-15' MINUTE, TIME'12:34:56.456789') +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE", + "sqlState" : "42K09", + "messageParameters" : { + "currentValue" : "-900000000L", + "exprName" : "bucket_width", + "sqlExpr" : "\"time_bucket(INTERVAL '-15' MINUTE, TIME '12:34:56.456789')\"", + "valueRange" : "(0, 9223372036854775807]" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 64, + "fragment" : "time_bucket(INTERVAL '-15' MINUTE, TIME'12:34:56.456789')" + } ] +} diff --git a/sql/core/src/test/resources/sql-tests/inputs/time.sql b/sql/core/src/test/resources/sql-tests/inputs/time.sql index 3e1b62f84cbb..86c3cc40d3e2 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/time.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/time.sql @@ -365,3 +365,47 @@ SELECT time_to_millis(time_from_millis(52200500)); SELECT time_from_millis(time_to_millis(TIME'14:30:00.5')); SELECT time_to_micros(time_from_micros(52200500000)); SELECT time_from_micros(time_to_micros(TIME'14:30:00.5')); + +-- time_bucket function tests + +-- Basic bucketing with various intervals +SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22.123456'); +SELECT time_bucket(INTERVAL '30' MINUTE, TIME'14:47:00.987654'); +SELECT time_bucket(INTERVAL '1' HOUR, TIME'16:35:00.500000'); +SELECT time_bucket(INTERVAL '2' HOUR, TIME'15:20:00.123000'); +SELECT time_bucket(INTERVAL '90' MINUTE, TIME'10:45:00.456789'); + +-- Edge cases (midnight, end of day, exact bucket boundary) +SELECT time_bucket(INTERVAL '1' HOUR, TIME'00:00:00.123456'); +SELECT time_bucket(INTERVAL '30' MINUTE, TIME'23:59:59.888888'); +SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:30:00.555555'); + +-- Sub-second buckets +SELECT time_bucket(INTERVAL '1' SECOND, TIME'00:00:00.000001'); +SELECT time_bucket(INTERVAL '100' MILLISECOND, TIME'12:34:56.789123'); +SELECT time_bucket(INTERVAL '1000' MICROSECOND, TIME'14:30:00.555555'); + +-- Null handling +SELECT time_bucket(INTERVAL '15' MINUTE, NULL); +SELECT time_bucket(NULL, TIME'12:34:56.789012'); +SELECT time_bucket(NULL, NULL); + +-- Aggregation and grouping with CTE +WITH time_bucket_data AS ( + SELECT * FROM VALUES + (1, TIME'09:15:30.123456'), + (2, TIME'09:37:45.654321'), + (3, TIME'10:05:12.987654'), + (4, TIME'14:42:00.111222'), + (5, TIME'14:55:30.333444') + AS t(id, event_time) +) +SELECT time_bucket(INTERVAL '30' MINUTE, event_time) AS time_slot, + COUNT(*) AS cnt +FROM time_bucket_data +GROUP BY time_slot +ORDER BY time_slot; + +-- Error cases +SELECT time_bucket(INTERVAL '0' MINUTE, TIME'12:34:56.789123'); +SELECT time_bucket(INTERVAL '-15' MINUTE, TIME'12:34:56.456789'); diff --git a/sql/core/src/test/resources/sql-tests/results/time.sql.out b/sql/core/src/test/resources/sql-tests/results/time.sql.out index 802c9a358b55..88084ce1d78f 100644 --- a/sql/core/src/test/resources/sql-tests/results/time.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/time.sql.out @@ -2761,3 +2761,189 @@ SELECT time_from_micros(time_to_micros(TIME'14:30:00.5')) struct -- !query output 14:30:00.5 + + +-- !query +SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22.123456') +-- !query schema +struct +-- !query output +09:30:00 + + +-- !query +SELECT time_bucket(INTERVAL '30' MINUTE, TIME'14:47:00.987654') +-- !query schema +struct +-- !query output +14:30:00 + + +-- !query +SELECT time_bucket(INTERVAL '1' HOUR, TIME'16:35:00.500000') +-- !query schema +struct +-- !query output +16:00:00 + + +-- !query +SELECT time_bucket(INTERVAL '2' HOUR, TIME'15:20:00.123000') +-- !query schema +struct +-- !query output +14:00:00 + + +-- !query +SELECT time_bucket(INTERVAL '90' MINUTE, TIME'10:45:00.456789') +-- !query schema +struct +-- !query output +10:30:00 + + +-- !query +SELECT time_bucket(INTERVAL '1' HOUR, TIME'00:00:00.123456') +-- !query schema +struct +-- !query output +00:00:00 + + +-- !query +SELECT time_bucket(INTERVAL '30' MINUTE, TIME'23:59:59.888888') +-- !query schema +struct +-- !query output +23:30:00 + + +-- !query +SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:30:00.555555') +-- !query schema +struct +-- !query output +09:30:00 + + +-- !query +SELECT time_bucket(INTERVAL '1' SECOND, TIME'00:00:00.000001') +-- !query schema +struct +-- !query output +00:00:00 + + +-- !query +SELECT time_bucket(INTERVAL '100' MILLISECOND, TIME'12:34:56.789123') +-- !query schema +struct +-- !query output +12:34:56.7 + + +-- !query +SELECT time_bucket(INTERVAL '1000' MICROSECOND, TIME'14:30:00.555555') +-- !query schema +struct +-- !query output +14:30:00.555 + + +-- !query +SELECT time_bucket(INTERVAL '15' MINUTE, NULL) +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT time_bucket(NULL, TIME'12:34:56.789012') +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT time_bucket(NULL, NULL) +-- !query schema +struct +-- !query output +NULL + + +-- !query +WITH time_bucket_data AS ( + SELECT * FROM VALUES + (1, TIME'09:15:30.123456'), + (2, TIME'09:37:45.654321'), + (3, TIME'10:05:12.987654'), + (4, TIME'14:42:00.111222'), + (5, TIME'14:55:30.333444') + AS t(id, event_time) +) +SELECT time_bucket(INTERVAL '30' MINUTE, event_time) AS time_slot, + COUNT(*) AS cnt +FROM time_bucket_data +GROUP BY time_slot +ORDER BY time_slot +-- !query schema +struct +-- !query output +09:00:00 1 +09:30:00 1 +10:00:00 1 +14:30:00 2 + + +-- !query +SELECT time_bucket(INTERVAL '0' MINUTE, TIME'12:34:56.789123') +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE", + "sqlState" : "42K09", + "messageParameters" : { + "currentValue" : "0L", + "exprName" : "bucket_width", + "sqlExpr" : "\"time_bucket(INTERVAL '00' MINUTE, TIME '12:34:56.789123')\"", + "valueRange" : "(0, 9223372036854775807]" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 62, + "fragment" : "time_bucket(INTERVAL '0' MINUTE, TIME'12:34:56.789123')" + } ] +} + + +-- !query +SELECT time_bucket(INTERVAL '-15' MINUTE, TIME'12:34:56.456789') +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE", + "sqlState" : "42K09", + "messageParameters" : { + "currentValue" : "-900000000L", + "exprName" : "bucket_width", + "sqlExpr" : "\"time_bucket(INTERVAL '-15' MINUTE, TIME '12:34:56.456789')\"", + "valueRange" : "(0, 9223372036854775807]" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 64, + "fragment" : "time_bucket(INTERVAL '-15' MINUTE, TIME'12:34:56.456789')" + } ] +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala index 57356324d005..c43d063f18a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TimeFunctionsSuiteBase.scala @@ -478,6 +478,159 @@ abstract class TimeFunctionsSuiteBase extends QueryTest with SharedSparkSession ) } + test("time_bucket with various intervals") { + // 15-minute buckets + checkAnswer( + sql("SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22')"), + Row(LocalTime.of(9, 30, 0)) :: Nil) + + // 30-minute buckets + checkAnswer( + sql("SELECT time_bucket(INTERVAL '30' MINUTE, TIME'14:47:00')"), + Row(LocalTime.of(14, 30, 0)) :: Nil) + + // 1-hour buckets + checkAnswer( + sql("SELECT time_bucket(INTERVAL '1' HOUR, TIME'16:35:00')"), + Row(LocalTime.of(16, 0, 0)) :: Nil) + + // 2-hour buckets + checkAnswer( + sql("SELECT time_bucket(INTERVAL '2' HOUR, TIME'15:20:00')"), + Row(LocalTime.of(14, 0, 0)) :: Nil) + + // Complex interval (90 minutes = 1.5 hours) + checkAnswer( + sql("SELECT time_bucket(INTERVAL '90' MINUTE, TIME'10:45:00')"), + Row(LocalTime.of(10, 30, 0)) :: Nil) + } + + test("time_bucket edge cases") { + // Midnight (start of day) + checkAnswer( + sql("SELECT time_bucket(INTERVAL '1' HOUR, TIME'00:00:00')"), + Row(LocalTime.of(0, 0, 0)) :: Nil) + + // End of day + checkAnswer( + sql("SELECT time_bucket(INTERVAL '30' MINUTE, TIME'23:59:59.999999')"), + Row(LocalTime.of(23, 30, 0)) :: Nil) + + // Exact bucket boundary + checkAnswer( + sql("SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:30:00')"), + Row(LocalTime.of(9, 30, 0)) :: Nil) + } + + test("time_bucket with null inputs") { + checkAnswer( + sql("SELECT time_bucket(INTERVAL '15' MINUTE, NULL)"), + Row(null) :: Nil) + + checkAnswer( + sql("SELECT time_bucket(NULL, TIME'12:34:56')"), + Row(null) :: Nil) + + checkAnswer( + sql("SELECT time_bucket(NULL, NULL)"), + Row(null) :: Nil) + } + + test("time_bucket with table data") { + withTable("time_bucket_test") { + sql(""" + CREATE TABLE time_bucket_test ( + id INT, + event_time TIME(6) + ) USING parquet + """) + + sql(""" + INSERT INTO time_bucket_test VALUES + (1, TIME'09:15:30'), + (2, TIME'09:37:45'), + (3, TIME'10:05:12'), + (4, TIME'14:42:00'), + (5, TIME'14:55:30') + """) + + // Group by 30-minute buckets + checkAnswer( + sql(""" + SELECT time_bucket(INTERVAL '30' MINUTE, event_time) AS time_slot, + COUNT(*) AS cnt + FROM time_bucket_test + GROUP BY time_slot + ORDER BY time_slot + """), + Row(LocalTime.of(9, 0, 0), 1L) :: + Row(LocalTime.of(9, 30, 0), 1L) :: + Row(LocalTime.of(10, 0, 0), 1L) :: + Row(LocalTime.of(14, 30, 0), 2L) :: Nil) + } + } + + test("time_bucket error handling - zero interval") { + checkError( + exception = intercept[AnalysisException] { + sql("SELECT time_bucket(INTERVAL '0' MINUTE, TIME'12:34:56')").collect() + }, + condition = "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE", + parameters = Map( + "sqlExpr" -> "\"time_bucket(INTERVAL '00' MINUTE, TIME '12:34:56')\"", + "exprName" -> "bucket_width", + "valueRange" -> "(0, 9223372036854775807]", + "currentValue" -> "0L" + ), + context = ExpectedContext( + fragment = "time_bucket(INTERVAL '0' MINUTE, TIME'12:34:56')", + start = 7, + stop = 54 + ) + ) + } + + test("time_bucket error handling - negative interval") { + checkError( + exception = intercept[AnalysisException] { + sql("SELECT time_bucket(INTERVAL '-15' MINUTE, TIME'12:34:56')").collect() + }, + condition = "DATATYPE_MISMATCH.VALUE_OUT_OF_RANGE", + parameters = Map( + "sqlExpr" -> "\"time_bucket(INTERVAL '-15' MINUTE, TIME '12:34:56')\"", + "exprName" -> "bucket_width", + "valueRange" -> "(0, 9223372036854775807]", + "currentValue" -> "-900000000L" + ), + context = ExpectedContext( + fragment = "time_bucket(INTERVAL '-15' MINUTE, TIME'12:34:56')", + start = 7, + stop = 56 + ) + ) + } + + test("time_bucket DataFrame API") { + val df = Seq( + (1, "09:37:22"), + (2, "14:47:00"), + (3, "10:05:30") + ).toDF("id", "t") + .withColumn("t", to_time(col("t"))) + + // Using expr for interval + val result = df.withColumn( + "bucket", + time_bucket(expr("INTERVAL '30' MINUTE"), col("t")) + ) + + checkAnswer( + result.select("bucket"), + Row(LocalTime.of(9, 30, 0)) :: + Row(LocalTime.of(14, 30, 0)) :: + Row(LocalTime.of(10, 0, 0)) :: Nil) + } + test("SPARK-52883: to_time function without format") { // Input data for the function. val schema = StructType(Seq( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationExpressionWalkerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationExpressionWalkerSuite.scala index 71961988b2f3..2701aab0ef29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationExpressionWalkerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/collation/CollationExpressionWalkerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.collation import java.sql.Timestamp +import java.time.Duration import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder @@ -98,6 +99,7 @@ class CollationExpressionWalkerSuite extends SparkFunSuite with SharedSparkSessi inputType match { // TODO: Try to make this a bit more random. case AnyTimestampType => Literal(Timestamp.valueOf("2009-07-30 12:58:59")) + case AnyTimeType => Literal(java.time.LocalTime.of(12, 30, 0)) case BinaryType => collationType match { case Utf8Binary => Literal.create("dummy string".getBytes) @@ -107,6 +109,7 @@ class CollationExpressionWalkerSuite extends SparkFunSuite with SharedSparkSessi case BooleanType => Literal(true) case ByteType => Literal(5.toByte) case _: DatetimeType => Literal(Timestamp.valueOf("2009-07-30 12:58:59")) + case DayTimeIntervalType => Literal(Duration.ofMinutes(15)) case DecimalType => Literal((new Decimal).set(5)) case _: DecimalType => Literal((new Decimal).set(5)) case _: DoubleType => Literal(5.0) @@ -179,6 +182,7 @@ class CollationExpressionWalkerSuite extends SparkFunSuite with SharedSparkSessi inputType match { // TODO: Try to make this a bit more random. case AnyTimestampType => "TIMESTAMP'2009-07-30 12:58:59'" + case AnyTimeType => "TIME'12:30:00'" case BinaryType => collationType match { case Utf8Binary => "Cast('dummy string' collate utf8_binary as BINARY)" @@ -187,6 +191,7 @@ class CollationExpressionWalkerSuite extends SparkFunSuite with SharedSparkSessi case BooleanType => "True" case ByteType => "cast(5 as tinyint)" case _: DatetimeType => "date'2016-04-08'" + case DayTimeIntervalType => "CAST(INTERVAL '15' MINUTE AS INTERVAL DAY TO SECOND)" case DecimalType => "5.0" case _: DecimalType => "5.0" case _: DoubleType => "5.0" @@ -244,10 +249,12 @@ class CollationExpressionWalkerSuite extends SparkFunSuite with SharedSparkSessi collationType: CollationType): String = inputType match { case AnyTimestampType => "TIMESTAMP" + case AnyTimeType => "TIME" case BinaryType => "BINARY" case BooleanType => "BOOLEAN" case ByteType => "TINYINT" case _: DatetimeType => "DATE" + case DayTimeIntervalType => "INTERVAL DAY TO SECOND" case DecimalType => "DECIMAL(2, 1)" case _: DecimalType => "DECIMAL(2, 1)" case _: DoubleType => "DOUBLE"