# Session 3: Lazy Evaluation and Query Optimization

This session covers Polars' most powerful feature: **lazy evaluation**. You'll learn how to write optimized queries that can handle massive datasets efficiently.

## Learning Objectives

By the end of this session, you will be able to:
1. Understand the difference between eager and lazy execution
2. Create and work with LazyFrames
3. Inspect and understand query plans
4. Apply query optimization techniques
5. Use streaming for large datasets
6. Benchmark Polars performance vs Pandas

In [None]:
import polars as pl
import pandas as pd
import time

print(f"Polars version: {pl.__version__}")

## 1. Eager vs Lazy Execution

### What is Eager Execution?

**Eager** execution means operations are performed immediately when called. This is how Pandas works, and it's the default mode for Polars DataFrames.

```python
# Each operation executes immediately
df = pl.read_csv("data.csv")  # Reads entire file now
df = df.filter(...)            # Filters all rows now
df = df.select(...)            # Selects columns now
```

### What is Lazy Execution?

**Lazy** execution means operations are recorded but not executed until you explicitly request the result. This allows Polars to optimize the entire query plan.

```python
# Operations are recorded, not executed
lf = pl.scan_csv("data.csv")   # Doesn't read file yet
lf = lf.filter(...)            # Adds filter to plan
lf = lf.select(...)            # Adds select to plan
df = lf.collect()              # NOW everything executes (optimized!)
```

### Benefits of Lazy Evaluation

1. **Automatic optimization**: Polars can reorder and combine operations
2. **Predicate pushdown**: Filters are pushed to the earliest possible point
3. **Projection pushdown**: Only needed columns are read from disk
4. **Memory efficiency**: Intermediate results aren't stored
5. **Parallelization**: Operations can be automatically parallelized

### Pandas Comparison: Execution Models

| Aspect | Pandas | Polars Eager | Polars Lazy |
|--------|--------|--------------|-------------|
| Execution | Immediate | Immediate | Deferred |
| Optimization | None | None | Automatic |
| Memory | Stores intermediates | Stores intermediates | Optimized |
| File reading | Full file at once | Full file at once | Only needed columns |

**Pandas equivalent of lazy evaluation**: Pandas doesn't have native lazy evaluation. The closest concept is using generators or Dask for deferred computation. Polars' lazy mode is a built-in feature that requires no additional libraries.

## 2. Creating LazyFrames

There are two ways to create a LazyFrame:

In [None]:
# Method 1: Scan a file (recommended for large files)
lf = pl.scan_csv("data/large_transactions.csv")
print(f"Type: {type(lf)}")
print(lf)

In [None]:
# Method 2: Convert existing DataFrame to lazy
df = pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
lf = df.lazy()
print(f"Type: {type(lf)}")

### Pandas Comparison: File Reading

| Polars Lazy | Polars Eager | Pandas |
|-------------|--------------|--------|
| `pl.scan_csv()` | `pl.read_csv()` | `pd.read_csv()` |
| `pl.scan_parquet()` | `pl.read_parquet()` | `pd.read_parquet()` |
| `pl.scan_ndjson()` | `pl.read_json()` | `pd.read_json()` |

**Key difference**: Pandas has no equivalent to `scan_*` functions. It always reads the entire file immediately. To achieve similar lazy behavior in Pandas, you would need external libraries like Dask (`dask.dataframe.read_csv()`).

### Key Differences: scan_* vs read_*

| Eager | Lazy | Description |
|-------|------|-------------|
| `pl.read_csv()` | `pl.scan_csv()` | CSV files |
| `pl.read_parquet()` | `pl.scan_parquet()` | Parquet files |
| `pl.read_json()` | `pl.scan_ndjson()` | JSON/NDJSON files |

The `scan_*` functions don't read the entire file - they just set up a plan to read it.

## 3. Executing Queries: `collect()`

The `collect()` method executes the lazy query and returns a DataFrame.

In [None]:
# Build a lazy query
lf = (
    pl.scan_csv("data/large_transactions.csv")
    .filter(pl.col("transaction_type") == "purchase")
    .filter(pl.col("amount") > 100)
    .select("transaction_id", "account_id", "amount", "merchant")
)

# At this point, no computation has happened!
print("Query built (not executed yet)")
print(f"Type: {type(lf)}")

In [None]:
# Execute the query
result = lf.collect()
print(f"Result shape: {result.shape}")
result.head()

## 4. Inspecting Query Plans: `explain()`

One of the most powerful features of lazy evaluation is the ability to see and understand the query plan.

In [None]:
# Build a query
lf = (
    pl.scan_csv("data/large_transactions.csv")
    .filter(pl.col("transaction_type") == "purchase")
    .filter(pl.col("amount") > 100)
    .group_by("merchant")
    .agg(
        pl.len().alias("count"),
        pl.col("amount").sum().alias("total")
    )
    .sort("total", descending=True)
)

# Show the optimized plan
print("OPTIMIZED QUERY PLAN:")
print(lf.explain())

### Pandas Comparison: Query Plans

| Feature | Pandas | Polars |
|---------|--------|--------|
| View query plan | Not available | `lf.explain()` |
| Optimize queries | Manual only | Automatic |
| Predicate pushdown | Not available | Automatic |
| Projection pushdown | Not available | Automatic |

**Why Pandas can't do this**: Pandas executes operations immediately, so there's no "plan" to inspect or optimize. Each operation runs as soon as it's called. Polars' lazy evaluation records operations first, then optimizes the entire chain before execution.

In [None]:
# Show the unoptimized (naive) plan
print("UNOPTIMIZED QUERY PLAN:")
print(lf.explain(optimized=False))

### Understanding Query Plans

Key elements in query plans:

- **Csv SCAN**: Reading from CSV file
- **SELECTION**: Filter predicates pushed down
- **PROJECT**: Columns being read/selected
- **AGGREGATE**: Groupby operations
- **SORT**: Sorting operations

Notice how the optimized plan pushes the filters down to the file scan level!

## 5. Query Optimizations

### 5.1 Predicate Pushdown

Filters are pushed as close to the data source as possible.

In [None]:
# Example: These two filters will be combined and pushed to the scan
lf = (
    pl.scan_csv("data/large_transactions.csv")
    .with_columns(pl.col("amount").abs().alias("abs_amount"))
    .filter(pl.col("transaction_type") == "purchase")  # Filter 1
    .select("transaction_id", "amount", "merchant")
    .filter(pl.col("amount") > 200)  # Filter 2
)

print(lf.explain())

### 5.2 Projection Pushdown

Only the columns you actually need are read from disk.

In [None]:
# Even though the file has many columns, only 3 will be read
lf = (
    pl.scan_csv("data/large_transactions.csv")
    .select("transaction_id", "amount", "merchant")
)

print(lf.explain())

### 5.3 Common Subexpression Elimination

Polars detects when the same computation is used multiple times and computes it only once.

In [None]:
# The mean is computed once, not twice
lf = (
    pl.scan_csv("data/large_transactions.csv")
    .filter(pl.col("amount") > 0)
    .select(
        pl.col("amount"),
        pl.col("amount").mean().alias("avg_amount"),
        (pl.col("amount") - pl.col("amount").mean()).alias("diff_from_avg")
    )
)

print(lf.explain())

## 6. Performance Benchmarking

Let's compare performance between Pandas, Polars Eager, and Polars Lazy.

In [None]:
def benchmark_pandas():
    """Benchmark Pandas operations."""
    df = pd.read_csv("data/large_transactions.csv")
    df = df[df["transaction_type"] == "purchase"]
    df = df[df["amount"] > 100]
    result = df.groupby("merchant")["amount"].agg(["count", "sum"])
    result = result.sort_values("sum", ascending=False)
    return result

def benchmark_polars_eager():
    """Benchmark Polars eager operations."""
    df = pl.read_csv("data/large_transactions.csv")
    df = df.filter(pl.col("transaction_type") == "purchase")
    df = df.filter(pl.col("amount") > 100)
    result = df.group_by("merchant").agg(
        pl.len().alias("count"),
        pl.col("amount").sum().alias("sum")
    ).sort("sum", descending=True)
    return result

def benchmark_polars_lazy():
    """Benchmark Polars lazy operations."""
    result = (
        pl.scan_csv("data/large_transactions.csv")
        .filter(pl.col("transaction_type") == "purchase")
        .filter(pl.col("amount") > 100)
        .group_by("merchant")
        .agg(
            pl.len().alias("count"),
            pl.col("amount").sum().alias("sum")
        )
        .sort("sum", descending=True)
        .collect()
    )
    return result

In [None]:
# Run benchmarks
print("Running benchmarks (3 iterations each)...\n")

# Pandas benchmark
times_pandas = []
for _ in range(3):
    start = time.time()
    result_pandas = benchmark_pandas()
    times_pandas.append(time.time() - start)

# Polars eager benchmark
times_eager = []
for _ in range(3):
    start = time.time()
    result_eager = benchmark_polars_eager()
    times_eager.append(time.time() - start)

# Polars lazy benchmark
times_lazy = []
for _ in range(3):
    start = time.time()
    result_lazy = benchmark_polars_lazy()
    times_lazy.append(time.time() - start)

# Print results
print(f"{'Method':<20} {'Avg Time (s)':<15} {'Speedup vs Pandas'}")
print("-" * 55)

avg_pandas = sum(times_pandas) / len(times_pandas)
avg_eager = sum(times_eager) / len(times_eager)
avg_lazy = sum(times_lazy) / len(times_lazy)

print(f"{'Pandas':<20} {avg_pandas:<15.4f} {'1.0x (baseline)'}")
print(f"{'Polars (eager)':<20} {avg_eager:<15.4f} {avg_pandas/avg_eager:.1f}x")
print(f"{'Polars (lazy)':<20} {avg_lazy:<15.4f} {avg_pandas/avg_lazy:.1f}x")

### Pandas Comparison: Large Dataset Processing

| Approach | Pandas | Polars |
|----------|--------|--------|
| Process in chunks | `pd.read_csv(chunksize=1000)` | `lf.collect(streaming=True)` |
| Memory efficiency | Manual chunk management | Automatic |
| Code complexity | Requires iteration logic | Single line change |

```python
# Pandas chunked processing (manual)
chunks = []
for chunk in pd.read_csv("large.csv", chunksize=10000):
    result = chunk.groupby("col").sum()
    chunks.append(result)
final = pd.concat(chunks).groupby(level=0).sum()

# Polars streaming (automatic)
result = (
    pl.scan_csv("large.csv")
    .group_by("col").agg(pl.sum("value"))
    .collect(streaming=True)
)
```

## 7. Streaming for Large Datasets

For datasets that don't fit in memory, Polars supports **streaming** execution.

In [None]:
# Streaming processes data in chunks
result = (
    pl.scan_csv("data/large_transactions.csv")
    .filter(pl.col("amount") > 0)
    .group_by("transaction_type")
    .agg(
        pl.len().alias("count"),
        pl.col("amount").sum().alias("total"),
        pl.col("amount").mean().alias("avg")
    )
    .collect(streaming=True)  # Enable streaming
)

result

### When to Use Streaming

Use `collect(streaming=True)` when:
- Dataset is larger than available memory
- You want to limit memory usage
- Processing very large files

Note: Not all operations support streaming. Polars will fall back to non-streaming if needed.

## 8. Window Functions with `over()`

Window functions allow calculations across groups without reducing the number of rows.

In [None]:
# Load data
transactions = pl.read_csv("data/large_transactions.csv").head(1000)  # Sample for demo

# Window functions: compute stats per group while keeping all rows
result = transactions.with_columns(
    # Average amount per account
    pl.col("amount").mean().over("account_id").alias("account_avg"),
    
    # Rank within each account (by amount)
    pl.col("amount").rank().over("account_id").alias("rank_in_account"),
    
    # Count of transactions per account
    pl.len().over("account_id").alias("account_tx_count"),
    
    # Difference from account average
    (pl.col("amount") - pl.col("amount").mean().over("account_id")).alias("diff_from_avg")
)

result.select(
    "transaction_id", "account_id", "amount", 
    "account_avg", "rank_in_account", "account_tx_count", "diff_from_avg"
).head(10)

### Pandas Comparison: Window Functions

```python
# Pandas uses transform with groupby
df["account_avg"] = df.groupby("account_id")["amount"].transform("mean")
df["rank"] = df.groupby("account_id")["amount"].rank()
```

Polars' `.over()` is more flexible and integrates naturally with the expression API.

## 9. Best Practices for Query Optimization

### DO:
1. **Use lazy mode** (`scan_*`) for large files
2. **Filter early** - put filters before expensive operations
3. **Select only needed columns** - projection pushdown saves I/O
4. **Chain operations** - allows better optimization
5. **Use `explain()`** to understand your query plan
6. **Use Parquet** instead of CSV for large datasets

### DON'T:
1. **Don't call `collect()` multiple times** - execute once at the end
2. **Don't mix eager and lazy unnecessarily**
3. **Don't use `apply()` with Python functions** - it's slow
4. **Don't iterate row by row** - use vectorized operations

In [None]:
# Example: Good query structure
good_query = (
    pl.scan_csv("data/large_transactions.csv")
    # Filter early (pushdown)
    .filter(pl.col("transaction_type") == "purchase")
    .filter(pl.col("amount") > 50)
    # Select only needed columns (projection pushdown)
    .select("account_id", "amount", "merchant")
    # Aggregate
    .group_by("merchant")
    .agg(
        pl.len().alias("count"),
        pl.col("amount").sum().alias("total")
    )
    .sort("total", descending=True)
    .head(10)
    # Execute once at the end
    .collect()
)

good_query

## 10. Practical Example: Full Analysis Pipeline

Let's build a complete analysis pipeline using lazy evaluation.

In [None]:
# Comprehensive transaction analysis
analysis = (
    pl.scan_csv("data/large_transactions.csv")
    # Parse timestamp
    .with_columns(
        pl.col("timestamp").str.to_datetime().alias("datetime")
    )
    # Extract date components
    .with_columns(
        pl.col("datetime").dt.month().alias("month"),
        pl.col("datetime").dt.weekday().alias("weekday"),
        pl.col("datetime").dt.hour().alias("hour")
    )
    # Filter to purchases only
    .filter(pl.col("transaction_type") == "purchase")
    # Aggregate by month
    .group_by("month")
    .agg(
        pl.len().alias("transaction_count"),
        pl.col("amount").sum().alias("total_revenue"),
        pl.col("amount").mean().alias("avg_transaction"),
        pl.col("account_id").n_unique().alias("unique_customers"),
        pl.col("is_flagged").sum().alias("flagged_count")
    )
    .sort("month")
    .collect()
)

print("Monthly Analysis:")
analysis

## Summary: Lazy Evaluation Cheat Sheet

| Eager | Lazy | Description |
|-------|------|-------------|
| `pl.read_csv()` | `pl.scan_csv()` | Read/scan CSV |
| `pl.read_parquet()` | `pl.scan_parquet()` | Read/scan Parquet |
| `DataFrame` | `LazyFrame` | Data structure |
| Operations execute immediately | Operations are deferred | Execution model |
| N/A | `lf.collect()` | Execute lazy query |
| N/A | `lf.explain()` | View query plan |
| `df.lazy()` | N/A | Convert to lazy |

### Key Optimizations

- **Predicate pushdown**: Filters applied at scan time
- **Projection pushdown**: Only needed columns read
- **CSE**: Common subexpressions computed once
- **Streaming**: Process larger-than-memory data

## Practice Exercises

1. Write a lazy query that finds the top 5 merchants by transaction count
2. Use `explain()` to compare the optimized vs unoptimized plan
3. Benchmark lazy vs eager for a groupby aggregation on the full dataset
4. Use window functions to calculate each transaction's percentile within its account

In [None]:
# Exercise 1: Top 5 merchants


In [None]:
# Exercise 2: Compare plans


In [None]:
# Exercise 3: Benchmark


In [None]:
# Exercise 4: Window percentiles


## Course Conclusion

Congratulations! You've completed the Advanced Tech Track on Polars. You've learned:

1. **Session 1**: Polars basics, DataFrames, expressions, and the Pandas comparison
2. **Session 2**: Data manipulation - filtering, transformations, groupby, joins
3. **Session 3**: Lazy evaluation, query optimization, and performance

### Next Steps

- Practice with your own datasets
- Explore the [Polars documentation](https://docs.pola.rs/)
- Join the [Polars Discord community](https://discord.gg/4UfP5cfBE7)
- Try more advanced features: custom expressions, plugins, cloud integration