In [None]:
import dlt
import duckdb
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

In [None]:
# Define the API resource for NYC taxi data
@dlt.resource(name="rides")  # The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net/",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):  # API endpoint for retrieving taxi ride data
        yield page  # yield data to manage memory


# define new dlt pipeline
pipeline = dlt.pipeline(
    destination="duckdb",
    pipeline_name="ny_taxi_pipeline",
    dataset_name="ny_taxi_data"
    )

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)
print(load_info)


In [None]:
# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()


In [6]:
# explore loaded data
df = pipeline.dataset(dataset_type="default").rides.df()
len(df)

10000

In [7]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]
