# **Performance Optimization in dlt pipelines** [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dlt-hub/dlt/blob/master/docs/education/dlt-advanced-course/lesson_9_performance_optimisation.ipynb) [![GitHub badge](https://img.shields.io/badge/github-view_source-2b3137?logo=github)](https://github.com/dlt-hub/dlt/blob/master/docs/education/dlt-advanced-course/lesson_9_performance_optimisation.ipynb)

### **Introduction**

Sometimes you have to slow down in order to speed up...

This lesson teaches you how to make dlt pipelines faster by optimizing each internal stage of execution. You’ll learn how to tune memory, enable parallelism, and reduce runtime using real examples.

We will walk through the internal steps of `pipeline.run()` again, but this time focusing only on performance optimization techniques.

Read more in the [dlt performance docs](https://dlthub.com/docs/general-usage/performance).



![Lesson_9_Performance_optimisation_img1](https://storage.googleapis.com/dlt-blog-images/dlt-advanced-course/Lesson_9_Performance_optimisation_img1.png)

### **Already covered in Fundamentals**

- Basic structure of `pipeline.run()`.

- Default behavior of **extract/normalize/load**.

- Example with nested rows and `items__nested` tables.

- Overview of file formats (jsonl, parquet, etc.).

- Progress logging and pipeline introspection.

### **In the Advanced Performance Optimization lesson**

- Optimize memory with buffer tuning.

- Yield pages instead of rows.

- Control threading and multiprocessing.

- Tune file rotation for parallelism.

- Run multiple pipelines in one process.

- Spawn method on Linux.

- Real GitHub pipeline performance demo.

---

## Understanding `pipeline.run()` for Performance

When you call `pipeline.run()`, dlt goes through three stages:

1. **Extract** – fetch data and write intermediary files.
2. **Normalize** – transform and flatten the data.
3. **Load** – load data into the destination.

We'll now look at how to optimize each of these stages individually.

> If you're unfamiliar with how `pipeline.run()` works under the hood, including the **extract/normalize/load** stages and intermediary files, please complete the [Fundamentals module "How dlt works"](https://colab.research.google.com/drive/1geSMNRkSwAelQJKd3e8vdoHCKiHMdmIo#forceEdit=true&sandboxMode=true) first.

![Lesson_9_Performance_optimisation_img2](https://storage.googleapis.com/dlt-blog-images/dlt-advanced-course/Lesson_9_Performance_optimisation_img2.png)


## **Before we dive into parallelism in dlt...**

To get the most out of parallelism features in `dlt`, it's helpful to quickly refresh how parallelism works in Python in general.

Python isn't truly multithreaded by default due to the Global Interpreter Lock (GIL), but there are multiple ways to run tasks concurrently: using **threads**, **processes**, or **async**.

Each has its own strengths, and `dlt` actually uses all of them depending on the stage: threads for extracting and loading, and processes for normalization.

Let’s take a quick look at how these work under the hood.

# **Parallelism in Python**


Python is single-threaded by default. That means only one operation happens at a time, even on multi-core CPUs. This becomes a bottleneck for:

- API calls
- file I/O
- database queries
- and anything that waits instead of computes

Parallelism solves this by doing *many things at once*. It’s essential when building efficient data pipelines, like those with `dlt`.


## **Types of parallelism in Python**

There are 3 main types. Each has different use cases.

![Lesson_9_Performance_optimisation_img3](https://storage.googleapis.com/dlt-blog-images/dlt-advanced-course/Lesson_9_Performance_optimisation_img3.webp)



### 1. **Threading**
- Best for I/O-bound tasks (e.g., reading from APIs or files).
- Uses the `threading` or [`concurrent.futures.ThreadPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor).



### **Why Python has multithreading — but only one thread runs Python code at a time**

- Python does support multithreading, and you can create multiple threads with `threading.Thread()`.

- But in CPython, the standard Python implementation, there’s something called the **Global Interpreter Lock (GIL)**.

- The GIL makes sure that only **one thread** can execute Python bytecode at a time — even on multi-core CPUs.

- So if you create 5 threads, Python will **run them one by one**, rapidly switching between them — not in true parallel.

- It still counts as “multithreading” because threads **exist and run**, but they’re **not truly concurrent** for Python code execution.

**Example 1:**

In this example, `threaded_function` prints the values zero to two that your for loop assigns to the loop variable number. Using a `ThreadPoolExecutor`, four threads are created to execute the threaded function. `ThreadPoolExecutor` is configured to run a maximum of four threads concurrently with `max_workers=4`, and each worker thread is named with a “Worker” prefix, as in `thread_name_prefix="Worker"`.

In [None]:
import threading
import time
from concurrent.futures import ThreadPoolExecutor


def threaded_function() -> None:
    for number in range(3):
        print(f"Printing from {threading.current_thread().name}. {number=}")
        # Simulate slow API call
        time.sleep(0.1)


with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
    for _ in range(4):
        executor.submit(threaded_function)

### 2. **Multiprocessing**
- Best for CPU-bound tasks (e.g., compressing, parsing, transforming).
- Uses `multiprocessing` or [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor).

Example 1:

In this example, `compute_heavy_task` squares numbers from 0 to 2 and prints the process name it runs on. We use `ProcessPoolExecutor` to run 4 processes in parallel, each computing the task independently.

In [None]:
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor


def compute_heavy_task() -> None:
    for number in range(3):
        print(f"Computing in {multiprocessing.current_process().name}. {number=} => {number**2}\n")
        time.sleep(0.1)


if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=4) as process_executor:
        for _ in range(4):
            process_executor.submit(compute_heavy_task)

Example 2:

In [None]:
import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
]


def is_prime(n: int) -> bool:
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


with concurrent.futures.ProcessPoolExecutor() as process_executor:
    for number, prime in zip(PRIMES, process_executor.map(is_prime, PRIMES)):
        print("%d is prime: %s" % (number, prime))

### 3. **AsyncIO**
- Great for many concurrent tasks that *wait* (e.g., HTTP, sockets).
- Lightweight and fast. Single-threaded but concurrent.



Example 1:

In [None]:
import asyncio


async def main() -> None:
    await asyncio.sleep(1)
    print("hello")


# In Colab, you'll need to get a handle of the current running loop first.
loop = asyncio.get_running_loop()
await loop.create_task(main())

Example 2:

The order of this output is **the heart of async IO**. Talking to each of the calls to `count()` is a single event loop, or coordinator.

When each task reaches `await asyncio.sleep(1)`, the function yells up to the event loop and gives control back to it, saying, **“I’m going to be sleeping for 1 second. Go ahead and let something else meaningful be done in the meantime.”**

In [None]:
import asyncio
import time


async def count() -> None:
    print("One")
    await asyncio.sleep(1)
    print("Two")


async def main() -> None:
    await asyncio.gather(count(), count(), count())


s = time.perf_counter()

# In Colab, you'll need to get a handle of the current running loop first.
loop = asyncio.get_running_loop()
await loop.create_task(main())

elapsed = time.perf_counter() - s
print(f"executed in {elapsed:0.2f} seconds.")


---

# **Parallelism in dlt**

In `dlt`, parallelism is baked in:

- **Extraction**: via threads (`parallelized=True` in `@dlt.resource`) or async generators.
- **Normalization**: via process pools.
- **Loading**: via threads.




---

## **Extract**

The extract stage fetches data and writes it to intermediary files. This phase is usually **I/O-bound** — lots of small writes or slow network calls can slow it down.



### **Default behaviour**

- The in-memory buffer is set to `5000` items.
- By default, **intermediary files are not rotated**. If you do not explicitly set a size for an intermediary file with `file_max_items=100000`, `dlt` will create a **single file** for a resource, regardless of the number of records it contains, even if it reaches millions.
- By default, intermediary files at the extract stage use a custom version of the JSONL format.

### **How to optimize extraction?**

- Control the [in-memory buffer size](#scrollTo=ffVpDFHfnqO-) for the extract stage
- Group `dlt` resources into `dlt` sources
- Specify the number of thread workers or..
- When using async generators, control the number of async functions/awaitables being evaluated in parallel
- Yield pages instead of rows
- Customize the [size of intermediary files](#scrollTo=g9AGWfLkoAMb) created in the extract stage to control file rotation

### **IMPORTANT: Start simple. dlt has smart defaults**

Before you dive into tuning buffers, tweaking file sizes, and parallelizing every little thing — consider this:

> **`dlt` comes with well-thought-out defaults that work great for most cases.**

The default settings are:
- Conservative enough to work on a laptop.
- Efficient enough to run production loads for many use cases.
- Safe to experiment with incrementally.

#### When to start tweaking?

Once you’ve:
- Run your pipeline end-to-end successfully.
- Noticed slowdowns at scale.
- Understood which part of the pipeline (extract, normalize, load) is the bottleneck.

> **Start with the defaults. Measure. Then tune.**


### **1. [Use a larger In-Memory Buffer](https://dlthub.com/docs/reference/performance#overall-memory-and-disk-management)**

dlt **buffers** data **in memory** to speed up processing and uses the file system to pass data between the **extract** and **normalize** stages.

You can control **the size of the buffers** and **the size and number of the files** to fine-tune memory and CPU usage. These settings also impact parallelism.

The size of the buffers is controlled by specifying **the number of data items** held in them. Data is appended to open files when the item buffer is full, after which the buffer is cleared.

By default, dlt **buffers 5000 items** before writing to disk. Increase this value to reduce disk I/O and improve speed.


In [None]:
%%capture
!pip install -U dlt

Example 1:

We set the buffer size to 1. dlt will extract data **row by row** and write each row to an intermediary file one by one.

This also **disables multithreading** — when the buffer size is 1, the number of extract workers is effectively limited to 1.

In [None]:
from typing import Dict, Iterator
import os
import dlt
from dlt.common.typing import TDataItems

os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "1"


def get_rows(limit: int) -> Iterator[Dict[str, int]]:
    yield from map(lambda n: {"row": n}, range(limit))


@dlt.resource()
def buffered_resource() -> TDataItems:
    for row in get_rows(500000):
        yield row


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline1",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)
load_info = pipeline.extract(buffered_resource)
print(pipeline.last_trace)

Example 2:

Increase the number of buffer items.

In [None]:
import os
import dlt

os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "5000"


def get_rows(limit: int) -> Iterator[Dict[str, int]]:
    yield from map(lambda n: {"row": n}, range(limit))


@dlt.resource()
def buffered_resource() -> TDataItems:
    for row in get_rows(500000):
        yield row


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline2",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)
load_info = pipeline.extract(buffered_resource)
print(pipeline.last_trace)

**Explanation:** The buffer collects many items in memory before writing them to disk. A larger buffer means fewer writes, which saves I/O time and makes the extract stage faster. This is especially helpful when extracting a large number of small records.

**Downside:** High buffer size increases memory usage. If the machine has limited RAM, it could cause memory pressure or crashes.


**IMPORTANT: Performance measurements in Google Colab may be unreliable.**

Even with large buffer sizes, timing results in Colab can vary significantly between runs. This is because Colab runs on shared cloud infrastructure, where CPU, memory, and disk I/O are not guaranteed and may fluctuate at any time.

You might observe:

- Slower or inconsistent extract times

- Unpredictable delays due to resource throttling or background activity

For **reliable performance** testing, always run your dlt pipelines on a **local machine**, where you control the environment and system resources are stable.

#### **Excercise 1**

Play with `BUFFER_MAX_ITEMS` parameter. Run your pipeline and measure the time.

Don’t expect linear speed-up — larger buffers may **slow things down** depending on your system.

At some point, increasing the buffer size will **stop making things faster**. After that threshold, you’ll hit diminishing returns, and performance may plateau or even degrade. The optimal value depends on your machine’s I/O and memory characteristics.

In [None]:
import os
import time
import dlt
import matplotlib.pyplot as plt


def get_rows(limit: int) -> Iterator[Dict[str, int]]:
    yield from map(lambda n: {"row": n}, range(limit))


def measure_extract_time(buffer_size: int) -> float:
    os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = str(buffer_size)

    @dlt.resource()
    def buffered_resource() -> TDataItems:
        for row in get_rows(500000):
            yield row

    pipeline = dlt.pipeline(
        pipeline_name=f"extract_pipeline_{buffer_size}",
        destination="duckdb",
        dataset_name="mydata",
        dev_mode=True,
    )

    start_time = time.time()
    pipeline.extract(buffered_resource)
    return time.time() - start_time


# Try different buffer sizes
buffer_sizes = [1, 10, 100, 1000, 5000, 10000, 50000, 100000, 500000]
times = [measure_extract_time(size) for size in buffer_sizes]


plt.figure(figsize=(10, 6))
plt.plot(buffer_sizes, times, marker="o")
plt.xlabel("BUFFER_MAX_ITEMS")
plt.ylabel("Time to Extract (seconds)")
plt.title("Effect of Buffer Size on Extraction Time")
plt.grid(True)
plt.xscale("log")
plt.show()

### **2. Group Resources into Sources**

In `dlt`, each **resource** is treated as a separate unit during extraction. If you pass multiple resources directly to `pipeline.extract()`, `dlt` handles them independently — each with its own extract process and context.

To **optimize performance**, especially during the extract stage, it's often better to **group related resources into a single source**. This allows `dlt` to:
- Run extraction more efficiently
- Reuse shared context (like API sessions or connections)
- Avoid overhead from managing multiple resource objects individually
- Enable better parallelism and state management


Example without grouping:



In [None]:
exit()

In [None]:
import os
import dlt


def get_rows(limit: int) -> Iterator[Dict[str, int]]:
    yield from map(lambda n: {"row": n}, range(limit))


@dlt.resource(name="resource1")
def buffered_resource1() -> TDataItems:
    for row in get_rows(500000):
        yield row


@dlt.resource(name="resource2")
def buffered_resource2() -> TDataItems:
    for row in get_rows(500000):
        yield row


@dlt.resource(name="resource3")
def buffered_resource3() -> TDataItems:
    for row in get_rows(500000):
        yield row


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline4",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)

load_info = pipeline.extract([buffered_resource1, buffered_resource2, buffered_resource3])
print(pipeline.last_trace)

This works, but each resource is treated separately. For large datasets or many resources, this adds extract overhead.


Example with grouping:


In [None]:
from typing import Iterable
import os
import time
import dlt
from dlt.extract import DltResource
from threading import currentThread


@dlt.source
def source() -> Iterable[DltResource]:
    return buffered_resource1, buffered_resource2, buffered_resource3


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline4",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)

load_info = pipeline.extract(source())
print(pipeline.last_trace)


This version:
- Groups all resources into a single source
- Allows `dlt` to optimize scheduling and state tracking
- Reduces overhead during extraction and improves throughput

####  **What to expect**

- **Grouped resources** may not show a big speed increase in small examples.
- However, **it unlocks `dlt`'s parallel extraction engine**: when grouped into a single `@dlt.source`, `dlt` can schedule their execution in a shared thread pool.
- This is essential when working with:
  - Many resources
  - Slow APIs
  - IO-bound extractors
  - High data volumes

#### **Note**:
Even if timing results look similar in this example, grouping into a source is what **enables true concurrent resource execution**. Without it, `dlt` treats each resource as an isolated unit and may serialize extraction.


### **3. Enable parallel threaded extraction**

When extracting data from **multiple sources**, you usually want them to be processed **at the same time**, not one after another. This is especially useful when:

- Calling **slow APIs**
- Working with **multiple endpoints**
- Extracting from **databases with many tables**

Use multiple threads to fetch data from different resources with `parallelized=True`.

Set the number of parallel threads with:

```python
os.environ["EXTRACT__WORKERS"] = "3"
```

**Simulate Slow APIs with `time.sleep`**

We’ll simulate API latency by adding a `time.sleep(0.01)` delay before yielding each row. This mimics a network call taking ~10ms.

We’ll then parallelize the resources using `parallelized=True` and observe the thread behavior using `threading.currentThread()`.


In [None]:
import os
import dlt
import time
from threading import current_thread


def get_rows(limit: int) -> Iterator[Dict[str, int]]:
    yield from map(lambda n: {"row": n}, range(limit))


# Resource 1: slow generator, runs in a separate thread
@dlt.resource(name="resource1", parallelized=False)
def buffered_resource1() -> TDataItems:
    for row in get_rows(100):
        time.sleep(0.01)  # Simulate slow API call
        print(f"resource1 in thread {current_thread().name}")
        yield row


@dlt.resource(name="resource2", parallelized=False)
def buffered_resource2() -> TDataItems:
    for row in get_rows(100):
        time.sleep(0.01)
        print(f"resource2 in thread {current_thread().name}")
        yield row


@dlt.resource(name="resource3", parallelized=False)
def buffered_resource3() -> TDataItems:
    for row in get_rows(100):
        time.sleep(0.01)
        print(f"resource3 in thread {current_thread().name}")
        yield row


@dlt.source
def source() -> Iterable[DltResource]:
    return buffered_resource1, buffered_resource2, buffered_resource3


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline4",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)


load_info = pipeline.extract(source())
print(pipeline.last_trace)

**What does it mean?**

dlt is extracting rows in a [**round-robin**](https://dlthub.com/docs/reference/performance#resources-extraction-fifo-vs-round-robin) fashion — one row from each resource in turn — all within the `MainThread`. Since there’s no parallelization, the resources share a single thread and are executed sequentially.

**Now let’s enable multithreading.**

In the previous example, all resources ran sequentially in the main thread. This time, we add `parallelized=True` to each resource — allowing `dlt` to extract from all three **at the same time**, using separate threads.

You’ll see the difference immediately in the output: each resource prints from a different thread, confirming that extraction is now concurrent.

In [None]:
import os
import dlt
import time
from threading import current_thread


def get_rows(limit: int) -> Iterator[Dict[str, int]]:
    yield from map(lambda n: {"row": n}, range(limit))


# Resource 1: slow generator, runs in a separate thread
@dlt.resource(name="resource1", parallelized=True)
def buffered_resource1() -> TDataItems:
    for row in get_rows(100):
        time.sleep(0.01)  # Simulate slow API call
        print(f"resource1 in thread {current_thread().name}")
        yield row


@dlt.resource(name="resource2", parallelized=True)
def buffered_resource2() -> TDataItems:
    for row in get_rows(100):
        time.sleep(0.01)
        print(f"resource2 in thread {current_thread().name}")
        yield row


@dlt.resource(name="resource3", parallelized=True)
def buffered_resource3() -> TDataItems:
    for row in get_rows(100):
        time.sleep(0.01)
        print(f"resource3 in thread {current_thread().name}")
        yield row


@dlt.source
def source() -> Iterable[DltResource]:
    return buffered_resource1, buffered_resource2, buffered_resource3


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline4",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)


load_info = pipeline.extract(source())
print(pipeline.last_trace)

**Explanation:** Each worker runs in a separate thread, allowing several resources to extract data at the same time. This is critical for reducing bottlenecks when working with slow APIs or large resource sets.

**Downside:** More threads increase CPU load. Poorly written thread-unsafe code or thread contention may degrade performance instead of improving it.


### **Async**

The example below does the same but using an async generator as the main resource and async/await and futures pool for the transformer.

**Example 1 — Synchronous execution (sequential, slow)**

In [None]:
import time
import dlt
from dlt.common.typing import TDataItem


@dlt.resource
def sync_items() -> TDataItems:
    for i in range(10):
        time.sleep(0.5)  # Blocking call
        yield i


@dlt.transformer
def sync_transform(item: TDataItem) -> TDataItems:
    time.sleep(0.5)  # Also blocking
    return {"row": item}


start = time.time()
result = list(sync_items() | sync_transform)
print(f"Sync result: {result}")
print("Sync elapsed time:", round(time.time() - start, 2), "seconds")

**Example 2 — Asynchronous execution (concurrent, fast)**

In [None]:
import asyncio
import time
import dlt


@dlt.resource
async def async_items() -> TDataItems:
    for i in range(10):
        await asyncio.sleep(0.5)  # Blocking
        yield i


@dlt.transformer
async def async_transform(item) -> TDataItems:
    await asyncio.sleep(0.5)  # Non-blocking
    # just return the results, if you yield, generator will be evaluated in main thread
    return {"row": item}


start = time.time()
print(list(async_items() | async_transform))
print("Async elapsed time:", round(time.time() - start, 2), "seconds")

**Breakdown of time**

- `async_items()` yields 10 items → takes ~5s total (0.5s × 10)

- `async_transform()` is fast once it starts — runs in parallel

- So total time is:

  - ~5s to yield

  - `+` ~0.5s to process the last batch of transformer calls

  - ➜ ~5.5–6 seconds total

### **4. Yielding chunks instead of rows**

In real-world data extraction, especially from APIs, **data is typically returned in pages** — for example, 100 users per request. These pages are already **natural chunks**, so there's no reason to extract and yield each row from the page individually.

Instead of doing something like:

```python
for item in page:
    yield item  # ❌ inefficient
```

You should just do:

```python
yield page  # ✅ fast and efficient
```

This small change makes a big difference in performance. Yielding full pages (chunks) reduces the number of Python function calls and lets `dlt` process your data more efficiently — especially during buffering and writing stages.

#### **What is `yield` in Python?**

In Python, `yield` turns a function into a **generator**.

Instead of returning a full list of results at once, it gives back **one item at a time**, each time the function is called again.

This is useful when:
- You work with large datasets
- You don’t want to keep everything in memory
- You want to stream values as they are produced

#### Example

In [None]:
def count_up_to(n: int) -> Iterator[int]:
    for i in range(n):
        yield i

Calling `count_up_to(3)` returns a generator:

In [None]:
for number in count_up_to(3):
    print(number)

#### **Yielding rows in dlt**

This is what you usually see in basic educational `dlt` pipelines:

In [None]:
from typing import List


def fetch_users() -> List[Dict[str, int]]:
    return [{"id": 1}, {"id": 2}, {"id": 3}]


@dlt.resource
def get_users() -> TDataItems:
    for user in fetch_users():
        yield user  # yields one row at a time


#### Problem

This creates a **high number of `yield` calls** — each row is passed into the extract pipeline one at a time. While dlt buffers rows before writing, each row still incurs the cost of a Python function call and per-item processing inside the pipeline.

This adds overhead, especially with millions of rows.


#### **What does “Yield Chunks” mean?**

Instead of:

```python
yield {"id": 1}
yield {"id": 2}
```

Do this:

```python
yield [{"id": 1}, {"id": 2}]  # yield a list of rows
```

We call this **page/chunk-based yielding**.

You still use `yield`, but now each yield returns **a batch of rows**, not just one.




#### **How to yield chunks**

Here’s how you chunk your data with `islice` from `itertools`:

In [None]:
import os
import dlt
import time
from threading import current_thread
from itertools import islice


def get_rows(limit: int) -> Iterator[Dict[str, int]]:
    yield from map(lambda n: {"row": n}, range(limit))


def yield_chunks(iterator: Iterator[Dict[str, int]], chunk_size=10):
    iterator = iter(iterator)
    while chunk := list(islice(iterator, chunk_size)):  # <--- we slice data into chunks
        time.sleep(0.01)  # Simulate slow API call
        yield chunk


# Chunked resources, run in parallel threads
@dlt.resource(name="resource1", parallelized=True)
def buffered_resource1():
    yield from yield_chunks(get_rows(100), chunk_size=10)


@dlt.resource(name="resource2", parallelized=True)
def buffered_resource2():
    yield from yield_chunks(get_rows(100), chunk_size=10)


@dlt.resource(name="resource3", parallelized=True)
def buffered_resource3():
    yield from yield_chunks(get_rows(100), chunk_size=10)


@dlt.source
def source():
    return buffered_resource1, buffered_resource2, buffered_resource3


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline4",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)

load_info = pipeline.extract(source())
print(pipeline.last_trace)


Such a crazy speed improvement! You'll notice the difference even more as your data size grows.

### **5. Enable file rotation for large datasets**  

By default, `dlt` writes all extracted data from a resource into **one large intermediary file**. If your resource yields millions of rows, that means:
- Only **one normalization worker** will be able to process that file
- You’ll lose all benefits of **parallel processing** in later stages

To fix this, you can **enable file rotation** by setting a file size limit. For example:

```python
os.environ["EXTRACT__DATA_WRITER__FILE_MAX_ITEMS"] = "100000"
```

This means:
- Every 100,000 items, a new intermediary file will be created
- If you have 1,000,000 rows, you'll end up with 10 files
- Later, these files can be processed **in parallel** during normalization and load

File rotation is essential for scaling up performance when dealing with large datasets.



### **6. Avoid unnecessary transformation in the resource**  
   Keep your resource logic simple and fast — avoid costly computation or transformation in the generator itself.

## **Normalize**

### **What happens at the normalization stage?**

After data is extracted, `dlt` transforms it into a **relational format** suitable for loading into databases. This happens in the **normalize stage**:

1. Extracted files are passed to the **normalization process pool**.
2. Each file is read, schema is resolved, and data is transformed.
3. Rows are buffered and written into **normalized intermediary files**.
4. These files are then used in the **load** stage.


>If you’re not yet familiar with how the **normalization stage** works in `dlt`, we recommend reviewing the [**Normalization section in the dlt Fundamentals course**](https://colab.research.google.com/drive/1geSMNRkSwAelQJKd3e8vdoHCKiHMdmIo#forceEdit=true&sandboxMode=true&scrollTo=bCeUqaW_cRSh) before diving into performance tuning.  


### **Default behavior**

- **Buffer size**: 5,000 items
- **Parallelism**: Off by default (runs in main process)
- **File rotation**: Off by default — all rows written into one file
- **Compression**: On by default


### **Why normalization may be slow**

If you process a lot of data in one file and use just one CPU, normalization becomes a bottleneck:
- File parsing and transformation are **CPU-heavy**
- Without parallelism, large files block the pipeline
- Compression slows it further if not needed

> File parsing and transformation are **CPU-heavy**, especially when dealing with **deeply nested structures** (which must be flattened into multiple tables) and **automatic data type inference** (which inspects each value to determine its type).



### How to optimize normalization

1. **Enable parallelism**: Use multiple processes
   ```python
   os.environ['NORMALIZE__WORKERS'] = '3'
   ```

2. **Disable compression (for debugging or speed)**:
   ```python
   os.environ['NORMALIZE_DATA_WRITER__DISABLE_COMPRESSION'] = 'true'
   ```

3. **Control buffer size** (optional):
   ```python
   os.environ['NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS'] = '10000'
   ```

4. **Enable file rotation** (if you have one big file):
   ```python
   os.environ['NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS'] = '100000'
   ```


### **1. Parallel normalization**

Let’s measure normalization performance with and without parallelism.



#### **a. Normalize with 1 worker (default)**

First, we run extraction:

In [None]:
%%capture
!pip install -U dlt

In [None]:
import os
import dlt
import time
from threading import current_thread
from itertools import islice


def get_rows(limit):
    yield from map(lambda n: {"row": n}, range(limit))


def yield_chunks(iterable, chunk_size=10):
    iterator = iter(iterable)
    while chunk := list(islice(iterator, chunk_size)):  # <--- we slice data into chunks
        time.sleep(0.01)  # Simulate slow API call
        yield chunk


# Chunked resources, run in parallel threads
@dlt.resource(name="resource1", parallelized=True)
def buffered_resource1():
    yield from yield_chunks(get_rows(1000000), chunk_size=10000)


@dlt.resource(name="resource2", parallelized=True)
def buffered_resource2():
    yield from yield_chunks(get_rows(1000000), chunk_size=10000)


@dlt.source
def source():
    return buffered_resource1, buffered_resource2


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline_w1",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)

load_info = pipeline.extract(source())
print(pipeline.last_trace)

As mentioned earlier, each file created during the extract stage is sent to the process pool of the normalization stage. Since file rotation has not been enabled at the extract stage, each resource is written to a separate intermediary file. This results in **three files**, which can be **normalized in parallel**.

First, let's measure the time taken with a single process worker.

In [None]:
import os
import time

os.environ["NORMALIZE__WORKERS"] = "1"

load_info = pipeline.normalize()
print(pipeline.last_trace)

Oh, that took way longer than extraction, right?  

Yep, that’s totally normal. The **normalization step does the heavy lifting**:
- flattening nested data,
- figuring out types,
- generating tables.

It’s often **the slowest part** of the pipeline, so don’t be surprised if it takes most of the time.



#### **b. Normalize with 2 workers**

Now, let's try more process workers.

Unfortunately, Colab gives us only **2 CPU cores**.  
That means running normalization with more than 2 workers won’t help (and might even slow things down).  
Let’s stick with **2 workers** to get the best performance from what we’ve got!


Note that we are running the extract stage again with a new pipeline, because normalizing already normalized data would not be meaningful.

In [None]:
import os

os.cpu_count()

In [None]:
# Set the number of process workers to 2
os.environ["NORMALIZE__WORKERS"] = "2"

pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline_w2",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)


load_info = pipeline.extract(source())
load_info = pipeline.normalize()

print(pipeline.last_trace)



#### What to expect

With parallel workers:
- The total time to normalize **drops significantly**
- CPU usage will increase (expected!)
- Logs may show multiple files being processed at the same time


#### ✅ Rule of thumb:
Use more workers and rotate files if you have:
- Large data
- Multiple extracted files
- A machine with multiple CPU cores


### **2. Enable file rotation for large datasets**  

By default, all normalized data goes into **one big file** — which means **only one process** can handle it. That kills parallelism.

To fix this, set:

```python
os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "100000"
```

Now `dlt` will:
- Split data into smaller files (e.g., 10 files for 1M rows)
- Load them **in parallel** using multiple workers
- Speed up loading




## **Load**



### **What happens at the loading stage?**



After data is normalized, `dlt` takes the resulting files and sends them to your **destination** (e.g., DuckDB, BigQuery, Redshift).

This stage uses a **thread pool**, where:
1. Each thread loads one normalized file at a time.
2. Files from the same source are bundled into a **load package**.
3. Packages are loaded into the destination concurrently.





### **Default behavior**

- `dlt` uses **20 threads** by default
- Each thread processes one file
- All file contents are already normalized — there’s no parsing or schema detection at this point, so it’s mostly **I/O-bound**

### **How to optimize loading?**


1. **Control the number of threads**  
   Set this based on your destination’s capacity:

   ```python
   os.environ["LOAD__WORKERS"] = "4"
   ```

2. **Rotate files during normalization**  
   If all your data is in **one big file**, you’ll still have only **one load job**. To unlock real parallelism:
   ```python
   os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "100000"
   ```


### **Example**

Now that we have two pipelines from the previous steps, let's use the first one to load data with only one thread. This means all normalized files will be loaded sequentially.

In [None]:
%%capture
!pip install -U dlt

In [None]:
import os
import dlt
import time
from threading import current_thread
from itertools import islice


def get_rows(limit):
    yield from map(lambda n: {"row": n}, range(limit))


def yield_chunks(iterable, chunk_size=10):
    iterator = iter(iterable)
    while chunk := list(islice(iterator, chunk_size)):  # <--- we slice data into chunks
        time.sleep(0.01)  # Simulate slow API call
        yield chunk


# Chunked resources, run in parallel threads
@dlt.resource(name="resource1", parallelized=True)
def buffered_resource1():
    yield from yield_chunks(get_rows(1000000), chunk_size=10000)


@dlt.resource(name="resource2", parallelized=True)
def buffered_resource2():
    yield from yield_chunks(get_rows(1000000), chunk_size=10000)


@dlt.resource(name="resource3", parallelized=True)
def buffered_resource3():
    yield from yield_chunks(get_rows(1000000), chunk_size=10000)


@dlt.source
def source():
    return buffered_resource1, buffered_resource2, buffered_resource3

In [None]:
# Set the number of thread workers to 1
os.environ["LOAD__WORKERS"] = "1"

pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline_load1",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)


pipeline.extract(source())
pipeline.normalize()
pipeline.load()


print(pipeline.last_trace)

> Step load COMPLETED in 1 minute and 24.07 seconds.

Next, use the second pipeline to load data with 3 threads, allowing the normalized files to be loaded in parallel.

In [None]:
# Set the number of thread workers to 3
os.environ["LOAD__WORKERS"] = "3"

pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline_load2",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)


pipeline.extract(source())
pipeline.normalize()
pipeline.load()


print(pipeline.last_trace)

Step load COMPLETED in 59.89 seconds.


Voila! ⭐




### What to expect

- More threads = faster load, **if you have enough files**
- If there’s only one file, you won’t see a speedup
- Use **file rotation** in normalization to split the load into chunks

> The **load stage is I/O-bound**, but that doesn't mean “more files is always better.”  
Reading and loading many small files adds overhead too.  
So use file rotation wisely: create **enough files to allow parallelism**, but not so many that it slows things down.  
**Look at how much data you have**, and tune `FILE_MAX_ITEMS` accordingly.




## **Can you spot the bottleneck?**

Ask yourself:
- Is my pipeline slow because of waiting on I/O or doing heavy computations?
- Am I yielding too many tiny objects one-by-one instead of batches?
- Is my API async? If not, can I enable `parallelized=True` safely in my resources?

# **GitHub example**

In this example, we'll optimize a pipeline that loads data from seven different GitHub endpoints.

In [2]:
# Install dlt if not already installed
%%capture
!pip install "dlt[duckdb]"

Clear the runtime to reset configurations:

In [None]:
exit()

We'll first define the resources without parallelization.

> Since we are already yielding pages, the chunking method is implemented. However, with a manageable number of entries, the impact of chunking may be negligible.

In [None]:
import dlt
import requests
from google.colab import userdata


github_token = userdata.get("ACCESS_TOKEN")

headers = {"Authorization": f"token {github_token}"}


def pagination(url):
    while True:
        response = requests.get(url, headers=headers)
        time.sleep(0.1)  # Simulate delay
        response.raise_for_status()
        yield response.json()  # Here we're yielding pages

        # Get next page
        if "next" not in response.links:
            break
        url = response.links["next"]["url"]


@dlt.resource(table_name="issues", write_disposition="merge", primary_key="id")
def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = f"https://api.github.com/repos/dlt-hub/dlt/issues?since={updated_at.last_value}&per_page=100sort=updated"
    yield pagination(url)


@dlt.resource(table_name="stargazers", write_disposition="merge", primary_key="id")
def get_stargazers():
    url = "https://api.github.com/repos/dlt-hub/dlt/stargazers?per_page=100"
    yield pagination(url)


@dlt.resource(table_name="pull_requests", write_disposition="merge", primary_key="id")
def get_pulls(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = f"https://api.github.com/repos/dlt-hub/dlt/pulls?since={updated_at.last_value}&per_page=100&sort=updated"
    yield pagination(url)


@dlt.resource(table_name="commits", write_disposition="merge", primary_key="sha")
def get_commits():
    url = "https://api.github.com/repos/dlt-hub/dlt/commits?per_page=100"
    yield pagination(url)


@dlt.resource(table_name="branches", write_disposition="merge", primary_key="name")
def get_branches():
    url = "https://api.github.com/repos/dlt-hub/dlt/branches?per_page=100"
    yield pagination(url)


@dlt.resource(table_name="contributors", write_disposition="merge", primary_key="id")
def get_contributors():
    url = "https://api.github.com/repos/dlt-hub/dlt/contributors?per_page=100"
    yield pagination(url)


@dlt.resource(table_name="labels", write_disposition="merge", primary_key="id")
def get_labels():
    url = "https://api.github.com/repos/dlt-hub/dlt/labels?per_page=100"
    yield pagination(url)

Run the pipeline with the resources defined above:

In [None]:
pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline_example1",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)

load_info = pipeline.run(
    [get_issues, get_stargazers, get_pulls, get_branches, get_contributors, get_labels, get_commits]
)

In [None]:
print(pipeline.last_trace)

Let's redefine our resources with parallelization, wrap them in a single source, and increase the number of normalization, as well as extract workers.

> Since the default number of load workers is by default set to 20, there's probably no need to modify it.

While we could optimize the configuration of intermediary file sizes more effectively if we knew the exact number of items each endpoint returns, let's start by experimenting with an arbitrary value of 200 for the data writers, which should be more or less suitable to enable enough parallelization.

In [None]:
import os

os.environ["EXTRACT__WORKERS"] = "7"
os.environ["NORMALIZE__WORKERS"] = "2"

In [None]:
import time


@dlt.resource(table_name="issues", write_disposition="merge", primary_key="id", parallelized=True)
def get_issues_2(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = f"https://api.github.com/repos/dlt-hub/dlt/issues?since={updated_at.last_value}&per_page=100sort=updated"
    yield pagination(url)


@dlt.resource(
    table_name="stargazers", write_disposition="merge", primary_key="id", parallelized=True
)
def get_stargazers_2():
    url = "https://api.github.com/repos/dlt-hub/dlt/stargazers?per_page=100"
    yield pagination(url)


@dlt.resource(
    table_name="pull_requests", write_disposition="merge", primary_key="id", parallelized=True
)
def get_pulls_2(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
    url = f"https://api.github.com/repos/dlt-hub/dlt/pulls?since={updated_at.last_value}&per_page=100&sort=updated"
    yield pagination(url)


@dlt.resource(table_name="commits", write_disposition="merge", primary_key="sha", parallelized=True)
def get_commits_2():
    url = "https://api.github.com/repos/dlt-hub/dlt/commits?per_page=100"
    yield pagination(url)


@dlt.resource(
    table_name="branches", write_disposition="merge", primary_key="name", parallelized=True
)
def get_branches_2():
    url = "https://api.github.com/repos/dlt-hub/dlt/branches?per_page=100"
    yield pagination(url)


@dlt.resource(
    table_name="contributors", write_disposition="merge", primary_key="id", parallelized=True
)
def get_contributors_2():
    url = "https://api.github.com/repos/dlt-hub/dlt/contributors?per_page=100"
    yield pagination(url)


@dlt.resource(table_name="labels", write_disposition="merge", primary_key="id", parallelized=True)
def get_labels_2():
    url = "https://api.github.com/repos/dlt-hub/dlt/labels?per_page=100"
    yield pagination(url)


@dlt.source
def github_data():
    return (
        get_issues_2,
        get_stargazers_2,
        get_pulls_2,
        get_branches_2,
        get_contributors_2,
        get_labels_2,
        get_commits_2,
    )


improved_p = dlt.pipeline("test_pipeline_2", destination="duckdb")


pipeline = dlt.pipeline(
    pipeline_name="extract_pipeline_example2",
    destination="duckdb",
    dataset_name="mydata",
    dev_mode=True,
)

load_info = pipeline.run(github_data())
print(pipeline.last_trace)

# **Homework: Speed up your pipeline**

### **Goal**

Use the public **Jaffle Shop API** to build a `dlt` pipeline and apply everything you've learned about performance:

- Chunking
- Parallelism
- Buffer control
- File rotation
- Worker tuning

Your task is to **make the pipeline as fast as possible**, while keeping the results correct.



### **What you’ll need**

- API base: `https://jaffle-shop.scalevector.ai/api/v1`
- Docs: [https://jaffle-shop.scalevector.ai/docs](https://jaffle-shop.scalevector.ai/docs)
- Start with these endpoints:
  - `/customers`
  - `/orders`
  - `/products`

Each of them returns **paged responses** — so you'll need to handle pagination.



### **What to implement**

1. **Extract** from the API using `dlt`
   - Use `dlt.resource` and [`RESTClient`](https://dlthub.com/docs/devel/general-usage/http/rest-client) with proper pagination

2. **Apply all performance techniques**
   - Group resources into sources
   - Yield **chunks/pages**, not single rows
   - Use `parallelized=True`
   - Set `EXTRACT__WORKERS`, `NORMALIZE__WORKERS`, and `LOAD__WORKERS`
   - Tune buffer sizes and enable **file rotation**

3. **Measure performance**
   - Time the extract, normalize, and load stages separately
   - Compare a naive version vs. optimized version
   - Log thread info or `pipeline.last_trace` if helpful


### **Deliverables**

Share your code as a Google Colab or [GitHub Gist](https://gist.github.com/) in Homework Google Form. **This step is required for certification.**


It should include:
- Working pipeline for at least 2 endpoints
- Before/after timing comparison
- A short explanation of what changes made the biggest difference if there're any differences





And remember: **Start with the defaults. Measure. Then tune.**

In [3]:
"""
This script demonstrates how to build a dlt pipeline to extract data from the Jaffle Shop API.
It includes two modes: a 'naive' mode with default settings, and an 'optimized' mode that
applies performance-tuning techniques like parallelism, buffer control, and file rotation.

The script extracts data from three endpoints: /customers, /orders, and /products,
and loads it into a DuckDB destination. It then measures and compares the performance
of both pipeline runs.
"""

import dlt
import time
from typing import Any
import os
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator

# --- Constants ---
BASE_URL = "https://jaffle-shop.scalevector.ai/api/v1"

# --- DLT Source ---

@dlt.source
def jaffle_shop_source(base_url=BASE_URL):
    """
    A dlt source for the Jaffle Shop API.

    This source retrieves data from the /customers, /orders, and /products endpoints.
    It is designed to be run in both a naive (sequential) and a parallelized manner.

    Args:
        base_url (str, optional): The base URL of the API. Defaults to BASE_URL.

    Returns:
        A tuple of dlt resources.
    """

    @dlt.resource(name="customers", write_disposition="merge", primary_key="id")
    def customers_resource():
        """Resource for the /customers endpoint."""
        client = RESTClient(base_url=base_url)
        paginator = HeaderLinkPaginator()
        yield client.paginate(path="/customers", paginator=paginator)

    @dlt.resource(name="orders", write_disposition="merge", primary_key="id")
    def orders_resource(ordered_at=dlt.sources.incremental("ordered_at", initial_value="2017-01-01T00:00:00Z")):
        """
        Resource for the /orders endpoint. Uses incremental loading based on 'ordered_at'.
        """
        client = RESTClient(base_url=base_url)
        paginator = HeaderLinkPaginator()
        yield client.paginate(
            path="/orders",
            paginator=paginator,
            params={"start_date": ordered_at.last_value}
        )

    @dlt.resource(name="products", primary_key="sku", write_disposition="merge")
    def products_resource():
        """Resource for the /products endpoint."""
        client = RESTClient(base_url=base_url)
        paginator = HeaderLinkPaginator()
        yield client.paginate(path="/products", paginator=paginator)

    return customers_resource, orders_resource, products_resource


def get_timing_stats(trace):
    """
    Extracts timing information from a dlt pipeline trace.
    """
    timings = {
        "extract": 0,
        "normalize": 0,
        "load": 0,
        "total": 0
    }
    for step in trace.steps:
        # Support both dict-like and object-like step records
        step_name = step.get("step_type") if isinstance(step, dict) else getattr(step, "step_type", None)
        if step_name in timings:
            started_at = step.get("started_at") if isinstance(step, dict) else getattr(step, "started_at", None)
            finished_at = step.get("finished_at") if isinstance(step, dict) else getattr(step, "finished_at", None)
            if started_at is not None and finished_at is not None:
                duration = (finished_at - started_at).total_seconds()
                timings[step_name] += duration

    timings["total"] = timings["extract"] + timings["normalize"] + timings["load"]
    return timings


def run_pipeline(pipeline_name: str, source: Any, parallel: bool = False):
    """
    Runs a dlt pipeline with a given source and configuration.

    Args:
        pipeline_name (str): The name of the pipeline.
        source (dlt.sources.DltSource): The dlt source to run.
        parallel (bool): Whether to run the pipeline in parallel.

    Returns:
        A dictionary with the timing statistics.
    """
    print(f"\n--- Running {pipeline_name.upper()} pipeline ---")

    if parallel:
        # Enable parallel execution of resources
        source.parallelized = True

        # Apply performance settings via environment variables
        # Yield chunks/pages: RESTClient.paginate already yields pages, not single rows

        # Workers
        os.environ["EXTRACT__WORKERS"] = "6"
        os.environ["NORMALIZE__WORKERS"] = "3"
        os.environ["LOAD__WORKERS"] = "3"

        # Buffer sizes and file rotation
        os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "10000"
        os.environ["EXTRACT__DATA_WRITER__FILE_MAX_ITEMS"] = "100000"
        os.environ["NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS"] = "20000"
        os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "100000"
        os.environ["LOAD__DATA_WRITER__BUFFER_MAX_ITEMS"] = "20000"
    else:
        # Use default settings for naive run
        pass

    pipeline = dlt.pipeline(
        pipeline_name=pipeline_name,
        destination="duckdb",
        dataset_name=f"{pipeline_name}_data",
        progress="log"
    )

    start_time = time.time()
    info = pipeline.run(source)
    end_time = time.time()

    print(info)

    stats = get_timing_stats(pipeline.last_trace)
    stats["wall_clock"] = end_time - start_time

    return stats


def print_comparison(naive_stats, optimized_stats):
    """Prints a comparison of the naive and optimized pipeline runs."""
    print("\n\n--- Performance Comparison ---")
    print(f"{'Stage':<12} | {'Naive (s)':<12} | {'Optimized (s)':<15}")
    print("-" * 45)

    for stage in ["extract", "normalize", "load", "total", "wall_clock"]:
        naive_t = naive_stats.get(stage, 0)
        opt_t = optimized_stats.get(stage, 0)
        print(f"{stage.capitalize():<12} | {naive_t:<12.2f} | {opt_t:<15.2f}")

    print("-" * 45)

    if naive_stats["wall_clock"] > 0 and optimized_stats["wall_clock"] > 0:
        improvement = (naive_stats["wall_clock"] - optimized_stats["wall_clock"]) / naive_stats["wall_clock"] * 100
        speedup = naive_stats["wall_clock"] / optimized_stats["wall_clock"]
        print(f"\nOptimized pipeline was {improvement:.2f}% faster.")
        print(f"Speed-up factor: {speedup:.2f}x")


if __name__ == "__main__":
    print("Starting Jaffle Shop pipeline performance benchmark...")

    # Run the naive pipeline
    naive_source = jaffle_shop_source()
    naive_stats = run_pipeline("jaffle_shop_naive", naive_source, parallel=False)

    # Run the optimized pipeline
    optimized_source = jaffle_shop_source()
    optimized_stats = run_pipeline("jaffle_shop_optimized", optimized_source, parallel=True)

    # Print the performance comparison
    print_comparison(naive_stats, optimized_stats)

    print("\nBenchmark finished.")


Starting Jaffle Shop pipeline performance benchmark...

--- Running JAFFLE_SHOP_NAIVE pipeline ---
-------------------------- Extract jaffle_shop_source --------------------------
Resources: 0/3 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 253.47 MB (10.40%) | CPU usage: 0.00%

-------------------------- Extract jaffle_shop_source --------------------------
Resources: 0/3 (0.0%) | Time: 0.47s | Rate: 0.00/s
products: 10  | Time: 0.00s | Rate: 791378.11/s
Memory usage: 254.36 MB (10.40%) | CPU usage: 0.00%

-------------------------- Extract jaffle_shop_source --------------------------
Resources: 0/3 (0.0%) | Time: 1.43s | Rate: 0.00/s
products: 10  | Time: 0.96s | Rate: 10.43/s
orders: 100  | Time: 0.00s | Rate: 8388608.00/s
Memory usage: 254.87 MB (10.40%) | CPU usage: 0.00%

-------------------------- Extract jaffle_shop_source --------------------------
Resources: 0/3 (0.0%) | Time: 1.89s | Rate: 0.00/s
products: 10  | Time: 1.42s | Rate: 7.05/s
orders: 100  | Time: 0.46s | Ra

In [16]:
"""
Highly optimized dlt pipeline for Jaffle Shop API with aggressive performance tuning.
This version implements multiple optimization strategies for maximum throughput.
"""

import dlt
import time
import asyncio
import aiohttp
import json
from typing import Any, Iterator, List, Dict
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
import multiprocessing
import duckdb

# --- Constants ---
BASE_URL = "https://jaffle-shop.scalevector.ai/api/v1"

# Optimize for your system - adjust based on CPU cores
CPU_COUNT = multiprocessing.cpu_count()
MAX_WORKERS = min(CPU_COUNT * 2, 16)  # Cap at 16 to avoid overwhelming the API

# --- Optimized DLT Source (using synchronous RESTClient but with dlt optimizations) ---

def jaffle_shop_source_optimized(base_url=BASE_URL):
    """
    Highly optimized dlt source for the Jaffle Shop API using synchronous client
    but with dlt's parallelization, buffer, and file format optimizations.
    """

    @dlt.resource(
        name="customers",
        write_disposition="merge",
        primary_key="id",
        table_format="parquet"  # Use Parquet for better compression and speed
    )
    def customers_resource():
        """Optimized customers resource using synchronous RESTClient."""
        print("Fetching customers data synchronously with dlt parallelization...")
        client = RESTClient(base_url) # Corrected base_url passing
        paginator = HeaderLinkPaginator()
        # Yield pages directly from the synchronous client
        yield client.paginate(path="/customers", paginator=paginator)


    @dlt.resource(
        name="orders",
        write_disposition="merge",
        primary_key="id",
        table_format="parquet"
    )
    def orders_resource(ordered_at=dlt.sources.incremental("ordered_at", initial_value="2017-01-01T00:00:00Z")):
        """Optimized orders resource using synchronous RESTClient and incremental loading."""
        print(f"Fetching orders data from {ordered_at.last_value} synchronously with dlt parallelization...")
        client = RESTClient(base_url) # Corrected base_url passing
        paginator = HeaderLinkPaginator()
        # Yield pages directly from the synchronous client
        yield client.paginate(
            path="/orders",
            paginator=paginator,
            params={"start_date": ordered_at.last_value}
        )

    @dlt.resource(
        name="products",
        primary_key="sku",
        write_disposition="merge",
        table_format="parquet"
    )
    def products_resource():
        """Optimized products resource using synchronous RESTClient."""
        print("Fetching products data synchronously with dlt parallelization...")
        client = RESTClient(base_url) # Corrected base_url passing
        paginator = HeaderLinkPaginator()
        # Yield pages directly from the synchronous client
        yield client.paginate(path="/products", paginator=paginator)

    # Explicitly create DltSource and add resources
    source = dlt.sources.DltSource(name="jaffle_shop_source_optimized")
    source.add_resources(customers_resource, orders_resource, products_resource)
    # parallelized will be set in run_pipeline function for this source
    return source


def jaffle_shop_source_standard(base_url=BASE_URL):
    """Standard dlt source for comparison (your original implementation)."""

    @dlt.resource(name="customers", write_disposition="merge", primary_key="id")
    def customers_resource():
        client = RESTClient(base_url) # Corrected base_url passing
        paginator = HeaderLinkPaginator()
        yield client.paginate(path="/customers", paginator=paginator)

    @dlt.resource(name="orders", write_disposition="merge", primary_key="id")
    def orders_resource(ordered_at=dlt.sources.incremental("ordered_at", initial_value="2017-01-01T00:00:00Z")):
        client = RESTClient(base_url) # Corrected base_url passing
        paginator = HeaderLinkPaginator()
        yield client.paginate(
            path="/orders",
            paginator=paginator,
            params={"start_date": ordered_at.last_value}
        )

    @dlt.resource(name="products", primary_key="sku", write_disposition="merge")
    def products_resource():
        client = RESTClient(base_url) # Corrected base_url passing
        paginator = HeaderLinkPaginator()
        yield client.paginate(path="/products", paginator=paginator)

    # Explicitly create DltSource and add resources
    source = dlt.sources.DltSource(name="jaffle_shop_source_standard")
    source.add_resources(customers_resource, orders_resource, products_resource)
    # Do NOT set source.parallelized = True for the standard source
    return source


def set_aggressive_performance_settings():
    """Set aggressive performance optimization environment variables."""

    # Worker configurations - scale with CPU cores
    os.environ["EXTRACT__WORKERS"] = str(min(MAX_WORKERS, 12))
    os.environ["NORMALIZE__WORKERS"] = str(min(CPU_COUNT, 4)) # Use CPU_COUNT for normalize
    os.environ["LOAD__WORKERS"] = str(min(MAX_WORKERS // 2, 8))

    # Aggressive buffer sizes for high throughput
    os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "50000"  # 5x larger
    os.environ["DATA_WRITER__BUFFER_MAX_BYTES"] = "50000000"  # 50MB buffer

    # File rotation settings for better I/O
    os.environ["EXTRACT__DATA_WRITER__FILE_MAX_ITEMS"] = "500000"  # 5x larger
    os.environ["EXTRACT__DATA_WRITER__FILE_MAX_BYTES"] = "100000000"  # 100MB files

    # Normalize stage optimization
    os.environ["NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS"] = "100000"  # 5x larger
    os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "500000"
    os.environ["NORMALIZE__DATA_WRITER__BUFFER_MAX_BYTES"] = "100000000"

    # Load stage optimization
    os.environ["LOAD__DATA_WRITER__BUFFER_MAX_ITEMS"] = "100000"  # 5x larger
    os.environ["LOAD__DATA_WRITER__BUFFER_MAX_BYTES"] = "100000000"

    # Connection and timeout settings
    os.environ["SOURCES__REST_CLIENT__REQUEST_TIMEOUT"] = "60"
    os.environ["SOURCES__REST_CLIENT__REQUEST_MAX_ATTEMPTS"] = "5"

    # Memory optimization
    os.environ["RUNTIME__MEMORY_LIMIT"] = "0"  # Disable memory limits

    # Enable all performance features
    os.environ["RUNTIME__DLTHUB_TELEMETRY"] = "False"  # Disable telemetry overhead

    print(f"Set performance settings: EXTRACT_WORKERS={os.environ['EXTRACT__WORKERS']}, NORMALIZE__WORKERS={os.environ['NORMALIZE__WORKERS']}, LOAD__WORKERS={os.environ['LOAD__WORKERS']}")
    print(f"Buffer settings: DATA_WRITER_BUFFER_MAX_ITEMS={os.environ['DATA_WRITER__BUFFER_MAX_ITEMS']}, EXTRACT_FILE_MAX_ITEMS={os.environ['EXTRACT__DATA_WRITER__FILE_MAX_ITEMS']}, NORMALIZE_FILE_MAX_ITEMS={os.environ['NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS']}")


def get_timing_stats(trace):
    """Extract timing information from a dlt pipeline trace with better error handling."""
    timings = {
        "extract": 0,
        "normalize": 0,
        "load": 0,
        "total": 0
    }

    if not trace or not hasattr(trace, 'steps'):
        return timings

    for step in trace.steps:
        try:
            step_name = step.get("step") if isinstance(step, dict) else getattr(step, "step", None)
            if not step_name:
                step_name = step.get("step_type") if isinstance(step, dict) else getattr(step, "step_type", None)

            if step_name in timings:
                started_at = step.get("started_at") if isinstance(step, dict) else getattr(step, "started_at", None)
                finished_at = step.get("finished_at") if isinstance(step, dict) else getattr(step, "finished_at", None)

                if started_at and finished_at:
                    # Ensure started_at and finished_at are datetime objects or similar
                    # If they are strings, you might need to parse them first
                    # Assuming they are datetime objects based on dlt trace structure
                    duration = (finished_at - started_at).total_seconds()
                    timings[step_name] += duration
        except Exception as e:
            print(f"Error processing step timing: {e}")
            continue

    timings["total"] = timings["extract"] + timings["normalize"] + timings["load"]
    return timings


def run_pipeline(pipeline_name: str, source: Any, optimized: bool = False):
    """Run a dlt pipeline with performance monitoring."""
    print(f"\n--- Running {pipeline_name.upper()} pipeline ---")

    # Clear previous environment variables to ensure clean run
    for var in ["EXTRACT__WORKERS", "NORMALIZE__WORKERS", "LOAD__WORKERS",
                "DATA_WRITER__BUFFER_MAX_ITEMS", "DATA_WRITER__BUFFER_MAX_BYTES",
                "EXTRACT__DATA_WRITER__FILE_MAX_ITEMS", "EXTRACT__DATA_WRITER__FILE_MAX_BYTES",
                "NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS", "NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS",
                "NORMALIZE__DATA_WRITER__BUFFER_MAX_BYTES", "LOAD__DATA_WRITER__BUFFER_MAX_ITEMS",
                "LOAD__DATA_WRITER__BUFFER_MAX_BYTES", "SOURCES__REST_CLIENT__REQUEST_TIMEOUT",
                "SOURCES__REST_CLIENT__REQUEST_MAX_ATTEMPTS", "RUNTIME__MEMORY_LIMIT",
                "RUNTIME__DLTHUB_TELEMETRY"]:
        if var in os.environ:
            del os.environ[var]

    if optimized:
        source.parallelized = True # Set parallelized on the DltSource object
        set_aggressive_performance_settings()
        # Use file-based DuckDB destination for parallel runs
        destination = "duckdb"

    else:
        # Standard settings
        destination = "duckdb"
        # Ensure parallelized is False for the standard source (default)
        source.parallelized = False


    pipeline = dlt.pipeline(
        pipeline_name=pipeline_name,
        destination=destination,
        dataset_name=f"{pipeline_name}_data",
        progress="log",
        dev_mode=False  # Disable dev mode for better performance
    )

    start_time = time.time()

    try:
        info = pipeline.run(source, table_format="parquet" if optimized else None)
        end_time = time.time()

        print(f"\nPipeline completed successfully!")
        # The info object itself contains load metrics including record counts
        print(info)

    except Exception as e:
        end_time = time.time()
        print(f"Pipeline failed: {e}")
        # Re-raise the exception to show the full traceback
        raise

    # Get detailed timing stats
    stats = get_timing_stats(pipeline.last_trace)
    stats["wall_clock"] = end_time - start_time

    return stats


def print_comparison(naive_stats, optimized_stats):
    """Print detailed performance comparison."""
    print("\n" + "="*60)
    print("PERFORMANCE COMPARISON RESULTS")
    print("="*60)
    print(f"{'Stage':<15} | {'Naive (s)':<12} | {'Optimized (s)':<15} | {'Improvement':<12}")
    print("-" * 75)

    improvements = {}
    stages_to_compare = ["extract", "normalize", "load", "total", "wall_clock"]
    for stage in stages_to_compare:
        naive_t = naive_stats.get(stage, 0)
        opt_t = optimized_stats.get(stage, 0)

        if naive_t > 0:
            improvement = ((naive_t - opt_t) / naive_t) * 100
            improvements[stage] = improvement
            improvement_str = f"{improvement:.1f}%"
        elif opt_t > 0:
             # If naive was 0 (instant) but optimized took time
             improvement_str = "N/A"
        else:
            improvement_str = "N/A"


        print(f"{stage.replace('_', ' ').title():<15} | {naive_t:<12.2f} | {opt_t:<15.2f} | {improvement_str:<12}")

    print("-" * 75)

    naive_wall_clock = naive_stats.get("wall_clock", 0)
    optimized_wall_clock = optimized_stats.get("wall_clock", 0)

    if naive_wall_clock > 0 and optimized_wall_clock > 0:
        improvement = ((naive_wall_clock - optimized_wall_clock) / naive_wall_clock) * 100
        speedup = naive_wall_clock / optimized_wall_clock

        print(f"\n🚀 OPTIMIZATION RESULTS:")
        print(f"   • Total time improvement: {improvement:.1f}%")
        print(f"   • Speed-up factor: {speedup:.2f}x")
        print(f"   • Time saved: {naive_wall_clock - optimized_wall_clock:.1f} seconds")

        if improvement > 20:
            print(f"   • 🎉 Excellent optimization! Over 20% improvement achieved.")
        elif improvement > 10:
            print(f"   • ✅ Good optimization! Over 10% improvement achieved.")
        else:
            print(f"   • ⚠️  Modest improvement. Consider additional optimizations.")
    elif naive_wall_clock > 0 and optimized_wall_clock == 0:
         print("\n🚀 OPTIMIZATION RESULTS:")
         print(f"   • Optimized pipeline completed instantly (0s). Significant improvement!")
    elif naive_wall_clock == 0 and optimized_wall_clock > 0:
         print("\n🚀 OPTIMIZATION RESULTS:")
         print(f"   • Naive pipeline completed instantly (0s), optimized took {optimized_wall_clock:.2f}s.")
    else:
         print("\n🚀 OPTIMIZATION RESULTS:")
         print("   • Both pipelines completed instantly (0s). No significant difference observed.")



if __name__ == "__main__":
    print("🚀 Starting Advanced Jaffle Shop Pipeline Performance Benchmark")
    print(f"System: {CPU_COUNT} CPU cores detected")
    print("-" * 60)

    # Run the naive pipeline
    print("Phase 1: Running standard pipeline...")
    naive_source = jaffle_shop_source_standard()
    naive_stats = run_pipeline("jaffle_shop_naive", naive_source, optimized=False)

    # Run the highly optimized pipeline
    print("\nPhase 2: Running highly optimized pipeline...")
    optimized_source = jaffle_shop_source_optimized()
    optimized_stats = run_pipeline("jaffle_shop_optimized", optimized_source, optimized=True)

    # Print the performance comparison
    print_comparison(naive_stats, optimized_stats)

    print("\n✨ Benchmark completed!")

🚀 Starting Advanced Jaffle Shop Pipeline Performance Benchmark
System: 2 CPU cores detected
------------------------------------------------------------
Phase 1: Running standard pipeline...


TypeError: DltSource.__init__() got an unexpected keyword argument 'name'