# Windowing in Apache Iceberg

In [16]:
from pyspark.sql import SparkSession
import os

In [17]:
spark = (
    SparkSession.builder
    .appName("Windowing in Apache Iceberg")
    .master("spark://spark:7077") 
    .getOrCreate()
)

First let's take a look at the timestamp when the orders arrived

In [18]:
spark.sql("""
SELECT 
    id AS order_id,
    timestamp AS event_time,
    customer_id
FROM ice.demo.orders
""").show(5)

+--------------------+-------------------+-----------+
|            order_id|         event_time|customer_id|
+--------------------+-------------------+-----------+
|36d8d5cf-b6c3-4c1...|2025-10-03 10:58:05|         12|
|36d8d5cf-b6c3-4c1...|2025-10-03 10:58:05|         12|
|61b24c9a-4f2e-4b3...|2025-10-04 18:03:17|          8|
|3f06bb49-b6cd-4a3...|2025-10-01 09:13:45|          3|
|52d2ce38-3fc1-4a4...|2025-10-02 14:25:30|          7|
+--------------------+-------------------+-----------+
only showing top 5 rows



## Fixed Windows

Fixed, non-overlapping intervals (e.g., every day, every week).

In [19]:
spark.sql("""
SELECT
    window.start AS window_start,
    window.end   AS window_end,
    ROUND(SUM(oi.quantity * p.price), 2) AS total_revenue
FROM ice.demo.order_items oi
JOIN ice.demo.orders o ON oi.order_id = o.id
JOIN ice.demo.products p ON oi.product_id = p.product_id
GROUP BY window(o.timestamp, '2 days')
ORDER BY window_start
""").show(truncate=False)

+-------------------+-------------------+-------------+
|window_start       |window_end         |total_revenue|
+-------------------+-------------------+-------------+
|2025-10-01 00:00:00|2025-10-03 00:00:00|2459.6       |
|2025-10-03 00:00:00|2025-10-05 00:00:00|2359.76      |
|2025-10-05 00:00:00|2025-10-07 00:00:00|1139.8       |
|2025-10-07 00:00:00|2025-10-09 00:00:00|2839.72      |
|2025-10-09 00:00:00|2025-10-11 00:00:00|3159.64      |
|2025-10-11 00:00:00|2025-10-13 00:00:00|1499.76      |
|2025-10-13 00:00:00|2025-10-15 00:00:00|2059.84      |
|2025-10-15 00:00:00|2025-10-17 00:00:00|1339.76      |
|2025-10-17 00:00:00|2025-10-19 00:00:00|799.84       |
+-------------------+-------------------+-------------+



## Hopping Windows

Overlapping windows, e.g., “every day, looking back 3 days”.

In [20]:
spark.sql("""
SELECT
    window.start AS window_start,
    window.end AS window_end,
    ROUND(SUM(oi.quantity * p.price), 2) AS revenue
FROM ice.demo.order_items oi
JOIN ice.demo.orders o ON oi.order_id = o.id
JOIN ice.demo.products p ON oi.product_id = p.product_id
GROUP BY window(o.timestamp, '3 days', '1 day')
ORDER BY window_start
""").show(truncate=False)

+-------------------+-------------------+-------+
|window_start       |window_end         |revenue|
+-------------------+-------------------+-------+
|2025-09-29 00:00:00|2025-10-02 00:00:00|519.88 |
|2025-09-30 00:00:00|2025-10-03 00:00:00|2459.6 |
|2025-10-01 00:00:00|2025-10-04 00:00:00|3859.48|
|2025-10-02 00:00:00|2025-10-05 00:00:00|4299.48|
|2025-10-03 00:00:00|2025-10-06 00:00:00|3099.68|
|2025-10-04 00:00:00|2025-10-07 00:00:00|2099.68|
|2025-10-05 00:00:00|2025-10-08 00:00:00|2919.72|
|2025-10-06 00:00:00|2025-10-09 00:00:00|3239.6 |
|2025-10-07 00:00:00|2025-10-10 00:00:00|3719.6 |
|2025-10-08 00:00:00|2025-10-11 00:00:00|4219.44|
|2025-10-09 00:00:00|2025-10-12 00:00:00|4079.52|
|2025-10-10 00:00:00|2025-10-13 00:00:00|3779.52|
|2025-10-11 00:00:00|2025-10-14 00:00:00|3239.68|
|2025-10-12 00:00:00|2025-10-15 00:00:00|2639.72|
|2025-10-13 00:00:00|2025-10-16 00:00:00|2899.72|
|2025-10-14 00:00:00|2025-10-17 00:00:00|1659.68|
|2025-10-15 00:00:00|2025-10-18 00:00:00|2139.6 |


## Session Windows

Dynamic, gap-based windows — when activity pauses longer than a threshold, the window closes. Group all order events from the same customer into sessions, separated by 12 hours of inactivity.

In [21]:
spark.sql("""
SELECT
    o.customer_id,
    session_window.start AS session_start,
    session_window.end AS session_end,
    ROUND(SUM(oi.quantity * p.price), 2) AS session_revenue
FROM ice.demo.orders o
JOIN ice.demo.order_items oi ON o.id = oi.order_id
JOIN ice.demo.products p ON oi.product_id = p.product_id
GROUP BY o.customer_id, session_window(o.timestamp, '12 hours')
ORDER BY o.customer_id, session_start
""").show(truncate=False)

+-----------+-------------------+-------------------+---------------+
|customer_id|session_start      |session_end        |session_revenue|
+-----------+-------------------+-------------------+---------------+
|1          |2025-10-10 11:55:08|2025-10-10 23:55:08|1679.84        |
|2          |2025-10-06 08:24:09|2025-10-06 20:24:09|399.88         |
|3          |2025-10-01 09:13:45|2025-10-01 21:13:45|519.88         |
|4          |2025-10-10 16:21:55|2025-10-11 04:21:55|599.92         |
|5          |2025-10-08 09:35:33|2025-10-08 21:35:33|719.88         |
|6          |2025-10-15 13:59:41|2025-10-16 01:59:41|839.88         |
|7          |2025-10-02 14:25:30|2025-10-03 02:25:30|819.84         |
|8          |2025-10-04 18:03:17|2025-10-05 06:03:17|959.88         |
|9          |2025-10-09 13:10:27|2025-10-10 01:10:27|879.88         |
|10         |2025-10-14 11:18:47|2025-10-14 23:18:47|319.92         |
|11         |2025-10-16 09:11:34|2025-10-16 21:11:34|499.88         |
|12         |2025-10

## `OVER`

`Over` provides some analysis based on rows, of data. `Over` is considered a grouping, in this case, a grouping of rows.

In [26]:
spark.sql("""
SELECT
    o.customer_id,
    o.timestamp,
    ROUND(SUM(oi.quantity * p.price)
          OVER (PARTITION BY o.customer_id ORDER BY o.timestamp
                ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 2) AS running_total
FROM ice.demo.orders o
JOIN ice.demo.order_items oi ON o.id = oi.order_id
JOIN ice.demo.products p ON oi.product_id = p.product_id
ORDER BY o.customer_id, o.timestamp
""").show(100, truncate=False)

+-----------+-------------------+-------------+
|customer_id|timestamp          |running_total|
+-----------+-------------------+-------------+
|1          |2025-10-10 11:55:08|299.99       |
|1          |2025-10-10 11:55:08|599.98       |
|1          |2025-10-10 11:55:08|719.95       |
|1          |2025-10-10 11:55:08|839.92       |
|1          |2025-10-10 11:55:08|1139.91      |
|1          |2025-10-10 11:55:08|1439.9       |
|1          |2025-10-10 11:55:08|1559.87      |
|1          |2025-10-10 11:55:08|1679.84      |
|2          |2025-10-06 08:24:09|69.98        |
|2          |2025-10-06 08:24:09|139.96       |
|2          |2025-10-06 08:24:09|169.95       |
|2          |2025-10-06 08:24:09|199.94       |
|2          |2025-10-06 08:24:09|269.92       |
|2          |2025-10-06 08:24:09|339.9        |
|2          |2025-10-06 08:24:09|369.89       |
|2          |2025-10-06 08:24:09|399.88       |
|3          |2025-10-01 09:13:45|79.98        |
|3          |2025-10-01 09:13:45|159.96 

### `RANK` and `WITH`

Using `RANK` to show top customers, we are also using `WITH` to define one query, and then using that query in another query. Here we are getting the top ranked order per customer.

In [33]:
spark.sql("""
WITH order_totals AS (
  SELECT
      o.customer_id,
      o.id AS order_id,
      SUM(oi.quantity * p.price) AS order_total
  FROM ice.demo.orders o
  JOIN ice.demo.order_items oi ON o.id = oi.order_id
  JOIN ice.demo.products p ON oi.product_id = p.product_id
  GROUP BY o.customer_id, o.id
),
ranked AS (
  SELECT
      customer_id,
      order_id,
      ROUND(order_total, 2) AS order_total,
      RANK() OVER (
        PARTITION BY customer_id
        ORDER BY order_total DESC
      ) AS rnk
  FROM order_totals
)
SELECT customer_id, order_id, order_total, rnk
FROM ranked
WHERE rnk <= 3
ORDER BY customer_id, rnk, order_total DESC
""").show(truncate=False)

+-----------+------------------------------------+-----------+---+
|customer_id|order_id                            |order_total|rnk|
+-----------+------------------------------------+-----------+---+
|1          |adbc34fb-f7d7-42ef-8e09-2c8658adca6f|1679.84    |1  |
|2          |2df2f3f4-c20c-47da-bd4f-7135cb7b8b9b|399.88     |1  |
|3          |3f06bb49-b6cd-4a3b-8f74-2e1f8ebf27a5|519.88     |1  |
|4          |9dc64a8e-cb1a-4b3b-a303-31bcd9e127f9|599.92     |1  |
|5          |32ed0e61-0287-4a68-8103-01fc23592c62|719.88     |1  |
|6          |ea8a2722-25b3-4b74-a02d-76c6ac960b8b|839.88     |1  |
|7          |52d2ce38-3fc1-4a41-bcf1-c82829c68729|819.84     |1  |
|8          |61b24c9a-4f2e-4b3a-bf53-236e305a6a80|959.88     |1  |
|9          |a6630e05-21b2-4b92-940f-9f03dcb4a8c1|879.88     |1  |
|10         |40a1e68b-2682-4b7e-88d9-71c6a004bf2b|319.92     |1  |
|11         |9df8ce5c-10aa-4f45-9f9c-f4a64dc17616|499.88     |1  |
|12         |36d8d5cf-b6c3-4c1a-a9b7-14dc9f8249b5|1399.88    |

## Generating New Insights

Which customers haven't returned in 90 days, this provides data to generate new events again perhaps to market to old customers who haven't returned.

In [34]:
spark.sql("""
WITH last_order AS (
  SELECT
      customer_id,
      MAX(timestamp) AS last_purchase
  FROM ice.demo.orders
  GROUP BY customer_id
)
SELECT
    COUNT(*) AS churned_customers
FROM last_order
WHERE last_purchase < date_sub(current_date(), 90);""").show()

+-----------------+
|churned_customers|
+-----------------+
|                0|
+-----------------+



In [36]:
spark.stop()