# ⚙️ Demo: Using dlt and Dagster to Ingest, Orchestrate, and Parallelize API Pipelines

In this demo, we’ll put dlt and Dagster into action by building a simple, production-style data pipeline — extracting data from the **Jaffle Shop API**, processing it with `dlt`, and materializing it into a local **DuckDB** database.

---

## 🔍 What We'll Cover

This demo is broken into two parts:

### 📝 **In This Notebook**
We’ll focus on the first part of the workflow:
- Creating a `dlt` pipeline using `rest_api_source`
- Configuring and running the pipeline locally
- Inspecting the loaded data in DuckDB

### 🧑‍💻 **Live Walkthrough (During the Demo)**
The following topics will be demonstrated live:
- Registering the dlt pipeline as a Dagster asset using `dlt_asset`
- Scheduling, monitoring, and lineage tracking in the Dagster UI
- Running multiple dlt pipelines in **parallel** using Dagster orchestration

---

By the end of this demo, you’ll see how dlt and Dagster work together to streamline data ingestion, asset management, and parallel execution — all with minimal boilerplate and maximum visibility.


### Installation

In [1]:
!uv pip install "dlt[duckdb]"

[2mUsing Python 3.11.13 environment at: /usr[0m
[2K[2mResolved [1m41 packages[0m [2min 653ms[0m[0m
[2K[37m⠙[0m [2mPreparing packages...[0m (0/8)
[2K[1A[37m⠙[0m [2mPreparing packages...[0m (0/8)
[2msemver    [0m [32m[2m------------------------------[0m[0m     0 B/17.49 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/8)
[2msemver    [0m [32m[2m------------------------------[0m[0m     0 B/17.49 KiB
[2K[2A[37m⠙[0m [2mPreparing packages...[0m (0/8)
[2mhexbytes  [0m [32m[2m------------------------------[0m[0m     0 B/4.96 KiB
[2msemver    [0m [32m[2m------------------------------[0m[0m     0 B/17.49 KiB
[2K[3A[37m⠙[0m [2mPreparing packages...[0m (0/8)
[2mhexbytes  [0m [32m------------------------------[2m[0m[0m 4.96 KiB/4.96 KiB
[2msemver    [0m [32m[2m------------------------------[0m[0m     0 B/17.49 KiB
[2K[3A[37m⠙[0m [2mPreparing packages...[0m (0/8)
[2msemver    [0m [32m[2m-----------------------------

In [2]:
import dlt
import requests

## About the API: Jaffle Shop API

In this demo, we’ll load data from the **Jaffle Shop API** — a fictional e-commerce service designed for testing and analytics workflows.  
It provides RESTful endpoints for common entities such as:

- `customers`
- `orders`
- `items`
- `products`
- `supplies`


This makes it a perfect sandbox for demonstrating data ingestion pipelines and transformations using `dlt`.

Check out the Jaffle Shops documentation:  
https://jaffle-shop.scalevector.ai/docs#/

In [None]:
# Sample url
base_url = "https://jaffle-shop.scalevector.ai/api/v1/"
endpoint = "orders"
url = f"{base_url}{endpoint}"
url

'https://jaffle-shop.scalevector.ai/api/v1/orders'

In [None]:
# sample API response
response = requests.get(url)
data = response.json()
data[0]

{'id': '9bed808a-5074-4dfb-b1eb-388e2e60a6da',
 'customer_id': '50a2d1c4-d788-4498-a6f7-dd75d4db588f',
 'store_id': '4b6c2304-2b9e-41e4-942a-cf11a1819378',
 'ordered_at': '2016-09-01T15:01:00',
 'subtotal': '700',
 'tax_paid': '42',
 'order_total': '742',
 'items': [{'id': '2e3cb58a-c73c-4216-9d70-66e91bb2ca32',
   'order_id': '9bed808a-5074-4dfb-b1eb-388e2e60a6da',
   'sku': 'BEV-004'}]}

### Loading Data from REST APIs with `rest_api_source`

If you're working with external APIs, `dlt` provides a built-in utility called **`rest_api_source`** to make REST data ingestion simple and production-ready.

This function helps you:

- Handle various pagination styles automatically (offset, cursor, page-based, etc.)
- Manage retries and rate limits gracefully
- Load data incrementally with built-in state tracking
- Normalize and infer schema automatically

It’s ideal for wrapping external services into reusable, declarative data sources.

You can find a thorough explanation of how to use `dlt` to load data from a REST API in the official tutorial:  
https://dlthub.com/docs/tutorial/rest-api


In [5]:
from dlt.sources.rest_api import rest_api_source

config = {"client": {
        "base_url": "https://jaffle-shop.scalevector.ai/api/v1/"
    },
    "resource_defaults": {
        "primary_key": "id",
        "endpoint": {
            "params": {
                "page_size": 100
            }
        }
    },
    "resources": [
            {"name": "customers", "endpoint": {"path": "customers"}},
            {"name": "orders", "endpoint": {"path": "orders"}},
            {"name": "items", "endpoint": {"path": "items"}},
            {"name": "products", "primary_key": "sku", "endpoint": {"path": "products"}},
            {"name": "supplies", "endpoint": {"path": "supplies"}},
        ]
}


In [None]:
# create the source
dlt_source = rest_api_source(config)
dlt_source

<@dlt.source(name='rest_api', n_resources=5, resources=['customers', 'orders', 'items', 'products', 'supplies'])>

#### 🛑 Limiting API Pagination for the Demo

To keep the demo lightweight and fast, we limit how many pages of data are fetched from each endpoint in the Jaffle Shop API. This is especially useful when working with paginated REST APIs that may return large datasets by default.


In [7]:
# limit for number of pages
RESOURCE_LIMIT= 10

# add limit to number of pages collected for each endpoint
dlt_source.customers.add_limit(RESOURCE_LIMIT)
dlt_source.orders.add_limit(RESOURCE_LIMIT)
dlt_source.items.add_limit(RESOURCE_LIMIT)
dlt_source.products.add_limit(RESOURCE_LIMIT)
dlt_source.supplies.add_limit(RESOURCE_LIMIT)

<@dlt.resource(name='supplies', primary_key='id', limit=10, write_disposition='append', incremental=<dlt.extract.incremental.IncrementalResourceWrapper object at 0x7b9b1925c550>, n_steps=3, steps=['function', 'IncrementalResourceWrapper', 'LimitItem'])>

### 🔄 The `dlt.pipeline`: The Core of Every dlt Workflow

At the heart of every `dlt` project is the **`dlt.pipeline`** — a high-level object that orchestrates the full process.

With `dlt.pipeline`, you can:

- Define the pipeline name, destination, and configuration
- Run data through the sources
- Automatically handle schema generation, state management, and data loading
- Track and inspect run metrics and logs

In [8]:
jaffle_shop_pipeline = dlt.pipeline(
        pipeline_name=f"jaffle_shop",
        destination="duckdb",
        dataset_name="jaffle_shop_dataset",
        progress="log"
    )

In [9]:
load_info = jaffle_shop_pipeline.run(dlt_source, write_disposition="replace")

------------------------------- Extract rest_api -------------------------------
Resources: 0/5 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 258.37 MB (8.90%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/5 (0.0%) | Time: 0.68s | Rate: 0.00/s
customers: 100  | Time: 0.00s | Rate: 15534459.26/s
Memory usage: 258.89 MB (8.90%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/5 (0.0%) | Time: 1.83s | Rate: 0.00/s
customers: 100  | Time: 1.15s | Rate: 86.91/s
orders: 100  | Time: 0.00s | Rate: 11037642.11/s
Memory usage: 259.40 MB (8.90%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/5 (0.0%) | Time: 2.59s | Rate: 0.00/s
customers: 100  | Time: 1.91s | Rate: 52.44/s
orders: 100  | Time: 0.76s | Rate: 132.26/s
items: 100  | Time: 0.00s | Rate: 6078701.45/s
Memory usage: 259.66 MB (9.20%) 



------------------------------- Extract rest_api -------------------------------
Resources: 0/5 (0.0%) | Time: 3.34s | Rate: 0.00/s
customers: 100  | Time: 2.65s | Rate: 37.69/s
orders: 100  | Time: 1.50s | Rate: 66.56/s
items: 100  | Time: 0.75s | Rate: 134.00/s
products: 10  | Time: 0.00s | Rate: 1075462.56/s
Memory usage: 259.92 MB (9.30%) | CPU usage: 0.00%





------------------------------- Extract rest_api -------------------------------
Resources: 0/5 (0.0%) | Time: 4.03s | Rate: 0.00/s
customers: 100  | Time: 3.35s | Rate: 29.86/s
orders: 100  | Time: 2.20s | Rate: 45.49/s
items: 100  | Time: 1.44s | Rate: 69.33/s
products: 10  | Time: 0.70s | Rate: 14.37/s
supplies: 65  | Time: 0.00s | Rate: 7573048.89/s
Memory usage: 259.92 MB (9.30%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/5 (0.0%) | Time: 5.74s | Rate: 0.00/s
customers: 200  | Time: 5.06s | Rate: 39.52/s
orders: 200  | Time: 3.91s | Rate: 51.15/s
items: 100  | Time: 3.15s | Rate: 31.71/s
products: 10  | Time: 2.41s | Rate: 4.15/s
supplies: 65  | Time: 1.71s | Rate: 37.98/s
Memory usage: 260.43 MB (9.20%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 2/5 (40.0%) | Time: 7.07s | Rate: 0.28/s
customers: 200  | Time: 6.39s | Rate: 31.31/s
orders: 200

### 📊 Accessing Pipeline and Dataset Info After a Run

Once your `dlt` pipeline has completed, you can programmatically access its metadata and loaded data using the returned `pipeline` object.

To learn more about accessing loaded datasets with dlt, check out the following: https://dlthub.com/docs/general-usage/dataset-access/dataset

In [17]:
# Pipeline object
jaffle_shop_pipeline

<dlt.pipeline(pipeline_name='jaffle_shop', destination='duckdb', dataset_name='jaffle_shop_dataset', default_schema_name='rest_api', schema_names=['rest_api'], pipelines_dir='/var/dlt/pipelines', working_dir='/var/dlt/pipelines/jaffle_shop')>

In [11]:
dataset = jaffle_shop_pipeline.dataset()
dataset # Displays basic metadata about the dataset

<dlt.dataset(dataset_name='jaffle_shop_dataset', destination="<dlt.destinations.duckdb(destination_type='duckdb', staging_dataset_name_layout='%s_staging', enable_dataset_name_normalization=True, info_tables_query_threshold=1000, truncate_tables_on_staging_destination_before_load=True, local_dir='/content', pipeline_name='jaffle_shop', pipeline_working_dir='/var/dlt/pipelines/jaffle_shop', create_indexes=False)>", schema="<dlt.Schema(name='rest_api', version=2, tables=['_dlt_version', '_dlt_loads', 'customers', 'orders', 'items', 'products', 'supplies', '_dlt_pipeline_state', 'orders__items'], version_hash='00/FulEVgWGfUc5jjPGiahQMqb4iW4rvwQpTN8pXFIw=')>")>

In [12]:
# List all user-defined table names in the schema (exclude internal _dlt tables)
[table for table in dataset.schema.tables.keys() if table[:4]!="_dlt"]

['customers', 'orders', 'items', 'products', 'supplies', 'orders__items']

In [13]:
# Load the 'orders' table into a DataFrame for inspection
orders = dataset.orders
orders_df = orders.df()
orders_df.head()

Unnamed: 0,id,customer_id,store_id,ordered_at,subtotal,tax_paid,order_total,_dlt_load_id,_dlt_id
0,9bed808a-5074-4dfb-b1eb-388e2e60a6da,50a2d1c4-d788-4498-a6f7-dd75d4db588f,4b6c2304-2b9e-41e4-942a-cf11a1819378,2016-09-01 15:01:00+00:00,700,42,742,1752050339.3941944,N7rYizVFCc7nuw
1,b83630c1-0fdc-4cd2-818d-0b6d4384ce86,438005c2-dd1d-48aa-8bfd-7fb06851b5f8,4b6c2304-2b9e-41e4-942a-cf11a1819378,2016-09-01 10:39:00+00:00,700,42,742,1752050339.3941944,e1/HvbeQQWEJKQ
2,3b4a03db-7b23-4673-a88a-7f51b01ca497,5261268c-aa94-438a-921a-05efc0d414ac,4b6c2304-2b9e-41e4-942a-cf11a1819378,2016-09-01 07:46:00+00:00,700,42,742,1752050339.3941944,odhosTlXncKZSw
3,3368b213-6687-4338-bf73-b927ae72340f,f8486fce-bc07-4a4f-a6e9-ed6a06ba996c,4b6c2304-2b9e-41e4-942a-cf11a1819378,2016-09-01 14:39:00+00:00,600,36,636,1752050339.3941944,EsO8JVw8ifjoRQ
4,739bf2f3-8d20-4159-9124-5451f1e4b136,341ed9b2-1760-4720-a1b1-42681d273c63,4b6c2304-2b9e-41e4-942a-cf11a1819378,2016-09-01 10:57:00+00:00,1100,65,1165,1752050339.3941944,z1idYUaAcgW2jQ


In [16]:
# Load the 'orders__items' nested table into a DataFrame
orders_items = dataset.orders__items
orders_items_df = orders_items.df()
orders_items_df.head()

Unnamed: 0,id,order_id,sku,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,2e3cb58a-c73c-4216-9d70-66e91bb2ca32,9bed808a-5074-4dfb-b1eb-388e2e60a6da,BEV-004,N7rYizVFCc7nuw,0,11C/oWRo2vigfQ
1,0d005cee-30f1-4426-a786-833fcc77ae34,b83630c1-0fdc-4cd2-818d-0b6d4384ce86,BEV-004,e1/HvbeQQWEJKQ,0,km5fpW34QReVNA
2,0b8661ef-9337-44cd-a12d-f2aa9badda01,3b4a03db-7b23-4673-a88a-7f51b01ca497,BEV-004,odhosTlXncKZSw,0,K1yK2ZLRG+sa9A
3,0e6118fd-4563-4feb-9cee-9a40135db550,3368b213-6687-4338-bf73-b927ae72340f,BEV-003,EsO8JVw8ifjoRQ,0,w7jwutGqDczEiA
4,b7b8e822-c78f-482b-8a18-8a86ad6d281e,739bf2f3-8d20-4159-9124-5451f1e4b136,BEV-005,z1idYUaAcgW2jQ,0,jUuWGVvfubOKuw


### ⚙️ Transitioning from `dlt` to `dlt_assets`: Scheduling and Multiprocessing with Dagster

So far, we’ve used **`dlt`** to build a working pipeline — ideal for data ingestion, transformation, and loading with minimal configuration.

But what if you want to:

- **Schedule your pipeline** to run hourly, daily, or on custom intervals?
- **Parallelize execution** across multiple workers for faster performance?
- Leverage a production-grade orchestration framework with a **UI for observability, logs, retries**, and more?

That’s where **`dlt_assets`** comes in.

`dlt_assets` lets you define your `dlt` pipelines as **Dagster assets**, enabling seamless integration with **Dagster’s orchestration engine**. This unlocks:

- **Scheduling**: Run your pipelines on a defined cadence using Dagster’s scheduler  
- **Multiprocessing**: Use multiple workers to run pipeline components in parallel, improving throughput and scalability  
- **Production reliability**: Retry logic, logging, alerting, and monitoring — all via Dagster's tooling  

---

Next, we'll refactor our existing `dlt` pipeline into a `dlt_assets` asset and show how to connect it to Dagster.