# Polars Notebook — Unsolved

CheatSheet: https://franzdiebold.github.io/polars-cheat-sheet/Polars_cheat_sheet.pdf

## 0. Setup — Working at Scale

We are now working with **larger datasets**.

Goal:
- Understand lazy execution
- Optimize query plans
- Work with joins, windows and multi-step pipelines

We generate:
- `products`: small dimension table
- `sales`: large fact table

Data is persisted to Parquet to enable **scan_parquet**.


In [None]:
import polars as pl
import polars.selectors as cs
from datetime import datetime

In [None]:
# ----------------------------
# Dataset sizes
# ----------------------------
N_SALES = 300_000
N_PRODUCTS = 2_000

cities = ["Valencia", "Madrid", "Sevilla", "Bilbao", "Barcelona"]
categories = ["Electronics", "Fashion", "Home", "Sports", "Toys"]


In [None]:
# ----------------------------
# Products dimension
# ----------------------------
products = pl.DataFrame({
    "product_id": pl.int_range(1, N_PRODUCTS + 1, eager=True),
    "category": [categories[i % len(categories)] for i in range(N_PRODUCTS)],
    "is_discontinued": (pl.int_range(1, N_PRODUCTS + 1, eager=True) % 40 == 0),
})

products.write_parquet("products.parquet")

In [None]:
print("Products — sample")
products.head(10)

Products — sample


product_id,category,is_discontinued
i64,str,bool
1,"""Electronics""",False
2,"""Fashion""",False
3,"""Home""",False
4,"""Sports""",False
5,"""Toys""",False
6,"""Electronics""",False
7,"""Fashion""",False
8,"""Home""",False
9,"""Sports""",False
10,"""Toys""",False


In [None]:
# ----------------------------
# Sales fact table
# ----------------------------
sales = pl.DataFrame({
    "sale_id": pl.int_range(1, N_SALES + 1, eager=True),
    "product_id": (pl.int_range(1, N_SALES + 1, eager=True) * 37 % (N_PRODUCTS + 200)) + 1,
    "user_id": (pl.int_range(1, N_SALES + 1, eager=True) * 13) % 50_000,
    "units": (pl.int_range(1, N_SALES + 1, eager=True) * 7 % 5) + 1,
    "city": [cities[i % len(cities)] for i in range(N_SALES)],
    "has_discount": (pl.int_range(1, N_SALES + 1, eager=True) % 10 == 0),
    "ts": pl.datetime_range(
        datetime(2025, 1, 1),
        datetime(2025, 3, 31),
        interval="1s",
        eager=True,
    )[:N_SALES],
})

sales = sales.with_columns(
    (pl.col("units") * ((pl.col("sale_id") % 200) + 5)).alias("gross_value")
)

sales.write_parquet("sales.parquet")


In [None]:
print("Sales — sample")
sales.head(10)


Sales — sample


sale_id,product_id,user_id,units,city,has_discount,ts,gross_value
i64,i64,i64,i64,str,bool,datetime[μs],i64
1,38,13,3,"""Valencia""",False,2025-01-01 00:00:00,18
2,75,26,5,"""Madrid""",False,2025-01-01 00:00:01,35
3,112,39,2,"""Sevilla""",False,2025-01-01 00:00:02,16
4,149,52,4,"""Bilbao""",False,2025-01-01 00:00:03,36
5,186,65,1,"""Barcelona""",False,2025-01-01 00:00:04,10
6,223,78,3,"""Valencia""",False,2025-01-01 00:00:05,33
7,260,91,5,"""Madrid""",False,2025-01-01 00:00:06,60
8,297,104,2,"""Sevilla""",False,2025-01-01 00:00:07,26
9,334,117,4,"""Bilbao""",False,2025-01-01 00:00:08,56
10,371,130,1,"""Barcelona""",True,2025-01-01 00:00:09,15


## 1. Lazy Pipeline Optimization

Build a lazy pipeline that:
- filters data
- creates derived columns
- aggregates results
- sorts output

Then inspect the execution plan with `explain()`.


In [None]:
# TODO:
# 1) scan sales.parquet lazily
# 2) filter rows with high gross_value (> 1000)
# 3) create net_value applying a 10% discount when needed
# 4) group by city
# 5) aggregate:
#    - number of orders
#    - total revenue
# 6) sort by revenue descending
# 7) inspect the execution plan

lazy_sales = pl.scan_parquet("sales.parquet")

pipeline = (
    lazy_sales
    # TODO: filter by gross_value
    # TODO: create net_value column
    # TODO: group by city
    # TODO: aggregate orders and revenue
    # TODO: sort by revenue
)

# TODO: inspect execution plan
# TODO: execute pipeline


SORT BY [col("revenue")]
  AGGREGATE[maintain_order: false]
    [len().alias("orders"), col("net_value").sum().alias("revenue")] BY [col("city")]
    FROM
     WITH_COLUMNS:
     [when(col("has_discount")).then([(col("gross_value").cast(Unknown(Float))) * (dyn float: 0.9)]).otherwise(col("gross_value").strict_cast(Unknown(Float))).alias("net_value")] 
      Parquet SCAN [sales.parquet] [id: 137483413454384]
      PROJECT 3/8 COLUMNS
      SELECTION: [(col("gross_value")) > (1000)]


city,orders,revenue
str,u32,f64
"""Madrid""",1500,1515000.0


## 2. Advanced Joins

Join sales with products.

Goal:
- enrich data
- detect invalid product references


In [None]:
# TODO:
# - scan sales
# - scan products
# - inner join
# - anti join (invalid product_id)


In [None]:
# TODO:
# - scan sales.parquet lazily
# - scan products.parquet lazily
# - perform an inner join to enrich sales
# - perform an anti join to detect invalid product references
# - count rows in both results

sales_l = pl.scan_parquet("sales.parquet")
products_l = pl.scan_parquet("products.parquet")

# TODO: inner join
# TODO: anti join

# TODO: collect row counts for both joins

(shape: (1, 1)
 ┌────────┐
 │ len    │
 │ ---    │
 │ u32    │
 ╞════════╡
 │ 272730 │
 └────────┘,
 shape: (1, 1)
 ┌───────┐
 │ len   │
 │ ---   │
 │ u32   │
 ╞═══════╡
 │ 27270 │
 └───────┘)

## 3. Window Functions

Compute metrics that depend on group context:
- running totals
- lag values


In [None]:
# TODO:
# - create net_value
# - sort by ts
# - running total per user
# - previous value per user


In [None]:
# TODO:
# - create net_value applying discount logic
# - sort rows by timestamp
# - compute running total per user
# - compute previous value per user

cleaned = (
    sales_l
    # TODO: create net_value
    # TODO: sort by timestamp
)

windows = cleaned.with_columns([
    # TODO: running total per user
    # TODO: lag (previous value) per user
])

windows.select(
    ["user_id", "net_value", "running_total", "prev_value"]
).head().collect()

user_id,net_value,running_total,prev_value
i64,f64,f64,f64
13,18.0,18.0,
26,35.0,35.0,
39,16.0,16.0,
52,36.0,36.0,
65,10.0,10.0,


## 4. String & Date Processing

Clean and extract features from text and timestamps.


In [None]:
# TODO:
# - normalize to lower case city
# - extract year and month


In [None]:
# TODO:
# - normalize city to lowercase
# - extract year from timestamp
# - extract month from timestamp

sales_l.with_columns([
    # TODO: normalized city
    # TODO: year
    # TODO: month
]).select(
    ["city", "city_norm", "year", "month"]
).head().collect()


city,city_norm,year,month
str,str,i32,i8
"""Valencia""","""valencia""",2025,1
"""Madrid""","""madrid""",2025,1
"""Sevilla""","""sevilla""",2025,1
"""Bilbao""","""bilbao""",2025,1
"""Barcelona""","""barcelona""",2025,1


## 5. End-to-End Pipeline

Build a single optimized pipeline that:
- cleans data
- joins products
- computes revenue
- aggregates KPIs


In [None]:
# TODO:
# - join sales with products
# - filter out discontinued products
# - compute net_value applying discount logic
# - group by category
# - aggregate:
#   - number of orders
#   - total revenue
# - sort results by revenue descending

final = (
    sales_l
    # TODO: join with products
    # TODO: filter discontinued products
    # TODO: create net_value
    # TODO: group by category
    # TODO: aggregate KPIs
    # TODO: sort by revenue
)

# TODO: execute pipeline


category,orders,revenue
str,u32,f64
"""Toys""",47726,24680685.0
"""Sports""",54546,23236576.0
"""Home""",54546,16936503.0
"""Fashion""",54546,11508916.0
"""Electronics""",54546,5318092.5


## Exercise 6 — Predicate & Projection Pushdown

Build a lazy pipeline that:

- reads sales data
- selects only the necessary columns
- filters rows as early as possible

Inspect the execution plan and identify:
- predicate pushdown
- projection pruning


In [None]:
# TODO:
# - scan sales.parquet lazily
# - select only: city, gross_value, has_discount
# - filter gross_value > 5000
# - inspect execution plan

pipeline = (
    pl.scan_parquet("sales.parquet")
    # TODO
)

print(pipeline.explain())
pipeline.collect()


## Exercise 7 — Join + Filter Pushdown

Join sales with products and filter out discontinued products.

Inspect the execution plan and verify:
- join reordering
- filter pushdown on the dimension table


In [None]:
# TODO:
# - join sales and products
# - filter discontinued products
# - inspect execution plan

pipeline = (
    sales_l
    # TODO
)

print(pipeline.explain())
pipeline.collect()


## Exercise 8 — Window Aggregation vs GroupBy

Compute total revenue per city using:

- a group_by aggregation
- a window function

Compare the results conceptually.


In [None]:
# TODO:
# - compute net_value
# - compute revenue per city using group_by
# - compute revenue per city using window functions

# group_by version
grouped = (
    sales_l
    # TODO
)

# window version
windowed = (
    sales_l
    # TODO
)

grouped.collect(), windowed.select(["city", "revenue"]).head().collect()


## Exercise 9 — Top-N per Group

For each city, return the top 3 users by total revenue.

Use window functions to rank users within each city.


In [None]:
# TODO:
# - compute net_value
# - aggregate revenue per city + user
# - rank users within each city
# - keep only top 3

pipeline = (
    sales_l
    # TODO
)

pipeline.collect()


## Exercise 10 — Multi-step Optimization Challenge

Build a fully optimized lazy pipeline that:

- joins sales and products
- filters discontinued products
- computes net_value
- aggregates revenue per category
- sorts results

Inspect the execution plan and explain:
- which optimizations are applied


In [None]:
# TODO:
# - full lazy pipeline
# - minimal intermediate steps
# - inspect execution plan

pipeline = (
    sales_l
    # TODO
)

print(pipeline.explain())
pipeline.collect()
