In [7]:
# Create a gold_layer.py file with the same functionality

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime, timedelta
import json
import os

# Initialize Spark Session
def initialize_spark():
    spark = SparkSession.builder \
        .appName("F1GoldLayer") \
        .getOrCreate()
    return spark

def get_last_processed_timestamp(spark, checkpoint_path):
    """
    Get the last processed timestamp from checkpoint
    """
    try:
        checkpoint_df = spark.read.parquet(checkpoint_path)
        last_checkpoint = checkpoint_df.orderBy("processing_timestamp", ascending=False).first()
        return last_checkpoint.last_processed_timestamp
    except:
        # Return a default timestamp if no checkpoint exists
        return datetime(1970, 1, 1).isoformat()

def update_checkpoint(spark, checkpoint_path, last_processed_timestamp):
    """
    Update the checkpoint with the latest processing information
    """
    checkpoint_data = [{
        "processing_timestamp": datetime.now().isoformat(),
        "last_processed_timestamp": last_processed_timestamp,
    }]

    checkpoint_df = spark.createDataFrame(checkpoint_data)
    checkpoint_df.write.mode("append").parquet(checkpoint_path)
    print(f"Checkpoint updated with timestamp: {last_processed_timestamp}")

def apply_scd_type2(spark, current_df, new_df, key_columns, track_columns,
                   effective_from_col="effective_from",
                   effective_to_col="effective_to",
                   current_flag_col="is_current"):
    """
    Apply SCD Type 2 logic to merge current and new data

    Parameters:
    - current_df: Current dimension table
    - new_df: New data to be merged
    - key_columns: List of columns that uniquely identify a record
    - track_columns: List of columns to track changes
    - effective_from_col: Column name for effective from date
    - effective_to_col: Column name for effective to date
    - current_flag_col: Column name for current flag

    Returns:
    - DataFrame with SCD Type 2 applied
    """
    # If current_df is empty, initialize with new_df
    if current_df.rdd.isEmpty():
        return new_df.withColumn(effective_from_col, F.current_timestamp()) \
                    .withColumn(effective_to_col, F.lit(None).cast("timestamp")) \
                    .withColumn(current_flag_col, F.lit(True))

    # Register DataFrames as temp views for SQL operations
    current_df.createOrReplaceTempView("current_df")
    new_df.createOrReplaceTempView("new_df")

    # Identify key columns in both dataframes
    join_condition = " AND ".join([f"current.{col} = new.{col}" for col in key_columns])

    # Identify changed records
    change_condition = " OR ".join([f"(current.{col} <> new.{col} OR (current.{col} IS NULL AND new.{col} IS NOT NULL) OR (current.{col} IS NOT NULL AND new.{col} IS NULL))" for col in track_columns])

    # 1. Expire current records that have changes
    expired_records = spark.sql(f"""
        SELECT
            current.*,
            FALSE as {current_flag_col},
            current_timestamp() as {effective_to_col}
        FROM current_df current
        JOIN new_df new
        ON {join_condition}
        WHERE current.{current_flag_col} = TRUE
        AND ({change_condition})
    """)

    # 2. Insert new versions of changed records
    new_versions = spark.sql(f"""
        SELECT
            new.*,
            current_timestamp() as {effective_from_col},
            NULL as {effective_to_col},
            TRUE as {current_flag_col}
        FROM new_df new
        JOIN current_df current
        ON {join_condition}
        WHERE current.{current_flag_col} = TRUE
        AND ({change_condition})
    """)

    # 3. Insert completely new records
    new_records = spark.sql(f"""
        SELECT
            new.*,
            current_timestamp() as {effective_from_col},
            NULL as {effective_to_col},
            TRUE as {current_flag_col}
        FROM new_df new
        LEFT JOIN current_df current
        ON {join_condition}
        WHERE current.{key_columns[0]} IS NULL
    """)

    # 4. Keep unchanged current records
    unchanged_records = spark.sql(f"""
        SELECT current.*
        FROM current_df current
        JOIN new_df new
        ON {join_condition}
        WHERE current.{current_flag_col} = TRUE
        AND NOT ({change_condition})
    """)

    # 5. Keep historical records
    historical_records = spark.sql(f"""
        SELECT current.*
        FROM current_df current
        WHERE current.{current_flag_col} = FALSE
    """)

    # Union all results
    result_df = expired_records.unionAll(new_versions) \
                              .unionAll(new_records) \
                              .unionAll(unchanged_records) \
                              .unionAll(historical_records)

    return result_df

def process_driver_dimension(spark, silver_df, gold_path):
    """Process driver dimension with SCD Type 2"""
    # Extract driver data from silver
    driver_df = spark.sql("""
        SELECT DISTINCT
            driver_id,
            driver_given_name,
            driver_family_name,
            driver_full_name,
            processed_timestamp
        FROM silver_data
    """)

    # Define path for driver dimension
    driver_dim_path = f"{gold_path}/dim_driver"

    # Check if dimension exists
    try:
        current_driver_dim = spark.read.parquet(driver_dim_path)
        current_driver_dim.createOrReplaceTempView("current_df")
        driver_df.createOrReplaceTempView("new_df")

        # Apply SCD Type 2
        updated_driver_dim = apply_scd_type2(
            spark,
            current_driver_dim,
            driver_df,
            key_columns=["driver_id"],
            track_columns=["driver_given_name", "driver_family_name", "driver_full_name"]
        )
    except:
        # Initialize dimension if it doesn't exist
        print("Initializing driver dimension")
        updated_driver_dim = driver_df.withColumn("effective_from", F.current_timestamp()) \
                                    .withColumn("effective_to", F.lit(None).cast("timestamp")) \
                                    .withColumn("is_current", F.lit(True))

    # Write dimension table
    updated_driver_dim.write.mode("overwrite").parquet(driver_dim_path)
    print(f"Driver dimension written to {driver_dim_path}")

def process_constructor_dimension(spark, silver_df, gold_path):
    """Process constructor dimension with SCD Type 2"""
    # Extract constructor data from silver
    constructor_df = spark.sql("""
        SELECT DISTINCT
            constructor_id,
            constructor_name,
            processed_timestamp
        FROM silver_data
    """)

    # Define path for constructor dimension
    constructor_dim_path = f"{gold_path}/dim_constructor"

    # Check if dimension exists
    try:
        current_constructor_dim = spark.read.parquet(constructor_dim_path)
        current_constructor_dim.createOrReplaceTempView("current_df")
        constructor_df.createOrReplaceTempView("new_df")

        # Apply SCD Type 2
        updated_constructor_dim = apply_scd_type2(
            spark,
            current_constructor_dim,
            constructor_df,
            key_columns=["constructor_id"],
            track_columns=["constructor_name"]
        )
    except:
        # Initialize dimension if it doesn't exist
        print("Initializing constructor dimension")
        updated_constructor_dim = constructor_df.withColumn("effective_from", F.current_timestamp()) \
                                             .withColumn("effective_to", F.lit(None).cast("timestamp")) \
                                             .withColumn("is_current", F.lit(True))

    # Write dimension table
    updated_constructor_dim.write.mode("overwrite").parquet(constructor_dim_path)
    print(f"Constructor dimension written to {constructor_dim_path}")

def process_circuit_dimension(spark, silver_df, gold_path):
    """Process circuit dimension with SCD Type 2"""
    # Extract circuit data from silver
    circuit_df = spark.sql("""
        SELECT DISTINCT
            circuit_id,
            circuit_name,
            circuit_lat,
            circuit_long,
            circuit_locality,
            circuit_country,
            processed_timestamp
        FROM silver_data
    """)

    # Define path for circuit dimension
    circuit_dim_path = f"{gold_path}/dim_circuit"

    # Check if dimension exists
    try:
        current_circuit_dim = spark.read.parquet(circuit_dim_path)
        current_circuit_dim.createOrReplaceTempView("current_df")
        circuit_df.createOrReplaceTempView("new_df")

        # Apply SCD Type 2
        updated_circuit_dim = apply_scd_type2(
            spark,
            current_circuit_dim,
            circuit_df,
            key_columns=["circuit_id"],
            track_columns=["circuit_name", "circuit_lat", "circuit_long", "circuit_locality", "circuit_country"]
        )
    except:
        # Initialize dimension if it doesn't exist
        print("Initializing circuit dimension")
        updated_circuit_dim = circuit_df.withColumn("effective_from", F.current_timestamp()) \
                                      .withColumn("effective_to", F.lit(None).cast("timestamp")) \
                                      .withColumn("is_current", F.lit(True))

    # Write dimension table
    updated_circuit_dim.write.mode("overwrite").parquet(circuit_dim_path)
    print(f"Circuit dimension written to {circuit_dim_path}")

def process_dimension_tables(spark, silver_path, gold_path, checkpoint_path):
    """
    Process dimension tables with SCD Type 2
    """
    # Get the last processed timestamp
    last_processed_ts = get_last_processed_timestamp(spark, f"{checkpoint_path}/dim_tables")
    print(f"Last processed timestamp: {last_processed_ts}")

    # Read silver data with incremental filter
    silver_df = spark.read.parquet(silver_path) \
                    .filter(F.col("processed_timestamp") > F.lit(last_processed_ts))

    if silver_df.rdd.isEmpty():
        print("No new data to process")
        return

    print(f"Processing {silver_df.count()} new records")

    # Get current max timestamp for checkpoint update
    max_timestamp = silver_df.agg(F.max("processed_timestamp")).collect()[0][0]

    # Register silver_df as a temp view for SQL operations
    silver_df.createOrReplaceTempView("silver_data")

    # Process Driver dimension
    process_driver_dimension(spark, silver_df, gold_path)

    # Process Constructor dimension
    process_constructor_dimension(spark, silver_df, gold_path)

    # Process Circuit dimension
    process_circuit_dimension(spark, silver_df, gold_path)

    # Update checkpoint
    update_checkpoint(spark, f"{checkpoint_path}/dim_tables", max_timestamp)

    print("Dimension tables processing complete")

def process_race_results_fact(spark, silver_df, gold_path):
    """Process race results fact table incrementally"""
    # Extract race results data
    race_results_df = spark.sql("""
        SELECT
            CONCAT(season, '-', round, '-', driver_id) as result_id,
            season,
            round,
            race_timestamp,
            circuit_id,
            driver_id,
            constructor_id,
            CAST(position as INT) as position,
            CAST(points as DOUBLE) as points,
            CAST(grid as INT) as grid,
            CAST(laps as INT) as laps,
            status,
            finish_time,
            processed_timestamp
        FROM silver_data
        WHERE is_valid_record = TRUE
    """)

    # Define path for race results fact
    race_results_path = f"{gold_path}/fact_race_results"

    # Check if fact table exists and append new data
    try:
        # Read existing data
        existing_results = spark.read.parquet(race_results_path)

        # Find records that already exist (to avoid duplicates)
        joined_df = race_results_df.join(
            existing_results,
            on=["result_id"],
            how="left_anti"  # Only keep records that don't exist in the target
        )

        # Append new records
        if not joined_df.rdd.isEmpty():
            joined_df.write.mode("append").partitionBy("season").parquet(race_results_path)
            print(f"Added {joined_df.count()} new records to race results fact")
        else:
            print("No new race results to add")

    except:
        # Initialize fact table if it doesn't exist
        print("Initializing race results fact table")
        race_results_df.write.partitionBy("season").parquet(race_results_path)
        print(f"Race results fact written to {race_results_path}")

def process_driver_standings_fact(spark, silver_df, gold_path):
    """Process driver standings fact table incrementally"""
    # Calculate driver standings
    driver_standings_df = spark.sql("""
        WITH race_points AS (
            SELECT
                season,
                round,
                driver_id,
                CAST(points as DOUBLE) as race_points
            FROM silver_data
            WHERE is_valid_record = TRUE
        ),
        cumulative_points AS (
            SELECT
                season,
                round,
                driver_id,
                race_points,
                SUM(race_points) OVER (
                    PARTITION BY season, driver_id
                    ORDER BY round
                    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                ) as total_points,
                ROW_NUMBER() OVER (
                    PARTITION BY season, round
                    ORDER BY SUM(race_points) OVER (
                        PARTITION BY season, driver_id
                        ORDER BY round
                        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                    ) DESC
                ) as position
            FROM race_points
        )
        SELECT
            CONCAT(season, '-', round, '-', driver_id) as standing_id,
            season,
            round,
            driver_id,
            race_points,
            total_points,
            position,
            current_timestamp() as processed_timestamp
        FROM cumulative_points
    """)

    # Define path for driver standings fact
    driver_standings_path = f"{gold_path}/fact_driver_standings"

    # Check if fact table exists and append new data
    try:
        # Read existing data
        existing_standings = spark.read.parquet(driver_standings_path)

        # Find records that already exist (to avoid duplicates)
        joined_df = driver_standings_df.join(
            existing_standings,
            on=["standing_id"],
            how="left_anti"  # Only keep records that don't exist in the target
        )

        # Append new records
        if not joined_df.rdd.isEmpty():
            joined_df.write.mode("append").partitionBy("season").parquet(driver_standings_path)
            print(f"Added {joined_df.count()} new records to driver standings fact")
        else:
            print("No new driver standings to add")

    except:
        # Initialize fact table if it doesn't exist
        print("Initializing driver standings fact table")
        driver_standings_df.write.partitionBy("season").parquet(driver_standings_path)
        print(f"Driver standings fact written to {driver_standings_path}")

def process_constructor_standings_fact(spark, silver_df, gold_path):
    """Process constructor standings fact table incrementally"""
    # Calculate constructor standings
    constructor_standings_df = spark.sql("""
        WITH constructor_points AS (
            SELECT
                season,
                round,
                constructor_id,
                SUM(CAST(points as DOUBLE)) as race_points
            FROM silver_data
            WHERE is_valid_record = TRUE
            GROUP BY season, round, constructor_id
        ),
        cumulative_points AS (
            SELECT
                season,
                round,
                constructor_id,
                race_points,
                SUM(race_points) OVER (
                    PARTITION BY season, constructor_id
                    ORDER BY round
                    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                ) as total_points,
                ROW_NUMBER() OVER (
                    PARTITION BY season, round
                    ORDER BY SUM(race_points) OVER (
                        PARTITION BY season, constructor_id
                        ORDER BY round
                        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                    ) DESC
                ) as position
            FROM constructor_points
        )
        SELECT
            CONCAT(season, '-', round, '-', constructor_id) as standing_id,
            season,
            round,
            constructor_id,
            race_points,
            total_points,
            position,
            current_timestamp() as processed_timestamp
        FROM cumulative_points
    """)

    # Define path for constructor standings fact
    constructor_standings_path = f"{gold_path}/fact_constructor_standings"

    # Check if fact table exists and append new data
    try:
        # Read existing data
        existing_standings = spark.read.parquet(constructor_standings_path)

        # Find records that already exist (to avoid duplicates)
        joined_df = constructor_standings_df.join(
            existing_standings,
            on=["standing_id"],
            how="left_anti"  # Only keep records that don't exist in the target
        )

        # Append new records
        if not joined_df.rdd.isEmpty():
            joined_df.write.mode("append").partitionBy("season").parquet(constructor_standings_path)
            print(f"Added {joined_df.count()} new records to constructor standings fact")
        else:
            print("No new constructor standings to add")

    except:
        # Initialize fact table if it doesn't exist
        print("Initializing constructor standings fact table")
        constructor_standings_df.write.partitionBy("season").parquet(constructor_standings_path)
        print(f"Constructor standings fact written to {constructor_standings_path}")

def process_fact_tables(spark, silver_path, gold_path, checkpoint_path):
    """
    Process fact tables incrementally
    """
    # Get the last processed timestamp
    last_processed_ts = get_last_processed_timestamp(spark, f"{checkpoint_path}/fact_tables")
    print(f"Last processed timestamp for facts: {last_processed_ts}")

    # Read silver data with incremental filter
    silver_df = spark.read.parquet(silver_path) \
                    .filter(F.col("processed_timestamp") > F.lit(last_processed_ts))

    if silver_df.rdd.isEmpty():
        print("No new data to process for fact tables")
        return

    print(f"Processing {silver_df.count()} new records for fact tables")

    # Get current max timestamp for checkpoint update
    max_timestamp = silver_df.agg(F.max("processed_timestamp")).collect()[0][0]

    # Register silver_df as a temp view for SQL operations
    silver_df.createOrReplaceTempView("silver_data")

    # Process Race Results fact table
    process_race_results_fact(spark, silver_df, gold_path)

    # Process Driver Standings fact table
    process_driver_standings_fact(spark, silver_df, gold_path)

    # Process Constructor Standings fact table
    process_constructor_standings_fact(spark, silver_df, gold_path)

    # Update checkpoint
    update_checkpoint(spark, f"{checkpoint_path}/fact_tables", max_timestamp)

    print("Fact tables processing complete")

def create_season_summary_view(spark, gold_path):
    """Create season summary view"""
    season_summary_df = spark.sql("""
        WITH season_winners AS (
            SELECT
                fds.season,
                dd.driver_full_name as champion_driver,
                fds.driver_id as champion_driver_id,
                fds.total_points as champion_points
            FROM fact_driver_standings fds
            JOIN dim_driver dd ON fds.driver_id = dd.driver_id
            WHERE fds.round = (
                SELECT MAX(round)
                FROM fact_driver_standings
                WHERE season = fds.season
            )
            AND fds.position = 1
        ),
        constructor_winners AS (
            SELECT
                fcs.season,
                dc.constructor_name as champion_team,
                fcs.constructor_id as champion_team_id,
                fcs.total_points as champion_team_points
            FROM fact_constructor_standings fcs
            JOIN dim_constructor dc ON fcs.constructor_id = dc.constructor_id
            WHERE fcs.round = (
                SELECT MAX(round)
                FROM fact_constructor_standings
                WHERE season = fcs.season
            )
            AND fcs.position = 1
        ),
        season_stats AS (
            SELECT
                season,
                COUNT(DISTINCT round) as total_races,
                COUNT(DISTINCT driver_id) as total_drivers,
                COUNT(DISTINCT constructor_id) as total_constructors
            FROM fact_race_results
            GROUP BY season
        )
        SELECT
            ss.season,
            ss.total_races,
            ss.total_drivers,
            ss.total_constructors,
            sw.champion_driver,
            sw.champion_driver_id,
            sw.champion_points,
            cw.champion_team,
            cw.champion_team_id,
            cw.champion_team_points
        FROM season_stats ss
        LEFT JOIN season_winners sw ON ss.season = sw.season
        LEFT JOIN constructor_winners cw ON ss.season = cw.season
        ORDER BY ss.season DESC
    """)

    # Write the view
    season_summary_df.write.mode("overwrite").parquet(f"{gold_path}/view_season_summary")
    print("Season summary view created")

def create_driver_performance_view(spark, gold_path):
    """Create driver performance view"""
    driver_performance_df = spark.sql("""
        WITH driver_stats AS (
            SELECT
                frr.season,
                frr.driver_id,
                dd.driver_full_name,
                COUNT(DISTINCT CONCAT(frr.season, '-', frr.round)) as races,
                SUM(CASE WHEN frr.position = 1 THEN 1 ELSE 0 END) as wins,
                SUM(CASE WHEN frr.position = 2 THEN 1 ELSE 0 END) as p2,
                SUM(CASE WHEN frr.position = 3 THEN 1 ELSE 0 END) as p3,
                SUM(CASE WHEN frr.position BETWEEN 1 AND 3 THEN 1 ELSE 0 END) as podiums,
                SUM(CASE WHEN frr.position BETWEEN 1 AND 10 THEN 1 ELSE 0 END) as points_finishes,
                SUM(frr.points) as total_points,
                AVG(CAST(frr.position as DOUBLE)) as avg_position,
                MIN(CAST(frr.position as INT)) as best_position,
                AVG(CAST(frr.grid as DOUBLE)) as avg_grid,
                SUM(CASE WHEN frr.grid = 1 THEN 1 ELSE 0 END) as poles
            FROM fact_race_results frr
            JOIN dim_driver dd ON frr.driver_id = dd.driver_id
            GROUP BY frr.season, frr.driver_id, dd.driver_full_name
        )
        SELECT
            ds.*,
            ROUND(ds.wins / NULLIF(ds.races, 0) * 100, 2) as win_percentage,
            ROUND(ds.podiums / NULLIF(ds.races, 0) * 100, 2) as podium_percentage,
            ROUND(ds.total_points / NULLIF(ds.races, 0), 2) as points_per_race
        FROM driver_stats ds
        ORDER BY ds.season DESC, ds.total_points DESC
    """)

    # Write the view
    driver_performance_df.write.mode("overwrite").partitionBy("season").parquet(f"{gold_path}/view_driver_performance")
    print("Driver performance view created")

def create_constructor_performance_view(spark, gold_path):
    """Create constructor performance view"""
    constructor_performance_df = spark.sql("""
        WITH constructor_stats AS (
            SELECT
                frr.season,
                frr.constructor_id,
                dc.constructor_name,
                COUNT(DISTINCT CONCAT(frr.season, '-', frr.round)) as races,
                SUM(CASE WHEN frr.position = 1 THEN 1 ELSE 0 END) as wins,
                SUM(CASE WHEN frr.position BETWEEN 1 AND 3 THEN 1 ELSE 0 END) as podiums,
                SUM(CASE WHEN frr.position BETWEEN 1 AND 10 THEN 1 ELSE 0 END) as points_finishes,
                SUM(frr.points) as total_points,
                COUNT(DISTINCT frr.driver_id) as drivers_used
            FROM fact_race_results frr
            JOIN dim_constructor dc ON frr.constructor_id = dc.constructor_id
            GROUP BY frr.season, frr.constructor_id, dc.constructor_name
        )
        SELECT
            cs.*,
            ROUND(cs.wins / NULLIF(cs.races, 0) * 100, 2) as win_percentage,
            ROUND(cs.podiums / NULLIF(cs.races, 0) * 100, 2) as podium_percentage,
            ROUND(cs.total_points / NULLIF(cs.races, 0), 2) as points_per_race
        FROM constructor_stats cs
        ORDER BY cs.season DESC, cs.total_points DESC
    """)

    # Write the view
    constructor_performance_df.write.mode("overwrite").partitionBy("season").parquet(f"{gold_path}/view_constructor_performance")
    print("Constructor performance view created")

def create_circuit_statistics_view(spark, gold_path):
    """Create circuit statistics view"""
    circuit_statistics_df = spark.sql("""
        WITH circuit_stats AS (
            SELECT
                frr.circuit_id,
                dc.circuit_name,
                dc.circuit_country,
                COUNT(DISTINCT frr.season) as seasons_used,
                COUNT(DISTINCT CONCAT(frr.season, '-', frr.round)) as total_races,
                AVG(CAST(frr.laps as DOUBLE)) as avg_race_laps,
                COUNT(DISTINCT frr.driver_id) as different_winners,
                COUNT(DISTINCT frr.constructor_id) as different_winning_constructors
            FROM fact_race_results frr
            JOIN dim_circuit dc ON frr.circuit_id = dc.circuit_id
            WHERE frr.position = 1
            GROUP BY frr.circuit_id, dc.circuit_name, dc.circuit_country
        ),
        most_successful_drivers AS (
            SELECT
                frr.circuit_id,
                dd.driver_full_name,
                COUNT(*) as wins,
                ROW_NUMBER() OVER (PARTITION BY frr.circuit_id ORDER BY COUNT(*) DESC) as rank
            FROM fact_race_results frr
            JOIN dim_driver dd ON frr.driver_id = dd.driver_id
            WHERE frr.position = 1
            GROUP BY frr.circuit_id, dd.driver_full_name
        ),
        most_successful_constructors AS (
            SELECT
                frr.circuit_id,
                dc.constructor_name,
                COUNT(*) as wins,
                ROW_NUMBER() OVER (PARTITION BY frr.circuit_id ORDER BY COUNT(*) DESC) as rank
            FROM fact_race_results frr
            JOIN dim_constructor dc ON frr.constructor_id = dc.constructor_id
            WHERE frr.position = 1
            GROUP BY frr.circuit_id, dc.constructor_name
        )
        SELECT
            cs.*,
            msd.driver_full_name as most_successful_driver,
            msd.wins as driver_wins,
            msc.constructor_name as most_successful_constructor,
            msc.wins as constructor_wins
        FROM circuit_stats cs
        LEFT JOIN most_successful_drivers msd ON cs.circuit_id = msd.circuit_id AND msd.rank = 1
        LEFT JOIN most_successful_constructors msc ON cs.circuit_id = msc.circuit_id AND msc.rank = 1
        ORDER BY cs.total_races DESC
    """)

    # Write the view
    circuit_statistics_df.write.mode("overwrite").parquet(f"{gold_path}/view_circuit_statistics")
    print("Circuit statistics view created")

def create_aggregated_views(spark, gold_path, checkpoint_path):
    """
    Create aggregated views for analytics
    """
    # Get the last processed timestamp
    last_processed_ts = get_last_processed_timestamp(spark, f"{checkpoint_path}/aggregated_views")
    print(f"Last processed timestamp for aggregated views: {last_processed_ts}")

    # Define paths
    dim_driver_path = f"{gold_path}/dim_driver"
    dim_constructor_path = f"{gold_path}/dim_constructor"
    dim_circuit_path = f"{gold_path}/dim_circuit"
    fact_race_results_path = f"{gold_path}/fact_race_results"
    fact_driver_standings_path = f"{gold_path}/fact_driver_standings"
    fact_constructor_standings_path = f"{gold_path}/fact_constructor_standings"

    # Load dimension and fact tables
    try:
        dim_driver = spark.read.parquet(dim_driver_path)
        dim_constructor = spark.read.parquet(dim_constructor_path)
        dim_circuit = spark.read.parquet(dim_circuit_path)
        fact_race_results = spark.read.parquet(fact_race_results_path)
        fact_driver_standings = spark.read.parquet(fact_driver_standings_path)
        fact_constructor_standings = spark.read.parquet(fact_constructor_standings_path)

        # Register tables for SQL queries
        dim_driver.filter(F.col("is_current") == True).createOrReplaceTempView("dim_driver")
        dim_constructor.filter(F.col("is_current") == True).createOrReplaceTempView("dim_constructor")
        dim_circuit.filter(F.col("is_current") == True).createOrReplaceTempView("dim_circuit")
        fact_race_results.createOrReplaceTempView("fact_race_results")
        fact_driver_standings.createOrReplaceTempView("fact_driver_standings")
        fact_constructor_standings.createOrReplaceTempView("fact_constructor_standings")

        # Create season summary view
        create_season_summary_view(spark, gold_path)

        # Create driver performance view
        create_driver_performance_view(spark, gold_path)

        # Create constructor performance view
        create_constructor_performance_view(spark, gold_path)

        # Create circuit statistics view
        create_circuit_statistics_view(spark, gold_path)

        # Update checkpoint
        update_checkpoint(spark, f"{checkpoint_path}/aggregated_views", datetime.now().isoformat())

        print("Aggregated views creation complete")

    except Exception as e:
        print(f"Error creating aggregated views: {str(e)}")
        raise

def ensure_directories_exist(spark, paths):
    """
    Create directories if they don't exist
    """
    for path in paths:
        try:
            hadoop_path = spark._jvm.org.apache.hadoop.fs.Path(path)
            fs = hadoop_path.getFileSystem(spark._jsc.hadoopConfiguration())
            if not fs.exists(hadoop_path):
                fs.mkdirs(hadoop_path)
                print(f"Created directory: {path}")
        except Exception as e:
            print(f"Error creating directory {path}: {str(e)}")

def process_gold_layer_incremental(silver_path, gold_path, checkpoint_path):
    """
    Main function to process the gold layer incrementally
    """
    # Initialize Spark
    spark = initialize_spark()

    # Create directories if they don't exist
    ensure_directories_exist(spark, [
        gold_path,
        checkpoint_path,
        f"{checkpoint_path}/dim_tables",
        f"{checkpoint_path}/fact_tables",
        f"{checkpoint_path}/aggregated_views"
    ])

    try:
        print("Starting Gold Layer Processing...")

        # Process dimension tables with SCD Type 2
        print("\\nProcessing dimension tables...")
        process_dimension_tables(spark, silver_path, gold_path, checkpoint_path)

        # Process fact tables incrementally
        print("\\nProcessing fact tables...")
        process_fact_tables(spark, silver_path, gold_path, checkpoint_path)

        # Create aggregated views
        print("\\nCreating aggregated views...")
        create_aggregated_views(spark, gold_path, checkpoint_path)

        print("\\nGold Layer Processing Complete!")

        # Show sample of processed data
        print("\\nSample of dimension data:")
        spark.read.parquet(f"{gold_path}/dim_driver").filter(F.col("is_current") == True).show(5)

        print("\\nSample of fact data:")
        spark.read.parquet(f"{gold_path}/fact_race_results").filter(F.col("season") == 2024).show(5)

        print("\\nSample of aggregated view:")
        spark.read.parquet(f"{gold_path}/view_season_summary").show(5)

    except Exception as e:
        print(f"Error in Gold Layer Processing: {str(e)}")
        raise

if __name__ == "__main__":
    # Define paths
    silver_path = "/content/drive/MyDrive/Capstone/silver"
    gold_path = "/content/drive/MyDrive/Capstone/gold"
    checkpoint_path = "/content/drive/MyDrive/Capstone/checkpoint/gold"

    # Execute the processing
    process_gold_layer_incremental(silver_path, gold_path, checkpoint_path)

Created directory: /content/drive/MyDrive/Capstone/gold
Created directory: /content/drive/MyDrive/Capstone/checkpoint/gold
Created directory: /content/drive/MyDrive/Capstone/checkpoint/gold/dim_tables
Created directory: /content/drive/MyDrive/Capstone/checkpoint/gold/fact_tables
Created directory: /content/drive/MyDrive/Capstone/checkpoint/gold/aggregated_views
Starting Gold Layer Processing...
\nProcessing dimension tables...
Last processed timestamp: 1970-01-01T00:00:00
Processing 25514 new records
Initializing driver dimension
Driver dimension written to /content/drive/MyDrive/Capstone/gold/dim_driver
Initializing constructor dimension
Constructor dimension written to /content/drive/MyDrive/Capstone/gold/dim_constructor
Initializing circuit dimension
Circuit dimension written to /content/drive/MyDrive/Capstone/gold/dim_circuit
Checkpoint updated with timestamp: 2025-05-13 05:35:38.509089
Dimension tables processing complete
\nProcessing fact tables...
Last processed timestamp for fa