# Homework: Speed Up Your dlt Pipeline

## Goal
Build a `dlt` pipeline using the **Jaffle Shop API** and apply performance optimization techniques:
- Chunking (yielding pages instead of rows)
- Parallelism (`parallelized=True`)
- Buffer control
- File rotation
- Worker tuning

## API Information
- **Base URL**: `https://jaffle-shop.scalevector.ai/api/v1`
- **Docs**: https://jaffle-shop.scalevector.ai/docs
- **Endpoints**: `/customers`, `/orders`, `/products`
- **Pagination**: Uses `page` and `page_size` query parameters with `Link` header for next page

## Setup

In [None]:
# Install required packages
# !pip install -U "dlt[duckdb]" requests

In [1]:
import os
import time
import dlt
import requests
from typing import Iterator, Dict, Any, List
from threading import current_thread
from dlt.common.typing import TDataItems

---
# Part 1: Naive Implementation (Baseline)

This version uses:
- No parallelization
- Yielding one row at a time
- Default worker settings
- No file rotation

In [2]:
# Reset environment variables to defaults
for key in ["EXTRACT__WORKERS", "NORMALIZE__WORKERS", "LOAD__WORKERS", 
            "DATA_WRITER__BUFFER_MAX_ITEMS", "EXTRACT__DATA_WRITER__FILE_MAX_ITEMS",
            "NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"]:
    os.environ.pop(key, None)

In [3]:
API_BASE = "https://jaffle-shop.scalevector.ai/api/v1"


def naive_paginate(endpoint: str) -> Iterator[Dict[str, Any]]:
    """Naive pagination - yields one row at a time (inefficient)."""
    page = 1
    while True:
        url = f"{API_BASE}/{endpoint}?page={page}&page_size=100"
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        
        if not data:
            break
        
        # Yielding one row at a time - INEFFICIENT!
        for item in data:
            yield item
        
        # Check for next page using Link header
        if "next" not in response.links:
            break
        page += 1


@dlt.resource(table_name="customers", write_disposition="replace", primary_key="id")
def naive_customers() -> TDataItems:
    """Naive customers resource - no parallelization, yields rows."""
    yield from naive_paginate("customers")


@dlt.resource(table_name="orders", write_disposition="replace", primary_key="id")
def naive_orders() -> TDataItems:
    """Naive orders resource - no parallelization, yields rows."""
    yield from naive_paginate("orders")


@dlt.resource(table_name="products", write_disposition="replace", primary_key="sku")
def naive_products() -> TDataItems:
    """Naive products resource - no parallelization, yields rows."""
    yield from naive_paginate("products")

In [4]:
# Run the naive pipeline
print("=" * 60)
print("NAIVE PIPELINE (Baseline)")
print("=" * 60)

naive_pipeline = dlt.pipeline(
    pipeline_name="jaffle_shop_naive",
    destination="duckdb",
    dataset_name="jaffle_naive",
    dev_mode=True,
)

naive_start = time.perf_counter()

# Run the full pipeline
load_info = naive_pipeline.run([naive_customers, naive_orders, naive_products])

naive_total_time = time.perf_counter() - naive_start

print(f"\n{'=' * 60}")
print(f"NAIVE TOTAL TIME: {naive_total_time:.2f} seconds")
print(f"{'=' * 60}")
print(naive_pipeline.last_trace)

NAIVE PIPELINE (Baseline)

NAIVE TOTAL TIME: 394.52 seconds
Run started at 2026-01-15 11:04:59.984777+00:00 and COMPLETED in 6 minutes and 34.50 seconds with 4 steps.
Step extract COMPLETED in 6 minutes and 23.53 seconds.

Load package 1768475099.997995 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 3.83 seconds.
Normalized data for the following tables:
- products: 10 row(s)
- orders: 61948 row(s)
- orders__items: 90900 row(s)
- customers: 935 row(s)
- _dlt_pipeline_state: 1 row(s)

Load package 1768475099.997995 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 7.14 seconds.
Pipeline jaffle_shop_naive load step completed in 6.21 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_naive_20260115110459
The duckdb destination used duckdb:////Users/bartoszturkowyd/Projects/Python/dlt-fundamentals-cert/jaffle_shop_naive.duckdb location to store

---
# Part 2: Optimized Implementation

This version applies all performance techniques:
1. **Chunking**: Yield pages instead of individual rows
2. **Parallelism**: Use `parallelized=True` on resources
3. **Source grouping**: Group resources into a single source
4. **Worker tuning**: Configure extract, normalize, and load workers
5. **Buffer control**: Optimize buffer sizes
6. **File rotation**: Enable file rotation for better parallelism

In [5]:
# Configure performance settings
os.environ["EXTRACT__WORKERS"] = "3"  # 3 resources = 3 extract workers
os.environ["NORMALIZE__WORKERS"] = str(os.cpu_count() or 2)  # Use all CPU cores
os.environ["LOAD__WORKERS"] = "5"  # Multiple load threads
os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "10000"  # Larger buffer for fewer writes
os.environ["EXTRACT__DATA_WRITER__FILE_MAX_ITEMS"] = "5000"  # Enable file rotation
os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "5000"  # Enable file rotation

print("Configured workers:")
print(f"  - Extract workers: {os.environ['EXTRACT__WORKERS']}")
print(f"  - Normalize workers: {os.environ['NORMALIZE__WORKERS']}")
print(f"  - Load workers: {os.environ['LOAD__WORKERS']}")
print(f"  - Buffer max items: {os.environ['DATA_WRITER__BUFFER_MAX_ITEMS']}")
print(f"  - Extract file rotation: {os.environ['EXTRACT__DATA_WRITER__FILE_MAX_ITEMS']} items")
print(f"  - Normalize file rotation: {os.environ['NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS']} items")

Configured workers:
  - Extract workers: 3
  - Normalize workers: 8
  - Load workers: 5
  - Buffer max items: 10000
  - Extract file rotation: 5000 items
  - Normalize file rotation: 5000 items


In [6]:
def optimized_paginate(endpoint: str) -> Iterator[List[Dict[str, Any]]]:
    """Optimized pagination - yields entire pages (chunks) instead of rows."""
    page = 1
    while True:
        url = f"{API_BASE}/{endpoint}?page={page}&page_size=100"
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        
        if not data:
            break
        
        # Yield the entire page as a chunk - EFFICIENT!
        yield data
        
        # Check for next page using Link header
        if "next" not in response.links:
            break
        page += 1


@dlt.resource(
    table_name="customers",
    write_disposition="replace",
    primary_key="id",
    parallelized=True  # Enable parallel extraction
)
def optimized_customers() -> TDataItems:
    """Optimized customers resource - parallelized, yields pages."""
    print(f"  [customers] Running in thread: {current_thread().name}")
    yield from optimized_paginate("customers")


@dlt.resource(
    table_name="orders",
    write_disposition="replace",
    primary_key="id",
    parallelized=True  # Enable parallel extraction
)
def optimized_orders() -> TDataItems:
    """Optimized orders resource - parallelized, yields pages."""
    print(f"  [orders] Running in thread: {current_thread().name}")
    yield from optimized_paginate("orders")


@dlt.resource(
    table_name="products",
    write_disposition="replace",
    primary_key="sku",
    parallelized=True  # Enable parallel extraction
)
def optimized_products() -> TDataItems:
    """Optimized products resource - parallelized, yields pages."""
    print(f"  [products] Running in thread: {current_thread().name}")
    yield from optimized_paginate("products")


@dlt.source
def jaffle_shop_source():
    """Group all resources into a single source for better scheduling."""
    return optimized_customers, optimized_orders, optimized_products

In [7]:
# Run the optimized pipeline
print("=" * 60)
print("OPTIMIZED PIPELINE")
print("=" * 60)

optimized_pipeline = dlt.pipeline(
    pipeline_name="jaffle_shop_optimized",
    destination="duckdb",
    dataset_name="jaffle_optimized",
    dev_mode=True,
)

optimized_start = time.perf_counter()

# Run the full pipeline using the source
load_info = optimized_pipeline.run(jaffle_shop_source())

optimized_total_time = time.perf_counter() - optimized_start

print(f"\n{'=' * 60}")
print(f"OPTIMIZED TOTAL TIME: {optimized_total_time:.2f} seconds")
print(f"{'=' * 60}")
print(optimized_pipeline.last_trace)

OPTIMIZED PIPELINE
  [customers] Running in thread: dlt-pool-8673609728-extract-threads_0
  [orders] Running in thread: dlt-pool-8673609728-extract-threads_1
  [products] Running in thread: dlt-pool-8673609728-extract-threads_2

OPTIMIZED TOTAL TIME: 234.01 seconds
Run started at 2026-01-15 11:11:46.857589+00:00 and COMPLETED in 3 minutes and 54.02 seconds with 4 steps.
Step extract COMPLETED in 3 minutes and 43.28 seconds.

Load package 1768475506.865561 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 4.80 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- customers: 935 row(s)
- products: 10 row(s)
- orders: 61948 row(s)
- orders__items: 90900 row(s)

Load package 1768475506.865561 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 5.93 seconds.
Pipeline jaffle_shop_optimized load step completed in 5.92 seconds
1 load package(s) were loade

---
# Part 3: Performance Comparison

In [None]:
print("\n" + "=" * 60)
print("PERFORMANCE COMPARISON SUMMARY")
print("=" * 60)
print(f"\nNaive Pipeline:     {naive_total_time:.2f} seconds")
print(f"Optimized Pipeline: {optimized_total_time:.2f} seconds")

if naive_total_time > optimized_total_time:
    speedup = naive_total_time / optimized_total_time
    print(f"\nSpeedup: {speedup:.2f}x faster")
    print(f"Time saved: {naive_total_time - optimized_total_time:.2f} seconds")
else:
    print("\nThe optimized version was not faster.")
    print("This can happen with small datasets where parallelization overhead")
    print("exceeds the benefits.")

print("\n" + "=" * 60)


PERFORMANCE COMPARISON SUMMARY

Naive Pipeline:     394.52 seconds
Optimized Pipeline: 234.01 seconds

ðŸš€ Speedup: 1.69x faster!
   Time saved: 160.51 seconds



---
# Part 4: Data Verification

Verify that both pipelines loaded the same data.

In [None]:
import duckdb

# Connect to the DuckDB databases
naive_conn = duckdb.connect(f"{naive_pipeline.pipeline_name}.duckdb")
optimized_conn = duckdb.connect(f"{optimized_pipeline.pipeline_name}.duckdb")

# Get the actual dataset names from the pipelines
naive_dataset = naive_pipeline.dataset_name
optimized_dataset = optimized_pipeline.dataset_name

print(f"Naive dataset: {naive_dataset}")
print(f"Optimized dataset: {optimized_dataset}")
print("\nRow counts comparison:")
print("-" * 40)

for table in ["customers", "orders", "products"]:
    naive_count = naive_conn.execute(f"SELECT COUNT(*) FROM {naive_dataset}.{table}").fetchone()[0]
    optimized_count = optimized_conn.execute(f"SELECT COUNT(*) FROM {optimized_dataset}.{table}").fetchone()[0]
    match = "MATCH" if naive_count == optimized_count else "MISMATCH"
    print(f"{table:12} | Naive: {naive_count:5} | Optimized: {optimized_count:5} | {match}")

naive_conn.close()
optimized_conn.close()

Naive dataset: jaffle_naive_20260115110459
Optimized dataset: jaffle_optimized_20260115111146

Row counts comparison:
----------------------------------------
customers    | Naive:   935 | Optimized:   935 | âœ…
orders       | Naive: 61948 | Optimized: 61948 | âœ…
products     | Naive:    10 | Optimized:    10 | âœ…


---
# Explanation: What Changes Made the Biggest Difference?

The optimized pipeline achieved a 1.69x speedup (394 seconds to 234 seconds).

## Key Changes by Impact

**High Impact:**
1. Yielding pages instead of rows - reduced yield operations from 62,893 to ~629
2. Parallel extraction (`parallelized=True`) - resources run concurrently in separate threads

**Medium Impact:**
3. Worker configuration - 3 extract workers, 8 normalize workers, 5 load workers
4. Source grouping - enables dlt's resource scheduling

**Low Impact:**
5. Buffer size increase (5,000 to 10,000 items)
6. File rotation (limited effect at this data volume)

## Conclusion

The extract stage was the primary bottleneck. Chunking and parallelization provided the most significant improvements by reducing function call overhead and enabling concurrent API requests.