# **Homework: Speed up your pipeline**

### **Goal**

Use the public **Jaffle Shop API** to build a `dlt` pipeline and apply everything you've learned about performance:

- Chunking
- Parallelism
- Buffer control
- File rotation
- Worker tuning

Your task is to **make the pipeline as fast as possible**, while keeping the results correct.

### **What you’ll need**

- API base: `https://jaffle-shop.scalevector.ai/api/v1`
- Docs: [https://jaffle-shop.scalevector.ai/docs](https://jaffle-shop.scalevector.ai/docs)
- Start with these endpoints:
  - `/customers`
  - `/orders`
  - `/products`

Each of them returns **paged responses** — so you'll need to handle pagination.

### **What to implement**

1. **Extract** from the API using `dlt`

   - Use `dlt.resource` and [`RESTClient`](https://dlthub.com/docs/devel/general-usage/http/rest-client) with proper pagination

2. **Apply all performance techniques**

   - Group resources into sources
   - Yield **chunks/pages**, not single rows
   - Use `parallelized=True`
   - Set `EXTRACT__WORKERS`, `NORMALIZE__WORKERS`, and `LOAD__WORKERS`
   - Tune buffer sizes and enable **file rotation**

3. **Measure performance**
   - Time the extract, normalize, and load stages separately
   - Compare a naive version vs. optimized version
   - Log thread info or `pipeline.last_trace` if helpful

### **Deliverables**

Share your code as a Google Colab or [GitHub Gist](https://gist.github.com/) in Homework Google Form. **This step is required for certification.**

It should include:

- Working pipeline for at least 2 endpoints
- Before/after timing comparison
- A short explanation of what changes made the biggest difference if there're any differences


In [None]:
%%capture
%pip install dlt[duckdb]


## **Naive Version**

In this first version, the pipeline extracts data from three endpoints:

- `/customers`
- `/orders`
- `/products`

This implementation:

- Uses a small page size (`100`) for pagination.
- Processes every row individually.
- Runs sequentially without parallelization.
- Uses default buffer and worker settings.

This serves as a baseline to compare the impact of later optimizations.


In [None]:
from collections.abc import Iterator
from typing import Any

import dlt
from dlt.sources.helpers.rest_client import PageData, RESTClient
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator


@dlt.resource(name="customers")
def get_customers() -> Iterator[PageData[Any]]:
    client = RESTClient(
        base_url="https://jaffle-shop.scalevector.ai/api/v1",
        paginator=HeaderLinkPaginator(),
    )
    params = {"page_size": 100}
    for page in client.paginate("/customers", params=params):
        yield from page


@dlt.resource(name="orders")
def get_orders() -> Iterator[PageData[Any]]:
    client = RESTClient(
        base_url="https://jaffle-shop.scalevector.ai/api/v1",
        paginator=HeaderLinkPaginator(),
    )
    params = {"page_size": 100}
    for page in client.paginate("/orders", params=params):
        yield from page


@dlt.resource(name="products")
def get_products() -> Iterator[PageData[Any]]:
    client = RESTClient(
        base_url="https://jaffle-shop.scalevector.ai/api/v1",
        paginator=HeaderLinkPaginator(),
    )
    params = {"page_size": 100}
    for page in client.paginate("/products", params=params):
        yield from page


pipeline = dlt.pipeline(
    pipeline_name="jaffle_shop_pipeline",
    destination="duckdb",
    dataset_name="jaffle_shop",
    dev_mode=True,
)

load_info = pipeline.run([get_customers, get_orders, get_products])
print(pipeline.last_trace)


Run started at 2025-07-30 21:03:42.431935+00:00 and COMPLETED in 10 minutes and 41.80 seconds with 4 steps.
Step extract COMPLETED in 10 minutes and 10.22 seconds.

Load package 1753909422.4734976 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 13.39 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- products: 10 row(s)
- orders: 61948 row(s)
- orders__items: 90900 row(s)
- customers: 935 row(s)

Load package 1753909422.4734976 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 18.16 seconds.
Pipeline jaffle_shop_pipeline load step completed in 17.23 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_shop_20250730090342
The duckdb destination used duckdb:////content/jaffle_shop_pipeline.duckdb location to store data
Load package 1753909422.4734976 is LOADED and contains no failed jobs

Step run COMPLETED in 1

## **Optimized Version**

In this improved version, several optimizations are applied to speed up the pipeline:

1. **Chunking:**

   - Increased page size from 100 → 5000 to reduce the number of API calls and the overhead of small requests.
   - Instead of yielding individual rows, pages are yielded in bulk for faster processing.

2. **Parallelism:**

   - Added `parallelized=True` to resources, allowing concurrent data extraction from multiple endpoints.

3. **Worker tuning:**

   - Increased the number of workers for extract, normalize, and load stages to make full use of available CPU resources.

4. **Buffer control and file rotation:**

   - Increased buffer sizes and allowed file rotation to reduce memory overhead and improve write speeds.

5. **Grouping resources into a single source:**
   - Resources were grouped in a `@dlt.source` to keep the pipeline structure cleaner and run all related endpoints together.

These changes aim to make the extraction phase, which was the bottleneck in the naive version, much faster while keeping normalization and load times efficient.


In [None]:
import os
from collections.abc import Iterator
from typing import Any

import dlt
from dlt.sources.helpers.rest_client import PageData, RESTClient  # noqa: F811
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator  # noqa: F811

os.environ["EXTRACT__WORKERS"] = "6"
os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "10000"
os.environ["EXTRACT__DATA_WRITER__FILE_MAX_ITEMS"] = "20000"

os.environ["NORMALIZE__WORKERS"] = "4"
os.environ["NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS"] = "20000"
os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "20000"

os.environ["LOAD__WORKERS"] = "4"


@dlt.source(name="jaffle_shop_source")
def jaffle_shop_source() -> tuple[Any, Any, Any]:
    @dlt.resource(name="customers", parallelized=True)
    def get_customers() -> Iterator[PageData[Any]]:
        client = RESTClient(
            base_url="https://jaffle-shop.scalevector.ai/api/v1",
            paginator=HeaderLinkPaginator(),
        )
        params = {"page_size": 5000}
        yield from client.paginate("/customers", params=params)

    @dlt.resource(name="orders", parallelized=True)
    def get_orders() -> Iterator[PageData[Any]]:
        client = RESTClient(
            base_url="https://jaffle-shop.scalevector.ai/api/v1",
            paginator=HeaderLinkPaginator(),
        )
        params = {"page_size": 5000}
        yield from client.paginate("/orders", params=params)

    @dlt.resource(name="products", parallelized=True)
    def get_products() -> Iterator[PageData[Any]]:
        client = RESTClient(
            base_url="https://jaffle-shop.scalevector.ai/api/v1",
            paginator=HeaderLinkPaginator(),
        )
        params = {"page_size": 5000}
        yield from client.paginate("/products", params=params)

    return get_customers, get_orders, get_products


pipeline = dlt.pipeline(
    pipeline_name="jaffle_shop_pipeline_optimized",
    destination="duckdb",
    dataset_name="jaffle_shop",
    dev_mode=True,
)

load_info = pipeline.run(jaffle_shop_source())
print(pipeline.last_trace)


Run started at 2025-07-30 21:31:04.998949+00:00 and COMPLETED in 2 minutes and 33.04 seconds with 4 steps.
Step extract COMPLETED in 2 minutes and 6.50 seconds.

Load package 1753911065.024928 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 12.19 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- customers: 935 row(s)
- products: 10 row(s)
- orders: 61948 row(s)
- orders__items: 90900 row(s)

Load package 1753911065.024928 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 14.33 seconds.
Pipeline jaffle_shop_pipeline_optimized load step completed in 14.29 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_shop_20250730093104
The duckdb destination used duckdb:////content/jaffle_shop_pipeline_optimized.duckdb location to store data
Load package 1753911065.024928 is LOADED and contains no failed jobs

Step run 

## **Timing Comparison and Analysis**

| Version   | Total Time | Extract  | Normalize | Load    |
| --------- | ---------- | -------- | --------- | ------- |
| Naive     | 10:41.80   | 10:10.22 | 13.39 s   | 18.16 s |
| Optimized | 02:33.04   | 02:06.50 | 12.19 s   | 14.33 s |


### **Observations**

- The naive pipeline spent most of its time in the **extract step**, as small page sizes resulted in many sequential API calls.
- The optimized pipeline:
  - Used **larger chunks of data (5000 rows)**.
  - Ran API requests in **parallel** with multiple workers.
  - Increased buffer sizes and allowed file rotation to speed up I/O.
  - Grouped resources into a `@dlt.source` for a cleaner, more structured pipeline definition.


## **Result**

The optimized pipeline completed about **4 times faster** than the naive version while maintaining data correctness. The biggest performance improvement came from **parallel extraction and larger chunk sizes**, reducing the overhead of multiple small API requests.
