Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/pyspark/sql/connect/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3664,6 +3664,13 @@ def time_trunc(unit: "ColumnOrName", time: "ColumnOrName") -> Column:
time_trunc.__doc__ = pysparkfuncs.time_trunc.__doc__


def time_bucket(bucket_width: "ColumnOrName", time: "ColumnOrName") -> Column:
return _invoke_function_over_columns("time_bucket", bucket_width, time)


time_bucket.__doc__ = pysparkfuncs.time_bucket.__doc__


def timestamp_millis(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("timestamp_millis", col)

Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
"timestamp_micros",
"timestamp_millis",
"timestamp_seconds",
"time_bucket",
"time_diff",
"time_from_micros",
"time_from_millis",
Expand Down
73 changes: 73 additions & 0 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12920,6 +12920,79 @@ def time_trunc(unit: "ColumnOrName", time: "ColumnOrName") -> Column:
return _invoke_function_over_columns("time_trunc", unit, time)


@_try_remote_functions
def time_bucket(bucket_width: "ColumnOrName", time: "ColumnOrName") -> Column:
"""
Returns the start of the time bucket containing the input time value.
Buckets are aligned to midnight (00:00:00).

.. versionadded:: 4.2.0

Parameters
----------
bucket_width : :class:`~pyspark.sql.Column` or column name
A day-time interval specifying the width of each bucket.
Can be specified as a column expression like ``sf.expr("INTERVAL '15' MINUTE")``.
time : :class:`~pyspark.sql.Column` or column name
The time value to bucket. Must be of TIME type.

Returns
-------
:class:`~pyspark.sql.Column`
The start time of the bucket containing the input time.

See Also
--------
:meth:`pyspark.sql.functions.time_trunc`
:meth:`pyspark.sql.functions.date_trunc`

Examples
--------
Example 1: Basic 15-minute bucketing

>>> from pyspark.sql import functions as sf
>>> df = spark.createDataFrame([
... (1, '09:37:22'),
... (2, '14:47:00'),
... (3, '10:05:30')
... ], ['id', 't'])
>>> df = df.withColumn("t", sf.to_time(df.t))
>>> df.select(sf.time_bucket(sf.expr("INTERVAL '15' MINUTE"), df.t).alias("bucket")).show()
+--------+
| bucket|
+--------+
|09:30:00|
|14:45:00|
|10:00:00|
+--------+

Example 2: Hourly bucketing

>>> df.select(sf.time_bucket(sf.expr("INTERVAL '1' HOUR"), df.t).alias("bucket")).show()
+--------+
| bucket|
+--------+
|09:00:00|
|14:00:00|
|10:00:00|
+--------+

Example 3: Aggregation with time buckets

>>> df.groupBy(
... sf.time_bucket(sf.expr("INTERVAL '30' MINUTE"), df.t).alias("time_slot")
... ).count().orderBy("time_slot").show()
+---------+-----+
|time_slot|count|
+---------+-----+
| 09:30:00| 1|
| 10:00:00| 1|
| 14:30:00| 1|
+---------+-----+
"""
return _invoke_function_over_columns("time_bucket", bucket_width, time)


@_try_remote_functions
def timestamp_millis(col: "ColumnOrName") -> Column:
"""
Expand Down
30 changes: 30 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6910,6 +6910,36 @@ object functions {
Column.fn("time_trunc", unit, time)
}

/**
* Returns the start of the time bucket containing the input time value. Buckets are aligned to
* midnight (00:00:00).
*
* @param bucketWidth
* A Column representing a day-time interval (e.g., INTERVAL '15' MINUTE)
* @param time
* The time column to bucket
*
* @group datetime_funcs
* @since 4.2.0
*/
def time_bucket(bucketWidth: Column, time: Column): Column =
Column.fn("time_bucket", bucketWidth, time)

/**
* Returns the start of the time bucket containing the input time value. Buckets are aligned to
* midnight (00:00:00).
*
* @param bucketWidth
* A string representing a day-time interval (e.g., "15 minutes")
* @param time
* The time column to bucket
*
* @group datetime_funcs
* @since 4.2.0
*/
def time_bucket(bucketWidth: String, time: Column): Column =
time_bucket(expr(s"INTERVAL '$bucketWidth'"), time)

/**
* Creates a TIME from the number of seconds since midnight.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ object FunctionRegistry {
expression[MakeDate]("make_date"),
expression[MakeTime]("make_time"),
expression[TimeTrunc]("time_trunc"),
expression[TimeBucket]("time_bucket", setAlias = true),
expression[TimeFromSeconds]("time_from_seconds"),
expression[TimeFromMillis]("time_from_millis"),
expression[TimeFromMicros]("time_from_micros"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,111 @@ case class TimeTrunc(unit: Expression, time: Expression)
}
}

// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """
_FUNC_(bucket_width, time) - Returns the start of the time bucket containing `time`,
where buckets are aligned to midnight (00:00:00).
""",
arguments = """
Arguments:
* bucket_width - A day-time interval specifying the width of each bucket (e.g., INTERVAL '15' MINUTE).
* time - The time value to bucket. Must be of TIME type.
""",
examples = """
Examples:
> SELECT _FUNC_(INTERVAL '15' MINUTE, TIME'09:37:22');
09:30:00
> SELECT _FUNC_(INTERVAL '30' MINUTE, TIME'14:47:00');
14:30:00
> SELECT _FUNC_(INTERVAL '1' HOUR, TIME'16:35:00');
16:00:00
> SELECT _FUNC_(INTERVAL '2' HOUR, TIME'15:20:00');
14:00:00
""",
note = """
Notes:
* Buckets are aligned to midnight (00:00:00) and cannot span across midnight.
* The bucket width must be a positive day-time interval.
* The function returns the start time of the bucket (inclusive lower bound).
""",
group = "datetime_funcs",
since = "4.2.0")
// scalastyle:on line.size.limit
case class TimeBucket(
bucketWidth: Expression,
time: Expression)
extends BinaryExpression with ImplicitCastInputTypes {

override def left: Expression = bucketWidth
override def right: Expression = time

override def inputTypes: Seq[AbstractDataType] =
Seq(DayTimeIntervalType, AnyTimeType)

override def dataType: DataType = time.dataType

override def nullable: Boolean = time.nullable || bucketWidth.nullable

override def nullIntolerant: Boolean = true

override def checkInputDataTypes(): TypeCheckResult = {
super.checkInputDataTypes() match {
case TypeCheckSuccess =>
if (!bucketWidth.foldable) {
DataTypeMismatch(
errorSubClass = "NON_FOLDABLE_INPUT",
messageParameters = Map(
"inputName" -> toSQLId("bucket_width"),
"inputType" -> toSQLType(bucketWidth.dataType),
"inputExpr" -> toSQLExpr(bucketWidth)
)
)
} else {
val widthValue = bucketWidth.eval()
if (widthValue != null) {
val widthMicros = widthValue.asInstanceOf[Long]
if (widthMicros <= 0) {
DataTypeMismatch(
errorSubClass = "VALUE_OUT_OF_RANGE",
messageParameters = Map(
"exprName" -> "bucket_width",
"valueRange" -> s"(0, ${Long.MaxValue}]",
"currentValue" -> toSQLValue(widthMicros, LongType)
)
)
} else {
TypeCheckSuccess
}
} else {
TypeCheckSuccess
}
}
case failure => failure
}
}

override def nullSafeEval(bucketWidthValue: Any, timeValue: Any): Any = {
val bucketMicros = bucketWidthValue.asInstanceOf[Long]
val timeNanos = timeValue.asInstanceOf[Long]
val bucketNanos = bucketMicros * NANOS_PER_MICROS
(timeNanos / bucketNanos) * bucketNanos
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
defineCodeGen(ctx, ev, (bucket, time) => {
val scaleFactor = s"${NANOS_PER_MICROS}L"
s"($time / ($bucket * $scaleFactor)) * ($bucket * $scaleFactor)"
})
}

override def prettyName: String = "time_bucket"

override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): TimeBucket =
copy(bucketWidth = newLeft, time = newRight)
}

abstract class IntegralToTimeBase
extends UnaryExpression with ExpectsInputTypes with CodegenFallback {
protected def upScaleFactor: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@
| org.apache.spark.sql.catalyst.expressions.ThetaIntersection | theta_intersection | SELECT theta_sketch_estimate(theta_intersection(theta_sketch_agg(col1), theta_sketch_agg(col2))) FROM VALUES (5, 4), (1, 4), (2, 5), (2, 5), (3, 1) tab(col1, col2) | struct<theta_sketch_estimate(theta_intersection(theta_sketch_agg(col1, 12), theta_sketch_agg(col2, 12))):bigint> |
| org.apache.spark.sql.catalyst.expressions.ThetaSketchEstimate | theta_sketch_estimate | SELECT theta_sketch_estimate(theta_sketch_agg(col)) FROM VALUES (1), (1), (2), (2), (3) tab(col) | struct<theta_sketch_estimate(theta_sketch_agg(col, 12)):bigint> |
| org.apache.spark.sql.catalyst.expressions.ThetaUnion | theta_union | SELECT theta_sketch_estimate(theta_union(theta_sketch_agg(col1), theta_sketch_agg(col2))) FROM VALUES (1, 4), (1, 4), (2, 5), (2, 5), (3, 6) tab(col1, col2) | struct<theta_sketch_estimate(theta_union(theta_sketch_agg(col1, 12), theta_sketch_agg(col2, 12), 12)):bigint> |
| org.apache.spark.sql.catalyst.expressions.TimeBucket | time_bucket | SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22') | struct<time_bucket(INTERVAL '15' MINUTE, TIME '09:37:22'):time(6)> |
| org.apache.spark.sql.catalyst.expressions.TimeDiff | time_diff | SELECT time_diff('HOUR', TIME'20:30:29', TIME'21:30:28') | struct<time_diff(HOUR, TIME '20:30:29', TIME '21:30:28'):bigint> |
| org.apache.spark.sql.catalyst.expressions.TimeFromMicros | time_from_micros | SELECT time_from_micros(0) | struct<time_from_micros(0):time(6)> |
| org.apache.spark.sql.catalyst.expressions.TimeFromMillis | time_from_millis | SELECT time_from_millis(0) | struct<time_from_millis(0):time(6)> |
Expand Down
Loading