## Workshop "Data Ingestion with dlt": Homework
Dataset & API
We’ll use NYC Taxi data via the same custom API from the workshop:

🔹 Base API URL:

https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api <br>
🔹 Data format: Paginated JSON (1,000 records per page). <br>
🔹 API Pagination: Stop when an empty page is returned.

Question 1: dlt Version
Install dlt:

In [None]:
 !pip install dlt[duckdb]

Collecting dlt[duckdb]
  Downloading dlt-1.6.1-py3-none-any.whl.metadata (11 kB)
Collecting fsspec>=2022.4.0 (from dlt[duckdb])
  Downloading fsspec-2025.2.0-py3-none-any.whl.metadata (11 kB)
Collecting giturlparse>=0.10.0 (from dlt[duckdb])
  Downloading giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt[duckdb])
  Downloading hexbytes-1.3.0-py3-none-any.whl.metadata (3.3 kB)
Collecting humanize>=4.4.0 (from dlt[duckdb])
  Downloading humanize-4.11.0-py3-none-any.whl.metadata (7.8 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt[duckdb])
  Downloading jsonpath_ng-1.7.0-py3-none-any.whl.metadata (18 kB)
Collecting makefun>=1.15.0 (from dlt[duckdb])
  Downloading makefun-1.15.6-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting orjson!=3.10.1,!=3.9.11,!=3.9.12,!=3.9.13,!=3.9.14,<4,>=3.6.7 (from dlt[duckdb])
  Downloading orjson-3.10.15-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (41 kB)
Collecting pathvalidate>=2.5.2 (from dl

In [None]:
dbutils.library.restartPython() 

In [None]:
! dlt --version


[39mdlt 1.6.1[0m


Answer1 -- The version is '**1.6.1**' <br>

Question 2: Define & Run the Pipeline (NYC Taxi API)

In [None]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


def paginated_getter():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        # Define pagination strategy - page-based pagination
        paginator=PageNumberPaginator(   # <--- Pages are numbered (1, 2, 3, ...)
            base_page=1,   # <--- Start from page 1
            total_path=None    # <--- No total count of pages provided by API, pagination should stop when a page contains no result items
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # remember about memory management and yield data


for page_data in paginated_getter():
    print(page_data)
    break

Unexpected internal error when monkey patching dlt module: cannot import name 'overrides' from partially initialized module 'dlt' (most likely due to a circular import) (/local_disk0/.ephemeral_nfs/envs/pythonEnv-fbfb5295-9e85-460d-9b5b-73e278cb4f2b/lib/python3.11/site-packages/dlt/__init__.py)


[{'End_Lat': 40.742963, 'End_Lon': -73.980072, 'Fare_Amt': 45.0, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.641525, 'Start_Lon': -73.787442, 'Tip_Amt': 9.0, 'Tolls_Amt': 4.15, 'Total_Amt': 58.15, 'Trip_Distance': 17.52, 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00', 'Trip_Pickup_DateTime': '2009-06-14 23:23:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 0.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.740187, 'End_Lon': -74.005698, 'Fare_Amt': 6.5, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.722065, 'Start_Lon': -74.009767, 'Tip_Amt': 1.0, 'Tolls_Amt': 0.0, 'Total_Amt': 8.5, 'Trip_Distance': 1.56, 'Trip_Dropoff_DateTime': '2009-06-18 17:43:00', 'Trip_Pickup_DateTime': '2009-06-18 17:35:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 1.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.718043, 'End_Lon': -74.004745, 'Fare_Amt': 12.5, 'Passenger_Count': 5, 'Payment_Type': 'Credit', 'Rate_Code': N

In [None]:
item = page_data[0]
item

{'End_Lat': 40.742963,
 'End_Lon': -73.980072,
 'Fare_Amt': 45.0,
 'Passenger_Count': 1,
 'Payment_Type': 'Credit',
 'Rate_Code': None,
 'Start_Lat': 40.641525,
 'Start_Lon': -73.787442,
 'Tip_Amt': 9.0,
 'Tolls_Amt': 4.15,
 'Total_Amt': 58.15,
 'Trip_Distance': 17.52,
 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00',
 'Trip_Pickup_DateTime': '2009-06-14 23:23:00',
 'mta_tax': None,
 'store_and_forward': None,
 'surcharge': 0.0,
 'vendor_name': 'VTS'}

In [None]:
page_data[104]

{'End_Lat': 40.752725,
 'End_Lon': -73.986827,
 'Fare_Amt': 6.9,
 'Passenger_Count': 1,
 'Payment_Type': 'Credit',
 'Rate_Code': None,
 'Start_Lat': 40.750013,
 'Start_Lon': -73.9916,
 'Tip_Amt': 2.0,
 'Tolls_Amt': 0.0,
 'Total_Amt': 8.9,
 'Trip_Distance': 0.69,
 'Trip_Dropoff_DateTime': '2009-06-15 12:53:00',
 'Trip_Pickup_DateTime': '2009-06-15 12:43:00',
 'mta_tax': None,
 'store_and_forward': None,
 'surcharge': 0.0,
 'vendor_name': 'VTS'}

In [None]:
%%capture
!pip install dlt

In [None]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory


# define new dlt pipeline
pipeline = dlt.pipeline(destination="duckdb")


# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)


# explore loaded data
pipeline.dataset(dataset_type="default").rides.df()

Pipeline dlt_db_ipykernel_launcher load step completed in 3.76 seconds
1 load package(s) were loaded to destination duckdb and into dataset dlt_db_ipykernel_launcher_dataset
The duckdb destination used duckdb:////Workspace/Users/revathy@calfrac.com/dlt_db_ipykernel_launcher.duckdb location to store data
Load package 1739309395.5686655 is LOADED and contains no failed jobs


Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739309395.5686655,yAmyKcJm/zm6Rg,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739309395.5686655,cqoNX1QSepVWBQ,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739309395.5686655,po80taABH4vZ2g,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739309395.5686655,QlFlm/zMFPuINg,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739309395.5686655,h/v7Nja59JfsbA,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739309395.5686655,0FAAwkiSOcGzKg,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739309395.5686655,wVnOApUBcjr07w,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739309395.5686655,12Rmx/hj2I1gYg,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739309395.5686655,IM1oZtIZTlvvzg,


In [None]:
import duckdb
#from google.colab import data_table
#data_table.enable_dataframe_formatter()

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


In [None]:
conn.sql("DESCRIBE rides").df()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,end_lat,DOUBLE,YES,,,
1,end_lon,DOUBLE,YES,,,
2,fare_amt,DOUBLE,YES,,,
3,passenger_count,BIGINT,YES,,,
4,payment_type,VARCHAR,YES,,,
5,start_lat,DOUBLE,YES,,,
6,start_lon,DOUBLE,YES,,,
7,tip_amt,DOUBLE,YES,,,
8,tolls_amt,DOUBLE,YES,,,
9,total_amt,DOUBLE,YES,,,


In [None]:
conn.sql("SHOW ALL TABLES").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,dlt_db_ipykernel_launcher,dlt_db_ipykernel_launcher_dataset,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


How many tables were created? <br>
Answer2 -- It looks like **4** tables were created <br>

Question 3: Explore the loaded data

In [None]:
df = pipeline.dataset(dataset_type="default").rides.df()


In [None]:
df.shape

(10000, 18)

What is the total number of records extracted? <br>

Answer3 :**10000**

Question 4: Trip Duration Analysis
Run the SQL query below to:

Calculate the average trip duration in minutes.

In [None]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]


What is the average trip duration? <br>

Answer 4 = **12.309**