##### 1. Import Required Libraries

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, 
    StructField, 
    LongType, 
    StringType, 
    BooleanType, 
    TimestampType, 
    IntegerType, 
    FloatType, 
    DoubleType
)
from pyspark.sql.functions import (
    lit, 
    current_timestamp, 
    col, 
    when, 
    row_number, 
    lag, 
    broadcast
)
from pyspark.sql import functions as F
from pyspark.sql.window import Window

StatementMeta(, 360bc22a-55d4-49d9-99af-7aa4b0b05708, 5, Finished, Available, Finished)

##### 2. Creting Dimension Tables

In [4]:
def create_dim_author(sdf, existing_dim_table=None):
    """
    Implements SCD Type 2 logic for the author dimension without using window functions.
    
    :param sdf: Source DataFrame containing new author records
    :param existing_dim_table: Full table name of existing dimension (e.g., "mydb.dim_author")
    :return: DataFrame with updated SCD2-compliant data
    """
    now = current_timestamp()

    # Prepare new incoming data
    new_data = sdf.select("pk_author_id", "sk_author_id", "author_name") \
                  .withColumn("valid_from", now) \
                  .withColumn("valid_to", lit('9999-12-31 23:59:59.999').cast("timestamp")) \
                  .withColumn("is_current", lit(True)) \
                  .dropDuplicates(["sk_author_id"])

    if not existing_dim_table or not spark.catalog.tableExists(existing_dim_table):
        return new_data

    # Load existing dimension table
    existing_data = spark.table(existing_dim_table)

    # Filter active records (is_current = True)
    current_records = existing_data.filter("is_current = True")

    # Identify records to expire (changed author_name)
    records_to_expire = current_records.alias("e").join(
        new_data.alias("n"), on="sk_author_id"
    ).filter("e.author_name != n.author_name") \
     .select("e.*") \
     .withColumn("is_current", lit(False)) \
     .withColumn("valid_to", now)

    # Truly new records (sk_author_id not in current table)
    new_sk_ids = new_data.alias("n").join(
        current_records.alias("e"),
        on="sk_author_id",
        how="left_anti"
    )

    # New versions of changed records
    changed_records = new_data.alias("n").join(
        current_records.alias("e"),
        on="sk_author_id"
    ).filter("e.author_name != n.author_name") \
     .select("n.*")

    records_to_insert = new_sk_ids.unionByName(changed_records)

    # Final union: existing unchanged + expired + inserts
    unchanged_records = existing_data.alias("e").join(
        records_to_expire.alias("x"), on="sk_author_id", how="left_anti"
    ).join(
        records_to_insert.alias("n"), on="sk_author_id", how="left_anti"
    )

    final_dim_df = unchanged_records.unionByName(records_to_expire).unionByName(records_to_insert)

    return final_dim_df



def create_dim_date(sdf):

    # Select date-related columns
    new_data = sdf.select("pk_date_id", "sk_date_id", "year", "month", "day", "weekday").dropDuplicates(["sk_date_id"])

    return new_data

def create_dim_time(sdf):

    # Select time-related columns
    new_data = sdf.select("pk_time_id", "sk_time_id", "hour").dropDuplicates(["sk_time_id"])

    return new_data

def create_dim_post(sdf):

    # Select post-related columns
    new_data = sdf.select("pk_post_id", "sk_post_id", "title", "title_length", "link_flair_text", "url").dropDuplicates(["sk_post_id"])

    return new_data

def create_dim_engagement_attributes(sdf):

    # Select engagement-related columns
    new_data = sdf.select("pk_engagement_id", "sk_engagement_id", "is_adult_content", "is_spoiler", "is_stickied", "has_awards", "has_crossposts").dropDuplicates(["sk_engagement_id"])
    
    return new_data    

StatementMeta(, 360bc22a-55d4-49d9-99af-7aa4b0b05708, 6, Finished, Available, Finished)

##### 3. Save dimension tables using merge


In [5]:
def save_dim_table(df, table_name, database_name, partition_columns=None, match_condition=None):
    """
    Save DataFrame as a Delta table in Microsoft Fabric with upsert functionality using Spark SQL.
    
    :param df: DataFrame to save
    :param table_name: Table name
    :param database_name: Database name
    :param partition_columns: List of columns to partition by
    :param match_condition: SQL condition for matching records (e.g., "target.id = source.id")
    """
    # Create the database if it doesn't exist
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

    full_table_name = f"{database_name}.{table_name}"

    # Check if the table exists
    table_exists = spark.catalog.tableExists(full_table_name)

    if not table_exists:

        # If table doesn't exist, create it
        writer = df.write.format("delta")
        if partition_columns:
            writer = writer.partitionBy(*partition_columns)
        writer.mode("overwrite").saveAsTable(full_table_name)
        print(f"Table {full_table_name} created.")

    else:

        # If table exists, perform upsert using MERGE INTO
        if not match_condition:
            raise ValueError("match_condition is required for upsert operations.")

        # Register the source DataFrame as a temp view
        df.createOrReplaceTempView("source_view")

        # Build and run the MERGE SQL
        merge_sql = f"""
        MERGE INTO {full_table_name} AS target
        USING source_view AS source
        ON {match_condition}
        WHEN MATCHED THEN
          UPDATE SET *
        WHEN NOT MATCHED THEN
          INSERT *
        """
        spark.sql(merge_sql)
        
        print(f"Upsert completed for table {full_table_name}.")

def save_SCD_2_dim_table(df, table_name, database_name, partition_columns=None, match_condition=None):
    """
    Save DataFrame as a Delta table in Microsoft Fabric or Databricks with upsert (MERGE) logic.
    
    :param df: DataFrame to save
    :param table_name: Table name
    :param database_name: Database name
    :param partition_columns: List of columns to partition by
    :param match_condition: SQL condition for matching (e.g., "target.sk_author_id = source.sk_author_id")
    """
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
    full_table_name = f"{database_name}.{table_name}"
    table_exists = spark.catalog.tableExists(full_table_name)

    if not table_exists:
        writer = df.write.format("delta")
        if partition_columns:
            writer = writer.partitionBy(*partition_columns)
        writer.mode("overwrite").saveAsTable(full_table_name)
        print(f"Table {full_table_name} created.")
    else:
        if not match_condition:
            raise ValueError("match_condition is required for upsert operations.")

        # Register the source DataFrame
        df.createOrReplaceTempView("source_view")

        # Dynamically build column mappings
        columns = df.columns
        update_clause = ", ".join([f"target.{col} = source.{col}" for col in columns])
        insert_cols = ", ".join(columns)
        insert_vals = ", ".join([f"source.{col}" for col in columns])

        merge_sql = f"""
        MERGE INTO {full_table_name} AS target
        USING source_view AS source
        ON {match_condition} AND target.is_current = TRUE
        WHEN MATCHED THEN
          UPDATE SET {update_clause}
        WHEN NOT MATCHED THEN
          INSERT ({insert_cols}) VALUES ({insert_vals})
        """
        spark.sql(merge_sql)
        print(f"Upsert completed for table {full_table_name}.")



StatementMeta(, 360bc22a-55d4-49d9-99af-7aa4b0b05708, 7, Finished, Available, Finished)

##### 4. Create the fact table


In [6]:
def create_fact_post_metrics(sdf, dim_author, dim_date, dim_time, dim_post, dim_engagement_attributes):

    # Defining alias
    dim_author = dim_author.alias("da")
    dim_date = dim_date.alias("dd")
    dim_time = dim_time.alias("dt")
    dim_post = dim_post.alias("dp")
    dim_engagement_attributes = dim_engagement_attributes.alias("de")
    new_data = sdf.alias("nd")

    # Perform joins
    fact_table = new_data \
        .join(broadcast(dim_author), col("nd.sk_author_id") == col("da.sk_author_id"), "left") \
        .join(broadcast(dim_date), col("nd.sk_date_id") == col("dd.sk_date_id"), "left") \
        .join(broadcast(dim_time), col("nd.sk_time_id") == col("dt.sk_time_id"), "left") \
        .join(broadcast(dim_post), col("nd.sk_post_id") == col("dp.sk_post_id"), "left") \
        .join(broadcast(dim_engagement_attributes), col("nd.sk_engagement_id") == col("de.sk_engagement_id"), "left") \
        .select(
            col("da.pk_author_id"),
            col("dd.pk_date_id"),
            col("dt.pk_time_id"),
            col("dp.pk_post_id"),
            col("de.pk_engagement_id"),
            col("nd.Submission_Fct_id"),
            col("nd.sk_author_id"),
            col("nd.sk_date_id"),
            col("nd.sk_time_id"),
            col("nd.sk_post_id"),
            col("nd.sk_engagement_id"),
            col("nd.engagement_score"),
            col("nd.award_rate"),
            col("nd.score_upvote_ratio"),
            col("nd.engagement_ratio"),
            col("nd.Total_Awards_Received"),
            col("nd.Gilded_Count"),
            col("nd.Number_of_Crossposts"),
            col("nd.score"),
            col("nd.upvote_ratio"),
            col("nd.num_comments")
        )

    return fact_table

StatementMeta(, 360bc22a-55d4-49d9-99af-7aa4b0b05708, 8, Finished, Available, Finished)

##### 5. Incrementally save fact table

In [7]:
def incremental_load_fact_table(new_data, table_name, database_name, partition_columns=None):
    """
    SCD Type 2 incremental load with support for multiple source rows per key.
    """
    full_table_name = f"{database_name}.{table_name}"
    table_exists = spark.catalog.tableExists(full_table_name)

    # Add SCD2 columns to new data
    new_data_with_flags = new_data.withColumn("valid_from", current_timestamp()) \
                                  .withColumn("valid_to", lit("9999-12-31 23:59:59.999")) \
                                  .withColumn("is_current", lit(True))

    if not table_exists:
        writer = new_data_with_flags.write.format("delta").mode("overwrite")
        if partition_columns:
            writer = writer.partitionBy(*partition_columns)
        writer.saveAsTable(full_table_name)
        print(f"✅ Created table: {full_table_name}")
        return

    # Step 1: Deduplicate source keys for safe merge
    new_data_with_flags.select("Submission_Fct_id").distinct().createOrReplaceTempView("deduped_source_keys")


    # Step 2: Expire active target records matching deduped source keys
    spark.sql(f"""
    MERGE INTO {full_table_name} AS target
    USING deduped_source_keys AS source
    ON target.Submission_Fct_id = source.Submission_Fct_id
        AND target.valid_to = '9999-12-31 23:59:59.999'
        AND target.is_current = true
    WHEN MATCHED THEN UPDATE SET
        target.valid_to = current_timestamp(),
        target.is_current = false
    """)

    # Step 3: Append all new source records (including duplicates)
    new_data_with_flags.write.format("delta").mode("append").saveAsTable(full_table_name)
    print(f"✅ Incremental SCD2 load completed for table: {full_table_name}")


StatementMeta(, 360bc22a-55d4-49d9-99af-7aa4b0b05708, 9, Finished, Available, Finished)

##### 6. Execute full pipeline

In [8]:

"""
Select Silver_Layer.transformed_data to perform DWH Loading
"""

sdf = spark.sql("SELECT * FROM Silver_Layer.transformed_data")

"""
Creates and saves all dimension and fact tables to a Hive Metastore database.
Overwrites dimension tables and appends to the fact table.
"""

database_name = "gold_dimensional_modeling"

# Create dimension tables
dim_author = create_dim_author(sdf, "gold_dimensional_modeling.dim_author")            
dim_date = create_dim_date(sdf)
dim_time = create_dim_time(sdf)
dim_post = create_dim_post(sdf)
dim_engagement_attributes = create_dim_engagement_attributes(sdf)

# Save dimension tables (overwrite mode)
save_SCD_2_dim_table(dim_author, table_name="dim_author", database_name=database_name, partition_columns=["valid_from", "valid_to"], match_condition="target.sk_author_id = source.sk_author_id")
save_dim_table(dim_date, "dim_date", database_name, partition_columns=["year", "month"], match_condition="target.sk_date_id == source.sk_date_id")
save_dim_table(dim_time, "dim_time", database_name, match_condition="target.sk_time_id == source.sk_time_id")
save_dim_table(dim_post, "dim_post", database_name, match_condition="target.sk_post_id == source.sk_post_id")
save_dim_table(dim_engagement_attributes, "dim_engagement_attributes", database_name, match_condition="target.sk_engagement_id == source.sk_engagement_id")


# Create fact table
new_data = create_fact_post_metrics(sdf, dim_author, dim_date, dim_time, dim_post, dim_engagement_attributes)

# Save fact table (with existence check and partitioning)
incremental_load_fact_table(new_data, "fact_Submissions", database_name=database_name, partition_columns= ["valid_from", "valid_to"])

StatementMeta(, 360bc22a-55d4-49d9-99af-7aa4b0b05708, 10, Finished, Available, Finished)

Upsert completed for table gold_dimensional_modeling.dim_author.
Upsert completed for table gold_dimensional_modeling.dim_date.
Upsert completed for table gold_dimensional_modeling.dim_time.
Upsert completed for table gold_dimensional_modeling.dim_post.
Upsert completed for table gold_dimensional_modeling.dim_engagement_attributes.
✅ Incremental SCD2 load completed for table: gold_dimensional_modeling.fact_Submissions
