In [1]:
# Import required libraries and initialize Spark session with Delta Lake
import os
import warnings
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from delta import *

warnings.filterwarnings('ignore')

print("=== V2 GOLD CLIMATE EMISSIONS PROCESSOR ===")

# Initialize Spark session with Delta Lake support
builder = SparkSession.builder \
    .appName("v2-Gold-Climate-Emissions") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.databricks.delta.optimizeWrite.enabled", "true") \
    .config("spark.databricks.delta.autoCompact.enabled", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

print(f"Spark Version: {spark.version}")
print("Delta Lake support enabled")
print(f"Processing timestamp: {datetime.now()}")

=== V2 GOLD CLIMATE EMISSIONS PROCESSOR ===


25/08/28 17:53:12 WARN Utils: Your hostname, 3rnese resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/28 17:53:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ernese/miniconda3/envs/SO/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ernese/.ivy2/cache
The jars for the packages stored in: /home/ernese/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c8b41067-6f3b-4bfe-a53e-e36db4dc9b07;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 148ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |

Spark Version: 3.4.0
Delta Lake support enabled
Processing timestamp: 2025-08-28 17:53:14.443104


In [2]:
# Configuration with correct paths
SILVER_PATH = "/home/ernese/miniconda3/envs/SO/New_SO/final-spark-silver"
GOLD_PATH = "/home/ernese/miniconda3/envs/SO/New_SO/final-spark-gold"
PROCESSING_TIMESTAMP = datetime.now()

# Create gold directory if it doesn't exist
os.makedirs(GOLD_PATH, exist_ok=True)

print(f"=== CONFIGURATION ===")
print(f"Silver Path: {SILVER_PATH}")
print(f"Gold Path: {GOLD_PATH}")
print(f"Processing Time: {PROCESSING_TIMESTAMP}")

# Define table paths with correct names
CLIMATE_TABLE_PATH = os.path.join(SILVER_PATH, "fact_climate_weather_v2")
LOCATION_TABLE_PATH = os.path.join(SILVER_PATH, "dim_location_v2")

print(f"Climate data: {CLIMATE_TABLE_PATH}")
print(f"Location data: {LOCATION_TABLE_PATH}")

# Verify paths exist
if os.path.exists(CLIMATE_TABLE_PATH):
    print("✓ Climate table found")
else:
    print("✗ Climate table NOT found")
    
if os.path.exists(LOCATION_TABLE_PATH):
    print("✓ Location dimension found")
else:
    print("✗ Location dimension NOT found")

=== CONFIGURATION ===
Silver Path: /home/ernese/miniconda3/envs/SO/New_SO/final-spark-silver
Gold Path: /home/ernese/miniconda3/envs/SO/New_SO/final-spark-gold
Processing Time: 2025-08-28 17:53:23.151883
Climate data: /home/ernese/miniconda3/envs/SO/New_SO/final-spark-silver/fact_climate_weather_v2
Location data: /home/ernese/miniconda3/envs/SO/New_SO/final-spark-silver/dim_location_v2
✓ Climate table found
✓ Location dimension found


In [3]:
def load_climate_data_with_locations():
    """Load climate data and location dimension with validation"""
    
    print("=== DATA LOADING ===")
    
    # Load climate fact data
    try:
        climate_data = spark.read.format("delta").load(CLIMATE_TABLE_PATH)
        climate_count = climate_data.count()
        print(f"Climate data loaded: {climate_count:,} records")
        
        if climate_count == 0:
            raise Exception("No climate data available")
            
        print("Climate data schema:")
        climate_data.printSchema()
            
    except Exception as e:
        print(f"ERROR: Cannot load climate data: {e}")
        raise
        
    # Load location dimension
    try:
        location_dim = spark.read.format("delta").load(LOCATION_TABLE_PATH)
        location_count = location_dim.count()
        print(f"Location dimension loaded: {location_count:,} records")
        
        if location_count == 0:
            print("WARNING: No location dimension data")
            return None, None, 0, 0
        else:
            print("Location dimension schema:")
            location_dim.printSchema()
            
            # Show location distribution
            print("Location types distribution:")
            location_dim.groupBy("location_type").agg(
                count("*").alias("count"),
                sum(when(col("latitude").isNotNull(), 1).otherwise(0)).alias("with_coordinates")
            ).orderBy(desc("count")).show()
            
    except Exception as e:
        print(f"ERROR: Cannot load location dimension: {e}")
        raise
        
    # Join climate data with location dimension
    print("Creating enriched dataset...")
    
    # Required location columns for analysis
    required_location_cols = [
        "location_id", "location_name", "display_name", "full_name", 
        "location_type", "latitude", "longitude", "population",
        "region_code", "province_code", "iso_code"
    ]
    
    # Add missing columns if they don't exist
    for col_name in required_location_cols:
        if col_name not in location_dim.columns:
            if col_name in ["latitude", "longitude"]:
                location_dim = location_dim.withColumn(col_name, lit(None).cast(DoubleType()))
            elif col_name == "population":
                location_dim = location_dim.withColumn(col_name, lit(None).cast(LongType()))
            else:
                location_dim = location_dim.withColumn(col_name, lit("unknown"))
                
    # Perform join
    climate_with_locations = climate_data.join(
        location_dim.select(*required_location_cols),
        "location_id",
        "left"
    )
    
    # Cache the joined dataset
    climate_with_locations.cache()
    joined_count = climate_with_locations.count()
    print(f"Enriched dataset: {joined_count:,} records")
    
    # Validate join quality
    null_locations = climate_with_locations.filter(col("location_name").isNull()).count()
    join_quality = ((joined_count - null_locations) / joined_count * 100) if joined_count > 0 else 0
    print(f"Location join quality: {join_quality:.1f}% ({null_locations:,} records without location data)")
    
    if join_quality < 95:
        print("WARNING: Low location join quality may affect mapping")
        
    # Show sample joined data
    print("Sample enriched data:")
    climate_with_locations.select(
        "measurement_date", "location_name", "metric_code", "measurement_value",
        "latitude", "longitude", "region_code", "location_type"
    ).show(5, truncate=False)
    
    return climate_with_locations, location_dim, joined_count, join_quality

# Execute the function
climate_data, location_dim, total_records, location_quality = load_climate_data_with_locations()

=== DATA LOADING ===


                                                                                

Climate data loaded: 12,880,169 records
Climate data schema:
root
 |-- climate_weather_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- date_id: long (nullable = true)
 |-- indicator_id: integer (nullable = true)
 |-- measurement_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- climate_metric: string (nullable = true)
 |-- metric_code: string (nullable = true)
 |-- measurement_value: double (nullable = true)
 |-- temperature_celsius: double (nullable = true)
 |-- precipitation_mm: double (nullable = true)
 |-- pressure_pascals: double (nullable = true)
 |-- humidity_percentage: double (nullable = true)
 |-- unit_of_measure: string (nullable = true)
 |-- measurement_type: string (nullable = true)
 |-- quality_flag: string (nullable = true)
 |-- data_quality_score: double (nullable = true)
 |-- data_source: string (nullable = true)
 |-- source_dataset: string (nul



Enriched dataset: 12,880,169 records
Location join quality: 100.0% (0 records without location data)
Sample enriched data:
+----------------+-------------+-----------+-----------------+--------+---------+-----------+-------------+
|measurement_date|location_name|metric_code|measurement_value|latitude|longitude|region_code|location_type|
+----------------+-------------+-----------+-----------------+--------+---------+-----------+-------------+
|2011-01-01      |Antipolo     |C09        |5.36             |14.5873 |121.1759 |IV-A       |city         |
|2011-01-02      |Antipolo     |C09        |2.56             |14.5873 |121.1759 |IV-A       |city         |
|2011-01-01      |Antipolo     |C09        |5.36             |14.5873 |121.1759 |IV-A       |city         |
|2011-01-02      |Antipolo     |C09        |2.56             |14.5873 |121.1759 |IV-A       |city         |
|2011-01-01      |Antipolo     |C09        |5.36             |14.5873 |121.1759 |IV-A       |city         |
+------------

                                                                                

In [4]:
print("=== DATA QUALITY ANALYSIS ===")
try:
    # Basic statistics with geographic context
    print(f"Total records: {total_records:,}")
    
    # Geographic coverage analysis
    geo_coverage = climate_data.agg(
        countDistinct("location_id").alias("unique_locations"),
        countDistinct("region_code").alias("unique_regions"),
        countDistinct("province_code").alias("unique_provinces"),
        countDistinct("metric_code").alias("unique_metrics"),
        sum(when(col("latitude").isNotNull(), 1).otherwise(0)).alias("records_with_coordinates"),
        min("measurement_date").alias("earliest_date"),
        max("measurement_date").alias("latest_date"),
        avg("measurement_value").alias("avg_value"),
        min("measurement_value").alias("min_value"),
        max("measurement_value").alias("max_value")
    ).collect()[0]
    
    print(f"\nGeographic Coverage:")
    print(f"  Unique locations: {geo_coverage['unique_locations']:,}")
    print(f"  Unique regions: {geo_coverage['unique_regions']:,}")
    print(f"  Unique provinces: {geo_coverage['unique_provinces']:,}")
    print(f"  Climate metrics: {geo_coverage['unique_metrics']:,}")
    print(f"  Records with coordinates: {geo_coverage['records_with_coordinates']:,}")
    
    coord_percentage = (geo_coverage['records_with_coordinates'] / total_records * 100) if total_records > 0 else 0
    print(f"  Coordinate coverage: {coord_percentage:.1f}%")
    
    print(f"\nData Coverage:")
    print(f"  Date range: {geo_coverage['earliest_date']} to {geo_coverage['latest_date']}")
    print(f"  Value range: {geo_coverage['min_value']:.2f} to {geo_coverage['max_value']:.2f}")
    print(f"  Average measurement: {geo_coverage['avg_value']:.2f}")
    
    # Climate metrics distribution
    print(f"\nClimate metrics distribution:")
    metric_dist = climate_data.groupBy("metric_code") \
                                .agg(count("*").alias("record_count"),
                                     countDistinct("location_id").alias("unique_locations"),
                                     avg("measurement_value").alias("avg_value"),
                                     min("measurement_value").alias("min_value"),
                                     max("measurement_value").alias("max_value")) \
                                .orderBy("metric_code")
                                
    metric_dist.show(20, truncate=False)
    
    # Location type distribution
    print(f"\nLocation type distribution:")
    location_dist = climate_data.groupBy("location_type") \
                                  .agg(count("*").alias("total_records"),
                                       countDistinct("location_id").alias("locations"),
                                       countDistinct("metric_code").alias("metrics"),
                                       avg("measurement_value").alias("avg_value")) \
                                  .orderBy(desc("total_records"))
                                  
    location_dist.show(20, truncate=False)
    
    # Regional distribution for maps
    if geo_coverage['unique_regions'] > 1:
        print(f"\nRegional distribution:")
        regional_dist = climate_data.filter(col("region_code") != "unknown") \
                                      .groupBy("region_code") \
                                      .agg(count("*").alias("total_records"),
                                           countDistinct("location_id").alias("locations"),
                                           countDistinct("metric_code").alias("metrics"),
                                           avg("measurement_value").alias("avg_value")) \
                                      .orderBy(desc("total_records"))
                                      
        regional_dist.show(20, truncate=False)
        
    # Time series analysis
    print(f"\nTemporal distribution:")
    temporal_dist = climate_data.groupBy("year") \
                                .agg(count("*").alias("total_records"),
                                     countDistinct("location_id").alias("locations"),
                                     countDistinct("metric_code").alias("metrics"),
                                     avg("measurement_value").alias("avg_value")) \
                                .orderBy("year")
                                
    temporal_dist.show(20, truncate=False)
    
    print(f"\nDATA QUALITY: VALIDATED")
    print(f"Dataset ready for aggregation and visualization")
    
except Exception as e:
    print(f"Error in data quality analysis: {e}")
    import traceback
    traceback.print_exc()

=== DATA QUALITY ANALYSIS ===
Total records: 12,880,169


                                                                                


Geographic Coverage:
  Unique locations: 27
  Unique regions: 15
  Unique provinces: 12
  Climate metrics: 7
  Records with coordinates: 12,880,169
  Coordinate coverage: 100.0%

Data Coverage:
  Date range: 1981-01-01 to 2025-07-11
  Value range: 0.00 to 573.72
  Average measurement: 44.35

Climate metrics distribution:


                                                                                

+-----------+------------+----------------+------------------+---------+---------+
|metric_code|record_count|unique_locations|avg_value         |min_value|max_value|
+-----------+------------+----------------+------------------+---------+---------+
|C01        |2146584     |27              |26.00481007032577 |13.37    |33.16    |
|C03        |2146584     |27              |29.18326591458813 |15.68    |42.2     |
|C04        |2146584     |27              |23.704765958378733|9.11     |30.28    |
|C09        |2146584     |27              |6.204945429575948 |0.0      |573.72   |
|C12        |2146716     |27              |98.4753380419185  |90.39    |102.6    |
|C13        |2146584     |27              |82.53220199162902 |42.94    |98.51    |
|C23        |533         |1               |27.771707317073172|25.13    |30.02    |
+-----------+------------+----------------+------------------+---------+---------+


Location type distribution:


                                                                                

+-------------+-------------+---------+-------+-----------------+
|location_type|total_records|locations|metrics|avg_value        |
+-------------+-------------+---------+-------+-----------------+
|city         |11709293     |15       |7      |44.32972416865778|
|province     |1170876      |12       |6      |44.56108955175462|
+-------------+-------------+---------+-------+-----------------+


Regional distribution:


                                                                                

+-----------+-------------+---------+-------+------------------+
|region_code|total_records|locations|metrics|avg_value         |
+-----------+-------------+---------+-------+------------------+
|IV-A       |10733563     |5        |7      |44.32887569672768 |
|VI         |390292       |4        |6      |44.41441630881502 |
|III        |292719       |3        |6      |44.43002029249902 |
|VII        |195146       |2        |6      |44.6679963719472  |
|NCR        |195146       |2        |6      |44.682634540292966|
|XI         |195146       |2        |6      |44.607504945015634|
|CAR        |97573        |1        |6      |42.38717288594179 |
|XIII       |97573        |1        |6      |44.69028839945477 |
|IV-B       |97573        |1        |6      |44.66710739651339 |
|V          |97573        |1        |6      |45.27187562132963 |
|IX         |97573        |1        |6      |44.54227532206656 |
|X          |97573        |1        |6      |44.587479938097644|
|I          |97573       



+----+-------------+---------+-------+------------------+
|year|total_records|locations|metrics|avg_value         |
+----+-------------+---------+-------+------------------+
|1981|289092       |27       |7      |43.74847439569366 |
|1982|289092       |27       |7      |43.44326684930749 |
|1983|289092       |27       |7      |43.0613443471281  |
|1984|289884       |27       |7      |43.39610092312773 |
|1985|289092       |27       |7      |43.54828051277818 |
|1986|289092       |27       |7      |43.66883251698429 |
|1987|289092       |27       |7      |43.48654985264187 |
|1988|289884       |27       |7      |44.06459263015544 |
|1989|289092       |27       |7      |44.21164688057769 |
|1990|289092       |27       |7      |44.07338947463139 |
|1991|289092       |27       |7      |43.78850085785809 |
|1992|289884       |27       |7      |43.51043296628976 |
|1993|289092       |27       |7      |43.86579562907306 |
|1994|289092       |27       |7      |44.03189057462701 |
|1995|289092  

                                                                                

In [5]:
# Quarterly aggregations with complete geographic context
print("=== QUARTERLY AGGREGATIONS ===")

try:
    print("Creating quarterly aggregations...")
    
    # Prepare base dataset for quarterly aggregation
    quarterly_base = climate_data.select(
        "location_id", "indicator_id", "measurement_date", "year", "metric_code", "climate_metric",
        "measurement_value", "unit_of_measure", "location_name", "display_name", "full_name",
        "location_type", "latitude", "longitude", "population", "region_code", "province_code", "iso_code"
    ).filter(
        col("measurement_value").isNotNull() &
        col("measurement_date").isNotNull() &
        col("metric_code").isNotNull() &
        col("location_id").isNotNull() &
        col("location_name").isNotNull()
    )

    # Add temporal components
    quarterly_base = quarterly_base.withColumn(
        "quarter_number", quarter(col("measurement_date"))
    ).withColumn(
        "quarter", concat(col("year"), lit("-Q"), col("quarter_number"))
    ).withColumn(
        "quarter_start_date", date_trunc("quarter", col("measurement_date"))
    )
    
    # Cache and validate
    quarterly_base.cache()
    base_count = quarterly_base.count()
    print(f"Valid records for quarterly aggregation: {base_count:,}")
    
    if base_count == 0:
        raise Exception("No valid records for quarterly aggregation")
    
    # Create comprehensive quarterly aggregations
    quarterly_climate = quarterly_base.groupBy(
        # Core identifiers
        "location_id", "indicator_id", "year", "quarter", "quarter_number", 
        "quarter_start_date", "metric_code", "climate_metric",
        # Geographic dimensions
        "location_name", "display_name", "full_name", "location_type",
        "latitude", "longitude", "population", "region_code", "province_code", "iso_code"
    ).agg(
        avg("measurement_value").alias("quarterly_value"),
        min("measurement_value").alias("min_daily_value"),
        max("measurement_value").alias("max_daily_value"),
        stddev("measurement_value").alias("stddev_daily_value"),
        count("measurement_value").alias("daily_records_count"),
        min("measurement_date").alias("period_start_date"),
        max("measurement_date").alias("period_end_date"),
        first("unit_of_measure").alias("unit_of_measure")
    )
    
    # Add metadata
    quarterly_climate = quarterly_climate.withColumn(
        "aggregation_type", lit("AVG")
    ).withColumn(
        "quarterly_climate_id", 
        row_number().over(Window.partitionBy(lit(1)).orderBy("year", "quarter_number", "location_id", "metric_code"))
    ).withColumn(
        "created_at", lit(PROCESSING_TIMESTAMP)
    ).withColumn(
        "updated_at", lit(PROCESSING_TIMESTAMP)
    ).withColumn(
        "processing_version", lit("V2")
    ).withColumn(
        "country_name", lit("Philippines")
    ).withColumn(
        "region_name", 
        when(col("region_code") == "NCR", "National Capital Region")
        .when(col("region_code") == "CAR", "Cordillera Administrative Region")
        .when(col("region_code") == "I", "Ilocos Region")
        .when(col("region_code") == "II", "Cagayan Valley")
        .when(col("region_code") == "III", "Central Luzon")
        .when(col("region_code") == "IV-A", "Calabarzon")
        .when(col("region_code") == "IV-B", "MIMAROPA")
        .when(col("region_code") == "V", "Bicol Region")
        .when(col("region_code") == "VI", "Western Visayas")
        .when(col("region_code") == "VII", "Central Visayas")
        .when(col("region_code") == "VIII", "Eastern Visayas")
        .when(col("region_code") == "IX", "Zamboanga Peninsula")
        .when(col("region_code") == "X", "Northern Mindanao")
        .when(col("region_code") == "XI", "Davao Region")
        .when(col("region_code") == "XII", "SOCCSKSARGEN")
        .when(col("region_code") == "XIII", "Caraga")
        .when(col("region_code") == "BARMM", "Bangsamoro Autonomous Region in Muslim Mindanao")
        .otherwise(col("region_code"))
    ).withColumn(
        "coordinate_text", 
        when(col("latitude").isNotNull() & col("longitude").isNotNull(),
            concat(format_number("latitude", 4), lit(", "), format_number("longitude", 4))
        ).otherwise(lit("No coordinates"))
    )
    
    # Final column selection
    quarterly_columns = [
        "quarterly_climate_id", "location_id", "indicator_id", "year", "quarter", 
        "quarter_number", "quarter_start_date", "period_start_date", "period_end_date",
        "metric_code", "climate_metric", "quarterly_value", "aggregation_type",
        "unit_of_measure", "min_daily_value", "max_daily_value", "stddev_daily_value",
        "daily_records_count",
        # Geographic columns
        "location_name", "display_name", "full_name", "location_type",
        "latitude", "longitude", "coordinate_text", "population",
        "country_name", "region_code", "region_name", "province_code", "iso_code",
        # Metadata
        "processing_version", "created_at", "updated_at"
    ]
    
    quarterly_climate = quarterly_climate.select(*quarterly_columns)
    
    # Cache and validate
    quarterly_climate.cache()
    quarterly_count = quarterly_climate.count()
    print(f"Quarterly aggregations created: {quarterly_count:,} records")
    
    if quarterly_count == 0:
        raise Exception("No quarterly aggregations created")
    
    # Show sample
    print("Sample quarterly data:")
    quarterly_climate.select(
        "quarter", "location_name", "metric_code", "quarterly_value", 
        "latitude", "longitude", "region_name"
    ).show(5, truncate=False)
    
    print("QUARTERLY AGGREGATION: SUCCESS")
    
except Exception as e:
    print(f"ERROR in quarterly aggregation: {e}")
    raise

finally:
    try:
        quarterly_base.unpersist()
    except:
        pass

=== QUARTERLY AGGREGATIONS ===
Creating quarterly aggregations...


                                                                                

Valid records for quarterly aggregation: 12,880,169


                                                                                

Quarterly aggregations created: 29,176 records
Sample quarterly data:
+-------+-------------+-----------+------------------+--------+---------+-----------+
|quarter|location_name|metric_code|quarterly_value   |latitude|longitude|region_name|
+-------+-------------+-----------+------------------+--------+---------+-----------+
|1981-Q1|Antipolo     |C01        |24.557411949685434|14.5873 |121.1759 |Calabarzon |
|1981-Q1|Antipolo     |C03        |28.37868658280918 |14.5873 |121.1759 |Calabarzon |
|1981-Q1|Antipolo     |C04        |21.841535639413213|14.5873 |121.1759 |Calabarzon |
|1981-Q1|Antipolo     |C09        |2.6336299790355975|14.5873 |121.1759 |Calabarzon |
|1981-Q1|Antipolo     |C12        |98.57743501048324 |14.5873 |121.1759 |Calabarzon |
+-------+-------------+-----------+------------------+--------+---------+-----------+
only showing top 5 rows

QUARTERLY AGGREGATION: SUCCESS


In [6]:
# Annual aggregations with complete geographic context
print("=== ANNUAL AGGREGATIONS ===")

try:
    print("Creating annual aggregations...")

    # Prepare base dataset for annual aggregation
    annual_base = climate_data.select(
        "location_id", "indicator_id", "measurement_date", "year", "metric_code", "climate_metric",
        "measurement_value", "unit_of_measure", "location_name", "display_name", "full_name",
        "location_type", "latitude", "longitude", "population", "region_code", "province_code", "iso_code"
    ).filter(
        col("measurement_value").isNotNull() &
        col("measurement_date").isNotNull() &
        col("metric_code").isNotNull() &
        col("location_id").isNotNull() &
        col("location_name").isNotNull()
    )

    # Add year start date
    annual_base = annual_base.withColumn(
        "year_start_date", date_trunc("year", col("measurement_date"))
    )

    # Cache and validate
    annual_base.cache()
    base_count = annual_base.count()
    print(f"Valid records for annual aggregation: {base_count:,}")

    if base_count == 0:
        raise Exception("No valid records for annual aggregation")

    # Perform annual aggregation
    annual_climate = annual_base.groupBy(
        "location_id", "indicator_id", "year", "year_start_date",
        "metric_code", "climate_metric",
        "location_name", "display_name", "full_name", "location_type",
        "latitude", "longitude", "population", "region_code", "province_code", "iso_code"
    ).agg(
        avg("measurement_value").alias("annual_value"),
        min("measurement_value").alias("min_daily_value"),
        max("measurement_value").alias("max_daily_value"),
        stddev("measurement_value").alias("stddev_daily_value"),
        count("measurement_value").alias("daily_records_count"),
        min("measurement_date").alias("period_start_date"),
        max("measurement_date").alias("period_end_date"),
        first("unit_of_measure").alias("unit_of_measure")
    )

    # Add metadata columns
    annual_climate = annual_climate.withColumn("aggregation_type", lit("AVG")) \
        .withColumn(
            "annual_climate_id",
            row_number().over(Window.partitionBy(lit(1)).orderBy("year", "location_id", "metric_code"))
        ) \
        .withColumn("created_at", lit(PROCESSING_TIMESTAMP)) \
        .withColumn("updated_at", lit(PROCESSING_TIMESTAMP)) \
        .withColumn("processing_version", lit("V2")) \
        .withColumn("country_name", lit("Philippines")) \
        .withColumn(
            "region_name",
            when(col("region_code") == "NCR", "National Capital Region")
            .when(col("region_code") == "CAR", "Cordillera Administrative Region")
            .when(col("region_code") == "I", "Ilocos Region")
            .when(col("region_code") == "II", "Cagayan Valley")
            .when(col("region_code") == "III", "Central Luzon")
            .when(col("region_code") == "IV-A", "Calabarzon")
            .when(col("region_code") == "IV-B", "MIMAROPA")
            .when(col("region_code") == "V", "Bicol Region")
            .when(col("region_code") == "VI", "Western Visayas")
            .when(col("region_code") == "VII", "Central Visayas")
            .when(col("region_code") == "VIII", "Eastern Visayas")
            .when(col("region_code") == "IX", "Zamboanga Peninsula")
            .when(col("region_code") == "X", "Northern Mindanao")
            .when(col("region_code") == "XI", "Davao Region")
            .when(col("region_code") == "XII", "SOCCSKSARGEN")
            .when(col("region_code") == "XIII", "Caraga")
            .when(col("region_code") == "BARMM", "Bangsamoro Autonomous Region in Muslim Mindanao")
            .otherwise(col("region_code"))
        ) \
        .withColumn(
            "coordinate_text",
            when(col("latitude").isNotNull() & col("longitude").isNotNull(),
                 concat(format_number(col("latitude"), 4), lit(", "), format_number(col("longitude"), 4))
                 ).otherwise(lit("No coordinates"))
        )

    # Final column selection
    annual_columns = [
        "annual_climate_id", "location_id", "indicator_id", "year", "year_start_date",
        "period_start_date", "period_end_date", "metric_code", "climate_metric",
        "annual_value", "aggregation_type", "unit_of_measure", "min_daily_value",
        "max_daily_value", "stddev_daily_value", "daily_records_count",
        "location_name", "display_name", "full_name", "location_type",
        "latitude", "longitude", "coordinate_text", "population",
        "country_name", "region_code", "region_name", "province_code", "iso_code",
        "processing_version", "created_at", "updated_at"
    ]

    annual_climate = annual_climate.select(*annual_columns)

    # Cache and validate
    annual_climate.cache()
    annual_count = annual_climate.count()
    print(f"Annual aggregations created: {annual_count:,} records")

    if annual_count == 0:
        raise Exception("No annual aggregations created")

    # Show sample
    print("Sample annual data:")
    annual_climate.select(
        "year", "location_name", "metric_code", "annual_value",
        "latitude", "longitude", "region_name"
    ).show(5, truncate=False)

    print("ANNUAL AGGREGATION: SUCCESS")

except Exception as e:
    print(f"ERROR in annual aggregation: {e}")
    raise

finally:
    try:
        annual_base.unpersist()
    except:
        pass

=== ANNUAL AGGREGATIONS ===
Creating annual aggregations...


                                                                                

Valid records for annual aggregation: 12,880,169


                                                                                

Annual aggregations created: 7,335 records
Sample annual data:
+----+-------------+-----------+------------------+--------+---------+-----------+
|year|location_name|metric_code|annual_value      |latitude|longitude|region_name|
+----+-------------+-----------+------------------+--------+---------+-----------+
|1981|Antipolo     |C01        |25.897904368053315|14.5873 |121.1759 |Calabarzon |
|1981|Antipolo     |C03        |29.29851641251008 |14.5873 |121.1759 |Calabarzon |
|1981|Antipolo     |C04        |23.47222176272974 |14.5873 |121.1759 |Calabarzon |
|1981|Antipolo     |C09        |4.490132850865767 |14.5873 |121.1759 |Calabarzon |
|1981|Antipolo     |C12        |98.3710049108307  |14.5873 |121.1759 |Calabarzon |
+----+-------------+-----------+------------------+--------+---------+-----------+
only showing top 5 rows

ANNUAL AGGREGATION: SUCCESS


In [7]:
def save_gold_table(df, table_name, partition_cols=None):
    """Save DataFrame as Delta table in Gold layer"""
    output_path = os.path.join(GOLD_PATH, table_name)

    print(f"\n=== SAVING GOLD TABLE: {table_name.upper()} ===")
    print(f"Target path: {output_path}")

    try:
        # Validate DataFrame
        if df is None:
            print("ERROR: Input DataFrame is None")
            return False, 0

        record_count = df.count()
        print(f"Records to save: {record_count:,}")

        if record_count == 0:
            print("WARNING: No data to save.")
            return False, 0

        # Configure writer with optimizations
        writer = (
            df.write.format("delta")
            .mode("overwrite")
            .option("overwriteSchema", "true")
            .option("delta.autoOptimize.optimizeWrite", "true")
            .option("delta.autoOptimize.autoCompact", "true")
        )

        # Apply partitioning if specified
        if partition_cols:
            valid_partition_cols = [c for c in partition_cols if c in df.columns]
            if valid_partition_cols:
                writer = writer.partitionBy(*valid_partition_cols)
                print(f"Partitioning by: {valid_partition_cols}")

        # Save to Delta table
        print("Writing data to Delta...")
        writer.save(output_path)
        print("Save completed.")

        # Validate saved data
        saved_count = spark.read.format("delta").load(output_path).count()
        print(f"Records saved: {saved_count:,}")

        return True, saved_count

    except Exception as e:
        print(f"ERROR saving table {table_name}: {e}")
        import traceback
        traceback.print_exc()
        return False, 0

    finally:
        try:
            df.unpersist()
        except:
            pass

print("Save function ready")

Save function ready


In [9]:
spark.conf.set("spark.databricks.delta.allowArbitraryProperties.enabled", "true")

print("=== GOLD LAYER SAVES ===")

# Save quarterly table
quarterly_success, quarterly_saved_count = save_gold_table(
    quarterly_climate,
    "gold_climate_quarterly_v2",
    ["year", "region_code"]
)

# Save annual table
annual_success, annual_saved_count = save_gold_table(
    annual_climate,
    "gold_climate_annual_v2",
    ["year", "region_code"]
)

# Final processing summary
print("\n=== PROCESSING SUMMARY ===")
print(f"Source records: {total_records:,}")
print(f"Location join quality: {location_quality:.1f}%")

# Geographic coverage summary
unique_locations = climate_data.select("location_name").distinct().count()
unique_regions = climate_data.agg(countDistinct("region_code")).collect()[0][0]
print(f"Geographic coverage: {unique_locations} locations across {unique_regions} regions")

print("\nQUARTERLY TABLE:")
print(f"  Status: {'SUCCESS' if quarterly_success else 'FAILED'}")
print(f"  Records saved: {quarterly_saved_count:,}")
print("  Table: gold_climate_quarterly_v2")
print("  Partitioning: year, region_code")

print("\nANNUAL TABLE:")
print(f"  Status: {'SUCCESS' if annual_success else 'FAILED'}")
print(f"  Records saved: {annual_saved_count:,}")
print("  Table: gold_climate_annual_v2")
print("  Partitioning: year, region_code")

overall_status = "SUCCESS" if (quarterly_success and annual_success) else "PARTIAL COMPLETION"
print(f"\nOVERALL STATUS: {overall_status}")

if quarterly_success or annual_success:
    total_saved = quarterly_saved_count + annual_saved_count
    print(f"Total saved records: {total_saved:,}")
    print(f"Gold layer path: {GOLD_PATH}")
    print("Processing version: V2")
    print(f"Processing timestamp: {PROCESSING_TIMESTAMP}")
else:
    print("No data was saved - check error logs above")

=== GOLD LAYER SAVES ===

=== SAVING GOLD TABLE: GOLD_CLIMATE_QUARTERLY_V2 ===
Target path: /home/ernese/miniconda3/envs/SO/New_SO/final-spark-gold/gold_climate_quarterly_v2
Records to save: 29,176
Partitioning by: ['year', 'region_code']
Writing data to Delta...
You are setting a property: delta.autooptimize.autocompact that is not recognized by this version of Delta
You are setting a property: delta.autooptimize.optimizewrite that is not recognized by this version of Delta


                                                                                

Save completed.
Records saved: 29,176

=== SAVING GOLD TABLE: GOLD_CLIMATE_ANNUAL_V2 ===
Target path: /home/ernese/miniconda3/envs/SO/New_SO/final-spark-gold/gold_climate_annual_v2


                                                                                

Records to save: 7,335
Partitioning by: ['year', 'region_code']
Writing data to Delta...
You are setting a property: delta.autooptimize.autocompact that is not recognized by this version of Delta
You are setting a property: delta.autooptimize.optimizewrite that is not recognized by this version of Delta


                                                                                

Save completed.
Records saved: 7,335

=== PROCESSING SUMMARY ===
Source records: 12,880,169
Location join quality: 100.0%


                                                                                

Geographic coverage: 27 locations across 15 regions

QUARTERLY TABLE:
  Status: SUCCESS
  Records saved: 29,176
  Table: gold_climate_quarterly_v2
  Partitioning: year, region_code

ANNUAL TABLE:
  Status: SUCCESS
  Records saved: 7,335
  Table: gold_climate_annual_v2
  Partitioning: year, region_code

OVERALL STATUS: SUCCESS
Total saved records: 36,511
Gold layer path: /home/ernese/miniconda3/envs/SO/New_SO/final-spark-gold
Processing version: V2
Processing timestamp: 2025-08-28 17:53:23.151883
