In [2]:
from tenacity import retry, stop_never, wait_fixed, retry_if_exception
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import lit

StatementMeta(, 62daa86b-3829-4c13-a613-e7d7325ad4ca, 4, Finished, Available, Finished)

In [3]:
# CONSTANTS
SERVER_NAME='dwpbgshdgicevmv2xhwrfh6yam-hgg4xv4ptkse7pco4qgre7rpri.datawarehouse.fabric.microsoft.com'
DATABASE_NAME='sample_warehouse'



StatementMeta(, 62daa86b-3829-4c13-a613-e7d7325ad4ca, 5, Finished, Available, Finished)

In [8]:
# -----------------------------------------------------------
# 1. Execute Dynamic SQL Against Fabric Warehouse
# -----------------------------------------------------------
def execute_dynamic_dw_query(query: str) -> None:
    """
    Executes a SQL statement dynamically through JDBC using Spark's JRE.
    This function is used for all ETL metadata updates.

    Parameters:
        query (str): SQL query to execute.
    """

    server_name = SERVER_NAME
    database_name = DATABASE_NAME
    port = 1433

    jdbc_url = f"jdbc:sqlserver://{server_name}:{port};database={database_name}"

    # Authentication token for SQL Endpoint (Fabric)
    token = mssparkutils.credentials.getToken(
        'https://analysis.windows.net/powerbi/api'
    )

    # JDBC connection properties
    java_props = spark._jvm.java.util.Properties()
    java_props.setProperty("accessToken", token)
    java_props.setProperty("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")

    # Create JDBC connection
    conn = spark._jvm.java.sql.DriverManager.getConnection(jdbc_url, java_props)

    stmt = conn.createStatement()
    # stmt.execute(query)
    has_result_set = stmt.execute(query)
    max_rows =20

    if has_result_set:
        rs = stmt.getResultSet()
        rs_meta = rs.getMetaData()
        col_count = rs_meta.getColumnCount()

        # Print column headers
        columns = [rs_meta.getColumnName(i) for i in range(1, col_count + 1)]
        print(" | ".join(columns))
        print("-" * 80)

        row_count = 0
        while rs.next() and row_count < max_rows:
            row = [str(rs.getObject(i)) for i in range(1, col_count + 1)]
            print(" | ".join(row))
            row_count += 1

        if row_count == max_rows:
            print(f"... output truncated after {max_rows} rows")

        rs.close()
    else:
        # For UPDATE / INSERT / DELETE
        update_count = stmt.getUpdateCount()
        print(f"Statement executed successfully. Rows affected: {update_count}")

    stmt.close()
    conn.close()

    print(f"SQL executed successfully: {query}")

StatementMeta(, 62daa86b-3829-4c13-a613-e7d7325ad4ca, 10, Finished, Available, Finished)

In [9]:
# query = """SELECT TOP (100) [GeographyID],
# 			[ZipCodeBKey],
# 			[County],
# 			[City],
# 			[State],
# 			[Country],
# 			[ZipCode]
# FROM [sample_warehouse].[dbo].[Geography]"""

# execute_dynamic_dw_query(query)

StatementMeta(, 62daa86b-3829-4c13-a613-e7d7325ad4ca, 11, Finished, Available, Finished)

GeographyID | ZipCodeBKey | County | City | State | Country | ZipCode
--------------------------------------------------------------------------------
10 | 90001 | Manhattan County | New York | NY | USA | 10010
1 | 10001 | Manhattan County | New York | NY | USA | 10001
2 | 10002 | Manhattan County | New York | NY | USA | 10002
3 | 10003 | Manhattan County | New York | NY | USA | 10003
4 | 10004 | Manhattan County | New York | NY | USA | 10004
5 | 10005 | Manhattan County | New York | NY | USA | 10005
6 | 10006 | Manhattan County | New York | NY | USA | 10006
7 | 10007 | Manhattan County | New York | NY | USA | 10007
8 | 10008 | Manhattan County | New York | NY | USA | 10008
9 | 10009 | Manhattan County | New York | NY | USA | 10009
11 | 11201 | Kings County | New York | NY | USA | 11201
12 | 11202 | Kings County | New York | NY | USA | 11202
13 | 11203 | Kings County | New York | NY | USA | 11203
14 | 11204 | Kings County | New York | NY | USA | 11204
15 | 11205 | Kings County | New Yo

In [10]:
# -----------------------------------------------------------
# 2. Helper Functions for Exception Detection
# -----------------------------------------------------------
def is_snapshot_isolation(exception):
    """Identify snapshot-isolation conflict errors."""
    return "Snapshot isolation" in str(exception)


def is_invalid_object_exception(exception):
    """Detect 'table not found' errors on the SQL Endpoint."""
    return "Invalid object name" in str(exception)

StatementMeta(, 62daa86b-3829-4c13-a613-e7d7325ad4ca, 12, Finished, Available, Finished)

In [11]:
# -----------------------------------------------------------
# 3. Update ETL_TASK_EXECUTION_LOG with Retry on Snapshot Errors
# -----------------------------------------------------------
@retry(
    stop=stop_never,          # Retry indefinitely
    wait=wait_fixed(20),      # 20 sec pause between attempts
    retry=retry_if_exception(is_snapshot_isolation),
)
def retryable_update(run_id, task_ids, status, task_error_message=None, predecessor_task_id=None):
    """
    Updates ETL_TASK_EXECUTION_LOG with retry logic for snapshot isolation conflicts.
    This ensures ETL metadata updates succeed even under high concurrency.

    Parameters:
        run_id (int): Instance ID
        task_ids (int | list): One or multiple task IDs
        status (str): SUCCESS / FAILED / BLOCKED
        task_error_message (str): Optional error message
    """

    # Normalize task_ids to always be a list
    if isinstance(task_ids, int):
        task_ids = [task_ids]
    elif not isinstance(task_ids, list):
        raise ValueError("task_ids should be an int or a list")

    task_id_str = ", ".join(map(str, task_ids))

    # Build TASK_ERROR_MESSAGE
    if status == "FAILED":
        task_error_message = f"'{task_error_message}'" if task_error_message else "NULL"
    elif status == "BLOCKED":
        task_error_message = f"'PREDECESSOR TASK {predecessor_task_id} FAILED'"
    else:
        task_error_message = "NULL"

    sql = f"""
        UPDATE dbo.ETL_TASK_EXECUTION_LOG
        SET END_TIME = CURRENT_TIMESTAMP,
            STATUS = '{status}',
            TASK_ERROR_MESSAGE = {task_error_message}
        WHERE INSTANCE_ID = {run_id}
        AND TASK_ID IN ({task_id_str})
    """

    print(f"Generated SQL: {sql}")

    try:
        execute_dynamic_dw_query(sql)

    except Exception as e:
        if is_snapshot_isolation(str(e)):
            # Trigger retry in Tenacity
            print(f"Snapshot isolation conflict detected for tasks {task_ids}. Retrying...")
            raise Exception(str(e))

        # Fallback update on unexpected errors
        fallback_sql = f"""
            UPDATE dbo.ETL_TASK_EXECUTION_LOG
            SET END_TIME = CURRENT_TIMESTAMP,
                STATUS = '{status}',
                TASK_ERROR_MESSAGE = 'FAILED WHILE UPDATING METADATA'
            WHERE INSTANCE_ID = {run_id}
            AND TASK_ID IN ({task_id_str})
        """
        print("Executing fallback update...")
        execute_dynamic_dw_query(fallback_sql)

    print(f"Updated task execution log for Run ID {run_id}, Task(s) {task_ids}.")


# -----------------------------------------------------------
# 4. Check if Lakehouse Temp Table Appears on SQL Endpoint + Update Params Table
# -----------------------------------------------------------
@retry(
    stop=stop_never,
    wait=wait_fixed(20),
    retry=retry_if_exception(lambda e: is_invalid_object_exception(e) or is_snapshot_isolation(e)),
)
def check_for_lakehousetable_update_params_table(instance_id, task_id):
    """
    Polls SQL Endpoint repeatedly until DYNA_PARAMS_xx_xx table becomes available.
    Then updates ETL_PARAMS using the data.

    Handles both:
      - SQL Endpoint propagation delay
      - Snapshot isolation conflicts
    """

    print("Checking if dynamic params table is available in SQL Endpoint...")

    staging_table = f"sample_lakehouse.dbo.DYNA_PARAMS_{instance_id}_{task_id}"

    # Check if table exists
    execute_dynamic_dw_query(f"SELECT * FROM {staging_table}")
    print("Dynamic params table detected. Proceeding with ETL_PARAMS update.")

    # Update ETL_PARAMS table
    sql = f"""
        UPDATE dbo.[ETL_PARAMS]
        SET LAST_UPDATED_DATE = CAST(GETDATE() AS DATE),
            PREV_PARAMETER_VALUE = stg.PREV_PARAM_VALUE,
            PARAMETER_VALUE = stg.CUR_PARAM_VALUE
        FROM {staging_table} stg
        INNER JOIN dbo.[ETL_PARAMS] tg
            ON stg.PLAN_ID = tg.PLAN_ID
            AND stg.TASK_ID = tg.TASK_ID
            AND stg.PARAMETER_NAME = tg.PARAMETER_NAME
        WHERE tg.PARAMETER_TYPE = 'dynamic';
    """

    print(f"Generated Update SQL: {sql}")
    execute_dynamic_dw_query(sql)

    print(f"ETL_PARAMS updated successfully for Run ID {instance_id}, Task {task_id}.")


# -----------------------------------------------------------
# 5. Save Dynamic Params to Lakehouse → Wait for Sync → Update Warehouse → Clean Up
# -----------------------------------------------------------
def save_dynamic_params(op_json, instance_id, task_id, plan_id):
    """
    Writes dynamic parameters to Lakehouse temp folder,
    waits for the data to sync to SQL Endpoint,
    updates ETL_PARAMS table,
    and removes temporary location.

    Parameters:
        op_json: Parsed JSON containing dyna_params array
        instance_id (int)
        task_id (int)
        plan_id (int)
    """

    print(f"Creating dataframe for dynamic parameters: {op_json['dyna_params']}")

    # Define schema for dynamic parameters
    columns = ['PLAN_ID', 'TASK_ID', 'PARAMETER_NAME', 'PREV_PARAM_VALUE', 'CUR_PARAM_VALUE']
    schema = StructType([StructField(c, StringType(), True) for c in columns])

    df = spark.createDataFrame(op_json['dyna_params'], schema)
    df = df.withColumn("TASK_ID", lit(task_id)).withColumn("PLAN_ID", lit(plan_id))

    lakehouse_path = f"{METADATA_LAKEHOUSE_TEMP_LOC}/DYNA_PARAMS_{instance_id}_{task_id}"

    df.write.mode("overwrite").option("overwriteSchema", "True").save(lakehouse_path)

    print(f"Written dynamic params to temp Lakehouse location: {lakehouse_path}")

    # Wait for SQL Endpoint sync → Then update ETL_PARAMS
    check_for_lakehousetable_update_params_table(instance_id, task_id)

    # Cleanup Lakehouse temp folder
    print(f"Removing temporary Lakehouse folder: {lakehouse_path}")
    notebookutils.fs.rm(lakehouse_path, True)

    print("Dynamic parameter update completed successfully.")


StatementMeta(, 62daa86b-3829-4c13-a613-e7d7325ad4ca, 13, Finished, Available, Finished)