In [1]:
!pip install dlt[duckdb]



In [2]:
# Import required libraries
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator
import pandas as pd

print("dlt version:", dlt.__version__)

dlt version: 1.6.1


In [3]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

@dlt.resource(write_disposition="replace", name="rides")
def ny_taxi(client: RESTClient = None) -> dict:
    """Get NY taxi data from the API with pagination"""

    if not client:
        client = RESTClient(
            base_url="https://us-central1-dlthub-analytics.cloudfunctions.net"
        )

    # Set up pagination
    has_more = True
    page = 1

    while has_more:
        # Get data for current page
        response = client.get(
            path="/data_engineering_zoomcamp_api",
            params={"page": page}
        )
        data = response.json()

        # Check if we have more pages
        if not data:
            has_more = False
        else:
            # Yield each record individually
            for record in data:
                yield record
            page += 1

# Create and run pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

# Load data
load_info = pipeline.run(ny_taxi)
print("Pipeline load info:")
print(load_info)

# Check the data
print("\nTotal number of records:")
df = pipeline.dataset(dataset_type="default").rides.df()
print(len(df))

# Question 4: Trip Duration Analysis
print("\nAverage trip duration:")
with pipeline.sql_client() as client:
    res = client.execute_sql(
        """
        SELECT
        AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
        FROM rides;
        """
    )
    print(res)

Pipeline load info:
Pipeline ny_taxi_pipeline load step completed in 3.11 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////content/ny_taxi_pipeline.duckdb location to store data
Load package 1739812417.5334835 is LOADED and contains no failed jobs

Total number of records:
10000

Average trip duration:
[(12.3049,)]
