In [None]:
#!pip install duckdb
#!pip install dlt[duckdb]

In [7]:
import dlt
print("dlt version:", dlt.__version__)

dlt version: 1.6.1


### Define the Data Extraction Logic
We'll define the API source using the @dlt.resource decorator and use the built-in RESTClient to handle pagination.

In [18]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

### Create the DLT Pipeline
We'll define the dlt.pipeline() to load the extracted data into DuckDB:

In [19]:
# Create the pipeline to load data into DuckDB
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)


# Extract data and load it into DuckDB
load_info = pipeline.run(ny_taxi())

# Display the load summary
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 2.62 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:///C:\Users\niky4\ny_taxi_pipeline.duckdb location to store data
Load package 1739811607.3348522 is LOADED and contains no failed jobs


### Establish connection in DuckDB
This connects to the DuckDB database file (ny_taxi_pipeline.duckdb) created by the dlt pipeline and sets the search path to the dataset (ny_taxi_data).

In [20]:
import duckdb

# Connect to the DuckDB database created by the pipeline
conn = duckdb.connect("ny_taxi_pipeline.duckdb")

# Set search path to the dataset
conn.execute("SET search_path = 'ny_taxi_data'")

<duckdb.duckdb.DuckDBPyConnection at 0x1770de72ab0>

[]


### List All Tables
To see the tables created in the database, run:

In [21]:
# List all tables in the current dataset
tables = conn.execute("SHOW TABLES").fetchdf()

# Display the tables
print(tables)

                  name
0           _dlt_loads
1  _dlt_pipeline_state
2         _dlt_version
3                rides


### Describe the Tables
To check the schema of the tables:

In [23]:
# Describe the rides table schema
schema = conn.execute("DESCRIBE rides").fetchdf()

# Display the schema details
print(schema)

               column_name               column_type null   key default extra
0                  end_lat                    DOUBLE  YES  None    None  None
1                  end_lon                    DOUBLE  YES  None    None  None
2                 fare_amt                    DOUBLE  YES  None    None  None
3          passenger_count                    BIGINT  YES  None    None  None
4             payment_type                   VARCHAR  YES  None    None  None
5                start_lat                    DOUBLE  YES  None    None  None
6                start_lon                    DOUBLE  YES  None    None  None
7                  tip_amt                    DOUBLE  YES  None    None  None
8                tolls_amt                    DOUBLE  YES  None    None  None
9                total_amt                    DOUBLE  YES  None    None  None
10           trip_distance                    DOUBLE  YES  None    None  None
11  trip_dropoff_date_time  TIMESTAMP WITH TIME ZONE  YES  None 

### View Sample Data
We can also look at the first few rows of data to get a feel for the content:

In [24]:
# Preview the first 5 rows of the rides table
sample_data = conn.execute("SELECT * FROM rides LIMIT 5").fetchdf()

# Display the sample data
print(sample_data)


     end_lat    end_lon  fare_amt  passenger_count payment_type  start_lat  \
0  40.742963 -73.980072      45.0                1       Credit  40.641525   
1  40.740187 -74.005698       6.5                1       Credit  40.722065   
2  40.718043 -74.004745      12.5                5       Credit  40.761945   
3  40.739637 -73.985233       4.9                1         CASH  40.749802   
4  40.730032 -73.852693      25.7                1         CASH  40.776825   

   start_lon  tip_amt  tolls_amt  total_amt  trip_distance  \
0 -73.787442      9.0       4.15      58.15          17.52   
1 -74.009767      1.0       0.00       8.50           1.56   
2 -73.983038      2.0       0.00      15.50           3.37   
3 -73.992247      0.0       0.00       5.40           1.11   
4 -73.949233      0.0       4.15      29.85          11.09   

     trip_dropoff_date_time     trip_pickup_date_time  surcharge vendor_name  \
0 2009-06-15 00:48:00+01:00 2009-06-15 00:23:00+01:00        0.0         VTS  

### Explore the loaded data
Inspect the table ride:

In [25]:
df = pipeline.dataset(dataset_type="default").rides.df()
df

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739811607.3348522,nSxIHxP2FNfcIw,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739811607.3348522,CsHkNSwo0c54BA,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739811607.3348522,fYpxXR2rn2+zVw,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739811607.3348522,D6bW8XFgEX3ZuQ,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739811607.3348522,g0Ivj9Wd2HqpHg,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739811607.3348522,szWMVA6dR7Fbog,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739811607.3348522,c9yCtaTcVO0k2Q,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739811607.3348522,IveSXUpN+Nqx1Q,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739811607.3348522,t+FUSi5A6C2CBA,


### Trip Duration Analysis
Lets Calculate the average trip duration in minutes.

In [26]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]
