# Data Pipeline: Bronze and Silver Layer Exploration

This notebook shows:
1. **Bronze Input**: Raw data files
2. **Bronze Output**: Data after quality checks
3. **Silver Input**: Bronze output (passed records)
4. **Silver Output**: Curated/transformed data


In [30]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
from pathlib import Path

# Base paths
base_path = Path("data-pipelines/data")
raw_path = base_path / "raw"
bronze_path = base_path / "bronze"
silver_path = base_path / "silver"

def read_parquet_sample(path, limit=5000, columns=None):
    dataset = ds.dataset(path, format="parquet")
    batches = []
    rows = 0
    for batch in dataset.to_batches(columns=columns):
        batches.append(batch)
        rows += batch.num_rows
        if rows >= limit:
            break
    if not batches:
        return pd.DataFrame(columns=columns or [])
    table = pa.Table.from_batches(batches)
    if table.num_rows > limit:
        table = table.slice(0, limit)
    return table.to_pandas()



## 1. Bronze Layer - Input (Raw Data)

The Bronze layer receives raw data from external sources.

In [31]:
# Bronze Input 1: Taxi Zone CSV
taxi_zone_raw = pd.read_csv(raw_path / "taxi_zone.csv")
print("üîµ BRONZE INPUT - Taxi Zone (Raw CSV)")
print(f"Shape: {taxi_zone_raw.shape}")
print("\nFirst 10 rows:")
taxi_zone_raw.head(10)

üîµ BRONZE INPUT - Taxi Zone (Raw CSV)
Shape: (265, 4)

First 10 rows:


Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone
5,6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
6,7,Queens,Astoria,Boro Zone
7,8,Queens,Astoria Park,Boro Zone
8,9,Queens,Auburndale,Boro Zone
9,10,Queens,Baisley Park,Boro Zone


In [34]:
# Bronze Input 2: Yellow Tripdata Parquet
raw_trip_parquet = raw_path / "yellow_tripdata_2025-01.parquet"
tripdata_raw = read_parquet_sample(raw_trip_parquet, limit=5000)
raw_total_rows = pq.ParquetFile(raw_trip_parquet).metadata.num_rows
print("üîµ BRONZE INPUT - Yellow Tripdata (Raw Parquet)")
print(f"Sample shape: {tripdata_raw.shape} (showing up to 5000 rows)")
print(f"Total rows on disk: {raw_total_rows}")
print(f"\nColumns: {list(tripdata_raw.columns)}")
print("\nFirst 5 rows:")
tripdata_raw.head()



üîµ BRONZE INPUT - Yellow Tripdata (Raw Parquet)
Sample shape: (5000, 20) (showing up to 5000 rows)
Total rows on disk: 3475226

Columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee', 'cbd_congestion_fee']

First 5 rows:


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1,1.6,1,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1,0.5,1,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1,0.6,1,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3,0.52,1,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3,0.66,1,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0,0.0


## 2. Bronze Layer - Output (After Quality Checks)

The Bronze layer applies data quality checks and outputs cleaned data with quality metrics.

In [36]:
# Bronze Output: Yellow Tripdata (Passed Quality Checks)
bronze_output_path = bronze_path / "bronze_yellow_tripdata" / "pass"
bronze_dataset = ds.dataset(bronze_output_path, format="parquet")
bronze_total_rows = bronze_dataset.count_rows()
bronze_output = read_parquet_sample(bronze_output_path, limit=5000)

print("üü¢ BRONZE OUTPUT - Yellow Tripdata (Passed QC)")
print(f"Sample shape: {bronze_output.shape} (showing up to 5000 rows)")
print(f"Total rows on disk: {bronze_total_rows}")
print(f"\nColumns: {list(bronze_output.columns)}")
print("\nFirst 5 rows:")
bronze_output.head()



üü¢ BRONZE OUTPUT - Yellow Tripdata (Passed QC)
Sample shape: (5000, 20) (showing up to 5000 rows)
Total rows on disk: 2850787

Columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee', 'cbd_congestion_fee']

First 5 rows:


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1,1.6,1,N,229,237,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,0.0,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1,0.5,1,N,236,237,1,5.1,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1,0.6,1,N,141,141,1,5.1,3.5,0.5,2.0,0.0,1.0,12.1,2.5,0.0,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3,0.52,1,N,244,244,2,7.2,1.0,0.5,0.0,0.0,1.0,9.7,0.0,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3,0.66,1,N,244,116,2,5.8,1.0,0.5,0.0,0.0,1.0,8.3,0.0,0.0,0.0


In [38]:
# Check for quality check columns added by Bronze layer (based on sample)
print("üìä Quality Check Columns Added by Bronze (sample view):")
bronze_columns = set(bronze_output.columns)
raw_columns = set(tripdata_raw.columns)
new_columns = bronze_columns - raw_columns
print(f"New columns detected in sample: {sorted(new_columns)}")

if new_columns:
    print("\nSample of quality check columns:")
    sample_cols = list(new_columns)[:5]
    display(bronze_output[sample_cols].head())



üìä Quality Check Columns Added by Bronze (sample view):
New columns detected in sample: []


## 3. Silver Layer - Input (Bronze Output)

The Silver layer takes the Bronze output (passed records) as input.

In [39]:
# Silver Input = Bronze Output (same data)
silver_input = bronze_output.copy()

print("üî∑ SILVER INPUT (= Bronze Output)")
print(f"Shape: {silver_input.shape}")
print(f"Columns: {list(silver_input.columns)[:10]}...")  # Show first 10 columns
print(f"\nTotal columns: {len(silver_input.columns)}")
print("\nData types:")
print(silver_input.dtypes)

üî∑ SILVER INPUT (= Bronze Output)
Shape: (5000, 20)
Columns: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type']...

Total columns: 20

Data types:
VendorID                          int32
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                   int64
trip_distance                   float64
RatecodeID                        int64
store_and_fwd_flag               object
PULocationID                      int32
DOLocationID                      int32
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64


## 4. Silver Layer - Output (Curated Data)

The Silver layer transforms and curates the data for analytics.

### Silver Layer Transformations

The Silver pipeline: 
- Casts pickup/dropoff timestamps to `TimestampType` for consistent timezone handling.
- Standardizes numeric IDs (e.g., `pu_location_id`, `do_location_id`) and trims text fields.
- Enriches trips with pickup/dropoff zone, borough, and service zone metadata.
- Derives KPI-ready fields such as `pickup_week_id`, `trip_minutes`, `revenue_per_mile`, `is_peak_hour`, and `is_night`.
- Deduplicates rides using deterministic hashing to emit a curated fact table.


In [41]:
# Silver Output: Curated Data
silver_output_path = silver_path / "silver_yellow_tripdata" / "curated"
silver_dataset = ds.dataset(silver_output_path, format="parquet")
silver_total_rows = silver_dataset.count_rows()
silver_output = read_parquet_sample(silver_output_path, limit=5000)

print("‚ú® SILVER OUTPUT - Curated Yellow Tripdata")
print(f"Sample shape: {silver_output.shape} (showing up to 5000 rows)")
print(f"Total rows on disk: {silver_total_rows}")
print(f"\nColumns: {list(silver_output.columns)}")
print("\nFirst 5 rows:")
silver_output.head()



‚ú® SILVER OUTPUT - Curated Yellow Tripdata
Sample shape: (5000, 42) (showing up to 5000 rows)
Total rows on disk: 2850733

Columns: ['ride_id', 'vendor_id', 'pickup_datetime', 'dropoff_datetime', 'pickup_date', 'pickup_hour', 'pickup_day_of_week', 'pickup_year', 'pickup_week_of_year', 'pickup_week_start', 'pickup_week_id', 'passenger_count', 'trip_distance', 'trip_minutes', 'trip_hours', 'rate_code_id', 'store_and_fwd_flag', 'pu_location_id', 'pu_borough', 'pu_zone', 'pu_service_zone', 'do_location_id', 'do_borough', 'do_zone', 'do_service_zone', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee', 'cbd_congestion_fee', 'revenue_per_mile', 'revenue_per_minute', 'trip_mph', 'is_peak_hour', 'is_night', 'is_weekend']

First 5 rows:


Unnamed: 0,ride_id,vendor_id,pickup_datetime,dropoff_datetime,pickup_date,pickup_hour,pickup_day_of_week,pickup_year,pickup_week_of_year,pickup_week_start,...,total_amount,congestion_surcharge,airport_fee,cbd_congestion_fee,revenue_per_mile,revenue_per_minute,trip_mph,is_peak_hour,is_night,is_weekend
0,00000d85a65d9501ec215248721d467daa4617e0e12009...,2,2025-01-01 17:49:47,2025-01-01 18:03:10,2025-01-01,22,Wed,2025,1,2024-12-30,...,20.2,0.0,0.0,0.0,6.453674,1.50934,14.032379,False,True,False
1,000022307b1b27bb2d0f971726614d51112730b69ef6e5...,2,2025-01-15 19:02:53,2025-01-15 19:12:09,2025-01-16,0,Thu,2025,3,2025-01-13,...,22.26,2.5,0.0,0.75,10.353488,2.402158,13.920863,False,True,False
2,0000412b95f5fe7ae3d469d2ecfdc0cc35292e5a0330a5...,2,2025-01-09 06:48:31,2025-01-09 07:03:19,2025-01-09,11,Thu,2025,2,2025-01-06,...,22.74,2.5,0.0,0.75,13.068966,1.536486,7.054054,False,False,False
3,00009bc79271470fe59aee1cc044002486ba95df99fb3a...,1,2025-01-05 07:40:29,2025-01-05 07:44:37,2025-01-05,12,Sun,2025,1,2024-12-30,...,11.8,2.5,0.0,0.0,19.666667,2.854839,8.709677,False,False,True
4,0000c70509347540a574f78e918facaf8189fb47ab7096...,1,2025-01-14 17:07:41,2025-01-14 17:16:03,2025-01-14,22,Tue,2025,3,2025-01-13,...,24.15,0.0,1.75,0.0,7.790323,2.886454,22.231076,False,True,False


In [44]:
# Compare Silver Input vs Output (based on samples)
print("üîÑ TRANSFORMATION: Silver Input ‚Üí Silver Output")
print("\nColumn Changes:")
silver_input_cols = set(bronze_output.columns)
silver_output_cols = set(silver_output.columns)

removed_cols = silver_input_cols - silver_output_cols
added_cols = silver_output_cols - silver_input_cols

print(f"\n‚ùå Removed columns (sample view) ({len(removed_cols)}): {sorted(removed_cols)}")
print(f"\n‚úÖ Added columns (sample view) ({len(added_cols)}): {sorted(added_cols)}")

print(f"\nRow count change: sample {len(bronze_output)} (total {bronze_total_rows}) ‚Üí sample {len(silver_output)} (total {silver_total_rows})")



üîÑ TRANSFORMATION: Silver Input ‚Üí Silver Output

Column Changes:

‚ùå Removed columns (sample view) (7): ['Airport_fee', 'DOLocationID', 'PULocationID', 'RatecodeID', 'VendorID', 'tpep_dropoff_datetime', 'tpep_pickup_datetime']

‚úÖ Added columns (sample view) (29): ['airport_fee', 'do_borough', 'do_location_id', 'do_service_zone', 'do_zone', 'dropoff_datetime', 'is_night', 'is_peak_hour', 'is_weekend', 'pickup_date', 'pickup_datetime', 'pickup_day_of_week', 'pickup_hour', 'pickup_week_id', 'pickup_week_of_year', 'pickup_week_start', 'pickup_year', 'pu_borough', 'pu_location_id', 'pu_service_zone', 'pu_zone', 'rate_code_id', 'revenue_per_mile', 'revenue_per_minute', 'ride_id', 'trip_hours', 'trip_minutes', 'trip_mph', 'vendor_id']

Row count change: sample 5000 (total 2850787) ‚Üí sample 5000 (total 2850733)


## Summary: Data Flow

```
RAW DATA (Input)
    ‚Üì
üîµ BRONZE INPUT
    - taxi_zone.csv (265 zones)
    - yellow_tripdata_2025-01.parquet (trip records)
    ‚Üì
[Quality Checks: nulls, domain ranges, counts]
    ‚Üì
üü¢ BRONZE OUTPUT
    - Passed QC records with quality metadata
    - Rejected records stored separately
    ‚Üì
üî∑ SILVER INPUT (= Bronze Output)
    - All columns from Bronze
    ‚Üì
[Transformations: cleaning, enrichment, aggregation]
    ‚Üì
‚ú® SILVER OUTPUT
    - Curated, analytics-ready data
    - Cleaned columns
    - Business logic applied
```

In [45]:
# Weekly KPI snapshot derived from Silver output
weekly_summary_path = silver_path / 'silver_yellow_tripdata' / '_run_summary_weekly'
weekly_summary = pd.read_parquet(weekly_summary_path)
print('üìà WEEKLY KPI SUMMARY (full dataset; small size)')
weekly_summary.sort_values('pickup_week_start')



üìà WEEKLY KPI SUMMARY (full dataset; small size)


Unnamed: 0,pickup_week_id,pickup_week_start,row_count,total_revenue,total_miles,run_ts
1,2024-W01,2024-12-30,21,589.17,76.81,2025-11-05 12:13:22.671622
3,2025-W01,2024-12-30,398693,11491386.57,1428831.05,2025-11-05 12:13:22.671622
0,2025-W02,2025-01-06,639595,17486361.59,2010467.28,2025-11-05 12:13:22.671622
4,2025-W03,2025-01-13,670967,18169573.57,2039605.38,2025-11-05 12:13:22.671622
2,2025-W04,2025-01-20,656789,18768570.32,2028180.97,2025-11-05 12:13:22.671622
5,2025-W05,2025-01-27,484668,13278345.15,1593693.04,2025-11-05 12:13:22.671622
