In [1]:
!pip install dlt[duckdb]

Collecting dlt[duckdb]
  Downloading dlt-1.6.1-py3-none-any.whl.metadata (11 kB)
Collecting giturlparse>=0.10.0 (from dlt[duckdb])
  Downloading giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt[duckdb])
  Downloading hexbytes-1.3.0-py3-none-any.whl.metadata (3.3 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt[duckdb])
  Downloading jsonpath_ng-1.7.0-py3-none-any.whl.metadata (18 kB)
Collecting makefun>=1.15.0 (from dlt[duckdb])
  Downloading makefun-1.15.6-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting pathvalidate>=2.5.2 (from dlt[duckdb])
  Downloading pathvalidate-3.2.3-py3-none-any.whl.metadata (12 kB)
Collecting pendulum>=2.1.2 (from dlt[duckdb])
  Downloading pendulum-3.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting rich-argparse<2.0.0,>=1.6.0 (from dlt[duckdb])
  Downloading rich_argparse-1.7.0-py3-none-any.whl.metadata (14 kB)
Collecting semver>=3.0.0 (from dlt[duckdb])
  Downloading semve

In [2]:
!dlt --version

[39mdlt 1.6.1[0m


In [3]:
import dlt
print("dlt version:", dlt.__version__)

dlt version: 1.6.1


In [13]:
import dlt
import requests

# Define the API endpoint
BASE_URL = "https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api"

# Define a resource to extract all pages of data
@dlt.resource
def ny_taxi():
    page = 1  # Start from page 1

    while True:
        response = requests.get(BASE_URL, params={"page": page})
        data = response.json()

        # Stop when an empty page is returned
        if not data:
            break

        yield data  # Return data for this page
        page += 1  # Go to the next page

# Create a pipeline for DuckDB
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

# Run the pipeline and load data into DuckDB
load_info = pipeline.run(ny_taxi)
print(load_info)


Pipeline ny_taxi_pipeline load step completed in 2.91 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////content/ny_taxi_pipeline.duckdb location to store data
Load package 1739260422.3189757 is LOADED and contains no failed jobs


In [14]:
import duckdb
from google.colab import data_table
data_table.enable_dataframe_formatter()

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,ny_taxi,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


In [15]:
# Show all tables in the dataset
tables = conn.sql("SHOW TABLES").df()
print(tables)

                  name
0           _dlt_loads
1  _dlt_pipeline_state
2         _dlt_version
3              ny_taxi


In [23]:
# Load the 'rides' table from the dataset
df1 = pipeline.dataset(dataset_type="default")._dlt_loads.df()

# Display the dataframe to inspect the data
# print(df1)
# print(len(df1))
df1


Unnamed: 0,load_id,schema_name,status,inserted_at,schema_version_hash
0,1739260422.3189757,ny_taxi,0,2025-02-11 07:54:21.302329+00:00,o0m+g67/XKuILD3Rfrk+8gQej5MheUDy9css+R7CsqI=


In [24]:
df2 = pipeline.dataset(dataset_type="default")._dlt_pipeline_state.df()
# print(len(df2))
df2

Unnamed: 0,version,engine_version,pipeline_name,state,created_at,version_hash,_dlt_load_id,_dlt_id
0,1,4,ny_taxi_pipeline,eNpdj08LgkAQR7/LnKVIJEjo1MH+IQWBh4hlcqd2yTZzx1...,2025-02-11 07:54:16.193804+00:00,zGKhWHLCtNWCz224trGPdG2IJNtTlGy/6QHWqhx9crg=,1739260422.3189757,EqT0diw9We6PDw


In [25]:
df3 = pipeline.dataset(dataset_type="default")._dlt_version.df()
# print(len(df3))
df3

Unnamed: 0,version,engine_version,inserted_at,schema_name,version_hash,schema
0,2,11,2025-02-11 07:54:18.652877+00:00,ny_taxi,o0m+g67/XKuILD3Rfrk+8gQej5MheUDy9css+R7CsqI=,"{""version"":2,""version_hash"":""o0m+g67/XKuILD3Rf..."


In [27]:
df4 = pipeline.dataset(dataset_type="default").ny_taxi.df()
df4.head()

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739260422.3189757,R2LOXXZkgH9tzQ,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.0,8.5,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739260422.3189757,+xzLdiyHulJvoA,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.0,15.5,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739260422.3189757,qoeS1F0nKW4VfQ,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.0,5.4,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739260422.3189757,sgu1E1S6HQ7now,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739260422.3189757,EfKtTeSCf5YWVg,


In [28]:
df4 = pipeline.dataset(dataset_type="default").ny_taxi.df()
total_records = len(df4)
print(f"Total number of records extracted: {total_records}")

Total number of records extracted: 10000


In [30]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM ny_taxi;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]
