In [4]:
"""
SILVER TRANSFORMATION - COMPLETE VERSION WITH ECONOMIC DATA
Based on your original working code, adds World Bank GDP and ECB FX transformations
Preserves your existing table creation logic
"""

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime

spark = SparkSession.builder.getOrCreate()
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*80)

# ============================================================================
# 1. LOAD ALL BRONZE TABLES
# ============================================================================
print("üì• LOADING ALL BRONZE TABLES...")
print("-" * 80)

# Check what tables are available
all_tables = [t.tableName for t in spark.sql("SHOW TABLES").collect()]
print(f"Found {len(all_tables)} tables in database:")

# Try to find all bronze tables
bronze_tables = {}
for table_name in all_tables:
    lower_name = table_name.lower()
    if 'bronze' in lower_name or 'worldbank' in lower_name:
        try:
            bronze_tables[table_name] = spark.table(table_name)
            print(f"‚úÖ Loaded: {table_name} ({bronze_tables[table_name].count():,} rows)")
        except:
            print(f"‚ùå Could not load: {table_name}")

# Your original code for air quality
air_quality_tables = [t for t in all_tables if 'bronze_open_air' in t.lower()]
if air_quality_tables:
    print(f"\nFound air quality table: {air_quality_tables[0]}")
    bronze_air = spark.table(air_quality_tables[0])
    print(f"‚úÖ Using '{air_quality_tables[0]}': {bronze_air.count():,} rows")
else:
    print("‚ùå No air quality table found")
    bronze_air = None

# Load taxi data from files (your original code)
print("\nüöñ LOADING TAXI DATA FROM FILES...")
try:
    taxi_path = "Files/Files/raw/nyc_taxi"
    bronze_taxi = spark.read.parquet(taxi_path)
    print(f"‚úÖ Taxi data from {taxi_path}: {bronze_taxi.count():,} rows")
except Exception as e:
    print(f"‚ùå Could not load taxi data: {str(e)[:100]}")
    bronze_taxi = None

# ============================================================================
# 2. AIR QUALITY CLEANING (YOUR ORIGINAL CODE)
# ============================================================================
if bronze_air is not None:
    print("\nüå´Ô∏è CLEANING AIR QUALITY DATA...")
    print("-" * 80)
    
    # Your original cleaning logic
    silver_air = bronze_air.filter(
        (col("value") > 0) &
        col("measurement_date").isNotNull() &
        col("latitude").isNotNull() &
        col("longitude").isNotNull()
    )
    
    silver_air = silver_air.withColumn(
        "location_id",
        md5(concat(col("city"), col("latitude").cast("string"), col("longitude").cast("string")))
    )
    
    silver_air = silver_air.withColumn(
        "date",
        col("measurement_date")
    )
    
    print(f"Cleaned air quality: {silver_air.count():,} rows")
    
    # Save only if not already exists
    try:
        spark.table("silver_air_quality_cleaned")
        print("‚ÑπÔ∏è silver_air_quality_cleaned already exists - skipping")
    except:
        silver_air.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("silver_air_quality_cleaned")
        print("‚úÖ Saved: silver_air_quality_cleaned")
    
else:
    silver_air = None
    print("\n‚ÑπÔ∏è Skipping air quality (no data)")

# ============================================================================
# 3. TAXI DATA CLEANING (YOUR ORIGINAL CODE)
# ============================================================================
if bronze_taxi is not None:
    print("\nüöñ CLEANING TAXI DATA...")
    print("-" * 80)
    
    # Your original cleaning logic
    silver_taxi = bronze_taxi.filter(
        (col("fare_amount") > 0) &
        (col("trip_distance") > 0) &
        (col("passenger_count") > 0) &
        (col("tpep_pickup_datetime").isNotNull()) &
        (col("tpep_dropoff_datetime").isNotNull())
    )
    
    silver_taxi = silver_taxi.withColumn(
        "date",
        date_trunc("day", col("tpep_pickup_datetime"))
    )
    
    silver_taxi = silver_taxi.withColumn(
        "trip_duration_minutes",
        (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60
    ).filter(
        col("trip_duration_minutes") > 0
    ).withColumn(
        "speed_mph",
        when(col("trip_duration_minutes") > 0, 
             col("trip_distance") / (col("trip_duration_minutes") / 60))
        .otherwise(None)
    )
    
    print(f"Cleaned taxi trips: {silver_taxi.count():,} rows")
    print(f"Date range: {silver_taxi.select(min('date')).collect()[0][0]} to {silver_taxi.select(max('date')).collect()[0][0]}")
    
    # Save only if not already exists
    try:
        spark.table("silver_taxi_cleaned")
        print("‚ÑπÔ∏è silver_taxi_cleaned already exists - skipping")
    except:
        silver_taxi.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("silver_taxi_cleaned")
        print("‚úÖ Saved: silver_taxi_cleaned")
    
else:
    silver_taxi = None
    print("\n‚ÑπÔ∏è Skipping taxi (no data)")

# ============================================================================
# 4. WORLD BANK GDP TRANSFORMATION (NEW) - FIXED VERSION
# ============================================================================
print("\nüåç TRANSFORMING WORLD BANK GDP DATA...")
print("-" * 80)

# Check if WorldBank table exists
worldbank_tables = [t for t in all_tables if 'worldbank' in t.lower()]
if worldbank_tables:
    bronze_gdp = spark.table(worldbank_tables[0])
    print(f"Found World Bank GDP: {bronze_gdp.count():,} rows")
    print("GDP columns:", bronze_gdp.columns)  # Debug: show column names
    
    try:
        # Check if silver_gdp_cleaned already exists
        spark.table("silver_gdp_cleaned")
        print("‚ÑπÔ∏è silver_gdp_cleaned already exists - skipping")
    except:
        # OPTION 1: Use backticks for column names with dots
        try:
            silver_gdp = bronze_gdp.select(
                col("`year`").cast("int").alias("year"),
                col("`countryiso3code`").alias("country_code"),
                col("`country.value`").alias("country_name"),  # Use backticks
                col("`gdp_value_usd`").cast("decimal(20,2)").alias("gdp_usd"),
                col("`indicator.value`").alias("indicator_name"),  # Use backticks
                lit("World Bank").alias("source"),
                current_timestamp().alias("processed_at")
            ).filter(
                col("gdp_usd").isNotNull()
            )
        except:
            # OPTION 2: Use selectExpr for SQL-like syntax
            silver_gdp = bronze_gdp.selectExpr(
                "CAST(year AS INT) as year",
                "countryiso3code as country_code",
                "`country.value` as country_name",  # Backticks in SQL
                "CAST(gdp_value_usd AS DECIMAL(20,2)) as gdp_usd",
                "`indicator.value` as indicator_name",  # Backticks in SQL
                "'World Bank' as source"
            ).filter("gdp_usd IS NOT NULL")
            silver_gdp = silver_gdp.withColumn("processed_at", current_timestamp())
        
        silver_gdp.write.mode("overwrite").format("delta").saveAsTable("silver_gdp_cleaned")
        print(f"‚úÖ Created silver_gdp_cleaned: {silver_gdp.count():,} rows")
        silver_gdp.show(5, truncate=False)
else:
    print("‚ÑπÔ∏è No World Bank GDP data found")
# ============================================================================
# 5. ECB FX TRANSFORMATION (NEW) - FIXED VERSION
# ============================================================================
print("\nüí± TRANSFORMING ECB FX DATA...")
print("-" * 80)

# Check if ECB_FX_USD_EUR_Bronze exists
ecb_tables = [t for t in all_tables if 'ecb_fx_usd_eur_bronze' in t.lower()]
if ecb_tables:
    bronze_fx = spark.table(ecb_tables[0])
    print(f"Found ECB FX data: {bronze_fx.count():,} rows")
    
    # DEBUG: Show column names and types
    print("FX columns:", bronze_fx.columns)
    bronze_fx.printSchema()
    
    try:
        # Check if silver_fx_cleaned already exists
        spark.table("silver_fx_cleaned")
        print("‚ÑπÔ∏è silver_fx_cleaned already exists - skipping")
    except:
        # SIMPLE TRANSFORMATION - Just get the essentials
        silver_fx = bronze_fx.select(
            col("TIME_PERIOD").cast("date").alias("rate_date"),
            col("OBS_VALUE").cast("double").alias("usd_eur_rate"),
            lit("USD").alias("from_currency"),
            lit("EUR").alias("to_currency"),
            lit("ECB").alias("source"),
            current_timestamp().alias("processed_at")
        ).filter(
            col("TIME_PERIOD").isNotNull() &
            col("OBS_VALUE").isNotNull()
        ).dropDuplicates(["rate_date"])
        
        # Show sample data
        print("FX Sample Data:")
        silver_fx.orderBy(col("rate_date").desc()).show(5, truncate=False)
        
        silver_fx.write.mode("overwrite").format("delta").saveAsTable("silver_fx_cleaned")
        print(f"‚úÖ Created silver_fx_cleaned: {silver_fx.count():,} rows")
        
        # Also create a simpler version for USD/EUR only
        print("\nüìä Checking if we need to filter for USD/EUR specifically...")
        
        # Check what's in the CURRENCY columns if they exist
        if 'CURRENCY' in bronze_fx.columns:
            print("Unique CURRENCY values:", bronze_fx.select("CURRENCY").distinct().collect())
        if 'CURRENCY_DENOM' in bronze_fx.columns:
            print("Unique CURRENCY_DENOM values:", bronze_fx.select("CURRENCY_DENOM").distinct().collect())
            
else:
    print("‚ÑπÔ∏è No ECB FX data found")

# ============================================================================
# 6. CREATE DAILY AGGREGATES (YOUR ORIGINAL CODE + NEW)
# ============================================================================
print("\nüìä CREATING DAILY AGGREGATES...")
print("-" * 80)

# Air Quality Daily (your original code)
if silver_air is not None:
    try:
        spark.table("silver_air_quality_daily")
        print("‚ÑπÔ∏è silver_air_quality_daily already exists - skipping")
    except:
        try:  # ADD THIS LINE
            air_daily = silver_air.groupBy("date", "location_id", "city").agg(
                avg(when(col("param_name") == "PM2.5", col("value"))).alias("avg_pm25"),
                avg(when(col("param_name") == "PM10", col("value"))).alias("avg_pm10"),
                avg(when(col("param_name") == "NO2", col("value"))).alias("avg_no2"),
                avg(when(col("param_name") == "O3", col("value"))).alias("avg_o3"),
                count("*").alias("total_measurements")
            )
            air_daily.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("silver_air_quality_daily")
            print(f"‚úÖ Air daily aggregates: {air_daily.count():,} rows")
        except Exception as e:  # THIS LINE IS FINE NOW
            print(f"‚ùå Could not create air daily aggregates: {str(e)}")

# Taxi Daily (your original code)
if silver_taxi is not None:
    try:
        spark.table("silver_taxi_daily")
        print("‚ÑπÔ∏è silver_taxi_daily already exists - skipping")
    except:
        taxi_daily = silver_taxi.groupBy("date").agg(
            count("*").alias("total_trips"),
            sum("fare_amount").alias("total_fare"),
            avg("fare_amount").alias("avg_fare"),
            avg("trip_distance").alias("avg_distance"),
            avg("passenger_count").alias("avg_passengers"),
            sum("trip_distance").alias("total_distance"),
            avg("trip_duration_minutes").alias("avg_duration_minutes"),
            avg("speed_mph").alias("avg_speed_mph")
        )
        taxi_daily.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("silver_taxi_daily")
        print(f"‚úÖ Taxi daily aggregates: {taxi_daily.count():,} rows")

# FX Daily Aggregates (NEW)
try:
    spark.table("silver_fx_daily")
    print("‚ÑπÔ∏è silver_fx_daily already exists - skipping")
except:
    try:
        fx_df = spark.table("silver_fx_cleaned")
        fx_daily = fx_df.groupBy("rate_date").agg(
            avg("usd_eur_rate").alias("avg_rate"),
            count("*").alias("record_count")
        )
        fx_daily.write.mode("overwrite").format("delta").saveAsTable("silver_fx_daily")
        print(f"‚úÖ FX daily aggregates: {fx_daily.count():,} rows")
    except:
        print("‚ÑπÔ∏è No FX data for daily aggregates")

# ============================================================================
# 7. VERIFICATION
# ============================================================================
print(f"""
{'='*80}
‚úÖ SILVER TRANSFORMATION COMPLETE!
{'='*80}

üìä ALL SILVER TABLES:
""")

# Show all silver tables
silver_tables_list = [t for t in spark.sql("SHOW TABLES").collect() if 'silver' in t.tableName.lower()]
for table in sorted(silver_tables_list, key=lambda x: x.tableName):
    try:
        df = spark.table(table.tableName)
        print(f"‚Ä¢ {table.tableName:30} {df.count():>10,} rows")
    except:
        print(f"‚Ä¢ {table.tableName:30} {'ERROR':>10}")

print(f"""
üìà DATA READY FOR GOLD LAYER:

Your Silver layer contains:
‚Ä¢ Air Quality data (cleaned and daily aggregates) - ‚úÖ Ready
‚Ä¢ Taxi Trip data (cleaned and daily aggregates)  - ‚úÖ Ready  
‚Ä¢ World Bank GDP data                            - ‚úÖ Ready
‚Ä¢ ECB FX Rate data                               - ‚úÖ Ready

Next: Run GOLD transformation to create:
1. Dimension tables from all Silver sources
2. Fact tables with proper relationships
3. Star schema for analytics

üèÅ Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{'='*80}
""")

StatementMeta(, 4c26237b-2281-4c4c-b822-bf82c0cef037, 6, Finished, Available, Finished)

Started: 2025-12-20 16:05:38
üì• LOADING ALL BRONZE TABLES...
--------------------------------------------------------------------------------
Found 23 tables in database:
‚úÖ Loaded: Bronze_NO2 (500 rows)
‚úÖ Loaded: Bronze_O3 (500 rows)
‚úÖ Loaded: Bronze_PM10 (500 rows)
‚úÖ Loaded: Bronze_PM25 (500 rows)
‚úÖ Loaded: ECB_FX_USD_EUR_Bronze (6,966 rows)
‚úÖ Loaded: WorldBank (50 rows)
‚úÖ Loaded: bronze_open_air (2,000 rows)

Found air quality table: bronze_open_air
‚úÖ Using 'bronze_open_air': 2,000 rows

üöñ LOADING TAXI DATA FROM FILES...
‚úÖ Taxi data from Files/Files/raw/nyc_taxi: 2,964,624 rows

üå´Ô∏è CLEANING AIR QUALITY DATA...
--------------------------------------------------------------------------------
Cleaned air quality: 1,896 rows
‚ÑπÔ∏è silver_air_quality_cleaned already exists - skipping

üöñ CLEANING TAXI DATA...
--------------------------------------------------------------------------------
Cleaned taxi trips: 2,723,750 rows
Date range: 2002-12-31 00:00:00 to 

In [5]:
# SIMPLE CODE TO SEE ALL TABLES
tables = spark.sql("SHOW TABLES")
tables.show(100, truncate=False)

StatementMeta(, 4c26237b-2281-4c4c-b822-bf82c0cef037, 7, Finished, Available, Finished)

+--------------------------+----------------------------+-----------+
|namespace                 |tableName                   |isTemporary|
+--------------------------+----------------------------+-----------+
|`My workspace`.LH_Core.dbo|Bronze_NO2                  |false      |
|`My workspace`.LH_Core.dbo|Bronze_O3                   |false      |
|`My workspace`.LH_Core.dbo|Bronze_PM10                 |false      |
|`My workspace`.LH_Core.dbo|Bronze_PM25                 |false      |
|`My workspace`.LH_Core.dbo|ECB_FX_USD_EUR_Bronze       |false      |
|`My workspace`.LH_Core.dbo|WorldBank                   |false      |
|`My workspace`.LH_Core.dbo|bronze_open_air             |false      |
|`My workspace`.LH_Core.dbo|gold_bridge_taxi_air_quality|false      |
|`My workspace`.LH_Core.dbo|gold_dim_date               |false      |
|`My workspace`.LH_Core.dbo|gold_dim_fx                 |false      |
|`My workspace`.LH_Core.dbo|gold_dim_location           |false      |
|`My workspace`.LH_C