In [16]:
"Sources, Resources & Transformers"

'Sources, Resources & Transformers'

In [2]:
import dlt
import pandas as pd
from dlt.common.typing import TDataItems

In [3]:
pipeline = dlt.pipeline(
    pipeline_name="csv_pipeline",
    destination="duckdb",
    dataset_name="csv_data"
)

In [4]:
@dlt.resource(table_name="df_data")
def my_df() -> TDataItems:
    df = pd.read_csv("https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv")
    yield df.to_dict(orient="records")

In [5]:
load_info = pipeline.run(my_df())
print(load_info)

Pipeline csv_pipeline load step completed in 0.59 seconds
1 load package(s) were loaded to destination duckdb and into dataset csv_data
The duckdb destination used duckdb:///c:\Users\HP\Desktop\dlt_test\csv_pipeline.duckdb location to store data
Load package 1767169029.7679594 is LOADED and contains no failed jobs


In [6]:
pipeline.dataset().df_data.df()

Unnamed: 0,index,height_inchesx,_weight_poundsx,_dlt_load_id,_dlt_id
0,1,65.78,112.99,1767169029.7679594,+/zIwOuqtDpr6w
1,2,71.52,136.49,1767169029.7679594,hrCRdMs+EyY5Pw
2,3,69.40,153.03,1767169029.7679594,cCaoWSrJ9loiUw
3,4,68.22,142.34,1767169029.7679594,jVO1npBhCMRZ5w
4,5,67.79,144.30,1767169029.7679594,8X7RYnYDMfnbCw
...,...,...,...,...,...
195,196,65.80,120.84,1767169029.7679594,GZQ6BFoa03vrug
196,197,66.11,115.78,1767169029.7679594,GpUEPCRdffHOUw
197,198,68.24,128.30,1767169029.7679594,Qvmh86E/v7iLkg
198,199,68.02,127.47,1767169029.7679594,pE+uiY/d+N9isg


In [12]:
"PostgreSQL -> DuckDB"

'PostgreSQL -> DuckDB'

In [7]:
import dlt
from sqlalchemy import create_engine, text
from dlt.common.typing import TDataItems

pipeline = dlt.pipeline(
    pipeline_name="mall_customers_pipeline",
    destination="duckdb",
    dataset_name="mall_data"
)

In [8]:
@dlt.resource(table_name="mall_customers")
def get_mall_customers() -> TDataItems:
    engine = create_engine("postgresql+psycopg2://postgres:aliza123@localhost:5432/Test_DB")

    with engine.connect() as conn:
        result = conn.execution_options(stream_results=True).execute(
            text("SELECT * FROM mall_customers")
        )
        for row in result:
            yield dict(row._mapping)  # convert row -> dict for dlt

In [9]:
load_info = pipeline.run(get_mall_customers())
print(load_info)

Pipeline mall_customers_pipeline load step completed in 0.52 seconds
1 load package(s) were loaded to destination duckdb and into dataset mall_data
The duckdb destination used duckdb:///c:\Users\HP\Desktop\dlt_test\mall_customers_pipeline.duckdb location to store data
Load package 1767170110.8082888 is LOADED and contains no failed jobs


In [11]:
df = pipeline.dataset().mall_customers.df()
df.head()

Unnamed: 0,customer_id,genre,age,annual_income_kx,spending_score_1_100x,_dlt_load_id,_dlt_id
0,1,Male,19,15,39,1767170110.8082888,eNUM3abqMUv8FQ
1,2,Male,21,15,81,1767170110.8082888,3N/rLsok2FrbFQ
2,3,Female,20,16,6,1767170110.8082888,Qx8veff7jKhbBg
3,4,Female,23,16,77,1767170110.8082888,+meomfNu44Kr1w
4,5,Female,31,17,40,1767170110.8082888,bSLhrSFyropLNw


In [13]:
# List all table names from the database
with pipeline.sql_client() as client:
    with client.execute_query(
        "SELECT table_name FROM information_schema.tables"
    ) as table:
        print(table.df())

            table_name
0       mall_customers
1           _dlt_loads
2  _dlt_pipeline_state
3         _dlt_version


In [14]:
"Rest APIs Endpoints"

'Rest APIs Endpoints'

In [15]:
import dlt
from dlt.sources.helpers import requests
from dlt.common.typing import TDataItems

pipeline = dlt.pipeline(
    pipeline_name="pokemon_pipeline",
    destination="duckdb",
    dataset_name="pokemon_data"
)

@dlt.resource(table_name="pokemon_api")
def get_pokemon() -> TDataItems:
    url = "https://pokeapi.co/api/v2/pokemon"
    response = requests.get(url)
    yield response.json()["results"]

load_info = pipeline.run(get_pokemon())
print(load_info)

pipeline.dataset().pokemon_api.df()

Pipeline pokemon_pipeline load step completed in 0.47 seconds
1 load package(s) were loaded to destination duckdb and into dataset pokemon_data
The duckdb destination used duckdb:///c:\Users\HP\Desktop\dlt_test\pokemon_pipeline.duckdb location to store data
Load package 1767170408.6445942 is LOADED and contains no failed jobs


Unnamed: 0,name,url,_dlt_load_id,_dlt_id
0,bulbasaur,https://pokeapi.co/api/v2/pokemon/1/,1767170408.6445942,6Nx/lYDl2Cj9oA
1,ivysaur,https://pokeapi.co/api/v2/pokemon/2/,1767170408.6445942,wFARKTZljJnxiQ
2,venusaur,https://pokeapi.co/api/v2/pokemon/3/,1767170408.6445942,8qbSCRatKOJvdQ
3,charmander,https://pokeapi.co/api/v2/pokemon/4/,1767170408.6445942,N5CLyEjcmAYhbA
4,charmeleon,https://pokeapi.co/api/v2/pokemon/5/,1767170408.6445942,GjmAJ/udTvgEbg
5,charizard,https://pokeapi.co/api/v2/pokemon/6/,1767170408.6445942,x6aj8aLrM2RpYg
6,squirtle,https://pokeapi.co/api/v2/pokemon/7/,1767170408.6445942,o3RmbOqpMQDTGQ
7,wartortle,https://pokeapi.co/api/v2/pokemon/8/,1767170408.6445942,cMZHV3D1a4pM9g
8,blastoise,https://pokeapi.co/api/v2/pokemon/9/,1767170408.6445942,D7lD4SVNlZtkmg
9,caterpie,https://pokeapi.co/api/v2/pokemon/10/,1767170408.6445942,JKrvIVf4cfIN4g


In [19]:
import dlt
import requests
from dlt.common.typing import TDataItems

data = [
    {"id": "1", "name": "bulbasaur", "size": {"weight": 6.9, "height": 0.7}},
    {"id": "4", "name": "charmander", "size": {"weight": 8.5, "height": 0.6}},
    {"id": "25", "name": "pikachu", "size": {"weight": 6, "height": 0.4}},
]

@dlt.resource(table_name="pokemon")
def my_dict_list() -> TDataItems:
    yield data

@dlt.transformer(data_from=my_dict_list, table_name="detailed_info")
def poke_details(items: TDataItems) -> TDataItems:
    for item in items:
        item_id = item["id"]
        url = f"https://pokeapi.co/api/v2/pokemon/{item_id}"
        details = requests.get(url, timeout=30).json()

        yield {
            "id": item_id,
            "name": details.get("name"),
            "height": details.get("height"),
            "weight": details.get("weight"),
            "base_experience": details.get("base_experience"),
            "types": [t["type"]["name"] for t in details.get("types", [])],
        }

def main():
    pipeline = dlt.pipeline(
        pipeline_name="quick_start",
        destination="duckdb",
        dataset_name="pokedata",
        dev_mode=True,
    )

    load_info = pipeline.run([
        my_dict_list(),               # loads "pokemon"
        my_dict_list() | poke_details # loads "detailed_info"
    ])
    print(load_info)

    #Query loaded tables
    ds = pipeline.dataset()
    print("\nAvailable tables:", ds.tables)

    print("\n--- pokemon (raw) ---")
    print(ds.pokemon.df().head())

    print("\n--- detailed_info (enriched) ---")
    print(ds.detailed_info.df().head())

    if "detailed_info__types" in ds.tables:
        print("\n--- detailed_info__types (normalized types) ---")
        print(ds.detailed_info__types.df().head())

if __name__ == "__main__":
    main()



Pipeline quick_start load step completed in 0.55 seconds
1 load package(s) were loaded to destination duckdb and into dataset pokedata_20251231102456
The duckdb destination used duckdb:///c:\Users\HP\Desktop\dlt_test\quick_start.duckdb location to store data
Load package 1767176696.6941404 is LOADED and contains no failed jobs

Available tables: ['pokemon', 'detailed_info', 'detailed_info__types', '_dlt_version', '_dlt_loads', '_dlt_pipeline_state']

--- pokemon (raw) ---
   id        name  size__weight  size__height        _dlt_load_id  \
0   1   bulbasaur           6.9           0.7  1767176696.6941404   
1   4  charmander           8.5           0.6  1767176696.6941404   
2  25     pikachu           6.0           0.4  1767176696.6941404   

          _dlt_id  
0  PblrKsDs9b5I8g  
1  SaKGOydh0HE+EQ  
2  iaElcmooUVG3DQ  

--- detailed_info (enriched) ---
   id        name  height  weight  base_experience        _dlt_load_id  \
0   1   bulbasaur       7      69               64  176717

In [20]:
import dlt
from dlt.common.typing import TDataItems

# Customers table
@dlt.resource(table_name="customers")
def customers() -> TDataItems:
    yield [
        {"id": 1, "name": "Alice"},
        {"id": 2, "name": "Bob"},
    ]

# Orders table
@dlt.resource(table_name="orders")
def orders() -> TDataItems:
    yield [
        {"order_id": 101, "customer_id": 1, "amount": 250},
        {"order_id": 102, "customer_id": 2, "amount": 400},
    ]

# Source: group related resources
@dlt.source
def shop_source():
    return customers(), orders()

# Pipeline: where data will be loaded
pipeline = dlt.pipeline(
    pipeline_name="shop_pipeline",
    destination="duckdb",
    dataset_name="shop_data"
)

# Running the source (loads BOTH tables together)
pipeline.run(shop_source())

# Query results
ds = pipeline.dataset()
print(ds.customers.df())
print(ds.orders.df())

   id   name        _dlt_load_id         _dlt_id
0   1  Alice  1767176811.3139133  Zh21ypZJagX6yQ
1   2    Bob  1767176811.3139133  2Dsxm7rRmeIFag
   order_id  customer_id  amount        _dlt_load_id         _dlt_id
0       101            1     250  1767176811.3139133  7CkN3BQjOgbB5A
1       102            2     400  1767176811.3139133  rIobiA7kua01SQ
