In [0]:
import pandas as pd
import uuid
import random
from typing import Iterator
from pyspark.sql.types import *
from pyspark.sql.streaming import StatefulProcessor
from pyspark.sql.functions import current_timestamp, current_date, lit

class OrderEventCollector(StatefulProcessor):
    def init(self, handle):
        # Define the schema for storing the two events
        event_schema = StructType([
            StructField("event_type", StringType()),
            StructField("ts", TimestampType()),
            StructField("body", StringType()),
            StructField("order_id", StringType()),
            StructField("location", StringType()),
        ])
        self.events_state = handle.getListState("events", event_schema)  

    def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
        # Accumulate incoming events
        for pdf in rows:
            for _, row in pdf.iterrows():
                self.events_state.appendList([(row["event_type"], row["ts"], row["body"], row["order_id"], row["location"])])

        # Retrieve all stored events for this order_id 
        events = list(self.events_state.get())
        event_types = [e[0] for e in events]

        # events are always in order so we can just check for delivered event
        if "delivered" in event_types:
            # Generate a unique event_id
            event_id = str(uuid.uuid4())
            # Construct the output row
            output = pd.DataFrame([{
                "order_id": events[0][3], # grab from one of the events
                "location": events[0][4], # grab from one of the events
                "events": events,
            }])
            # Clear the state for this order_id
            self.events_state.clear()
            
            yield output

    def close(self) -> None:
        pass

In [0]:
output_schema = StructType([
    StructField("order_id", StringType()),
    StructField("location", StringType()),
    StructField("events", ArrayType(StructType([
        StructField("event_type", StringType()),
        StructField("ts", TimestampType()),
        StructField("body", StringType()),
    ])))
])

In [0]:
from pyspark.sql.functions import col

# Read the streaming data, filtering for relevant event types
events_stream = (
    spark.readStream.table("gk_demo.lakeflow.all_events")
        .select(
            col("event_type"),
            col("ts").cast("timestamp").alias("ts"),
            col("body"),
            col("order_id"),
            col("location")
        )
)

# Apply the stateful transformation
processed_stream = (
    events_stream
    .groupBy("order_id")
    .transformWithStateInPandas(
        statefulProcessor=OrderEventCollector(),
        outputStructType=output_schema,
        outputMode="Append",
        timeMode="None"
    )
)

In [0]:
(
  processed_stream.writeStream
    .option("checkpointLocation", f"/Volumes/gk_demo/default/checkpoints/orders_in_progress")
    .trigger(availableNow=True)
    .toTable("gk_demo.default.completed_orders")
)