# 05 - ETL Pipeline Example

Real-world data pipeline: Extract, Transform, Load with type safety.

In [1]:
from typing import Optional, List
from pydantic import BaseModel, Field
from datetime import datetime
import polars as pl
from poldantic import to_polars_schema, to_pydantic_model

## 1. Define Data Models

In [2]:
class RawOrder(BaseModel):
    order_id: int
    customer_name: str
    product: str
    quantity: int
    price: float
    status: str

class ProcessedOrder(BaseModel):
    order_id: int
    customer_name: str
    product: str
    quantity: int
    price: float
    total: float
    status: str

print('Models defined')

Models defined


## 2. Extract - Load Raw Data

In [3]:
# Simulate raw data extraction
raw_data = [
    {'order_id': 1, 'customer_name': 'Alice', 'product': 'Widget', 'quantity': 5, 'price': 10.0, 'status': 'pending'},
    {'order_id': 2, 'customer_name': 'Bob', 'product': 'Gadget', 'quantity': 3, 'price': 25.0, 'status': 'completed'},
    {'order_id': 3, 'customer_name': 'Charlie', 'product': 'Widget', 'quantity': 10, 'price': 10.0, 'status': 'pending'},
    {'order_id': 4, 'customer_name': 'Diana', 'product': 'Gizmo', 'quantity': 2, 'price': 50.0, 'status': 'completed'},
]

# Validate with Pydantic
validated_orders = [RawOrder(**order) for order in raw_data]
print(f'Extracted {len(validated_orders)} orders')

Extracted 4 orders


## 3. Transform - Process with Polars

In [4]:
# Convert to Polars for efficient processing
schema = to_polars_schema(RawOrder)
df = pl.DataFrame([order.model_dump() for order in validated_orders], schema=schema)

print('Original data:')
print(df)

Original data:
shape: (4, 6)
┌──────────┬───────────────┬─────────┬──────────┬───────┬───────────┐
│ order_id ┆ customer_name ┆ product ┆ quantity ┆ price ┆ status    │
│ ---      ┆ ---           ┆ ---     ┆ ---      ┆ ---   ┆ ---       │
│ i64      ┆ str           ┆ str     ┆ i64      ┆ f64   ┆ str       │
╞══════════╪═══════════════╪═════════╪══════════╪═══════╪═══════════╡
│ 1        ┆ Alice         ┆ Widget  ┆ 5        ┆ 10.0  ┆ pending   │
│ 2        ┆ Bob           ┆ Gadget  ┆ 3        ┆ 25.0  ┆ completed │
│ 3        ┆ Charlie       ┆ Widget  ┆ 10       ┆ 10.0  ┆ pending   │
│ 4        ┆ Diana         ┆ Gizmo   ┆ 2        ┆ 50.0  ┆ completed │
└──────────┴───────────────┴─────────┴──────────┴───────┴───────────┘


In [5]:
# Transform: Calculate totals
df_transformed = df.with_columns([
    (pl.col('quantity') * pl.col('price')).alias('total')
])

print('\nWith totals:')
print(df_transformed)


With totals:
shape: (4, 7)
┌──────────┬───────────────┬─────────┬──────────┬───────┬───────────┬───────┐
│ order_id ┆ customer_name ┆ product ┆ quantity ┆ price ┆ status    ┆ total │
│ ---      ┆ ---           ┆ ---     ┆ ---      ┆ ---   ┆ ---       ┆ ---   │
│ i64      ┆ str           ┆ str     ┆ i64      ┆ f64   ┆ str       ┆ f64   │
╞══════════╪═══════════════╪═════════╪══════════╪═══════╪═══════════╪═══════╡
│ 1        ┆ Alice         ┆ Widget  ┆ 5        ┆ 10.0  ┆ pending   ┆ 50.0  │
│ 2        ┆ Bob           ┆ Gadget  ┆ 3        ┆ 25.0  ┆ completed ┆ 75.0  │
│ 3        ┆ Charlie       ┆ Widget  ┆ 10       ┆ 10.0  ┆ pending   ┆ 100.0 │
│ 4        ┆ Diana         ┆ Gizmo   ┆ 2        ┆ 50.0  ┆ completed ┆ 100.0 │
└──────────┴───────────────┴─────────┴──────────┴───────┴───────────┴───────┘


In [6]:
# Filter completed orders
df_completed = df_transformed.filter(pl.col('status') == 'completed')

print('\nCompleted orders:')
print(df_completed)


Completed orders:
shape: (2, 7)
┌──────────┬───────────────┬─────────┬──────────┬───────┬───────────┬───────┐
│ order_id ┆ customer_name ┆ product ┆ quantity ┆ price ┆ status    ┆ total │
│ ---      ┆ ---           ┆ ---     ┆ ---      ┆ ---   ┆ ---       ┆ ---   │
│ i64      ┆ str           ┆ str     ┆ i64      ┆ f64   ┆ str       ┆ f64   │
╞══════════╪═══════════════╪═════════╪══════════╪═══════╪═══════════╪═══════╡
│ 2        ┆ Bob           ┆ Gadget  ┆ 3        ┆ 25.0  ┆ completed ┆ 75.0  │
│ 4        ┆ Diana         ┆ Gizmo   ┆ 2        ┆ 50.0  ┆ completed ┆ 100.0 │
└──────────┴───────────────┴─────────┴──────────┴───────┴───────────┴───────┘


In [7]:
# Aggregate statistics
stats = df_transformed.group_by('product').agg([
    pl.col('quantity').sum().alias('total_quantity'),
    pl.col('total').sum().alias('total_revenue'),
    pl.col('order_id').count().alias('order_count')
]).sort('total_revenue', descending=True)

print('\nProduct statistics:')
print(stats)


Product statistics:
shape: (3, 4)
┌─────────┬────────────────┬───────────────┬─────────────┐
│ product ┆ total_quantity ┆ total_revenue ┆ order_count │
│ ---     ┆ ---            ┆ ---           ┆ ---         │
│ str     ┆ i64            ┆ f64           ┆ u32         │
╞═════════╪════════════════╪═══════════════╪═════════════╡
│ Widget  ┆ 15             ┆ 150.0         ┆ 2           │
│ Gizmo   ┆ 2              ┆ 100.0         ┆ 1           │
│ Gadget  ┆ 3              ┆ 75.0          ┆ 1           │
└─────────┴────────────────┴───────────────┴─────────────┘


## 4. Load - Validate Output

In [8]:
# Convert back to Pydantic for validation
output_dicts = df_transformed.to_dicts()
validated_output = [ProcessedOrder(**order) for order in output_dicts]

print(f'\nValidated {len(validated_output)} processed orders:')
for order in validated_output[:3]:
    print(f'  Order {order.order_id}: ${order.total:.2f}')


Validated 4 processed orders:
  Order 1: $50.00
  Order 2: $75.00
  Order 3: $100.00


## Summary

This ETL pipeline demonstrates:
1. ✅ Input validation with Pydantic
2. ✅ High-performance transformations with Polars
3. ✅ Type-safe schema conversion
4. ✅ Output validation before loading
5. ✅ Full type safety end-to-end