# Medallion Architecture Pipeline (Bronze -> Silver -> Gold)

This notebook demonstrates a data pipeline using the Medallion Architecture pattern with Pandas.

1. **Bronze Layer**: Raw data ingestion (Parquet format).
2. **Silver Layer**: Cleaned, filtered, and augmented data.
3. **Gold Layer**: Aggregated business-level data.

In [1]:
import pandas as pd
import os
from datetime import datetime

# Define paths
BASE_DIR = '/home/sshyku/UCU/lastproject/big-data-project'
SOURCE_FILE = os.path.join(BASE_DIR, 'car_prices.csv')

BRONZE_DIR = os.path.join(BASE_DIR, 'data/bronze')
SILVER_DIR = os.path.join(BASE_DIR, 'data/silver')
GOLD_DIR = os.path.join(BASE_DIR, 'data/gold')

# Ensure directories exist
os.makedirs(BRONZE_DIR, exist_ok=True)
os.makedirs(SILVER_DIR, exist_ok=True)
os.makedirs(GOLD_DIR, exist_ok=True)

print("Directories ready.")

Directories ready.


## Bronze Layer
**Goal**: Ingest raw CSV data, add metadata (ingestion time), and save as Parquet for efficient downstream processing.
**Action**: Read `car_prices.csv`, add `_ingestion_timestamp`, save to `data/bronze`.

In [2]:
# --- BRONZE LAYER ---

print(f"Reading raw data from {SOURCE_FILE}...")
# Read CSV (handling potential encoding issues if any, usually utf-8 or latin1 for legacy data)
try:
    df_raw = pd.read_csv(SOURCE_FILE, on_bad_lines='skip') # Skip bad lines to ensure ingestion succeeds
except Exception as e:
    print(f"Error reading CSV: {e}")
    # Fallback for common encoding issues
    df_raw = pd.read_csv(SOURCE_FILE, encoding='latin1', on_bad_lines='skip')

# Add metadata
df_raw['_ingestion_timestamp'] = datetime.now()

# Save to Bronze
bronze_path = os.path.join(BRONZE_DIR, 'car_prices_bronze.parquet')
df_raw.to_parquet(bronze_path, index=False)

print(f"Bronze layer saved to {bronze_path}")
print(f"Rows: {len(df_raw)}")
df_raw.head()

Reading raw data from /home/sshyku/UCU/lastproject/big-data-project/car_prices.csv...
Bronze layer saved to /home/sshyku/UCU/lastproject/big-data-project/data/bronze/car_prices_bronze.parquet
Rows: 558837


Unnamed: 0,year,make,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate,_ingestion_timestamp
0,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg566472,ca,5.0,16639.0,white,black,kia motors america inc,20500.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST),2025-12-16 12:45:06.042809
1,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg561319,ca,5.0,9393.0,white,beige,kia motors america inc,20800.0,21500.0,Tue Dec 16 2014 12:30:00 GMT-0800 (PST),2025-12-16 12:45:06.042809
2,2014,BMW,3 Series,328i SULEV,Sedan,automatic,wba3c1c51ek116351,ca,45.0,1331.0,gray,black,financial services remarketing (lease),31900.0,30000.0,Thu Jan 15 2015 04:30:00 GMT-0800 (PST),2025-12-16 12:45:06.042809
3,2015,Volvo,S60,T5,Sedan,automatic,yv1612tb4f1310987,ca,41.0,14282.0,white,black,volvo na rep/world omni,27500.0,27750.0,Thu Jan 29 2015 04:30:00 GMT-0800 (PST),2025-12-16 12:45:06.042809
4,2014,BMW,6 Series Gran Coupe,650i,Sedan,automatic,wba6b2c57ed129731,ca,43.0,2641.0,gray,black,financial services remarketing (lease),66000.0,67000.0,Thu Dec 18 2014 12:30:00 GMT-0800 (PST),2025-12-16 12:45:06.042809


## Silver Layer
**Goal**: Clean and normalize data.
**Actions**:
1. Load from Bronze.
2. Standardize column names (lowercase, replace spaces with underscores).
3. Convert data types (e.g., dates).
4. Handle missing values.
5. Save to `data/silver`.

In [3]:
# --- SILVER LAYER ---

# Read from Bronze
df_bronze = pd.read_parquet(bronze_path)

# 1. Standardize Column Names
df_bronze.columns = [c.lower().replace(' ', '_') for c in df_bronze.columns]

# 2. Data Cleaning
# Drop duplicates
df_silver = df_bronze.drop_duplicates()

# Convert 'saledate' to datetime if it exists
if 'saledate' in df_silver.columns:
    # 'saledate' in this dataset often has timezone info, we coerce errors or parse carefully
    df_silver['saledate'] = pd.to_datetime(df_silver['saledate'], utc=True, errors='coerce')

# Convert numeric columns that might be strings
numeric_cols = ['sellingprice', 'mmr', 'odometer', 'year']
for col in numeric_cols:
    if col in df_silver.columns:
        df_silver[col] = pd.to_numeric(df_silver[col], errors='coerce')

# Drop rows where critical business keys are missing (e.g., vin, sellingprice)
df_silver = df_silver.dropna(subset=['vin', 'sellingprice'])

# Save to Silver
silver_path = os.path.join(SILVER_DIR, 'car_prices_silver.parquet')
df_silver.to_parquet(silver_path, index=False)

print(f"Silver layer saved to {silver_path}")
print(f"Rows after cleaning: {len(df_silver)}")
df_silver.head()

  df_silver['saledate'] = pd.to_datetime(df_silver['saledate'], utc=True, errors='coerce')


Silver layer saved to /home/sshyku/UCU/lastproject/big-data-project/data/silver/car_prices_silver.parquet
Rows after cleaning: 558821


Unnamed: 0,year,make,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate,_ingestion_timestamp
0,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg566472,ca,5.0,16639.0,white,black,kia motors america inc,20500.0,21500.0,2014-12-16 04:30:00+00:00,2025-12-16 12:45:06.042809
1,2015,Kia,Sorento,LX,SUV,automatic,5xyktca69fg561319,ca,5.0,9393.0,white,beige,kia motors america inc,20800.0,21500.0,2014-12-16 04:30:00+00:00,2025-12-16 12:45:06.042809
2,2014,BMW,3 Series,328i SULEV,Sedan,automatic,wba3c1c51ek116351,ca,45.0,1331.0,gray,black,financial services remarketing (lease),31900.0,30000.0,2015-01-14 20:30:00+00:00,2025-12-16 12:45:06.042809
3,2015,Volvo,S60,T5,Sedan,automatic,yv1612tb4f1310987,ca,41.0,14282.0,white,black,volvo na rep/world omni,27500.0,27750.0,2015-01-28 20:30:00+00:00,2025-12-16 12:45:06.042809
4,2014,BMW,6 Series Gran Coupe,650i,Sedan,automatic,wba6b2c57ed129731,ca,43.0,2641.0,gray,black,financial services remarketing (lease),66000.0,67000.0,2014-12-18 04:30:00+00:00,2025-12-16 12:45:06.042809


## Gold Layer
**Goal**: Business-level aggregations for reporting.
**Actions**:
1. Load from Silver.
2. Aggregate: Average selling price by Make and Model.
3. Aggregate: Sales count by Year.
4. Save to `data/gold`.

In [4]:
# --- GOLD LAYER ---

# Read from Silver
df_silver = pd.read_parquet(silver_path)

# Aggregation 1: Average Price by Make and Model
if 'make' in df_silver.columns and 'model' in df_silver.columns and 'sellingprice' in df_silver.columns:
    gold_make_model_stats = df_silver.groupby(['make', 'model']).agg(
        avg_selling_price=('sellingprice', 'mean'),
        avg_mmr=('mmr', 'mean'),
        count=('vin', 'count')
    ).reset_index()
    
    # Sort by count to see most popular cars
    gold_make_model_stats = gold_make_model_stats.sort_values('count', ascending=False)
    
    # Save Gold Table 1
    gold_path_1 = os.path.join(GOLD_DIR, 'avg_price_by_make_model.parquet')
    gold_make_model_stats.to_parquet(gold_path_1, index=False)
    print(f"Gold Table 1 saved: {gold_path_1}")
    print(gold_make_model_stats.head())

# Aggregation 2: Sales Trends by Year
if 'year' in df_silver.columns:
    gold_yearly_trends = df_silver.groupby('year').agg(
        total_sales=('sellingprice', 'sum'),
        avg_price=('sellingprice', 'mean'),
        units_sold=('vin', 'count')
    ).reset_index().sort_values('year')

    # Save Gold Table 2
    gold_path_2 = os.path.join(GOLD_DIR, 'sales_trends_by_year.parquet')
    gold_yearly_trends.to_parquet(gold_path_2, index=False)
    print(f"\nGold Table 2 saved: {gold_path_2}")
    print(gold_yearly_trends.head())

Gold Table 1 saved: /home/sshyku/UCU/lastproject/big-data-project/data/gold/avg_price_by_make_model.parquet
       make   model  avg_selling_price       avg_mmr  count
592  Nissan  Altima       11421.055765  11496.502403  19349
254    Ford   F-150       18832.085020  19050.849506  14479
268    Ford  Fusion       12353.226728  12568.008884  12945
714  Toyota   Camry       11179.498446  11327.303707  12545
245    Ford  Escape       13985.734930  14161.620015  11861

Gold Table 2 saved: /home/sshyku/UCU/lastproject/big-data-project/data/gold/sales_trends_by_year.parquet
   year  total_sales     avg_price  units_sold
0  1982      25500.0  12750.000000           2
1  1983       5250.0   5250.000000           1
2  1984      10800.0   2160.000000           5
3  1985      35200.0   3520.000000          10
4  1986      27925.0   2538.636364          11


In [5]:
# Verify Outputs
print("Bronze Files:", os.listdir(BRONZE_DIR))
print("Silver Files:", os.listdir(SILVER_DIR))
print("Gold Files:", os.listdir(GOLD_DIR))

Bronze Files: ['car_prices_bronze.parquet']
Silver Files: ['car_prices_silver.parquet']
Gold Files: ['avg_price_by_make_model.parquet', 'sales_trends_by_year.parquet']
