# Q1: dlt Version

In [1]:
# import dlt
!pip install dlt[duckdb]

Collecting dlt[duckdb]
  Downloading dlt-1.6.1-py3-none-any.whl.metadata (11 kB)
Collecting click>=7.1 (from dlt[duckdb])
  Downloading click-8.1.8-py3-none-any.whl.metadata (2.3 kB)
Collecting duckdb>=0.9 (from dlt[duckdb])
  Downloading duckdb-1.2.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (966 bytes)
Collecting fsspec>=2022.4.0 (from dlt[duckdb])
  Downloading fsspec-2025.2.0-py3-none-any.whl.metadata (11 kB)
Collecting gitpython>=3.1.29 (from dlt[duckdb])
  Downloading GitPython-3.1.44-py3-none-any.whl.metadata (13 kB)
Collecting giturlparse>=0.10.0 (from dlt[duckdb])
  Downloading giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt[duckdb])
  Downloading hexbytes-1.3.0-py3-none-any.whl.metadata (3.3 kB)
Collecting humanize>=4.4.0 (from dlt[duckdb])
  Downloading humanize-4.11.0-py3-none-any.whl.metadata (7.8 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt[duckdb])
  Downloading jsonpath_ng-1.7.0-py3-none-any.whl.met

In [2]:
!dlt --version

[39mdlt 1.6.1[0m


# Q2: Define & Run the Pipeline (NYC Taxi API)

In [3]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

# Define the API resource for NYC taxi data
# STEP 01: Use the @dlt.resource decorator to define the API source.
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)


# STEP 02: Implement automatic pagination using dlt's built-in REST client.
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory

# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

# STEP 03: Load the extracted data into DuckDB for querying.
# load the data into DuckDB to test
load_info = pipeline.run(ny_taxi)
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 2.32 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////home/jovyan/ny_taxi_pipeline.duckdb location to store data
Load package 1739508783.060955 is LOADED and contains no failed jobs


In [6]:
import duckdb
#from google.colab import data_table
#data_table.enable_dataframe_formatter()

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
df = conn.sql("DESCRIBE").df()

print(df)

           database        schema                 name  \
0  ny_taxi_pipeline  ny_taxi_data           _dlt_loads   
1  ny_taxi_pipeline  ny_taxi_data  _dlt_pipeline_state   
2  ny_taxi_pipeline  ny_taxi_data         _dlt_version   
3  ny_taxi_pipeline  ny_taxi_data                rides   

                                        column_names  \
0  [load_id, schema_name, status, inserted_at, sc...   
1  [version, engine_version, pipeline_name, state...   
2  [version, engine_version, inserted_at, schema_...   
3  [end_lat, end_lon, fare_amt, passenger_count, ...   

                                        column_types  temporary  
0  [VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...      False  
1  [BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...      False  
2  [BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...      False  
3  [DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...      False  


# Q3: Explore the loaded data