Set up GCP credentials and GCS bucket

In [2]:
import os
import json

# Convert the credentials to a JSON string and set it as an environment variable
gcp_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
with open(gcp_path, "r") as f:
    gcp_json_str = f.read()
os.environ["DESTINATION__CREDENTIALS"] =  gcp_json_str 

# Set the bucket URL as an environment variable
os.environ["BUCKET_URL"] = "gs://ny_taxi_485500_bucket"

Using Terminal, create a virtual environment named .venv and install the required packages for production

/opt/homebrew/bin/python3 -m venv .venv \
source .venv/bin/activate \
python -m pip install --upgrade pip setuptools wheel  
python -m pip install \
ipykernel \
pandas \
pyarrow \
requests \
duckdb \
gcsfs \
"dlt[duckdb,gs,bigquery]" \

#Verify that the system is using .venv as the current kernel to install the pip
import sys
print(sys.executable)
Should now show your venv path


In [3]:
import dlt
import requests
import pandas as pd
from dlt.destinations import filesystem
from io import BytesIO

Ingesting parquet files to GCS.

In [22]:
# Define a dlt source to download and process Parquet files as resources
@dlt.source(name="rides")
def download_parquet():
    prefix = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata"
    for month in range(1, 7):
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)
        file_name = f"yellow_tripdata_2024-0{month}.parquet"
        
        df = pd.read_parquet(BytesIO(response.content))

        # Return the dataframe as a dlt resource for ingestion
        yield dlt.resource(df, name=file_name)


# Initialize the pipeline
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination=filesystem(layout="{schema_name}/{table_name}.{ext}"),
    dataset_name="rides_dataset",
)

# Run the pipeline to load Parquet data into DuckDB
load_info = pipeline.run(download_parquet(), loader_file_format="parquet")

# Print the results
print(load_info)


Pipeline rides_pipeline load step completed in 1 minute and 26.99 seconds
1 load package(s) were loaded to destination filesystem and into dataset rides_dataset
The filesystem destination used gs://ny_taxi_485500_bucket location to store data
Load package 1770667338.531616 is LOADED and contains no failed jobs


Ingesting data to Database

In [4]:
# Define a dlt resource to download and process Parquet files as single table
@dlt.resource(name="rides", write_disposition="replace")
def download_parquet():
    prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata'

    for month in range(1, 7):
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)

        df = pd.read_parquet(BytesIO(response.content))

        yield df


# Initialize the pipeline
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    #destination="duckdb",  # Use DuckDB for testing
    destination="bigquery",  # Use BigQuery for production
    dataset_name="rides_dataset",
)

# Run the pipeline to load Parquet data into DuckDB
info = pipeline.run(download_parquet)

# Print the results
print(info)




Pipeline rides_pipeline load step completed in 1 minute and 56.34 seconds
1 load package(s) were loaded to destination bigquery and into dataset rides_dataset
The bigquery destination used dtc-de-course@dtc-de-course-485500.iam.gserviceaccount.com@dtc-de-course-485500 location to store data
Load package 1770669230.398151 is LOADED and contains no failed jobs


In [None]:
import duckdb

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset to see loaded tables
res = conn.sql("DESCRIBE").df()
print(res)

In [None]:
# provide a resource name to query a table of that name
with pipeline.sql_client() as client:
    with client.execute_query(f"SELECT count(1) FROM rides") as cursor:
        data = cursor.df()
print(data)