In [1]:
!pip install dlt[duckdb]


Collecting dlt[duckdb]
  Using cached dlt-1.6.1-py3-none-any.whl.metadata (11 kB)
Collecting PyYAML>=5.4.1 (from dlt[duckdb])
  Downloading PyYAML-6.0.2-cp312-cp312-win_amd64.whl.metadata (2.1 kB)
Collecting click>=7.1 (from dlt[duckdb])
  Downloading click-8.1.8-py3-none-any.whl.metadata (2.3 kB)
Collecting duckdb>=0.9 (from dlt[duckdb])
  Using cached duckdb-1.2.0-cp312-cp312-win_amd64.whl.metadata (995 bytes)
Collecting fsspec>=2022.4.0 (from dlt[duckdb])
  Downloading fsspec-2025.2.0-py3-none-any.whl.metadata (11 kB)
Collecting gitpython>=3.1.29 (from dlt[duckdb])
  Using cached GitPython-3.1.44-py3-none-any.whl.metadata (13 kB)
Collecting giturlparse>=0.10.0 (from dlt[duckdb])
  Using cached giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt[duckdb])
  Using cached hexbytes-1.3.0-py3-none-any.whl.metadata (3.3 kB)
Collecting humanize>=4.4.0 (from dlt[duckdb])
  Using cached humanize-4.12.0-py3-none-any.whl.metadata (7.8 kB)
Collecting js

In [2]:
!dlt --version

[39mdlt 1.6.1[0m


In [3]:
import dlt
print("dlt version:", dlt.__version__)

dlt version: 1.6.1


In [5]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory

pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

In [6]:
load_info = pipeline.run(ny_taxi)
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 2.51 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:///C:\Users\pc\Documents\GitHub\data-engineering-zoomcamp\workshops-dlt\ny_taxi_pipeline.duckdb location to store data
Load package 1739795617.6136181 is LOADED and contains no failed jobs


In [9]:
import duckdb

# A database '.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


In [10]:
conn.sql("select count(1) from rides")

┌──────────┐
│ count(1) │
│  int64   │
├──────────┤
│    10000 │
└──────────┘

In [11]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]
