In [1]:
# --- Cell 1: Notebook Header, Logging Configuration, and Library Imports ---

"""
Notebook: 03_build_gold_layer.ipynb

Purpose:
This notebook processes cleaned and transformed earthquake data from the Silver layer
and loads it into a dimensional model (star schema) within the Lakehouse's Gold layer.
The Gold layer, stored as Delta tables within the Lakehouse, is highly refined,
aggregated, and conformed for optimized analytical consumption, reporting, and business intelligence.
It represents the final, business-ready data set.

Dependencies:
- Python 3.x
- pyspark library (for distributed processing and DataFrame operations)

Execution Environment:
This script is designed to run within an Apache Spark environment,
specifically optimized for platforms like Azure Fabric where a SparkSession
('spark') is typically pre-initialized and available globally.
"""

# Configure a basic logging system for effective monitoring in production environments.
# This setup allows capturing informational messages, warnings, and errors throughout
# the script's execution, which is crucial for debugging and operational oversight.
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# PySpark libraries are imported for core DataFrame operations, built-in functions, and windowing.
from pyspark.sql import SparkSession            # The entry point for Spark functionality.
from pyspark.sql import functions as F          # Provides access to Spark SQL functions (e.g., F.col, F.year).
from pyspark.sql.window import Window           # Used for defining window specifications, e.g., for deduplication or ranking.
# Standard Python library for date and time manipulations.
from datetime import datetime, timedelta, date  # Used for generating the date dimension.

# Initialize Spark Session:
# In Azure Fabric notebooks, the 'spark' session is typically pre-initialized and available globally.
# However, using `SparkSession.builder.getOrCreate()` is a robust pattern as it
# either retrieves the existing session or creates a new one if necessary,
# ensuring the notebook can run standalone or within a broader data pipeline.
try:
    if 'spark' not in globals() or not isinstance(spark, SparkSession):
        logger.info("SparkSession 'spark' not found or not an instance of SparkSession. Attempting to get or create a new one.")
        spark = SparkSession.builder.appName("LoadLakehouseGold").getOrCreate()
        logger.info("Spark session initialized successfully.")
    else:
        logger.info("Spark session 'spark' is already initialized and available.")
except Exception as e:
    logger.error(f"FATAL ERROR: Failed to initialize or retrieve Spark session: {e}", exc_info=True)
    raise Exception(f"Failed to initialize Spark session: {e}")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 3, Finished, Available, Finished)

2025-07-31 21:55:12,116 - INFO - Spark session 'spark' is already initialized and available.


In [2]:
# --- Cell 2: Configuration Parameters ---

# This section defines all key parameters for the Gold layer loading process,
# including the source (Silver) table name and the target (Gold) table names
# for the dimensional model. Centralizing these values ensures easy modification,
# maintainability, and consistency across the Gold layer components.

# Source Table Name (Silver Layer):
# The fully qualified name of the Delta table in the Silver layer from which
# cleaned, validated, and semi-enriched data will be read to build the Gold layer.
SILVER_TABLE_NAME = "silver_earthquakes_cleaned"

# Gold Layer Naming Conventions within the Lakehouse:
# For organizing Gold layer tables directly within the Lakehouse.
# This often involves a logical "schema" or prefix to group related tables
# belonging to a specific analytical domain or dimensional model.
GOLD_TABLE_PREFIX = "gold_earthquake_" # Prefix for all tables in this analytical model.

# Construct full table names for each component of the Gold layer's dimensional model.
# This creates a clear mapping for where each dimension and fact table will reside.
GOLD_DIM_DATE_TABLE = f"{GOLD_TABLE_PREFIX}dim_date"
GOLD_DIM_LOCATION_TABLE = f"{GOLD_TABLE_PREFIX}dim_location"
GOLD_DIM_MAGNITUDE_TABLE = f"{GOLD_TABLE_PREFIX}dim_magnitude"
GOLD_DIM_EVENT_TYPE_TABLE = f"{GOLD_TABLE_PREFIX}dim_event_type"
GOLD_FACT_EVENTS_TABLE = f"{GOLD_TABLE_PREFIX}fact_earthquake_events"

# Log the configured parameters for traceability and debugging in production environments.
logger.info("Gold Layer Load Configuration Loaded:")
logger.info(f"  Reading from Silver table: {SILVER_TABLE_NAME}")
logger.info(f"  Writing to Gold tables in Lakehouse, prefix: '{GOLD_TABLE_PREFIX}'):")
logger.info(f"    - DimDate: {GOLD_DIM_DATE_TABLE}")
logger.info(f"    - DimLocation: {GOLD_DIM_LOCATION_TABLE}")
logger.info(f"    - DimMagnitude: {GOLD_DIM_MAGNITUDE_TABLE}")
logger.info(f"    - DimEventType: {GOLD_DIM_EVENT_TYPE_TABLE}")
logger.info(f"    - FactEvents: {GOLD_FACT_EVENTS_TABLE}")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 4, Finished, Available, Finished)

2025-07-31 21:55:12,483 - INFO - Gold Layer Load Configuration Loaded:
2025-07-31 21:55:12,484 - INFO -   Reading from Silver table: silver_earthquakes_cleaned
2025-07-31 21:55:12,485 - INFO -   Writing to Gold tables in Lakehouse, prefix: 'gold_earthquake_'):
2025-07-31 21:55:12,486 - INFO -     - DimDate: gold_earthquake_dim_date
2025-07-31 21:55:12,486 - INFO -     - DimLocation: gold_earthquake_dim_location
2025-07-31 21:55:12,487 - INFO -     - DimMagnitude: gold_earthquake_dim_magnitude
2025-07-31 21:55:12,487 - INFO -     - DimEventType: gold_earthquake_dim_event_type
2025-07-31 21:55:12,488 - INFO -     - FactEvents: gold_earthquake_fact_earthquake_events


In [3]:
# --- Cell 3: Load Silver Data ---

# This cell loads the pre-processed earthquake data from the Silver layer
# into a PySpark DataFrame. This DataFrame serves as the primary source
# for extracting data to populate the various dimension and fact tables in the Gold layer.

try:
    # Read the Silver layer Delta table into a Spark DataFrame.
    # This table contains validated, cleaned, and enriched data from the previous transformation step,
    # making it suitable for direct consumption by the Gold layer.
    df_silver = spark.table(SILVER_TABLE_NAME)
    
    # Materialize the count of records to log and check for emptiness.
    silver_record_count = df_silver.count()
    
    logger.info(f"Successfully loaded {silver_record_count} records from Silver table: {SILVER_TABLE_NAME}.")
    logger.info("Silver DataFrame Schema:")
    df_silver.printSchema()

    # Check if the Silver table is empty.
    # If the source data for the Gold layer is empty, subsequent dimensional modeling
    # and fact table population steps will also result in empty tables.
    if silver_record_count == 0:
        logger.warning(f"Silver table '{SILVER_TABLE_NAME}' is empty. No data to load into the Gold layer.")
        # Create an empty DataFrame with the expected schema to prevent subsequent errors
        # in transformations that expect a DataFrame, ensuring the pipeline can proceed gracefully.
        df_silver = spark.createDataFrame([], df_silver.schema)
        
except Exception as e:
    logger.error(f"FATAL ERROR: Failed to load Silver data from '{SILVER_TABLE_NAME}'. Gold layer loading cannot proceed. Error: {e}", exc_info=True)
    raise Exception(f"Failed to load Silver data: {e}")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 5, Finished, Available, Finished)

root
 |-- event_id: string (nullable = true)
 |-- event_timestamp_utc: timestamp (nullable = true)
 |-- updated_timestamp_utc: timestamp (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- depth_km: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- place: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- magType: string (nullable = true)
 |-- significance: integer (nullable = true)
 |-- felt_reports: integer (nullable = true)
 |-- nst_stations: integer (nullable = true)
 |-- rms_travel_time: double (nullable = true)
 |-- gap_azimuthal: double (nullable = true)
 |-- alert: string (nullable = true)
 |-- status: string (nullable = true)
 |-- url: string (nullable = true)
 |-- title: string (nullable = true)
 |-- ingestion_timestamp_utc: timestamp (nullable = true)
 |-- magnitude_category: string (nullable = true)
 |-- depth_category: string (nullable = true)
 |-- hemisphere_ns: string (nullable 

In [4]:
# --- Cell 4: Populate DimDate Dimension Table ---

# This cell generates and populates the `DimDate` dimension table.

# Check if `df_silver` is empty or not defined before proceeding with date range determination.
if 'df_silver' not in locals() or df_silver.count() == 0:
    logger.warning("df_silver is empty or not defined. Skipping DimDate population.")
    # Create an empty DataFrame with a predefined schema to ensure consistency for downstream operations,
    # even if no data is generated for DimDate. The schema is implied by the `date_list` structure.
    df_dim_date = spark.createDataFrame([], schema="DateKey INT, FullDate DATE, Year INT, Quarter INT, Month INT, MonthName STRING, DayOfMonth INT, DayOfWeek INT, DayName STRING, IsWeekend INT, ISOWeekOfYear INT") 
else:
    logger.info("Determining date range for DimDate population from Silver data's event timestamps.")
    # Determine the minimum and maximum event dates from the `event_timestamp_utc` column in the Silver data.
    min_max_date_row = df_silver.select(F.min("event_timestamp_utc").alias("min_date"), 
                                        F.max("event_timestamp_utc").alias("max_date")).first()

    # Define the precise start and end dates for the `DimDate` generation.
    # Add a buffer (e.g., 30 days) to the `max_date` to account for potential future events or late-arriving data.
    # Include a fallback to a predefined historical and future range if no valid dates are found in `df_silver`.
    if min_max_date_row and min_max_date_row["min_date"] and min_max_date_row["max_date"]:
        start_date: date = min_max_date_row["min_date"].date()
        end_date: date = min_max_date_row["max_date"].date() + timedelta(days=30) # Add a 30-day buffer.
    else: 
        logger.warning("No valid min/max dates found in df_silver. Using default date range (2020-01-01 to 1 year from now) for DimDate.")
        start_date: date = datetime(2020, 1, 1).date()        # Default start from a reasonable historical date.
        end_date: date = datetime.now().date() + timedelta(days=365) # Default projection for a year into the future.

    logger.info(f"Generating DimDate from {start_date} to {end_date}.")

    # Generate a list of dictionaries, where each dictionary represents a day and its attributes.
    # This programmatic generation ensures all required date dimensions are present.
    date_list = []
    current_date: date = start_date
    while current_date <= end_date:
        date_list.append({
            'DateKey': int(current_date.strftime('%Y%m%d')), # YYYYMMDD as an INTEGER for efficient joins in data warehouses.
            'FullDate': current_date,                       # Full date as a Python date object.
            'Year': current_date.year,
            'Quarter': (current_date.month - 1) // 3 + 1,   # Calculate quarter (1-4).
            'Month': current_date.month,
            'MonthName': current_date.strftime('%B'),       # Full month name (e.g., "January").
            'DayOfMonth': current_date.day,
            # DayOfWeek: 1 (Sunday) to 7 (Saturday) to match common SQL DATEPART(dw,...) conventions.
            'DayOfWeek': current_date.isoweekday() % 7 + 1, # Monday=0, Sunday=6 in Python's weekday().
            'DayName': current_date.strftime('%A'),         # Full day name (e.g., "Monday").
            'IsWeekend': 1 if current_date.weekday() >= 5 else 0, # 1 for Saturday/Sunday, 0 otherwise.
            'ISOWeekOfYear': current_date.isocalendar()[1]  # ISO week number.
        })
        current_date += timedelta(days=1)                   # Increment to the next day.

    # Create Spark DataFrame from the list of date attributes.
    df_dim_date = spark.createDataFrame(date_list)
    
    # Persist DimDate to the Gold layer of the Lakehouse.
    # 'overwrite' mode is used here because DimDate is typically a static or slowly changing table
    # that can be fully rebuilt without losing historical context, or it follows a specific lifecycle.
    # 'overwriteSchema' allows schema evolution (e.g., adding new date attributes) if the definition changes.
    try:
        df_dim_date.write \
                   .format("delta") \
                   .mode("overwrite") \
                   .option("overwriteSchema", "true") \
                   .saveAsTable(GOLD_DIM_DATE_TABLE)
        logger.info(f"DimDate populated successfully with {df_dim_date.count()} records into table: {GOLD_DIM_DATE_TABLE}.")
        logger.info("Sample records from DimDate:")
        df_dim_date.show(5, truncate=False)
    except Exception as e:
        logger.error(f"Error populating DimDate to Gold layer table '{GOLD_DIM_DATE_TABLE}': {e}", exc_info=True)
        raise Exception(f"Failed to populate DimDate: {e}")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 6, Finished, Available, Finished)

2025-07-31 21:55:27,104 - INFO - Determining date range for DimDate population from Silver data's event timestamps.
2025-07-31 21:55:29,734 - INFO - Generating DimDate from 2024-07-30 to 2025-08-29.
2025-07-31 21:55:38,151 - INFO - DimDate populated successfully with 396 records into table: gold_earthquake_dim_date.
2025-07-31 21:55:38,153 - INFO - Sample records from DimDate:


+--------+---------+----------+---------+----------+-------------+---------+-----+---------+-------+----+
|DateKey |DayName  |DayOfMonth|DayOfWeek|FullDate  |ISOWeekOfYear|IsWeekend|Month|MonthName|Quarter|Year|
+--------+---------+----------+---------+----------+-------------+---------+-----+---------+-------+----+
|20240730|Tuesday  |30        |3        |2024-07-30|31           |0        |7    |July     |3      |2024|
|20240731|Wednesday|31        |4        |2024-07-31|31           |0        |7    |July     |3      |2024|
|20240801|Thursday |1         |5        |2024-08-01|31           |0        |8    |August   |3      |2024|
|20240802|Friday   |2         |6        |2024-08-02|31           |0        |8    |August   |3      |2024|
|20240803|Saturday |3         |7        |2024-08-03|31           |1        |8    |August   |3      |2024|
+--------+---------+----------+---------+----------+-------------+---------+-----+---------+-------+----+
only showing top 5 rows



In [5]:
# --- Cell 5: Populate DimLocation Dimension Table ---

# This cell populates the `DimLocation` dimension table.
# It extracts distinct location attributes (latitude, longitude, place description, extracted country/region, hemisphere)
# from the Silver layer and assigns a unique surrogate key for each unique combination.

# Check if `df_silver` is empty or not defined before proceeding with transformations.
if 'df_silver' not in locals() or df_silver.count() == 0:
    logger.warning("df_silver is empty or not defined. Skipping DimLocation population.")
    # Create an empty DataFrame with a placeholder schema for consistency.
    df_dim_location = spark.createDataFrame([], schema="LocationKey LONG, Latitude DOUBLE, Longitude DOUBLE, PlaceDescription STRING, ExtractedCountry STRING, ExtractedRegionDetail STRING, HemisphereNS STRING, HemisphereEW STRING")
else:
    logger.info("Extracting distinct location attributes for DimLocation population.")
    # Select distinct location attributes from the Silver DataFrame.
    # This ensures that each unique combination of geographical details gets a single entry in the dimension table.
    df_dim_location_source = df_silver.select(
        "latitude", "longitude", "place", 
        "extracted_country", "extracted_region_detail",
        "hemisphere_ns", "hemisphere_ew"
    ).distinct()

    # Assign a surrogate key (`LocationKey`) to each unique location record.
    df_dim_location = df_dim_location_source.withColumn("LocationKey", F.monotonically_increasing_id() + 1) \
        .select(
            F.col("LocationKey"),
            F.col("latitude").alias("Latitude"),
            F.col("longitude").alias("Longitude"),
            F.col("place").alias("PlaceDescription"),
            F.col("extracted_country").alias("ExtractedCountry"),
            F.col("extracted_region_detail").alias("ExtractedRegionDetail"),
            F.col("hemisphere_ns").alias("HemisphereNS"),
            F.col("hemisphere_ew").alias("HemisphereEW")
        )

    # Persist DimLocation to the Gold layer of the Lakehouse.
    # 'overwrite' mode is used for simplicity, assuming a full rebuild is acceptable for this dimension.
    try:
        df_dim_location.write \
                       .format("delta") \
                       .mode("overwrite") \
                       .option("overwriteSchema", "true") \
                       .saveAsTable(GOLD_DIM_LOCATION_TABLE)
        logger.info(f"DimLocation populated successfully with {df_dim_location.count()} records into table: {GOLD_DIM_LOCATION_TABLE}.")
        logger.info("Sample records from DimLocation:")
        df_dim_location.show(5, truncate=False)
    except Exception as e:
        logger.error(f"Error populating DimLocation to Gold layer table '{GOLD_DIM_LOCATION_TABLE}': {e}", exc_info=True)
        raise Exception(f"Failed to populate DimLocation: {e}")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 7, Finished, Available, Finished)

2025-07-31 21:55:41,179 - INFO - Extracting distinct location attributes for DimLocation population.


+-----------+----------------+-----------------+---------------------------------------+----------------+---------------------+------------+------------+
|LocationKey|Latitude        |Longitude        |PlaceDescription                       |ExtractedCountry|ExtractedRegionDetail|HemisphereNS|HemisphereEW|
+-----------+----------------+-----------------+---------------------------------------+----------------+---------------------+------------+------------+
|1          |31.5118333      |-114.3651667     |97 km SE of Estacion Coahuila, B.C., MX|B.C., MX        |B.C., MX             |Northern    |Western     |
|2          |19.3888333333333|-155.277333333333|7 km SW of Volcano, Hawaii             |Hawaii          |Hawaii               |Northern    |Western     |
|3          |-21.9587        |-63.7892         |13 km WNW of Yacuiba, Bolivia          |Bolivia         |Bolivia              |Southern    |Western     |
|4          |-17.4186        |-178.8261        |209 km ENE of Levuka, Fiji  

In [6]:
# --- Cell 6: Populate DimMagnitude Dimension Table ---

# This cell populates the `DimMagnitude` dimension table.
# This is typically a static dimension, meaning its data (magnitude categories, descriptions)
# is predefined and does not change based on the source data.
# It provides descriptive context for earthquake magnitudes, making analytical queries more readable.

logger.info("Populating DimMagnitude with static predefined categories.")

# Define the static data for magnitude categories.
# This data is hardcoded as it represents a fixed business classification of earthquake magnitudes.
magnitude_categories_data = [
    {"MagnitudeCategory": "Micro", "MinMagnitude": -2.0, "MaxMagnitude": 2.9, "Description": "Microearthquakes, not felt, or felt rarely by sensitive people."},
    {"MagnitudeCategory": "Minor", "MinMagnitude": 3.0, "MaxMagnitude": 3.9, "Description": "Often felt by people, but very rarely causes damage."},
    {"MagnitudeCategory": "Light", "MinMagnitude": 4.0, "MaxMagnitude": 4.9, "Description": "Felt indoors by many, outdoors by few. Light damage possible."},
    {"MagnitudeCategory": "Moderate", "MinMagnitude": 5.0, "MaxMagnitude": 5.9, "Description": "Felt by everyone. Slight damage to weak structures."},
    {"MagnitudeCategory": "Strong", "MinMagnitude": 6.0, "MaxMagnitude": 6.9, "Description": "Damage to a moderate number of well-built structures."},
    {"MagnitudeCategory": "Major", "MinMagnitude": 7.0, "MaxMagnitude": 7.9, "Description": "Causes damage to most buildings, some toppling."},
    {"MagnitudeCategory": "Great", "MinMagnitude": 8.0, "MaxMagnitude": 10.0, "Description": "Causes widespread destruction, severe damage or collapse."},
    {"MagnitudeCategory": "Unknown", "MinMagnitude": None, "MaxMagnitude": None, "Description": "Magnitude category could not be determined."} # Fallback for unexpected values.
]
df_dim_magnitude_static = spark.createDataFrame(magnitude_categories_data)

# Assign a surrogate key (`MagnitudeKey`) to each magnitude category.
# For static dimensions that are fully overwritten, `monotonically_increasing_id()` is acceptable,
# as key consistency across runs is not a strict requirement for a dimension that is always rebuilt.
df_dim_magnitude = df_dim_magnitude_static.withColumn("MagnitudeKey", F.monotonically_increasing_id() + 1) \
    .select("MagnitudeKey", "MagnitudeCategory", "MinMagnitude", "MaxMagnitude", "Description")

# Persist DimMagnitude to the Gold layer of the Lakehouse.
# 'overwrite' mode is appropriate for static or very slowly changing dimensions where a full refresh
# of the lookup data is always desired.
try:
    df_dim_magnitude.write \
                    .format("delta") \
                    .mode("overwrite") \
                    .option("overwriteSchema", "true") \
                    .saveAsTable(GOLD_DIM_MAGNITUDE_TABLE)
    logger.info(f"DimMagnitude populated successfully with {df_dim_magnitude.count()} records into table: {GOLD_DIM_MAGNITUDE_TABLE}.")
    logger.info("Sample records from DimMagnitude:")
    df_dim_magnitude.show(truncate=False) # Show all rows as it's a small static table.
except Exception as e:
    logger.error(f"Error populating DimMagnitude to Gold layer table '{GOLD_DIM_MAGNITUDE_TABLE}': {e}", exc_info=True)
    raise Exception(f"Failed to populate DimMagnitude: {e}")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 8, Finished, Available, Finished)

2025-07-31 21:55:50,644 - INFO - Populating DimMagnitude with static predefined categories.


+------------+-----------------+------------+------------+---------------------------------------------------------------+
|MagnitudeKey|MagnitudeCategory|MinMagnitude|MaxMagnitude|Description                                                    |
+------------+-----------------+------------+------------+---------------------------------------------------------------+
|1           |Micro            |-2.0        |2.9         |Microearthquakes, not felt, or felt rarely by sensitive people.|
|8589934593  |Minor            |3.0         |3.9         |Often felt by people, but very rarely causes damage.           |
|17179869185 |Light            |4.0         |4.9         |Felt indoors by many, outdoors by few. Light damage possible.  |
|25769803777 |Moderate         |5.0         |5.9         |Felt by everyone. Slight damage to weak structures.            |
|34359738369 |Strong           |6.0         |6.9         |Damage to a moderate number of well-built structures.          |
|42949672961 |Ma

In [7]:
# --- Cell 7: Populate DimEventType Dimension Table ---

# This cell populates the `DimEventType` dimension table.
# It extracts distinct combinations of `event_type` and `MagType` from the Silver layer
# to create a comprehensive lookup for different event classifications and their associated
# magnitude types.

# Check if `df_silver` is empty or not defined before proceeding.
if 'df_silver' not in locals() or df_silver.count() == 0:
    logger.warning("df_silver is empty or not defined. Skipping DimEventType population.")
    # Create an empty DataFrame with a placeholder schema for consistency.
    df_dim_event_type = spark.createDataFrame([], schema="EventTypeKey LONG, EventType STRING, MagType STRING")
else:
    logger.info("Extracting distinct event types and magnitude types for DimEventType population.")
    # Select distinct combinations of `event_type` and `MagType` from the Silver DataFrame.
    # This ensures that each unique combination is represented only once in the dimension.
    df_dim_event_type_source = df_silver.select("event_type", "MagType").distinct()

    # Assign a surrogate key (`EventTypeKey`) to each unique event type combination.
    # Similar considerations for `monotonically_increasing_id()` apply here as in `DimLocation`
    # regarding consistency across runs vs. full overwrite.
    df_dim_event_type = df_dim_event_type_source.withColumn("EventTypeKey", F.monotonically_increasing_id() + 1) \
        .select(
            F.col("EventTypeKey"),
            F.col("event_type").alias("EventType"), # Alias for clarity in the dimension.
            F.col("MagType")
        )

    # Persist DimEventType to the Gold layer of the Lakehouse.
    # 'overwrite' mode is used, suitable for dimensions that are fully refreshed or when
    # the entire set of event types is expected to be rebuilt.
    try:
        df_dim_event_type.write \
                         .format("delta") \
                         .mode("overwrite") \
                         .option("overwriteSchema", "true") \
                         .saveAsTable(GOLD_DIM_EVENT_TYPE_TABLE)
        logger.info(f"DimEventType populated successfully with {df_dim_event_type.count()} records into table: {GOLD_DIM_EVENT_TYPE_TABLE}.")
        logger.info("Sample records from DimEventType:")
        df_dim_event_type.show(truncate=False)
    except Exception as e:
        logger.error(f"Error populating DimEventType to Gold layer table '{GOLD_DIM_EVENT_TYPE_TABLE}': {e}", exc_info=True)
        raise Exception(f"Failed to populate DimEventType: {e}")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 9, Finished, Available, Finished)

2025-07-31 21:55:57,517 - INFO - Extracting distinct event types and magnitude types for DimEventType population.
2025-07-31 21:56:02,807 - INFO - DimEventType populated successfully with 15 records into table: gold_earthquake_dim_event_type.
2025-07-31 21:56:02,808 - INFO - Sample records from DimEventType:


+------------+-----------------+-------+
|EventTypeKey|EventType        |MagType|
+------------+-----------------+-------+
|1           |earthquake       |mww    |
|2           |earthquake       |mwr    |
|3           |earthquake       |mw     |
|4           |earthquake       |ml     |
|5           |earthquake       |md     |
|6           |earthquake       |mb     |
|7           |mining explosion |ml     |
|8           |earthquake       |mb_lg  |
|9           |earthquake       |mh     |
|10          |landslide        |ms_vx  |
|11          |volcanic eruption|ml     |
|12          |explosion        |mb     |
|13          |explosion        |ml     |
|14          |other event      |ml     |
|15          |earthquake       |mwb    |
+------------+-----------------+-------+



In [8]:
# --- Cell 8: Prepare Fact Table Data (Joins and Surrogate Key Lookups) ---

# This cell prepares the data for the `FactEarthquakeEvents` table.
# It involves joining the processed Silver layer data with the newly populated dimension tables
# (DimDate, DimLocation, DimMagnitude, DimEventType). 

# Check if `df_silver` is empty or not defined before proceeding.
if 'df_silver' not in locals() or df_silver.count() == 0:
    logger.warning("df_silver is empty or not defined. Skipping Fact table preparation.")
    # Create an empty DataFrame with a placeholder schema for consistency.
    # The schema should match the expected output of `df_fact_final`.
    df_fact_final = spark.createDataFrame([], schema="EarthquakeEventKey LONG, EventID STRING, DateKey INT, TimeOfDay STRING, LocationKey LONG, MagnitudeKey LONG, EventTypeKey LONG, Magnitude DOUBLE, DepthKm DOUBLE, TsunamiWarning BOOLEAN, Significance INT, FeltReports INT, NumberOfStations INT, RmsTravelTime DOUBLE, AzimuthalGap DOUBLE, SourceURL STRING, SilverProcessingTimestampUTC TIMESTAMP, DWLoadTimestampUTC TIMESTAMP")
else:
    logger.info("Loading dimension tables from Gold layer for fact table joins.")
    # Load Dimension tables back into Spark DataFrames from the Gold layer.
    # This ensures we are joining against the most current and accurate state of the dimensions.
    try:
        df_dim_date_loaded = spark.table(GOLD_DIM_DATE_TABLE)
        df_dim_location_loaded = spark.table(GOLD_DIM_LOCATION_TABLE)
        df_dim_magnitude_loaded = spark.table(GOLD_DIM_MAGNITUDE_TABLE)
        df_dim_event_type_loaded = spark.table(GOLD_DIM_EVENT_TYPE_TABLE)
        logger.info("Dimension tables loaded successfully from Gold layer for joining.")
    except Exception as e:
        # If any dimension table cannot be loaded, the fact table cannot be properly built.
        logger.error(f"FATAL ERROR: Failed to load dimension tables from Gold layer. Fact table preparation cannot proceed. Error: {e}", exc_info=True)
        raise Exception(f"Failed to load dimensions for fact table: {e}")

    # Prepare specific columns from the Silver DataFrame for joining and inclusion in the fact table.
    # - `FactDateKey`: Derived from `event_timestamp_utc` and cast to INT for joining with `DimDate`.
    # - `TimeOfDayString`: Formatted to "HH:mm:ss.SSSSSS" as a string. Spark does not have a native SQL `TIME` type,
    #   so storing it as a string is a common workaround when targeting a data warehouse that supports `TIME(6)`.
    df_fact_source = df_silver \
        .withColumn("FactDateKey", F.date_format(F.col("event_timestamp_utc"), "yyyyMMdd").cast("int")) \
        .withColumn("TimeOfDayString", F.date_format(F.col("event_timestamp_utc"), "HH:mm:ss.SSSSSS"))

    logger.info("Performing joins with dimension tables to resolve surrogate keys for the fact table.")
    # Join with DimDate: Link by the generated `DateKey`. An `inner` join ensures that only
    # event records that have a corresponding date entry in `DimDate` are included.
    df_fact_joined = df_fact_source.join(
        df_dim_date_loaded,
        df_fact_source.FactDateKey == df_dim_date_loaded.DateKey,
        "inner"
    ).select(df_fact_source["*"], df_dim_date_loaded["DateKey"]) # Select `DateKey` from the dimension for the fact.

    # Join with DimLocation: Link by Latitude, Longitude, and Place Description.
    # Important: When joining on multiple columns, especially strings, ensure handling of potential null values
    # (`(A.col IS NULL AND B.col IS NULL) OR (A.col = B.col)`) for robust matching.
    df_fact_joined = df_fact_joined.join(
        df_dim_location_loaded,
        (df_fact_joined.latitude == df_dim_location_loaded.Latitude) & \
        (df_fact_joined.longitude == df_dim_location_loaded.Longitude) & \
        ( (df_fact_joined.place.isNull() & df_dim_location_loaded.PlaceDescription.isNull()) | (df_fact_joined.place == df_dim_location_loaded.PlaceDescription) ),
        "inner"
    ).select(df_fact_joined["*"], df_dim_location_loaded["LocationKey"]) # Select `LocationKey`.

    # Join with DimMagnitude: Link by `MagnitudeCategory`.
    df_fact_joined = df_fact_joined.join(
        df_dim_magnitude_loaded,
        df_fact_joined.magnitude_category == df_dim_magnitude_loaded.MagnitudeCategory,
        "inner"
    ).select(df_fact_joined["*"], df_dim_magnitude_loaded["MagnitudeKey"]) # Select `MagnitudeKey`.

    # Join with DimEventType: Link by `EventType` and `MagType`.
    # Again, handle potential nulls in `magType` for complete matching.
    df_fact_joined = df_fact_joined.join(
        df_dim_event_type_loaded,
        (df_fact_joined.event_type == df_dim_event_type_loaded.EventType) & \
        ( (df_fact_joined.magType.isNull() & df_dim_event_type_loaded.MagType.isNull()) | (df_fact_joined.magType == df_dim_event_type_loaded.MagType) ),
        "inner"
    ).select(df_fact_joined["*"], df_dim_event_type_loaded["EventTypeKey"]) # Select `EventTypeKey`.

    # Generate `EarthquakeEventKey` for the fact table.
    # This serves as the primary key for each record in the fact table, generated by the ETL process.
    # `monotonically_increasing_id()` is suitable for full overwrite loads of the fact table.
    df_fact_joined = df_fact_joined.withColumn("EarthquakeEventKey", F.monotonically_increasing_id() + 1)

    # Select final columns for the fact table, aliasing them to match the desired Gold layer schema.
    # This step ensures the fact table has a clean, denormalized structure ready for analytics.
    # Ensure all columns designated as NOT NULL in the logical DW DDL are indeed non-null after joins/transformations.
    df_fact_final = df_fact_joined.select(
        F.col("EarthquakeEventKey"),                             # Fact table primary key.
        F.col("event_id").alias("EventID"),                      # Business key from source.
        F.col("DateKey"),                                        # Foreign Key to DimDate.
        F.col("TimeOfDayString").cast("string").alias("TimeOfDay"), # Time component, cast to string to align with typical DW TIME(6) type.
        F.col("LocationKey"),                                    # Foreign Key to DimLocation.
        F.col("MagnitudeKey"),                                   # Foreign Key to DimMagnitude.
        F.col("EventTypeKey"),                                   # Foreign Key to DimEventType.
        
        # Measures (aliased from Silver data for clarity)
        F.col("magnitude").alias("Magnitude"),
        F.col("depth_km").alias("DepthKm"),
        F.col("tsunami_warning").alias("TsunamiWarning"),
        F.col("significance").alias("Significance"),
        F.col("felt_reports").alias("FeltReports"),
        F.col("nst_stations").alias("NumberOfStations"),
        F.col("rms_travel_time").alias("RmsTravelTime"),
        F.col("gap_azimuthal").alias("AzimuthalGap"),

        # Metadata/Audit columns (aliased from Silver or generated)
        F.col("url").alias("SourceURL"),
        F.col("silver_processing_timestamp_utc").alias("SilverProcessingTimestampUTC"),
        F.current_timestamp().cast("timestamp").alias("DWLoadTimestampUTC") # Timestamp of when this record was loaded into the Gold layer.
    ) \
    .dropDuplicates(["EventID"]) # Ensure unique events by business key (`EventID`) before writing to fact table.

    logger.info(f"Fact table data prepped with {df_fact_final.count()} records after joins and deduplication.")
    logger.info("Fact DataFrame Schema (Gold Layer):")
    df_fact_final.printSchema()
    logger.info("Sample records from Fact Table:")
    df_fact_final.show(5, truncate=False)

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 10, Finished, Available, Finished)

2025-07-31 21:56:05,631 - INFO - Loading dimension tables from Gold layer for fact table joins.
2025-07-31 21:56:06,281 - INFO - Dimension tables loaded successfully from Gold layer for joining.
2025-07-31 21:56:06,297 - INFO - Performing joins with dimension tables to resolve surrogate keys for the fact table.


root
 |-- EarthquakeEventKey: long (nullable = false)
 |-- EventID: string (nullable = true)
 |-- DateKey: long (nullable = true)
 |-- TimeOfDay: string (nullable = true)
 |-- LocationKey: long (nullable = true)
 |-- MagnitudeKey: long (nullable = true)
 |-- EventTypeKey: long (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- DepthKm: double (nullable = true)
 |-- Significance: integer (nullable = true)
 |-- FeltReports: integer (nullable = true)
 |-- NumberOfStations: integer (nullable = true)
 |-- RmsTravelTime: double (nullable = true)
 |-- AzimuthalGap: double (nullable = true)
 |-- SourceURL: string (nullable = true)
 |-- SilverProcessingTimestampUTC: timestamp (nullable = true)
 |-- DWLoadTimestampUTC: timestamp (nullable = false)

+------------------+------------+--------+---------------+-----------+------------+------------+---------+-------+--------------+------------+-----------+----------------+-------------+------------+----------------------------------------

In [9]:
# --- Cell 9: Load Fact Table ---

# This cell writes the prepared fact table data to the Gold layer of the Lakehouse.
# This is the final step in populating the Gold layer with earthquake events,
# making the data available for analytical consumption.

# Get the count of records to write for logging. This avoids re-triggering a count action if not needed.
records_to_write_count = 0
if 'df_fact_final' in locals():
    records_to_write_count = df_fact_final.count()

if records_to_write_count > 0: # Only attempt to write if there are records in the final DataFrame.
    try:
        # Write the Spark DataFrame (`df_fact_final`) to the `FactEarthquakeEvents` table
        # in the Gold layer of the Lakehouse. This will create a Delta table within
        # the 'Tables' section of your Lakehouse environment.
        #
        # `format("delta")`: Specifies Delta Lake format for reliable, ACID-compliant data storage.
        # `mode("overwrite")`: Replaces the entire fact table with the new data. This strategy is suitable
        #                      for full daily rebuilds, where all historical data is re-processed and
        #                      a fresh snapshot of the fact table is desired.
        #                      For incremental loads (e.g., only loading new events or updating existing ones),
        #                      a `MERGE INTO` operation (available in Delta Lake) would be the preferred approach
        #                      to perform upserts based on a business key.
        # `option("overwriteSchema", "true")`: Allows the schema of the target Delta table to be updated
        #                                       if the incoming DataFrame's schema differs. Use with caution
        #                                       in production environments to avoid unintended schema changes,
        #                                       but it's common for iterative development or early stages.
        # `saveAsTable(...)`: Persists the data as a named Delta table in the Lakehouse's catalog,
        #                     making it easily discoverable and queryable via Spark SQL or other integrated tools.
        logger.info(f"Writing {records_to_write_count} records to Gold fact table: {GOLD_FACT_EVENTS_TABLE}.")
        df_fact_final.write \
                     .format("delta") \
                     .mode("overwrite") \
                     .option("overwriteSchema", "true") \
                     .saveAsTable(GOLD_FACT_EVENTS_TABLE)
        
        logger.info(f"FactEarthquakeEvents populated successfully with {records_to_write_count} records in the Gold layer table: {GOLD_FACT_EVENTS_TABLE}.")
        
    except Exception as e:
        logger.error(f"FATAL ERROR: An error occurred while saving data to Gold fact table '{GOLD_FACT_EVENTS_TABLE}': {e}", exc_info=True)
        raise Exception(f"Failed to populate Fact table in Gold layer: {e}")
else:
    logger.warning("Skipping write to Gold fact table as the final DataFrame (`df_fact_final`) is empty. No data was available to be written.")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 11, Finished, Available, Finished)

2025-07-31 21:56:20,887 - INFO - Writing 25132 records to Gold fact table: gold_earthquake_fact_earthquake_events.
2025-07-31 21:56:27,236 - INFO - FactEarthquakeEvents populated successfully with 25132 records in the Gold layer table: gold_earthquake_fact_earthquake_events.


In [10]:
# This cell provides a quick visual verification of the data written to the Gold fact table.
# It displays a sample of the records and the schema, confirming successful population.
if 'df_fact_final' in locals() and records_to_write_count > 0:
    try:
        # Ensure SparkSession is available to query the table.
        if 'spark' not in globals() or not isinstance(spark, SparkSession):
            logger.error("SparkSession 'spark' is not initialized for table verification. Attempting to get or create one.")
            spark = SparkSession.builder.appName("GoldFactVerification").getOrCreate()
            
        logger.info(f"Displaying a sample of 5 records from the Gold fact table '{GOLD_FACT_EVENTS_TABLE}' for verification.")
        # Retrieve the fact table as a Spark DataFrame and show its first 5 rows.
        # `truncate=False` ensures that column values are not truncated in the output.
        spark.table(GOLD_FACT_EVENTS_TABLE).show(5, truncate=False)
        
        logger.info(f"Gold fact table '{GOLD_FACT_EVENTS_TABLE}' schema:")
        spark.table(GOLD_FACT_EVENTS_TABLE).printSchema()
        logger.info(f"Total records in Gold fact table '{GOLD_FACT_EVENTS_TABLE}': {spark.table(GOLD_FACT_EVENTS_TABLE).count()}")

    except Exception as e:
        logger.error(f"An error occurred while trying to read and display data from Gold fact table '{GOLD_FACT_EVENTS_TABLE}': {e}", exc_info=True)
else:
    logger.info("Skipping Gold fact table display as no data was processed and written to the table.")

StatementMeta(, df1f360c-6673-44aa-b572-53f9c07ecffb, 12, Finished, Available, Finished)

2025-07-31 21:56:27,810 - INFO - Displaying a sample of 5 records from the Gold fact table 'gold_earthquake_fact_earthquake_events' for verification.
2025-07-31 21:56:29,882 - INFO - Gold fact table 'gold_earthquake_fact_earthquake_events' schema:
2025-07-31 21:56:30,501 - INFO - Total records in Gold fact table 'gold_earthquake_fact_earthquake_events': 25132
