In [1]:
import pandas as pd
import duckdb
from datetime import datetime

In [2]:
DB_PATH = 'duckdb.db'

In [3]:
def analyze_duckdb_database():

    # Connect to the database
    conn = duckdb.connect(DB_PATH)

    # Get list of all schemas
    schemas = conn.execute("""
                SELECT schema_name 
                FROM information_schema.schemata 
                WHERE schema_name NOT IN ('information_schema', 'pg_catalog')
                ORDER BY schema_name
            """).fetchall()

    for schema in schemas:
        schema_name = schema[0]

        if schema_name == 'main':
           continue

        print(f"Schema: {schema_name}")
        print("-" * 40)
        
        # Get list of tables in the schema
        tables = conn.execute(f"""
            SELECT table_name 
            FROM information_schema.tables 
            WHERE table_schema = '{schema_name}'
            AND table_type = 'BASE TABLE'
            ORDER BY table_name
        """).fetchall()

        # For each table, get records count
        for table in tables:
            table_name = table[0]
            
            # Get the number of rows in the table
            row_count = conn.execute(f"""
                SELECT COUNT(*) FROM "{schema_name}"."{table_name}"
            """).fetchone()[0]

            print(f"  Table: {table_name}: row count {row_count:,}")

        print("-" * 40)

    conn.close()

In [4]:
analyze_duckdb_database()

Schema: ingestion
----------------------------------------
  Table: _dlt_loads: row count 2
  Table: _dlt_pipeline_state: row count 2
  Table: _dlt_version: row count 2
  Table: payment_lookup: row count 7
  Table: trips: row count 3,044,066
----------------------------------------
Schema: reports
----------------------------------------
  Table: trips_report: row count 829
----------------------------------------
Schema: staging
----------------------------------------
  Table: trips: row count 3,043,842
----------------------------------------


In [5]:
start = datetime.fromisoformat('2019-01-01')
taxi_types = ['green']
end = datetime.fromisoformat('2019-03-01')

In [None]:
def fetch_data(taxi, year, month):
  #url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{taxi}_tripdata_{year}-{month:02d}.parquet"
  url = f"https://raw.githubusercontent.com/Avisprof/data-engineering-zoomcamp-2026/main/bruin/{taxi}_tripdata_{year}-{month:02d}.parquet"
  #url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi}_tripdata_{year}-{month:02d}.parquet"
  print(url)
  df = pd.read_parquet(url)
  print(f"fetched {len(df)} rows from {url}")
  
  # standardize column names: yellow has tpep_ prefix, green has lpep_ prefix
  # rename to common names declared in the asset metadata
  rename_map = {
      'tpep_pickup_datetime': 'pickup_datetime',
      'tpep_dropoff_datetime': 'dropoff_datetime',
      'lpep_pickup_datetime': 'pickup_datetime',
      'lpep_dropoff_datetime': 'dropoff_datetime',
      'PULocationID': 'pickup_location_id',
      'DOLocationID': 'dropoff_location_id',
  }
  df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns}) 
  df['taxi_type'] = taxi  # add column to distinguish yellow vs green
  return df

In [15]:
# fetch each parquet file, add lineage column, and collect DataFrames
dfs = []
current = start
while current < end:
    year = current.year
    month = current.month
    for taxi in taxi_types:
        df = fetch_data(taxi, year, month)
        dfs.append(df)
        break
    # advance to first of next month
    if month == 12:
        current = datetime(year + 1, 1, 1)
    else:
        current = datetime(year, month + 1, 1)

../green_tripdata_2019-01.parquet


fetched 672105 rows from ../green_tripdata_2019-01.parquet
../green_tripdata_2019-02.parquet


FileNotFoundError: [Errno 2] No such file or directory: '../green_tripdata_2019-02.parquet'

In [11]:
result = pd.concat(dfs, ignore_index=True)
print("final row count:", len(result))
result.head()

final row count: 1287699


Unnamed: 0,VendorID,pickup_datetime,dropoff_datetime,store_and_fwd_flag,RatecodeID,pickup_location_id,dropoff_location_id,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,taxi_type
0,2,2018-12-21 15:17:29,2018-12-21 15:18:57,N,1.0,264,264,5.0,0.0,3.0,...,0.5,0.0,0.0,,0.3,4.3,2.0,1.0,,green
1,2,2019-01-01 00:10:16,2019-01-01 00:16:32,N,1.0,97,49,2.0,0.86,6.0,...,0.5,0.0,0.0,,0.3,7.3,2.0,1.0,,green
2,2,2019-01-01 00:27:11,2019-01-01 00:31:38,N,1.0,49,189,2.0,0.66,4.5,...,0.5,0.0,0.0,,0.3,5.8,1.0,1.0,,green
3,2,2019-01-01 00:46:20,2019-01-01 01:04:54,N,1.0,189,17,2.0,2.68,13.5,...,0.5,2.96,0.0,,0.3,19.71,1.0,1.0,,green
4,2,2019-01-01 00:19:06,2019-01-01 00:39:43,N,1.0,82,258,1.0,4.53,18.0,...,0.5,0.0,0.0,,0.3,19.3,2.0,1.0,,green


In [12]:
conn = duckdb.connect(DB_PATH)

In [13]:
schema_name = 'ingestion'
table_name = 'trips'
row_count = conn.execute(f"""
                SELECT COUNT(*) FROM "{schema_name}"."{table_name}"
            """).fetchone()[0]

print(f"  Table: {table_name}: row count {row_count:,}")

  Table: trips: row count 3,044,066


In [14]:
df = conn.execute(f"""
                SELECT 
                  * 
                FROM "{schema_name}"."{table_name}"
                LIMIT 5
                """).df()
df.head()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,ratecode_id,pickup_location_id,dropoff_location_id,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,taxi_type
0,2,2018-12-21 15:17:29+00:00,2018-12-21 15:18:57+00:00,N,1.0,264,264,5.0,0.0,3.0,...,0.5,0.0,0.0,,0.3,4.3,2.0,1.0,,green
1,2,2019-01-01 00:10:16+00:00,2019-01-01 00:16:32+00:00,N,1.0,97,49,2.0,0.86,6.0,...,0.5,0.0,0.0,,0.3,7.3,2.0,1.0,,green
2,2,2019-01-01 00:27:11+00:00,2019-01-01 00:31:38+00:00,N,1.0,49,189,2.0,0.66,4.5,...,0.5,0.0,0.0,,0.3,5.8,1.0,1.0,,green
3,2,2019-01-01 00:46:20+00:00,2019-01-01 01:04:54+00:00,N,1.0,189,17,2.0,2.68,13.5,...,0.5,2.96,0.0,,0.3,19.71,1.0,1.0,,green
4,2,2019-01-01 00:19:06+00:00,2019-01-01 00:39:43+00:00,N,1.0,82,258,1.0,4.53,18.0,...,0.5,0.0,0.0,,0.3,19.3,2.0,1.0,,green


In [15]:
schema_name = 'ingestion'
table_name = 'payment_lookup'
df = conn.execute(f"""
                SELECT 
                  * 
                FROM "{schema_name}"."{table_name}"
                """).df()
df.head()

Unnamed: 0,payment_type_id,payment_type_name,_dlt_load_id,_dlt_id
0,0,flex_fare,1772305904.0973055,dsnV5DH2HfXK3g
1,1,credit_card,1772305904.0973055,Ov4ow96TCRSo8A
2,2,cash,1772305904.0973055,NhSkQHg66L1y0A
3,3,no_charge,1772305904.0973055,0vp5ryYO5yepBg
4,4,dispute,1772305904.0973055,M7kDErGGfih8Zg


In [16]:
conn.close()