**Please set up your credentials JSON as GCP_CREDENTIALS secrets**

In [1]:
import os
from google.cloud import storage, bigquery

storage_client = storage.Client()
bq_client = bigquery.Client()
bucket = storage_client.bucket("dtc-de-zoomcamp-didac")
blob = bucket.blob("nytaxi/_manifests/module4_processed_periods.json")

In [4]:
import dlt
import requests
import pandas as pd
import json
from dlt.destinations import filesystem
from io import BytesIO

Ingesting parquet files to GCS.

In [5]:
def load_manifest():
    if blob.exists():
        return json.loads(blob.download_as_text())
    return {"processed_periods": []}

def save_manifest(manifest: dict):
    blob.upload_from_string(
        json.dumps(manifest, indent=2),
        content_type="application/json"
    )

    
# Define a dlt source to download and process gz compressed csv files as resources
@dlt.source(name="module4_github_rides")
def download_gz_csv():
    # ✅ state propio (manifest) en GCS
    manifest = load_manifest()
    processed = set(manifest.get("processed_periods", []))
    
    colors = ["yellow", "green"]
    years = [2019, 2020]

    buffers = {c: [] for c in colors}
    to_mark = {c: [] for c in colors}

    for color in colors:
        for year in years:
            for month in range(1,2):

                month_str = f"{month:02d}"
                period_key = f"{color}_{year}-{month_str}"
                
                 # Skip if already processed
                if period_key in processed:
                    print(f"Skipping {period_key}")
                    continue
                    
                file_name = f"{color}_tripdata_{year}-{month_str}.csv.gz"
                url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{file_name}"

                print(f"Downloading {file_name}")

                response = requests.get(url)
                response.raise_for_status()
        
                df = pd.read_csv(BytesIO(response.content), compression="gzip", dtype={"store_and_fwd_flag": "string"})

                # Add usefull metadata
                df["year"] = year
                df["month"] = month
                df["color"] = color
        
                # Save the state
                buffers[color].append(df)
                to_mark[color].append(period_key)

    for color in colors:
        if buffers[color]:  # si hay algo
            big_df = pd.concat(buffers[color], ignore_index=True)

            # ✅ actualizamos manifest SOLO cuando vamos a emitir ese color
            processed.update(to_mark[color])
            manifest["processed_periods"] = sorted(processed)
            save_manifest(manifest)
            
            yield dlt.resource(big_df, name=f"{color}_tripdata")
            

# Initialize the pipeline
pipeline = dlt.pipeline(
    pipeline_name="module4_github_rides_pipeline",
    destination=filesystem(layout="{schema_name}/{table_name}.{ext}"),
    dataset_name="nytaxi",
)

# Run the pipeline to load Parquet data into DuckDB
load_info = pipeline.run(download_gz_csv(), loader_file_format="csv")

# Print the results
print(load_info)


Downloading yellow_tripdata_2019-01.csv.gz
Downloading yellow_tripdata_2020-01.csv.gz
Downloading green_tripdata_2019-01.csv.gz
Downloading green_tripdata_2020-01.csv.gz
Pipeline module4_github_rides_pipeline load step completed in 5.25 seconds
1 load package(s) were loaded to destination filesystem and into dataset nytaxi
The filesystem destination used gs://dtc-de-zoomcamp-didac location to store data
Load package 1771016474.0163293 is LOADED and contains no failed jobs


In [None]:
pipeline = dlt.pipeline(
    pipeline_name="module4_github_rides_pipeline",
    destination="filesystem",
    dataset_name="nytaxi"
)

print(pipeline.state)

In [None]:
import json


blobs = list(bucket.list_blobs(prefix="nytaxi/_dlt_pipeline_state/"))

for b in blobs:
    print("Reading:", b.name)
    content = b.download_as_text()
    print(content)

In [6]:
from dlt.destinations import filesystem
import dlt, json

pipeline = dlt.pipeline(
    pipeline_name="module4_github_rides_pipeline",
    destination=filesystem(layout="{schema_name}/{table_name}.{ext}"),
    dataset_name="nytaxi",
)

print(json.dumps(pipeline.state, indent=2, default=str)[:4000])

{
  "_state_version": 1,
  "_state_engine_version": 4,
  "_local": {
    "first_run": false,
    "initial_cwd": "/app",
    "last_run_context": {
      "settings_dir": "/app/.dlt",
      "local_dir": "/app",
      "run_dir": "/app",
      "uri": "file:///app"
    },
    "_last_extracted_at": "2026-02-13 13:26:08.335818+00:00",
    "_last_extracted_hash": "Af3XU4WclFqPkiAQb69xI82XW2SIhH2qSN36lSkvMO0="
  },
  "schema_names": [
    "module4_github_rides"
  ],
  "pipeline_name": "module4_github_rides_pipeline",
  "dataset_name": "nytaxi",
  "default_schema_name": "module4_github_rides",
  "destination_type": "dlt.destinations.filesystem",
  "destination_name": null,
  "_version_hash": "Af3XU4WclFqPkiAQb69xI82XW2SIhH2qSN36lSkvMO0="
}


In [7]:
def find_key(obj, key="processed_periods", path=""):
    hits = []
    if isinstance(obj, dict):
        for k, v in obj.items():
            p = f"{path}.{k}" if path else k
            if k == key:
                hits.append((p, v))
            hits.extend(find_key(v, key, p))
    elif isinstance(obj, list):
        for i, v in enumerate(obj):
            hits.extend(find_key(v, key, f"{path}[{i}]"))
    return hits

hits = find_key(pipeline.state, "processed_periods")
print(hits)

[]
