In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Silver Layer - Data Transformation
# MAGIC Clean and enrich Bronze data into Silver layer

# COMMAND ----------

# Import configuration
import sys
sys.path.append("/Workspace/Users/yahyasanbati.mail@gmail.com/GREEN-IT-DATA-PLATFORM/X002_Databricks")
from config import *

import pandas as pd
from datetime import datetime
import os

print("=" * 70)
print("SILVER LAYER - DATA TRANSFORMATION")
print("=" * 70)
print(f"\nSource: {BRONZE_FILE}")
print(f"Destination: {SILVER_FILE}")

# COMMAND ----------

# Load Bronze data
print("\nLoading Bronze data...")

if os.path.exists(BRONZE_FILE):
    df_silver = pd.read_parquet(BRONZE_FILE)
    print(f"Loaded {len(df_silver):,} rows from Bronze")
else:
    print(f"ERROR: Bronze file not found at {BRONZE_FILE}")
    dbutils.notebook.exit("Bronze file not found")

# COMMAND ----------

# Check data types
print("\nOriginal Data Types:")
for col, dtype in df_silver.dtypes.items():
    print(f"  {col}: {dtype}")

# COMMAND ----------

# Data Cleaning - Text columns
print("\nCleaning text columns...")

text_columns = [
    'WORKLOAD_TYPE',
    'ENERGY_SOURCE',
    'SECURITY_LEVEL',
    'WORKLOAD_SCENARIO',
    'SCENARIO_STRATEGY'
]

for col in text_columns:
    if col in df_silver.columns:
        df_silver[col] = df_silver[col].astype(str).str.lower().str.strip()
        print(f"  Cleaned: {col}")

# COMMAND ----------

# Data Type Conversion - Numeric columns
print("\nConverting numeric columns...")

numeric_columns = [
    'ENERGY_CONSUMPTION_KWH',
    'CARBON_EMISSIONS_KGCO2',
    'OPERATIONAL_COST_USD'
]

for col in numeric_columns:
    if col in df_silver.columns:
        df_silver[col] = pd.to_numeric(df_silver[col], errors='coerce')
        print(f"  Converted: {col}")

# COMMAND ----------

# FIX DATETIME COLUMNS
print("\nFixing datetime columns...")

datetime_columns = ['LOAD_DATE', 'bronze_ingestion_timestamp']

for col in datetime_columns:
    if col in df_silver.columns:
        df_silver[col] = pd.to_datetime(df_silver[col], errors='coerce')
        print(f"  Converted: {col}")

# COMMAND ----------

# Business Rules - Calculate derived metrics
print("\nApplying business rules...")

# Carbon Intensity
if 'ENERGY_CONSUMPTION_KWH' in df_silver.columns and 'CARBON_EMISSIONS_KGCO2' in df_silver.columns:
    df_silver['CARBON_INTENSITY'] = df_silver['CARBON_EMISSIONS_KGCO2'] / df_silver['ENERGY_CONSUMPTION_KWH'].replace(0, 1)
    print("  Calculated: CARBON_INTENSITY")

# Renewable Energy Flag
if 'ENERGY_SOURCE' in df_silver.columns:
    renewable_sources = ['solar', 'wind', 'renewable boost']
    df_silver['IS_RENEWABLE'] = df_silver['ENERGY_SOURCE'].isin(renewable_sources).astype(int)
    print("  Created: IS_RENEWABLE")

# Cost per kWh
if 'OPERATIONAL_COST_USD' in df_silver.columns and 'ENERGY_CONSUMPTION_KWH' in df_silver.columns:
    df_silver['COST_PER_KWH'] = df_silver['OPERATIONAL_COST_USD'] / df_silver['ENERGY_CONSUMPTION_KWH'].replace(0, 1)
    print("  Calculated: COST_PER_KWH")

# COMMAND ----------

# Add Silver metadata
print("\nAdding Silver metadata...")

df_silver['silver_processed_at'] = datetime.now()
df_silver['data_quality_score'] = 1.0

print("Metadata columns added")

# COMMAND ----------

# Verify data types before saving
print("\nFinal Data Types:")
for col, dtype in df_silver.dtypes.items():
    print(f"  {col}: {dtype}")

# COMMAND ----------

# Data Quality Report
print("\nSilver Data Quality Report:")
print("=" * 70)

print(f"\nTotal Rows: {len(df_silver):,}")
print(f"Total Columns: {len(df_silver.columns)}")

# NULL check
null_counts = df_silver.isnull().sum()
cols_with_nulls = null_counts[null_counts > 0]

if len(cols_with_nulls) > 0:
    print("\nColumns with NULL values:")
    for col, count in cols_with_nulls.items():
        pct = (count / len(df_silver)) * 100
        print(f"  - {col}: {count:,} ({pct:.2f}%)")
else:
    print("\nNo NULL values")

print("=" * 70)

# COMMAND ----------

# Save to Silver
print("\nSaving to Silver layer...")

os.makedirs(SILVER_PATH, exist_ok=True)

try:
    df_silver.to_parquet(SILVER_FILE, index=False, engine='pyarrow')
    file_size = os.path.getsize(SILVER_FILE) / (1024 * 1024)
    print(f"Saved to: {SILVER_FILE}")
    print(f"File size: {file_size:.2f} MB")
except Exception as e:
    print(f"ERROR saving Parquet: {e}")
    print("\nTrying to identify problematic columns...")
    
    for col in df_silver.columns:
        try:
            test_df = df_silver[[col]]
            test_df.to_parquet('/tmp/test_col.parquet', index=False)
        except Exception as col_error:
            print(f"  Problem with column: {col} - {col_error}")

# COMMAND ----------

# Display Silver data
print("\nFinal Silver Data:")
display(df_silver)

# COMMAND ----------

# Silver validation summary
print("\n" + "=" * 70)
print("SILVER LAYER VALIDATION")
print("=" * 70)

print(f"\nDataset Statistics:")
print(f"  Total Rows: {len(df_silver):,}")
print(f"  Total Columns: {len(df_silver.columns)}")
if 'file_size' in locals():
    print(f"  File Size: {file_size:.2f} MB")
print(f"  Location: {SILVER_FILE}")

print("\n" + "=" * 70)
print("SILVER LAYER COMPLETE")
print("=" * 70)

# COMMAND ----------

# MAGIC %md
# MAGIC ## Silver Layer Complete
# MAGIC 
# MAGIC Next: Run `03_gold_star_schema` notebook