# Pagination API_Test:

In [None]:
import requests
BASE_API_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"
page_number = 1
while True:
    params= {"page": page_number}
    response = requests.get(BASE_API_URL, params=params)
    data = response.json()
    if len(data) == 0:
        break
    print(data)
    # for row in data:
    #     print(row)
    page_number += 1
    # limit the number of pages for testing
    if page_number > 2:
      break

# Generator
#### When yield is encountered

- The function pauses execution and returns page_json to the caller.
- The function remembers its state, so when called again, it resumes from where it left off (after yield).

In [None]:
import requests

BASE_API_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"

def paginated_getter():
    page_number = 1
    while True:
        params = {'page': page_number}
        response = requests.get(BASE_API_URL, params=params)
        response.raise_for_status()
        page_json = response.json()
        print(f'Got page {page_number} with {len(page_json)} records')

        if page_json:
            yield page_json
            page_number += 1
        else:
            break


for i in paginated_getter():
    print(i)
    break

# dIt REST API 

In [None]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


def paginated_getter():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        # Define pagination strategy - page-based pagination
        paginator=PageNumberPaginator(   # <--- Pages are numbered (1, 2, 3, ...)
            base_page=1,   # <--- Start from page 1
            total_path=None    # <--- No total count of pages provided by API, pagination should stop when a page contains no result items
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # remember about memory management and yield data

for page_data in paginated_getter():
    print(page_data)
    break

# Normalizing Data

In [17]:
page_data[0]

{'End_Lat': 40.742963,
 'End_Lon': -73.980072,
 'Fare_Amt': 45.0,
 'Passenger_Count': 1,
 'Payment_Type': 'Credit',
 'Rate_Code': None,
 'Start_Lat': 40.641525,
 'Start_Lon': -73.787442,
 'Tip_Amt': 9.0,
 'Tolls_Amt': 4.15,
 'Total_Amt': 58.15,
 'Trip_Distance': 17.52,
 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00',
 'Trip_Pickup_DateTime': '2009-06-14 23:23:00',
 'mta_tax': None,
 'store_and_forward': None,
 'surcharge': 0.0,
 'vendor_name': 'VTS'}

In [18]:
data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "coordinates": {
            "start": {"lon": -73.787442, "lat": 40.641525},
            "end": {"lon": -73.980072, "lat": 40.742963}
        },
        "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
        ]
    }
]

In [19]:
data

[{'vendor_name': 'VTS',
  'record_hash': 'b00361a396177a9cb410ff61f20015ad',
  'time': {'pickup': '2009-06-14 23:23:00', 'dropoff': '2009-06-14 23:48:00'},
  'coordinates': {'start': {'lon': -73.787442, 'lat': 40.641525},
   'end': {'lon': -73.980072, 'lat': 40.742963}},
  'passengers': [{'name': 'John', 'rating': 4.9},
   {'name': 'Jack', 'rating': 3.9}]}]

In [None]:
import dlt

# Define a dlt pipeline with automatic normalization
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_data",
    destination="duckdb",
    dataset_name="taxi_rides"
)

# Run the pipeline with raw nested data
info = pipeline.run(data, table_name="rides", write_disposition="replace")

# Print the load summary
print(info)

print(pipeline.last_trace)

Pipeline ny_taxi_data load step completed in 1.21 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:///C:\Users\karee\ny_taxi_data.duckdb location to store data
Load package 1739718716.4614036 is LOADED and contains no failed jobs
Run started at 2025-02-16 15:11:56.099019+00:00 and COMPLETED in 1.94 seconds with 4 steps.
Step extract COMPLETED in 0.20 seconds.

Load package 1739718716.4614036 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.18 seconds.
Normalized data for the following tables:
- rides: 1 row(s)
- rides__passengers: 2 row(s)

Load package 1739718716.4614036 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 1.29 seconds.
Pipeline ny_taxi_data load step completed in 1.21 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used du

In [24]:
pipeline.dataset(dataset_type="default").rides.df().head()


Unnamed: 0,vendor_name,record_hash,time__pickup,time__dropoff,coordinates__start__lon,coordinates__start__lat,coordinates__end__lon,coordinates__end__lat,_dlt_load_id,_dlt_id
0,VTS,b00361a396177a9cb410ff61f20015ad,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,-73.787442,40.641525,-73.980072,40.742963,1739713860.50545,Pv5FH9dwVitJNA


In [25]:
pipeline.dataset(dataset_type="default").rides__passengers.df().head()


Unnamed: 0,name,rating,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,John,4.9,Pv5FH9dwVitJNA,0,u9Bvh5LOSGeHEg
1,Jack,3.9,Pv5FH9dwVitJNA,1,UB5GV/uNKBEbRQ


In [26]:
pipeline.dataset(dataset_type="default")._dlt_pipeline_state.df().head()


Unnamed: 0,version,engine_version,pipeline_name,state,created_at,version_hash,_dlt_load_id,_dlt_id
0,1,4,ny_taxi_data,eNptj0GLwkAMhf/LnIuLIroWPIgI6sKKehKRIXZSO22dDk...,2025-02-16 13:51:00.567377+00:00,j/tok8O2AY7spnjQCvYEOSes6kpreJNHjIfRlQ7xK9c=,1739713860.50545,nKQfLfv1BELeAg


# First Datapipeline (api -> normalization -> duckdb)

In [27]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory

# define new dlt pipeline
pipeline = dlt.pipeline(destination="duckdb")

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)

# explore loaded data
pipeline.dataset(dataset_type="default").rides.df()

Pipeline dlt_ipykernel_launcher load step completed in 3.52 seconds
1 load package(s) were loaded to destination duckdb and into dataset dlt_ipykernel_launcher_dataset
The duckdb destination used duckdb:///C:\Users\karee\dlt_ipykernel_launcher.duckdb location to store data
Load package 1739716571.970795 is LOADED and contains no failed jobs


Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739716571.970795,NQb7vhO0FyIWrw,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739716571.970795,1rWfDxb7SnasQw,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739716571.970795,aIMv9hocWn5myA,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739716571.970795,XWfMYeJ3QlY9AQ,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739716571.970795,vGSV1K8dSzGCpw,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739716571.970795,aZIojQVmVmJF4Q,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739716571.970795,Lwy5H+Mk96G/Qg,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739716571.970795,broYohRno131Hg,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739716571.970795,R29j4CdZL6eFnQ,


In [29]:
print(pipeline.last_trace)

Run started at 2025-02-16 14:36:11.747013+00:00 and COMPLETED in 37.66 seconds with 4 steps.
Step extract COMPLETED in 26.54 seconds.

Load package 1739716571.970795 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 7.23 seconds.
Normalized data for the following tables:
- rides: 10000 row(s)
- _dlt_pipeline_state: 1 row(s)

Load package 1739716571.970795 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 3.68 seconds.
Pipeline dlt_ipykernel_launcher load step completed in 3.52 seconds
1 load package(s) were loaded to destination duckdb and into dataset dlt_ipykernel_launcher_dataset
The duckdb destination used duckdb:///C:\Users\karee\dlt_ipykernel_launcher.duckdb location to store data
Load package 1739716571.970795 is LOADED and contains no failed jobs

Step run COMPLETED in 37.66 seconds.
Pipeline dlt_ipykernel_launcher load step completed in 3.52 seconds
1 load package(s)

# Incremental Load

In [35]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


@dlt.resource(name="rides", write_disposition="append")
def ny_taxi(
    cursor_date=dlt.sources.incremental(
        "Trip_Dropoff_DateTime",   # <--- field to track, our timestamp
        initial_value="2009-06-15",   # <--- start date June 15, 2009
        )
    ):
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

# define new dlt pipeline
pipeline = dlt.pipeline(pipeline_name="ny_taxi", destination="duckdb", dataset_name="ny_taxi_data")

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)
print(pipeline.last_trace)

Run started at 2025-02-16 15:24:17.825355+00:00 and COMPLETED in 25.73 seconds with 4 steps.
Step extract COMPLETED in 25.32 seconds.

Load package 1739719458.0970223 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.16 seconds.
No data found to normalize

Step load COMPLETED in 0.06 seconds.
Pipeline ny_taxi load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:///C:\Users\karee\ny_taxi.duckdb location to store data

Step run COMPLETED in 25.72 seconds.
Pipeline ny_taxi load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:///C:\Users\karee\ny_taxi.duckdb location to store data


In [36]:
pipeline.dataset(dataset_type="default").rides.df()

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.0,8.5,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739718868.592805,XhiNDJCA/GHRwA,
1,40.741075,-73.986537,4.1,5,CASH,40.752078,-73.978600,0.0,0.0,4.1,0.88,2009-06-16 12:59:00+00:00,2009-06-16 12:56:00+00:00,0.0,VTS,1739718868.592805,rQlPO3TS/W9NHg,
2,40.728683,-73.981595,11.7,1,Credit,40.759070,-73.984905,4.0,0.0,15.7,2.82,2009-06-16 12:58:00+00:00,2009-06-16 12:39:00+00:00,0.0,VTS,1739718868.592805,VBRhi7vVYN5V1g,
3,40.745173,-73.998107,10.1,1,CASH,40.760320,-73.964637,0.0,0.0,10.6,2.83,2009-06-15 20:18:00+00:00,2009-06-15 20:05:00+00:00,0.5,VTS,1739718868.592805,Zju6Y6ZbNJ0XZA,
4,40.779377,-73.987643,9.7,1,CASH,40.761615,-73.966648,0.0,0.0,9.7,2.08,2009-06-16 12:59:00+00:00,2009-06-16 12:44:00+00:00,0.0,VTS,1739718868.592805,o6DnUyNYl6fz4A,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5320,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.0,5.7,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739718868.592805,kGmd2hdmceWGEA,
5321,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.0,4.1,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739718868.592805,RAUYQfi0qm6s2Q,
5322,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.0,6.1,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739718868.592805,gk/g83d9R3hi2A,
5323,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.0,6.2,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739718868.592805,9MUbAOvabufOlA,


In [37]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            count(1)
            FROM rides;
            """
        )
    print(res)

[(5325,)]


# Homework

### 1. First Question


In [None]:
import dlt
dlt.__version__

'1.6.1'

### 2. Second Question

In [8]:
#2.
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# your code is here
@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
    base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
    paginator=PageNumberPaginator(
        base_page=1,
        total_path=None
        )
    )
    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

load_info = pipeline.run(ny_taxi)
print(load_info)

pipeline.dataset(dataset_type="default").rides.df()



Pipeline ny_taxi_pipeline load step completed in 3.56 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:///C:\Users\karee\ny_taxi_pipeline.duckdb location to store data
Load package 1739723449.3523223 is LOADED and contains no failed jobs


Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739723449.3523223,52UXT24oSR3zdQ,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739723449.3523223,OPgdX13bzCCNUw,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739723449.3523223,KTtTYZm8FwSxVw,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739723449.3523223,5SGOEBeEmLFflQ,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739723449.3523223,TRgml5dzM0O1OQ,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739723449.3523223,lgRscAZX5Y+aZA,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739723449.3523223,0C/lFIbV2PwReA,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739723449.3523223,xH2vDdJ/SkSqjw,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739723449.3523223,b1LJICaUJCiJDQ,


In [None]:
import duckdb

# Connect to the DuckDB database
conn = duckdb.connect("ny_taxi_pipeline.duckdb")

# Set search path to the dataset
conn.sql("SET search_path = 'ny_taxi_data'")

# Describe the dataset
conn.sql("DESCRIBE").df()


Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


### 3. Third Question

In [16]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            Count(1)
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(10000,)]


### 4. Fourth Question

In [15]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]
