### Event Time vs Processing Time
Think of it like **_mail delivery_**:
**Event Time** = When the letter was written (_the actual event timestamp_)
**Processing Time** = When the post office receives and processes the letter (_when Spark handles it_)

### **What Are Windows?**

Windows divide infinite streaming data into **finite chunks** so we can perform aggregations.

Think of it like a **conveyor belt in a factory**:
- Products (data) keep coming infinitely
- Windows are like **bins** that collect products for 5 minutes, then send them for counting/packaging

### **Common Window Types**

1. **Tumbling Window**: Fixed-size, non-overlapping windows
   - Every 5 minutes: [0-5], [5-10], [10-15]...

2. **Sliding Window**: Fixed-size, overlapping windows
   - Every 5 minutes, slide by 2 minutes: [0-5], [2-7], [4-9]...

3. **Session Window**: Groups events by activity gaps
   - User clicks: if gap > 10 mins, new session

In [0]:
from pyspark.sql.functions import *

In [0]:
rate_df = (
    spark.readStream
    .format('rate')
    .option('rowsPerSecond', 1)
    .load()
)

order_df = (
    rate_df
    .select(
        col("value").alias('order_id'),
        when(col("value") % 3 == 0, "Latte")
        .when(col("value") % 3 == 1, "Espresso")
        .otherwise("Cappuccino").alias("product")
        , col("timestamp").alias("event_time")
    )
)

order_df.printSchema()


root
 |-- order_id: long (nullable = true)
 |-- product: string (nullable = false)
 |-- event_time: timestamp (nullable = true)



In [0]:
windows_counts = (
    order_df
    .withWatermark("event_time", "5 seconds")
    .groupBy(
        window(col("event_time"), "10 seconds"), col("product")
    )
    .count()
    .select(
        col("window.start").alias("window_start")
        ,col("window.end").alias("window_end")
        ,col("product")
        ,col("count")
    )
)

In [0]:
query = (
    windows_counts
    .writeStream
    .queryName('in_memory')
    .format("memory")
    .outputMode('update')
    .start()
)

In [0]:
%sql
select * from in_memory order by window_start

window_start,window_end,product,count
2026-02-22T15:14:30Z,2026-02-22T15:14:40Z,Cappuccino,1
2026-02-22T15:14:30Z,2026-02-22T15:14:40Z,Latte,2
2026-02-22T15:14:30Z,2026-02-22T15:14:40Z,Espresso,2
2026-02-22T15:14:40Z,2026-02-22T15:14:50Z,Espresso,3
2026-02-22T15:14:40Z,2026-02-22T15:14:50Z,Latte,3
2026-02-22T15:14:40Z,2026-02-22T15:14:50Z,Cappuccino,4
2026-02-22T15:14:50Z,2026-02-22T15:15:00Z,Cappuccino,2
2026-02-22T15:14:50Z,2026-02-22T15:15:00Z,Espresso,2
2026-02-22T15:14:50Z,2026-02-22T15:15:00Z,Espresso,3
2026-02-22T15:14:50Z,2026-02-22T15:15:00Z,Latte,2


In [0]:
for stream in spark.streams.active:
    print(stream.name)
    stream.stop()

in_memory


### Summary: 

✅ **Event Time vs Processing Time**: Event time = when data happened, Processing time = when Spark receives it  
✅ **Window Functions**: Divide infinite streams into finite chunks for aggregation  
✅ **Window Assignment**: Based on event time, not processing time  
✅ **Rate Format**: Easy way to generate test streaming data  
✅ **Capturing All Times**: Can track event time, processing time, and window time in output