# Question 1

In [1]:
!dlt --version

[39mdlt 1.6.1[0m


# Question 2

In [None]:
import dlt
import duckdb
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


@dlt.resource(name="rides")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(base_page=1, total_path=None),
    )

    yield from client.paginate("data_engineering_zoomcamp_api")


pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline", destination="duckdb", dataset_name="ny_taxi_data"
)

In [3]:
load_info = pipeline.run(ny_taxi)
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 0.77 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////home/sergei/learning/dezoomcamp-hw/dlt_workshop/ny_taxi_pipeline.duckdb location to store data
Load package 1739734698.1978729 is LOADED and contains no failed jobs


In [4]:
# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


We see that there are 4 tables created: 3 which are internal `dlt` tables and the `rides` tables that contains the actual data.

# Question 3

In [7]:
df = pipeline.dataset(dataset_type="default").rides.df()
df.describe()

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,surcharge,store_and_forward
count,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,135.0
mean,40.381455,-73.30134,10.066001,2.0853,40.356752,-73.25825,0.606341,0.159485,11.165727,2.73583,0.3339,0.044444
std,3.870109,7.024986,8.245156,2.580095,3.994385,7.250808,1.451652,0.857254,9.448327,3.145651,0.37192,0.206848
min,0.0,-74.330058,2.5,1.0,0.0,-75.233332,0.0,0.0,2.5,0.0,0.0,0.0
25%,40.736812,-73.991246,5.7,1.0,40.737349,-73.991837,0.0,0.0,6.2,1.02,0.0,0.0
50%,40.754705,-73.979959,7.7,1.0,40.754095,-73.981639,0.0,0.0,8.5,1.73,0.5,0.0
75%,40.76911,-73.964989,11.3,3.0,40.768396,-73.967952,0.8,0.0,12.5,3.04,0.5,0.0
max,41.310787,0.005538,194.0,208.0,41.156413,0.001023,38.8,16.0,232.8,40.21,1.0,1.0


We see that the table has 10k rows

# Question 4

In [6]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
        """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
    )
    # Prints column values of the first row
    print(res)

[(12.3049,)]


Thus the average trip duration is 12.3049 minutes

In [9]:
%load_ext watermark

In [12]:
%watermark -iv -v

Python implementation: CPython
Python version       : 3.13.2
IPython version      : 8.32.0

dlt   : 1.6.1
duckdb: 1.2.0

