In [None]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page


pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 2.19 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////Users/alexanderheinz/Library/Mobile Documents/com~apple~CloudDocs/Kurse/data-engineering-zoomcamp/workshop/ny_taxi_pipeline.duckdb location to store data
Load package 1739453400.087819 is LOADED and contains no failed jobs


In [5]:
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 2.19 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////Users/alexanderheinz/Library/Mobile Documents/com~apple~CloudDocs/Kurse/data-engineering-zoomcamp/workshop/ny_taxi_pipeline.duckdb location to store data
Load package 1739453400.087819 is LOADED and contains no failed jobs


In [2]:
from google.cloud import bigquery
client = bigquery.Client()
dataset_name = "ny_taxi_data"
tables = list(client.list_tables(dataset_name))


In [4]:
print(f"Tables in dataset {dataset_name}:")
for table in tables:
    print(f"- {table.table_id}")

Tables in dataset ny_taxi_data:
- _dlt_loads
- _dlt_pipeline_state
- _dlt_version
- rides


In [6]:
# Count rows in each table
for table in tables:
    table_ref = f"{client.project}.{dataset_name}.{table.table_id}"
    query = f"SELECT COUNT(*) AS row_count FROM `{table_ref}`"
    result = client.query(query).result()
    row_count = list(result)[0]["row_count"]
    print(f"Table {table.table_id} has {row_count} rows")

Table _dlt_loads has 3 rows
Table _dlt_pipeline_state has 1 rows
Table _dlt_version has 1 rows
Table rides has 10000 rows


In [None]:
# Count rows in each table
table_ref = f"{client.project}.{dataset_name}.{tables[3].table_id}"
query = f"SELECT AVG() AS row_count FROM `{table_ref}`"
result = client.query(query).result()
row_count = list(result)[0]["row_count"]
print(f"Table {table.table_id} has {row_count} rows")

In [11]:
pipeline.dataset(dataset_type="default").rides.df()

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.758275,-73.937818,2.5,208,CASH,40.758278,-73.937825,0.00,0.00,2.50,0.00,2009-06-17 13:07:00+00:00,2009-06-17 13:07:00+00:00,0.0,VTS,1739450779.319147,kI/iOLjeI8BNBA,
1,40.745425,-73.972203,2.5,1,CASH,40.745427,-73.972203,0.00,0.00,2.50,0.00,2009-06-11 06:50:00+00:00,2009-06-11 06:50:00+00:00,0.0,VTS,1739450779.319147,exxxu+iiq0NZMA,
2,40.753018,-73.971008,2.5,1,CASH,40.753052,-73.970980,0.00,0.00,2.50,0.00,2009-06-14 17:57:00+00:00,2009-06-14 17:56:00+00:00,0.0,VTS,1739450779.319147,eISJJj77jAnLaQ,
3,40.758170,-73.937450,2.5,1,CASH,40.758175,-73.937452,0.00,0.00,2.50,0.00,2009-06-11 13:55:00+00:00,2009-06-11 13:55:00+00:00,0.0,VTS,1739450779.319147,iJzZzqhupq9yMw,
4,40.802125,-73.950702,2.5,5,CASH,40.802125,-73.950702,0.00,0.00,2.50,0.00,2009-06-22 13:43:00+00:00,2009-06-22 13:43:00+00:00,0.0,VTS,1739450779.319147,6cxJCyj7BJFa5Q,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.870670,-73.883128,55.3,1,Credit,40.774185,-73.874622,16.89,4.15,77.34,13.79,2009-06-26 17:50:00+00:00,2009-06-26 16:03:00+00:00,1.0,VTS,1739450779.319147,aOxEakvLOGgsow,
9996,40.695272,-74.177487,78.0,1,Credit,40.695270,-74.177483,19.50,0.00,97.50,0.00,2009-06-19 05:28:00+00:00,2009-06-19 05:27:00+00:00,0.0,VTS,1739450779.319147,XmLi61bgsOZ5Iw,
9997,41.069102,-73.548183,170.0,1,Credit,41.069102,-73.548183,20.00,0.00,190.00,0.00,2009-06-02 14:16:00+00:00,2009-06-02 14:15:00+00:00,0.0,VTS,1739450779.319147,FIhLkGAh+HF5jw,
9998,40.912662,-73.783763,90.0,1,Credit,40.912662,-73.783763,22.50,0.00,112.50,0.00,2009-06-29 02:32:00+00:00,2009-06-29 02:31:00+00:00,0.0,VTS,1739450779.319147,aFjPq1YQGGQ8Fg,


In [14]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]
