In [None]:
# Bronze to Silver Layer (Pandas + Timezone-Safe + Change Counts + Custom Fill for synthetic_order_lifecycle)

import os
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings("ignore")

from google.cloud import bigquery
from google.oauth2 import service_account
from google.api_core.exceptions import Conflict, NotFound

# ----------------------------
# 1. Setup & Authentication
# ----------------------------
KEY_PATH = r"C:\Users\Vishnu Vardhan\OneDrive\Desktop\Bigquery_Ecommerce\even-blueprint-441418-p2-043f8a9d855b.json"

PROJECT_ID = "even-blueprint-441418-p2"
BRONZE_DATASET_ID = "ecommerce_Bronze_Layer" 
SILVER_DATASET_ID = "ecommerce_Silver_Layer" 

# Authenticate
credentials = service_account.Credentials.from_service_account_file(KEY_PATH)
client = bigquery.Client(credentials=credentials, project=PROJECT_ID)

print("BigQuery client initialized successfully!")

# ----------------------------
# 2. Create Silver Dataset
# ----------------------------
dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{SILVER_DATASET_ID}")
dataset_ref.location = "US"

try:
    client.create_dataset(dataset_ref)
    print(f"Dataset '{SILVER_DATASET_ID}' created.")
except Conflict:
    print(f"Dataset '{SILVER_DATASET_ID}' already exists.")

# ----------------------------
# 3. Define Tables & Duration Logic
# ----------------------------
TABLES = [
    "dim_customer",
    "dim_order",
    "dim_payments",
    "dim_sellers",
    "dim_products",
    "synthetic_order_lifecycle"
]

DATE_DURATION_CONFIG = {
    "dim_orders": ("order_purchase_timestamp", "order_delivered_customer_date"),
    "synthetic_order_lifecycle": ("event_timestamp", None)
}

# ----------------------------
# 4. Safe Datetime Conversion
# ----------------------------
def to_utc_datetime(series: pd.Series, col_name: str) -> pd.Series:
    if series.dtype == 'object':
        series = pd.to_datetime(series, errors='coerce', utc=True)
    elif pd.api.types.is_datetime64_any_dtype(series):
        if series.dt.tz is None:
            series = series.dt.tz_localize('UTC')
        else:
            series = series.dt.tz_convert('UTC')
    else:
        series = pd.to_datetime(series, errors='coerce', utc=True)
    
    if series.isnull().all():
        print(f"   Warning: '{col_name}' has all NULL/invalid dates")
    return series

# ----------------------------
# 5. Pandas Transformation Function (Custom Fill Logic)
# ----------------------------
def transform_with_pandas(df: pd.DataFrame, table_name: str) -> pd.DataFrame:
    print(f"   Transforming {table_name}... (initial rows: {len(df):,})")
    
    initial_rows = len(df)
    total_changes = 0

    # 1. Remove duplicates
    dup_count = len(df) - len(df.drop_duplicates())
    df = df.drop_duplicates()
    total_changes += dup_count
    print(f"   â†’ Removed {dup_count:,} duplicate rows (now {len(df):,} rows)")

    # 2. Fill numeric NULLs â†’ **0 for synthetic_order_lifecycle, else median**
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    num_fills_total = 0
    for col in numeric_cols:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            if table_name == "synthetic_order_lifecycle":
                fill_value = 0
                df[col].fillna(0, inplace=True)
                print(f"   â†’ [SYNTHETIC] Filled {null_count:,} NULLs in '{col}' with 0")
            else:
                median_val = df[col].median()
                df[col].fillna(median_val, inplace=True)
                print(f"   â†’ Filled {null_count:,} NULLs in '{col}' with median: {median_val:.2f}")
                fill_value = median_val
            num_fills_total += null_count

    if num_fills_total > 0:
        fill_type = "0 (synthetic)" if table_name == "synthetic_order_lifecycle" else "median"
        print(f"   â†’ Total numeric NULLs filled ({fill_type}): {num_fills_total:,}")
        total_changes += num_fills_total

    # 3. Fill string NULLs
    string_cols = df.select_dtypes(include=['object']).columns
    str_fills_total = 0
    zip_fills = 0
    for col in string_cols:
        null_count = df[col].isnull().sum()
        if null_count > 0:
            if 'postal' in col.lower() or 'zip' in col.lower():
                df[col].fillna(0, inplace=True)
                zip_fills += null_count
                print(f"   â†’ Filled {null_count:,} NULLs in '{col}' with 0")
            else:
                df[col].fillna('Unknown', inplace=True)
                str_fills_total += null_count
                print(f"   â†’ Filled {null_count:,} NULLs in '{col}' with 'Unknown'")

    total_changes += str_fills_total + zip_fills
    if str_fills_total > 0:
        print(f"   â†’ Total string NULLs filled: {str_fills_total:,}")
    if zip_fills > 0:
        print(f"   â†’ Total zip/postal NULLs filled: {zip_fills:,}")

    # 4. Add load timestamp
    df['load_timestamp'] = pd.Timestamp.now(tz='UTC')
    print(f"   â†’ Added 'load_timestamp' to all {len(df):,} rows")

    # 5. Duration calculation
    date_drops = 0
    if table_name in DATE_DURATION_CONFIG:
        start_col, end_col = DATE_DURATION_CONFIG[table_name]
        
        if start_col in df.columns:
            print(f"   â†’ Converting '{start_col}' to UTC")
            df[start_col] = to_utc_datetime(df[start_col], start_col)
            
            if end_col and end_col in df.columns:
                print(f"   â†’ Converting '{end_col}' to UTC")
                df[end_col] = to_utc_datetime(df[end_col], end_col)
                
                mask = df[start_col].notna() & df[end_col].notna()
                date_drops = (~mask).sum()
                df = df[mask].copy()
                total_changes += date_drops
                if date_drops > 0:
                    print(f"   â†’ Dropped {date_drops:,} rows with invalid dates")
                
                df['duration_days'] = (df[end_col] - df[start_col]).dt.days
                print(f"   â†’ Added 'duration_days' to {len(df):,} valid rows")
            else:
                mask = df[start_col].notna()
                date_drops = (~mask).sum()
                df = df[mask].copy()
                total_changes += date_drops
                if date_drops > 0:
                    print(f"   â†’ Dropped {date_drops:,} rows with invalid '{start_col}'")
                now_utc = pd.Timestamp.now(tz='UTC')
                df['days_since_event'] = (now_utc - df[start_col]).dt.days
                print(f"   â†’ Added 'days_since_event' to {len(df):,} valid rows")

    # Final Summary
    print(f"   ðŸ“Š Summary for {table_name}: {total_changes:,} total changes applied")
    print(f"   â†’ Final rows: {len(df):,} (from {initial_rows:,})")

    return df

# ----------------------------
# 6. Main ETL Loop
# ----------------------------
for table_name in TABLES:
    print(f"\n{'='*70}")
    print(f"PROCESSING: {table_name.upper()}")
    print(f"{'='*70}")

    # --- Extract ---
    bronze_table = f"{PROJECT_ID}.{BRONZE_DATASET_ID}.{table_name}"
    try:
        query = f"SELECT * FROM `{bronze_table}`"
        df = client.query(query).to_dataframe()
        print(f"   Extracted {len(df):,} rows from Bronze")
    except NotFound:
        print(f"   Table `{table_name}` not found in Bronze. Skipping.")
        continue
    except Exception as e:
        print(f"   Error querying {table_name}: {e}")
        continue

    if df.empty:
        print("   DataFrame is empty. Skipping.")
        continue

    # --- Transform ---
    try:
        df_clean = transform_with_pandas(df.copy(), table_name)
    except Exception as e:
        print(f"   Transformation failed for {table_name}: {e}")
        continue

    # --- Load ---
    silver_table = f"{PROJECT_ID}.{SILVER_DATASET_ID}.{table_name}"

    try:
        table = bigquery.Table(silver_table)
        client.create_table(table)
        print(f"   Silver table `{table_name}` created.")
    except Conflict:
        pass

    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        write_disposition="WRITE_TRUNCATE"
    )

    try:
        job = client.load_table_from_dataframe(df_clean, silver_table, job_config=job_config)
        job.result()
        final_rows = client.get_table(silver_table).num_rows
        print(f"   Successfully loaded {final_rows:,} rows into Silver `{table_name}`")
    except Exception as e:
        print(f"   Failed to load {table_name}: {e}")

print("\nALL DONE! Bronze to Silver ETL completed.")
print("Note: 'synthetic_order_lifecycle' numeric NULLs filled with 0, others with median.")

BigQuery client initialized successfully!
Dataset 'ecommerce_Silver_Layer' already exists.

PROCESSING: DIM_CUSTOMER
   Extracted 99,441 rows from Bronze
   Transforming dim_customer... (initial rows: 99,441)
   â†’ Removed 0 duplicate rows (now 99,441 rows)
   â†’ Filled 278 NULLs in 'Latitude' with median: -22.93
   â†’ Filled 278 NULLs in 'Longitude' with median: -46.63
   â†’ Total numeric NULLs filled (median): 556
   â†’ Added 'load_timestamp' to all 99,441 rows
   ðŸ“Š Summary for dim_customer: 556 total changes applied
   â†’ Final rows: 99,441 (from 99,441)
   Successfully loaded 99,441 rows into Silver `dim_customer`

PROCESSING: DIM_ORDER
   Extracted 99,441 rows from Bronze
   Transforming dim_order... (initial rows: 99,441)
   â†’ Removed 0 duplicate rows (now 99,441 rows)
   â†’ Filled 160 NULLs in 'order_approved_at' with 'Unknown'
   â†’ Filled 1,783 NULLs in 'order_delivered_carrier_date' with 'Unknown'
   â†’ Filled 2,965 NULLs in 'order_delivered_customer_date' with 