Skip to content

Conversation

@vinodkc
Copy link
Contributor

@vinodkc vinodkc commented Nov 26, 2025

What changes were proposed in this pull request?

This PR adds a new time_bucket() SQL function that buckets TIME values into fixed-width intervals, returning the start time of each bucket. This enables histogram generation and time-of-day pattern analysis for TIME columns.

Why are the changes needed?

The TIME type currently lacks a bucketing function for aggregation and analysis. Users cannot easily group TIME values by arbitrary intervals (e.g., 15-minute or 1-hour buckets) without complex manual calculations.

Current Gap:

Existing functions don't support TIME bucketing:

  • window(): Only works with TIMESTAMP, not TIME. Returns a struct, not a scalar.
  • date_trunc(): Doesn't support TIME type
  • time_trunc(): Only supports fixed calendar units (HOUR, MINUTE), not arbitrary intervals like "15 minutes" or "90 minutes"

Current workarounds are error-prone, hard to maintain:

-- Manual calculation (error-prone, hard to maintain)
SELECT TIME(FLOOR(TIME_TO_SECONDS(event_time) / 900) * 900) as bucket FROM events;

Use Cases:

This function addresses common real-world analytics needs:

  1. Retail Analytics: Analyze customer traffic by 30-minute slots to optimize staffing
  2. Healthcare: Group appointments by 15-minute intervals for scheduling optimization
  3. Manufacturing: Aggregate sensor readings by hourly buckets to detect production patterns
  4. DevOps: Bucket system events by 5-minute intervals for performance monitoring
  5. Business Intelligence: Create time-of-day histograms for reporting

Industry Precedent:

  • SQL Server 2022: DATE_BUCKET() supports TIME type bucketing
  • TimescaleDB: time_bucket() is one of their most popular functions for time-series analytics
  • This fills a critical gap in Spark's TIME type functionality and brings it on par with leading databases

Does this PR introduce any user-facing change?

Yes. This PR adds a new SQL function time_bucket() available in SQL, Scala, Python, and Spark Connect.

Function Signature

time_bucket(bucket_width, time) -> TIME

Parameters:

  • bucket_width: A day-time interval expression (e.g., INTERVAL '15' MINUTE)
  • time: A TIME value to bucket

Behavior:

  • Returns the start of the time bucket containing the input time
  • Buckets are aligned to midnight (00:00:00)
  • Buckets cannot span across midnight
  • Returns the same precision as the input TIME type
  • Returns NULL if either input is NULL

Examples

Example 1: Basic Bucketing

-- 15-minute buckets
SELECT time_bucket(INTERVAL '15' MINUTE, TIME'09:37:22');
-- Result: 09:30:00

-- 30-minute buckets
SELECT time_bucket(INTERVAL '30' MINUTE, TIME'14:47:00');
-- Result: 14:30:00

-- 1-hour buckets
SELECT time_bucket(INTERVAL '1' HOUR, TIME'16:35:00');
-- Result: 16:00:00

-- 2-hour buckets
SELECT time_bucket(INTERVAL '2' HOUR, TIME'15:20:00');
-- Result: 14:00:00

Example 2: Retail Analytics - Peak Shopping Hours

-- Find busiest 30-minute slots in a store
SELECT time_bucket(INTERVAL '30' MINUTE, purchase_time) AS time_slot,
       COUNT(*) AS customer_count,
       SUM(total_amount) AS revenue
FROM sales
WHERE date = '2024-01-15'
GROUP BY time_slot
ORDER BY customer_count DESC
LIMIT 10;

-- Sample Output:
-- +----------+---------------+---------+
-- |time_slot |customer_count |revenue  |
-- +----------+---------------+---------+
-- |14:00:00  |           245 | 12450.50|
-- |14:30:00  |           231 | 11890.25|
-- |12:00:00  |           198 |  9875.00|
-- +----------+---------------+---------+

Example 3: Healthcare - Appointment Scheduling

-- Analyze appointment distribution by 15-minute slots
SELECT time_bucket(INTERVAL '15' MINUTE, appointment_time) AS slot,
       COUNT(*) AS appointments,
       AVG(duration_minutes) AS avg_duration,
       SUM(CASE WHEN status = 'no_show' THEN 1 ELSE 0 END) AS no_shows
FROM appointments
WHERE appointment_date BETWEEN '2024-01-01' AND '2024-01-31'
GROUP BY slot
ORDER BY slot;

-- Sample Output:
-- +----------+-------------+-------------+---------+
-- |slot      |appointments |avg_duration |no_shows |
-- +----------+-------------+-------------+---------+
-- |08:00:00  |          45 |        22.3 |       2 |
-- |08:15:00  |          48 |        24.1 |       3 |
-- |08:30:00  |          52 |        21.8 |       1 |
-- +----------+-------------+-------------+---------+

Example 4: Edge Cases

-- Midnight (start of day)
SELECT time_bucket(INTERVAL '1' HOUR, TIME'00:00:00');
-- Result: 00:00:00

-- Just before midnight
SELECT time_bucket(INTERVAL '1' HOUR, TIME'23:59:59.999999');
-- Result: 23:00:00

-- Microsecond precision
SELECT time_bucket(INTERVAL '1' MICROSECOND, TIME'12:34:56.123456');
-- Result: 12:34:56.123456

-- Millisecond buckets
SELECT time_bucket(INTERVAL '100' MILLISECOND, TIME'12:34:56.789123');
-- Result: 12:34:56.700000

-- Null handling
SELECT time_bucket(INTERVAL '15' MINUTE, NULL);
-- Result: NULL

SELECT time_bucket(NULL, TIME'12:34:56');
-- Result: NULL

Scala API

import org.apache.spark.sql.functions._
import java.time.LocalTime

val events = Seq(
  (1, LocalTime.of(9, 5, 30), 45, 150.0),
  (2, LocalTime.of(9, 37, 45), 67, 175.0),
  (3, LocalTime.of(10, 12, 0), 28, 225.0)
).toDF("event_id", "event_time", "duration", "value")

events.createOrReplaceTempView("events")
val df = spark.table("events")

// Test Example 1
df.groupBy(time_bucket(expr("INTERVAL '15' MINUTE"), col("event_time")).as("bucket"))
  .agg(count("*").as("count"))
  .orderBy("bucket")
  .show()

// Test Example 2
df.groupBy(time_bucket("30 minutes", col("event_time")).as("bucket"))
  .count()
  .show()

// Test Example 3
df.groupBy(time_bucket("1 hour", col("event_time")).as("hour"))
  .agg(
    count("*").as("total_events"),
    avg("duration").as("avg_duration"),
    max("value").as("max_value")
  )
  .show()

Python API

from pyspark.sql import functions as F

# Example 1: Basic bucketing
df = spark.table("events")
df.groupBy(F.time_bucket(F.expr("INTERVAL '15' MINUTE"), "event_time").alias("bucket")) \
  .count() \
  .show()

# Example 2: Histogram generation
df.groupBy(F.time_bucket(F.expr("INTERVAL '30' MINUTE"), "event_time").alias("slot")) \
  .agg(
      F.count("*").alias("count"),
      F.avg("value").alias("avg_value"),
      F.stddev("value").alias("stddev_value")
  ) \
  .orderBy("slot") \
  .show()

# Example 3: Peak detection
peak_hours = df.groupBy(
    F.time_bucket(F.expr("INTERVAL '1' HOUR"), "purchase_time").alias("hour")
).agg(
    F.sum("amount").alias("revenue")
).filter(
    F.col("revenue") > 10000
).orderBy(F.desc("revenue"))

peak_hours.show()

How was this patch tested?

Added tests in TimeFunctionsSuiteBase and sql-tests/inputs/time.sql

Was this patch authored or co-authored using generative AI tooling?

No

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant