In [None]:
!pip install dlt[duckdb]

In [1]:
!dlt --version

[39mdlt 1.6.1[0m


In [4]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# your code is here

# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory

# define new dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 5.59 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:///c:\Users\igor\Documentos\repos\de_zoomcamp\repo\de-zoomcamp\workshop-dlt\ny_taxi_pipeline.duckdb location to store data
Load package 1739390351.7024229 is LOADED and contains no failed jobs


In [6]:
import duckdb

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


In [8]:
# explore loaded data
df_rides = pipeline.dataset(dataset_type="default").rides.df()
print(f"{df_rides.shape[0]} rows and {df_rides.shape[1]} columns")

10000 rows and 18 columns


In [14]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]


In [11]:
df_rides.dtypes

end_lat                               float64
end_lon                               float64
fare_amt                              float64
passenger_count                         int64
payment_type                           object
start_lat                             float64
start_lon                             float64
tip_amt                               float64
tolls_amt                             float64
total_amt                             float64
trip_distance                         float64
trip_dropoff_date_time    datetime64[us, UTC]
trip_pickup_date_time     datetime64[us, UTC]
surcharge                             float64
vendor_name                            object
_dlt_load_id                           object
_dlt_id                                object
store_and_forward                     float64
dtype: object

In [13]:
df_rides['trip_duration'] = df_rides['trip_dropoff_date_time'] - df_rides['trip_pickup_date_time']  
print(f"AVG trip duration: {df_rides['trip_duration'].mean()}")

AVG trip duration: 0 days 00:12:18.295100
