Skip to content

[Feature] Support Spark expression: window_time #3138

@andygrove

Description

@andygrove

What is the problem the feature request solves?

Note: This issue was generated with AI assistance. The specification details have been extracted from Spark documentation and may need verification.

Comet does not currently support the Spark window_time function, causing queries using this function to fall back to Spark's JVM execution instead of running natively on DataFusion.

WindowTime is a Spark Catalyst expression that extracts the timestamp from a window structure column created by windowing operations. This expression is used internally to access the time component of window aggregation results and is replaced during the analysis phase rather than being directly evaluated.

Supporting this expression would allow more Spark workloads to benefit from Comet's native acceleration.

Describe the potential solution

Spark Specification

Syntax:

window_time(window_column)
// DataFrame API (internal usage through analyzer transformation)
window_time(col("window_column"))

Arguments:

Argument Type Description
windowColumn StructType A window structure column containing timestamp information from windowing operations

Return Type: Returns the same data type as the first field of the input struct (typically TimestampType).

Supported Data Types:

  • Input: StructType (specifically window structures from windowing operations)

  • Output: The data type of the first field in the window struct (usually TimestampType)

Edge Cases:

  • Null handling: Behavior depends on the analyzer replacement logic

  • Invalid window structure: Will fail during analysis if input is not a proper window column

  • Non-struct input: Enforced by inputTypes validation requiring StructType

  • Resolution: Always returns false for resolved property, forcing analyzer transformation

Examples:

-- Example SQL usage (after windowing operation)
SELECT window_time(window), count(*) 
FROM (
  SELECT window(timestamp_col, '5 minutes') as window, *
  FROM events
  GROUP BY window(timestamp_col, '5 minutes')
)
// Example DataFrame API usage
import org.apache.spark.sql.functions._

val windowedDF = df
  .groupBy(window(col("timestamp"), "5 minutes"))
  .count()

// window_time would be applied internally by the analyzer
// when accessing window timestamp information

Implementation Approach

See the Comet guide on adding new expressions for detailed instructions.

  1. Scala Serde: Add expression handler in spark/src/main/scala/org/apache/comet/serde/
  2. Register: Add to appropriate map in QueryPlanSerde.scala
  3. Protobuf: Add message type in native/proto/src/proto/expr.proto if needed
  4. Rust: Implement in native/spark-expr/src/ (check if DataFusion has built-in support first)

Additional context

Difficulty: Medium
Spark Expression Class: org.apache.spark.sql.catalyst.expressions.WindowTime

Related:

  • Window aggregation functions
  • TimeWindow expression
  • Windowing operations in Spark SQL
  • StructType field extraction expressions

This issue was auto-generated from Spark reference documentation.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions