In [None]:
import os

#extract
os.environ['EXTRACT__WORKERS'] = '3'
#os.environ["EXTRACT__DATA_WRITER__FILE_MAX_ITEMS"] = "100000"
#normalize
os.environ['NORMALIZE__WORKERS'] = '5'
os.environ['NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS'] = '5000'
os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "5000"
#load
os.environ['LOAD__WORKERS'] = '5'

In [11]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
from itertools import islice
from retrying import retry

def yield_chunks(iterable, chunk_size=10):
    iterator = iter(iterable)
    while chunk := islice(iterator, chunk_size):  # <--- we slice data into chunks
        yield chunk


@dlt.source
def jaffle_source():
    client = RESTClient(
            base_url="https://jaffle-shop.scalevector.ai/api/v1",
            paginator=HeaderLinkPaginator(),
    )

    @dlt.resource(table_name="jaffle_customers", write_disposition="merge", primary_key="id", parallelized=True)
    def jaffle_customers():
      for page in client.paginate("customers"):
        yield page

    #@retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=10)
    @dlt.resource(table_name="jaffle_orders", write_disposition="merge", primary_key="id", parallelized=True)
    def jaffle_orders():
      params = {
            "page_size": 5000 #with the limit set to 100 the pipeline ran for approximately 10 minutes
        }
      for attempt in range(10):
        try:
          for page in client.paginate("orders", params=params):
            yield page
        except Exception as e:
          pass
        else:
          break
         
      #yield from yield_chunks(client.paginate("orders", params=params), chunk_size=10) #yielding chunks seems to overburden the api, causing excessively long load time, execution was stopped after 37 minutes

    @dlt.resource(table_name="jaffle_products", write_disposition="merge", primary_key="sku", parallelized=True)
    def jaffle_products():
      for page in client.paginate("products"):
        yield page

    return jaffle_customers,jaffle_orders,jaffle_products

In [12]:
pipeline = dlt.pipeline(
    pipeline_name="jaffle_shop_pipeline",
    destination="duckdb",
    dataset_name="jaffle_stage",
    dev_mode=True,
)

load_info = pipeline.run(jaffle_source())
print(pipeline.last_trace)

Run started at 2025-05-21 15:41:52.265628+00:00 and COMPLETED in 24 minutes and 30.13 seconds with 4 steps.
Step extract COMPLETED in 23 minutes and 39.07 seconds.

Load package 1747842112.3093712 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 29.00 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- jaffle_customers: 935 row(s)
- jaffle_products: 10 row(s)
- jaffle_orders: 86948 row(s)
- jaffle_orders__items: 128577 row(s)

Load package 1747842112.3093712 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 22.03 seconds.
Pipeline jaffle_shop_pipeline load step completed in 21.98 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_stage_20250521034152
The duckdb destination used duckdb:////workspaces/docker_advanced_dlt_course/jaffle_shop_pipeline.duckdb location to store data
Load package 1747842112.3093712 is

####Performance testing

first perf trial with the following settings:
os.environ['EXTRACT__WORKERS'] = '3'
os.environ['NORMALIZE__WORKERS'] = '2'
os.environ['LOAD__WORKERS'] = '2'

runtime: 6min 46sec

output:
Run started at 2025-05-21 10:09:02.540887+00:00 and COMPLETED in 6 minutes and 45.37 seconds with 4 steps.
Step extract COMPLETED in 5 minutes and 53.30 seconds.

Load package 1747822142.5831456 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 27.44 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- jaffle_customers: 935 row(s)
- jaffle_orders: 61948 row(s)
- jaffle_orders__items: 90900 row(s)
- jaffle_products: 10 row(s)

Load package 1747822142.5831456 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 24.51 seconds.
Pipeline jaffle_shop_pipeline load step completed in 24.46 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_stage_20250521100902
The duckdb destination used duckdb:////workspaces/docker_advanced_dlt_course/jaffle_shop_pipeline.duckdb location to store data
Load package 1747822142.5831456 is LOADED and contains no failed jobs

Step run COMPLETED in 6 minutes and 45.37 seconds.
Pipeline jaffle_shop_pipeline load step completed in 24.46 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_stage_20250521100902
The duckdb destination used duckdb:////workspaces/docker_advanced_dlt_course/jaffle_shop_pipeline.duckdb location to store data
Load package 1747822142.5831456 is LOADED and contains no failed jobs

####V3 keeping the parameters, but setting the size of the pages to 100 causes the extract step to be significantly slower.
Run started at 2025-05-21 12:41:11.649454+00:00 and COMPLETED in 6 minutes and 53.75 seconds with 4 steps.
Step extract COMPLETED in 6 minutes and 12.53 seconds.

Load package 1747831271.7042348 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 22.03 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- jaffle_customers: 935 row(s)
- jaffle_orders: 61948 row(s)
- jaffle_orders__items: 90900 row(s)
- jaffle_products: 10 row(s)

Load package 1747831271.7042348 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 19.14 seconds.
Pipeline jaffle_shop_pipeline load step completed in 19.08 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_stage_20250521124111
The duckdb destination used duckdb:////workspaces/docker_advanced_dlt_course/jaffle_shop_pipeline.duckdb location to store data
Load package 1747831271.7042348 is LOADED and contains no failed jobs

Step run COMPLETED in 6 minutes and 53.74 seconds.
Pipeline jaffle_shop_pipeline load step completed in 19.08 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_stage_20250521124111
The duckdb destination used duckdb:////workspaces/docker_advanced_dlt_course/jaffle_shop_pipeline.duckdb location to store data
Load package 1747831271.7042348 is LOADED and contains no failed jobs

####V2
updating the buffer size and file rotation parameters yielded a small performance improvement
output:

Run started at 2025-05-21 10:32:08.689815+00:00 and COMPLETED in 6 minutes and 25.53 seconds with 4 steps.
Step extract COMPLETED in 5 minutes and 41.39 seconds.

Load package 1747823528.7447271 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 23.37 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- jaffle_customers: 935 row(s)
- jaffle_orders: 61948 row(s)
- jaffle_orders__items: 90900 row(s)
- jaffle_products: 10 row(s)

Load package 1747823528.7447271 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 20.74 seconds.
Pipeline jaffle_shop_pipeline load step completed in 20.69 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_stage_20250521103208
The duckdb destination used duckdb:////workspaces/docker_advanced_dlt_course/jaffle_shop_pipeline.duckdb location to store data
Load package 1747823528.7447271 is LOADED and contains no failed jobs

Step run COMPLETED in 6 minutes and 25.53 seconds.
Pipeline jaffle_shop_pipeline load step completed in 20.69 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_stage_20250521103208
The duckdb destination used duckdb:////workspaces/docker_advanced_dlt_course/jaffle_shop_pipeline.duckdb location to store data
Load package 1747823528.7447271 is LOADED and contains no failed jobs