#### Dataset Refresh Facts

##### Data ingestion strategy:
<mark style="background: #88D5FF;">**MERGE**</mark>

##### Related pipeline:

**Ext_Load_PBI_Dataset_Refresh_E2E**

##### Source:

**Files** from FUAM_Ext_Lakehouse folder **bronze_file_location** variable

##### Target:

**1 Delta table** in FUAM_Ext_Lakehouse 
- **gold_table_name** variable value


In [None]:
## Parameters
display_data = True

bronze_file_location = "Files/raw/dataset_refresh/"
gold_table_name = "dataset_refreshes"

# use case valid-data
# dataset_id = "2cfa112f-405c-4367-876f-d5591a270bab"

# use case empty-array
# dataset_id = "1ECDC217-48FF-4308-8320-CD30305E1F3C"

# use case service-exception
# dataset_id = "0270E8BE-EA7A-47DC-B698-6FD6A8D1A372"

# use case expand-array-issue
dataset_id = "5A03CB75-6CB9-410A-B5BC-13244F6C2D5E"

print("Successfully configured all paramaters for this run.")

In [None]:
## Import all packages used in this notebook
import datetime
from delta.tables import DeltaTable
from notebookutils import mssparkutils # type: ignore
from pyspark.sql.functions import * # type ignore
from pyspark.sql.types import * # type ignore
from pyspark.sql.window import Window # type ignore
from pyspark.sql import SparkSession # type: ignore
import random
import re
import time

print("Successfully imported all packages for this notebook.")

In [None]:
#
# Create the Spark session
#
app_name = "TransferDatasetRefreshes"

# Get the current Spark session
spark = SparkSession.builder \
    .appName(app_name) \
    .getOrCreate()

print(f"Spark session {app_name} has been created successfully.")

In [None]:
#
# Function to get check if the gold layer table already exists
#
def gold_table_exists(gold_table_name: str, spark) -> bool:
    """
    Checks if a table exists in the FUAM_Ext_Lakehouse catalog.

    Args:
        gold_table_name (str): Name of the table to check.
        spark (SparkSession): The active Spark session.

    Returns:
        bool: True if the table exists, False otherwise.
    """
    table_exists = spark._jsparkSession.catalog().tableExists('FUAM_Ext_Lakehouse', gold_table_name)
    return table_exists

print("The function 'gold_table_exists' has been created successfully.") 

In [None]:
#
# Function to get the refresh date for UPSERT processing from the SILVER and GOLD layers
#
def get_refresh_date(gold_table_name: str, dataset_id: str, silver_df, spark) -> datetime:
    spark = silver_df.sparkSession  # Get SparkSession from the DataFrame

    # Get earlist date from silver_df
    silver_min_df = silver_df.select(col('CreationDate')).orderBy(col('CreationDate'), ascending=True).first()
    silver_earlist_date = silver_min_df['CreationDate'] if silver_min_df else None

    if gold_table_exists(gold_table_name, spark):
        # Get latest date from gold table
        get_latest_date_sql = f"""
            SELECT CreationDate 
            FROM FUAM_Ext_Lakehouse.{gold_table_name}
            WHERE DatasetId = '{dataset_id}'
            ORDER BY CreationDate DESC 
            LIMIT 1
        """
        gold_max_df = spark.sql(get_latest_date_sql)

        if gold_max_df.count() == 0:
            print("No existing records in gold table. Using date from silver_df.")
            refresh_date = silver_earlist_date
        else:
            print("Using date from gold.")
            refresh_date = gold_max_df.first()['CreationDate']
    else:
        print("Using date from silver_df.")
        refresh_date = silver_earlist_date

    print(f"Refresh start date: {refresh_date}")
    return refresh_date

print("The function 'get_refresh_date' has been created successfully.") 

In [None]:
#
# Explodes the 'refreshAttempts' array while preserving rows with empty arrays.
# Returns a flattened DataFrame with one row per attempt, or one row with nulls if none exist.
# 
def flatten_refresh_attempts(input_df):
    """
    Safely explodes the 'refreshAttempts' array of structs, preserving rows with empty arrays.
    Returns a DataFrame with attempt fields flattened and null-padded when missing.
    """
    attempt_fields = ["attemptId", "startTime", "endTime", "type"]
    
    # Step 1: Separate rows with and without attempts
    df_with_attempts = input_df.filter(size("refreshAttempts") > 0)
    df_without_attempts = input_df.filter(size("refreshAttempts") == 0)

    # Step 2: Explode only if refreshAttempts is an array of structs
    exploded_df = df_with_attempts.withColumn("attempt", explode("refreshAttempts"))

    # Step 3: Check if 'attempt' is actually a struct and extract fields safely
    # Guard against attempt being STRING
    attempt_schema = exploded_df.select("attempt").schema[0].dataType
    if isinstance(attempt_schema, StructType):
        for field in attempt_fields:
            exploded_df = exploded_df.withColumn(field, col(f"attempt.{field}"))
    else:
        # Fallback: skip extraction and pad fields with nulls
        for field in attempt_fields:
            exploded_df = exploded_df.withColumn(field, lit(None).cast(StringType()))

    exploded_df = exploded_df.drop("attempt", "refreshAttempts")

    # Step 4: Pad missing fields for rows without attempts
    for field in attempt_fields:
        if field not in df_without_attempts.columns:
            df_without_attempts = df_without_attempts.withColumn(field, lit(None).cast(StringType()))

    df_without_attempts = df_without_attempts.drop("refreshAttempts")

    # Step 5: Union both sides
    flat_df = exploded_df.unionByName(df_without_attempts)

    return flat_df

print("The function 'flatten_refresh_attempts' has been created successfully.")

In [None]:
#
# Parses 'serviceExceptionJson' column if present, extracting fields like 'errorCode'.
# Returns a DataFrame with 'errorCode' added and handles missing structures gracefully.
#
def extract_service_exception_fields(input_df):
    """
    Parses 'serviceExceptionJson' column if present, extracting fields like 'errorCode'.
    Returns a DataFrame with 'errorCode' added and handles missing structures gracefully.
    """
    # Define expected schema inside the JSON string
    exception_schema = StructType([
        StructField("errorCode", StringType(), True)
    ])
    
    # Check for existence and parse if present
    if "serviceExceptionJson" in input_df.columns:
        input_df = input_df.withColumn("exception_struct", from_json(col("serviceExceptionJson"), exception_schema))
        input_df = input_df.withColumn("errorCode", col("exception_struct.errorCode"))
        input_df = input_df.drop("exception_struct", "serviceExceptionJson")
    else:
        input_df = input_df.withColumn("errorCode", lit(None).cast(StringType()))
    
    # Return result under standard name
    flat_df = input_df
    return flat_df

print("The function 'extract_service_exception_fields' has been created successfully.")

In [None]:
#
# Get the dataset refresh data from the BRONZE layer
#
raw_location = f"{bronze_file_location}{dataset_id.upper()}.json"
bronze_df = spark.read.json(raw_location)

print(f"Bronze data from {raw_location} has been read successfully.")

In [None]:
if display_data:
    display(bronze_df)

In [None]:
#
# Check if DataFrame has exactly 1 row and an empty dataset refresh array
#
if bronze_df.count() == 1 and bronze_df.filter(size(col("value")) == 0).count() == 1:
    print("❗️ No data to process--exiting notebook gracefully.")
    mssparkutils.notebook.exit("Completed: no-op")
else:
    print(" ✅ Dataset refreshes exist--continue processing.")


In [None]:
#
# Parse the JSON content from the bronze layer
#

# Explode the refreshes array
refresh_df = bronze_df.select(explode("value").alias("refreshes"))

# Detect which fields exist in the nested struct
available_fields = refresh_df.select("refreshes.*").schema.names

# List all the fields you want to pull (if present)
expected_fields = [
    ("startTime", "refreshStartTime"),
    ("endTime", "refreshEndTime"),
    ("id", "refreshId"),
    ("status", "refreshStatus"),
    ("requestId", "requestId"),
    ("extendedStatus", "extendedStatus"),  # optional
    ("serviceExceptionJson", "serviceExceptionJson"),  # optional
    ("refreshType", "refreshType"),
    ("refreshAttempts", "refreshAttempts")  # needed for explode
]

# Dynamically select available ones
select_exprs = [
    col(f"refreshes.{src}").alias(alias)
    for src, alias in expected_fields if src in available_fields
]

# Always include the exploded attempts safely
refresh_struct_df = refresh_df.select(*select_exprs)

# Explode the attempts array, if present
flat_df = flatten_refresh_attempts(refresh_struct_df)

# Parse serviceExceptionJson, if present
flat_df = extract_service_exception_fields(flat_df)

print(f"Bronze data from {raw_location} has been extracted and transformed.")

In [None]:
if display_data:
    display(flat_df)

In [None]:
#
# Create the silver layer dataframe
# Dataset IDs in the Lakehouse are expected to be uppercase
#

# Identify 'time'-related columns (case-insensitive)
time_columns = [c for c in flat_df.columns if "time" in c.lower()]

# Cast all time columns to TimestampType
silver_df = flat_df
for c_name in time_columns:
    silver_df = silver_df.withColumn(c_name, col(c_name).cast(TimestampType()))

# Add uppercase DatasetId column
silver_df = silver_df.withColumn("DatasetId", upper(lit(dataset_id)))

# Add create date to be used in table partitioning
silver_df = silver_df.withColumn("CreationDate", to_date("refreshStartTime"))

# Add time identifier to join these facts to the 'time' dimension table
silver_df = silver_df.withColumn(
    "TimeId",
    concat(
        lpad(hour("refreshStartTime").cast("string"), 2, "0"),
        lpad(minute("refreshStartTime").cast("string"), 2, "0")
    ).cast("int")
)

# Reorder columns — DatasetId first, CreationDate last
cols = silver_df.columns
ordered_cols = ["DatasetId"] + [c for c in cols if c not in ["DatasetId", "CreationDate"]] + ["CreationDate"]
silver_df = silver_df.select(ordered_cols)

print(f"Silver dataframe has been created successfully with {silver_df.count()} rows.")

In [None]:
if display_data:
    display(silver_df)

In [None]:
#
# Create the partitioned Delta table in the Fabric lakehouse if it does not already exist
#
if not gold_table_exists(gold_table_name, spark):
    silver_df.write \
    .format("delta") \
    .partitionBy("CreationDate", "DatasetId") \
    .mode("errorifexists") \
    .saveAsTable(gold_table_name)

    print(f"Gold table {gold_table_name} has been created successfully.")
    mssparkutils.notebook.exit("Completed: new-table")

else:
    # Get the date to start append processing
    refresh_date = get_refresh_date(gold_table_name, dataset_id, silver_df, spark) 
        
    # Filter silver_df data based on reresh date
    silver_df = silver_df.filter(col("CreationDate") >= lit(refresh_date))

    print(f"Silver dataframe has been filtered successfully on/after {refresh_date} with {silver_df.count()} rows.")

In [None]:
#
# Configure the MERGE process
#
max_retries = 5
retry_delay = 60  # max delay in seconds
success = False  # Flag to indicate success

match_criteria = f"""
        target.DatasetId = source.DatasetId AND
        target.refreshStartTime = source.refreshStartTime AND
        target.refreshId = source.refreshId AND
        target.attemptid = source.attemptid AND
        target.type = source.type
    """

# Replace all non-alphanumeric characters (except spaces) with nothing
match_criteria = match_criteria.replace('\n', ' ').replace('\r', ' ')
# Replace multiple spaces with a single space
match_criteria = re.sub(r'\s+', ' ', match_criteria)
# Trim leading/trailing whitespace
match_criteria = match_criteria.strip()

print("Successfully configured the merge process.")

In [None]:
#
# Dynamically align the source SILVER layer to the Delta GOLD table schema for MERGE
#
target_schema = spark.table(gold_table_name).columns

# Add missing columns
for col_name in target_schema:
    if col_name not in silver_df.columns:
        silver_df = silver_df.withColumn(col_name, lit(None).cast(StringType()))

# Align column order 
silver_df = silver_df.select(target_schema)

print(f"Successfully aligned the source to the {gold_table_name} gold table target schema.")

In [None]:
#
# Perform the merge to insert new and update existing records (UPSERT approach)
#

# Load the target Delta table
delta_table = DeltaTable.forName(spark, gold_table_name)

# Perform the MERGE operation with auto-update and insert
for attempt in range(max_retries):
    try:
        delta_table.alias("target") \
            .merge(silver_df.alias("source"), match_criteria) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
        success = True
        break  # Exit loop if successful
    except Exception as e:
        if "ConcurrentAppendException" in str(e):
            wait_time = random.randint(1, retry_delay)
            print(f"Retrying due to concurrent append conflict... Attempt {attempt + 1}, sleeping {wait_time} seconds")
            time.sleep(wait_time)
        else:
            raise e  # Raise other errors

if not success:
    print(f"Merge operation for gold table {gold_table_name} failed after {max_retries} retries.")
else:
    print(f"Gold table {gold_table_name} has been merged successfully.")

In [None]:
#
# Write history of bronze files
#
raw_path = bronze_file_location.replace("*/", '', )
history_path = raw_path.replace("Files/raw/", "Files/history/")
mssparkutils.fs.cp(raw_path, history_path + datetime.datetime.now().strftime('%Y/%m/%d') + "/", True) # type: ignore

print(f"History data copied to {history_path} successfully.")