In [0]:
dbutils.widgets.text("p_cdc_instance_name", "", "CDC Instance Name")
dbutils.widgets.text("p_timestamp", "", "PipelineStartTime")
dbutils.widgets.text("p_product_id", "", "Product ID")
dbutils.widgets.text("p_environment_name", "", "Environment Name")

In [0]:
def get_widget_values():
    v_widget_values_dict = {
        "filter_product_id"       : dbutils.widgets.get("p_product_id").strip(),
        "filter_timestamp"        : dbutils.widgets.get("p_timestamp").strip(),
        "filter_cdc_instance_name": dbutils.widgets.get("p_cdc_instance_name").strip(),
        "filter_environment_name" : dbutils.widgets.get("p_environment_name").strip()
    }

    v_required_widgets_list = [
        "filter_product_id","filter_timestamp", "filter_cdc_instance_name","filter_environment_name"
    ]

    for widget in v_required_widgets_list:
        if not v_widget_values_dict[widget]:
            raise ValueError(f"The widget '{widget}' must be populated before continuing.")

    return v_widget_values_dict

try:
    v_widget_values          = get_widget_values()
    v_product_id          = v_widget_values["filter_product_id"]
    v_timestamp           = v_widget_values["filter_timestamp"]
    v_cdc_instance_name   = v_widget_values["filter_cdc_instance_name"]
    v_environment         = v_widget_values["filter_environment_name"]

    print("All required widgets are populated. Proceeding with the script.")
except ValueError as e:
    print(f"Error: {str(e)}")
    raise


In [0]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import (
    col,
    max,
    expr,
    current_timestamp,
    regexp_replace,
    upper,
    when,
    lit,
    lower,
    concat
)
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.utils import AnalysisException

from delta.tables import DeltaTable

from datetime import datetime
import threading

import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed

from functools import reduce
import traceback
from typing import Dict
import time


In [0]:
%run "../common/DataFabricCommonFunctions"


In [0]:
v_locations_dict = get_locations_by_env(v_environment)

v_file_location        = v_locations_dict['source']
v_archive_location     = v_locations_dict['archive']
v_key_vault_location   = v_locations_dict['keyvault']
v_unity_catalog        = v_locations_dict['unity_catalog']

v_timestamp = datetime.strptime(v_timestamp, "%Y-%m-%d %H:%M:%S.%f").strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]

### Object Filtering Strategy: SQL Server vs Unity Catalog

After retrieving the list of source objects (tables/views) from SQL Server, we apply a filtering step to ensure that only the relevant and existing objects in **Unity Catalog** are processed.

#### Why This Matters
- Prevents unnecessary computation on tables that don't exist in Unity Catalog.
- Reduces risk of errors from referencing non-existent schemas or tables.
- Improves performance by limiting the scope of the loop to valid targets.

#### Process Overview
1. **Extract Object Metadata** from SQL Server (e.g., `Database`, `TableName`).
2. **List Available Objects** in Unity Catalog using Spark’s catalog introspection.
3. **Match & Filter**: Perform an inner join or intersection to retain only those objects that exist in both sources.
4. **Run Loop**: Iterate only on the matched objects for extraction, transformation, and loading (ETL).

This ensures your pipeline remains resilient and efficient while aligning strictly with Unity Catalog's metadata registry.


In [0]:
v_sql_server_df = (
    get_table_process_list(v_environment)
    .filter(col('CDCDestinationSchema') == f'{v_cdc_instance_name}')
    .withColumnRenamed('SourceFacilityId', 'SiteId')
    .withColumnRenamed('SourceServerName1', 'OriginalInstance')
    .withColumnRenamed('SourceDatabaseName1', 'DatabaseName')
    .withColumnRenamed('CDCKeyColumns', 'PK')
    .withColumn('UC_SchemaName', lower(col('CDCDestinationSchema')))
    .withColumn('TableName', lower(col('SourceTable')))
)

In [0]:
v_uc_schema_df = sql(f"SHOW SCHEMAS IN {v_unity_catalog}")
v_uc_schema_list = [row['databaseName'] for row in v_uc_schema_df.collect()]

v_all_schema_tables_list = []

for v_uc_schema_name in v_uc_schema_list:
    v_schema_tables_df = sql(f"SHOW TABLES IN {v_unity_catalog}.{v_uc_schema_name}").select('Database', 'TableName')
    v_all_schema_tables_list.append(v_schema_tables_df)

v_uc_tables_df = reduce(lambda x, y: x.unionByName(y), v_all_schema_tables_list)

In [0]:
v_sql_server_with_priority_df = v_sql_server_df.withColumn(
    "db_priority",
    when(v_sql_server_df["DatabaseName"] == "Archive", 1)
    .when(v_sql_server_df["DatabaseName"] == "Primary", 2)
    .otherwise(3),
)

v_execution_target_obj_df = (
    v_sql_server_with_priority_df.join(
        v_uc_tables_df.withColumnRenamed("Database", "UC_SchemaName"),
        ["UC_SchemaName", "TableName"],
        "inner",
    )
    .distinct()
    .orderBy("db_priority", "UC_SchemaName", "TableName", "InternalClientId")
)


In [0]:
v_agg_is_historical_df = v_execution_target_obj_df.groupBy(
    "UC_SchemaName", "DatabaseName", "InternalClientId", "TableName", "DataSourceShortName"
).agg(
    max("IsHistorical").alias("IsHistorical")
)

v_agg_is_historical_true_df = v_agg_is_historical_df.filter(
    col("IsHistorical") == True
).withColumn(
    "table_path",
    concat(
        lit(v_unity_catalog),
        lit(".db_"),
        col("InternalClientId"),
        lit("."),
        col("DataSourceShortName"),
        lit("_ods_"),
        col("TableName")
    )
)

v_overwrite_table_list = (
    v_agg_is_historical_true_df
    .select("table_path")
    .distinct()
    .toPandas()["table_path"]
    .tolist()
)


In [0]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark.sql(f"USE CATALOG {v_unity_catalog}")

for row in v_execution_target_obj_df.select("InternalClientId").distinct().collect():
    v_client_id = row["InternalClientId"]
    v_target_db_name = f"db_{v_client_id}"
    v_managed_location = f"{v_file_location}{v_client_id}"
    try:
        v_create_schema_stmt = (
            f"CREATE SCHEMA IF NOT EXISTS {v_unity_catalog}.{v_target_db_name} "
            f"MANAGED LOCATION '{v_managed_location}'"
        )
        print(v_create_schema_stmt)
        spark.sql(v_create_schema_stmt)
    except Exception as e:
        print(f"Failed to create schema for {v_client_id}: {e}")


In [0]:
v_common_db_name = f"db_common"
v_common_managed_location = f"{v_file_location}common"
try:
    v_create_schema_stmt = (
        f"CREATE SCHEMA IF NOT EXISTS {v_unity_catalog}.{v_common_db_name} "
        f"MANAGED LOCATION '{v_common_managed_location}'"
    )
    print(v_create_schema_stmt)
    spark.sql(v_create_schema_stmt)
except Exception as e:
    print(f"Failed to create schema for {v_client_id}: {e}")


## Data Split ETL Functions – Reference
---

### Decision Guide for `handle_merge`
- **Error if:**  
  - *Historical* **and** no `partition_keys` **and** no `merge_condition` → raises exception.  
  - *Non-historical* **and** no `merge_condition` → raises exception.
- **Actions:**  
  - If *historical* **and** `partition_keys` provided → first `DELETE` all rows with `InternalFacilityId = facility_id`, **then** proceed.  
  - If `merge_condition` provided → `MERGE` (`whenMatchedUpdate` with all columns except `created_by_user` and `process_timestamp`; `whenNotMatchedInsertAll`).  
  - Else (no `merge_condition`) → `append` (`mode="append"`).

> **Note:** The historical delete assumes the target table has `InternalFacilityId`. If it doesn’t, the `DELETE` will fail.

---

### Function Details

#### `format_query(query_str, placeholder_dict, uc_schema_name, table_name)`
**Purpose:** Return a fully formatted SQL string.  
**Behavior:**
- If `query_str` is provided, returns `query_str.format(**placeholder_dict)`.  
  - ⚠️ Missing keys in `placeholder_dict` will raise a `KeyError`.  
- Else returns `SELECT * FROM {uc_schema_name}.{table_name}`.

---

#### `add_audit_columns(df, client_id, facility_id, timestamp_str, schema_name)`
**Purpose:** Stamp audit/lineage columns.  
**Returns:** `None` if `df.count() == 0`, else a new DF with:
- `created_by_user = current_user()`
- `datetime_last_modified = timestamp_str`
- `modified_by_user = current_user()`
- `process_timestamp = timestamp_str`
- `InternalClientId = client_id`
- `InternalFacilityId = facility_id`
- `UC_SchemaName = schema_name`

---

#### `get_merge_condition(pk_str, df, facility_id)`
**Purpose:** Build a SQL `AND`-joined predicate for `DeltaTable.merge`.  
**Behavior:**
- From `pk_str="A,B,C"`, produces `target.A = source.A AND ...`
- If `InternalFacilityId` is a column **and** `facility_id` is not `None`, appends  
  `target.InternalFacilityId = {facility_id} AND source.InternalFacilityId = {facility_id}`.
- Returns `None` if no clauses built.

---

#### `update_watermark_value(v_product_id, original_instance_name, step_name_str, timestamp_str)`
**Purpose:** Update status/watermark in your config DB via stored procedure.  
**Notes:** Requires `execute_dbconfig_stored_procedure` and `v_environment`.

---

#### `handle_merge(df, table_path_str, merge_condition_str, partition_keys, facility_id, is_historical_flag)`
**Purpose:** Centralized **merge/append** (and optional **historical delete**) into a Delta table.

**Key logic:**
- **Validation:**
  - Historical & no partition keys & no merge condition → `raise`
  - Non-historical & no merge condition → `raise`
- **Historical deletion:** if historical **and** `partition_keys` provided →  
  `DELETE FROM {table} WHERE InternalFacilityId = {facility_id}`
- **Merge path:** if `merge_condition_str` present → `MERGE`:
  - `whenMatchedUpdate(set=update_map)`  
    - `update_map` includes **all** columns from `df` **except** `created_by_user` and `process_timestamp`  
      (⚠️ exclusion is **case-sensitive**: `{"created_by_user","process_timestamp"}`)
  - `whenNotMatchedInsertAll()`
- **Append path:** if no `merge_condition_str` → `df.write.format("delta").mode("append").saveAsTable(...)`

---

#### `overwrite_table_with_partition(df, table_path_str, v_partition_keys)`
**Purpose:** Full overwrite write, optionally partitioned.  
**Behavior:**  
- If `v_partition_keys` non-empty → `partitionBy(*v_partition_keys)`  
- Else → non-partitioned overwrite.

---

#### `get_partition_keys(df)`
**Purpose:** Recommend partition keys present in DF.  
**Current rule:** Returns `["InternalFacilityId"]` if available; else `[]`.

---

#### `check_partitions(table_path_str, partition_keys)`
**Purpose:** Determine if repartitioning is required.  
**Behavior:**  
- Returns `False` if `partition_keys ⊆ current_partition_columns` (no change needed)  
- Returns `True` otherwise (table should be rewritten with desired partitions)

---

#### `change_partition_of(table_path_str, partition_keys, client_id, facility_id, v_timestamp, uc_schema_name)`
**Purpose:** Rewrite table **with new partition spec** (and refresh audit columns).  
**Behavior:**  
- `spark.read.table(...)` → `add_audit_columns(...)` → `overwrite` with `partitionBy(*partition_keys)` and `option("overwriteSchema","true")`.  
- ⚠️ Rewrites **entire** table; ensure you intend to repartition all data.

---

### Gotchas & Tips
- `add_audit_columns` returns **`None`** for empty DataFrames; check before using.  
- `update_map` exclusion in `handle_merge` is **case-sensitive**; ensure column names exactly match.  
- `format_query` uses Python `str.format` – supply all placeholders or you’ll get a `KeyError`.  
- `check_partitions` only checks subset/superset, not order; repartition if you need an exact match.  

### Design Highlights
✅ Modular and reusable functions  
✅ Supports both merge and overwrite logic  
✅ Thread-safe dictionary updates using lock  
✅ Parallel execution for performance  
✅ Audit tracking and watermarking for data lineage  


In [0]:
def format_query(query_str, placeholder_dict, uc_schema_name, table_name):
    if query_str:
        return query_str.format(**placeholder_dict)
    return f"SELECT * FROM {uc_schema_name}.{table_name}"


def add_audit_columns(df, client_id, facility_id, timestamp_str, schema_name):
    return (
        df.withColumn("created_by_user", expr("current_user()"))
        .withColumn("datetime_last_modified", lit(timestamp_str))
        .withColumn("modified_by_user", expr("current_user()"))
        .withColumn("process_timestamp", lit(timestamp_str))
        .withColumn("InternalClientId", lit(client_id))
        .withColumn("InternalFacilityId", lit(facility_id))
        .withColumn("UC_SchemaName", lit(schema_name))
    )

def get_merge_condition(pk_str, df, facility_id):
    clauses = []

    if pk_str:
        pks = [p.strip() for p in pk_str.split(",") if p.strip()]
        clauses.extend([f"target.{p} = source.{p}" for p in pks])
    if "InternalFacilityId" in df.columns and facility_id is not None:
        clauses.append(f"target.InternalFacilityId = {facility_id} and source.InternalFacilityId = {facility_id}")
    
    if not clauses:
        return None
    else:
        return " AND ".join(clauses)


def update_watermark_value(v_product_id, original_instance_name, step_name_str, timestamp_str):
    execute_dbconfig_stored_procedure(
        f"""EXEC cfg.usp_UpdateSQLProcessStatusFacilityDetailWithWatermarkValue
        @InternalProductId = '{v_product_id}',
        @SourceInstanceServerName = '{original_instance_name}',
        @StepName = '{step_name_str}',
        @WatermarkValue = '{timestamp_str}'""",
        v_environment
    )

def handle_merge(df, table_path_str, merge_condition_str, partition_keys, facility_id, is_historical_flag, schema_name, pk_str):

    def _is_blank(s):
        return s is None or str(s).strip().lower() in {"", "none", "null"}

    excluded = {"created_by_user", "process_timestamp"}
    update_map = {c: f"source.{c}" for c in df.columns if c not in excluded}
    delta_table = DeltaTable.forName(spark, table_path_str)

    if "InternalFacilityId" in df.columns and facility_id is not None:
        df = df.filter(col("InternalFacilityId") == facility_id)

    do_delete = bool(
                    (is_historical_flag) or
                    (not is_historical_flag and not pk_str)
                    )

    do_merge  = (not _is_blank(merge_condition_str) and not _is_blank(pk_str))

    if do_delete:
        delete_query = f"""DELETE FROM {table_path_str}"""
        if schema_name and facility_id is not None:
            delete_query += f""" WHERE InternalFacilityId = {facility_id} and UC_SchemaName = '{schema_name}'"""
        spark.sql(delete_query)

    if do_merge:
        (
            delta_table.alias("target")
            .merge(df.alias("source"), merge_condition_str)
            .whenMatchedUpdate(set=update_map)
            .whenNotMatchedInsertAll()
            .execute()
        )
    else:
        df.write.format("delta").mode("append").saveAsTable(table_path_str)
        


def overwrite_table_with_partition(df, table_path_str, v_partition_keys):

    if v_partition_keys:
        df.write.format("delta").mode("overwrite").partitionBy(*v_partition_keys).saveAsTable(table_path_str)
    else:
        df.write.format("delta").mode("overwrite").saveAsTable(table_path_str)


def get_partition_keys(df):
    
    v_candidate_partition_keys = ["InternalFacilityId"]

    v_partition_keys_in_df = [k for k in v_candidate_partition_keys if k in df.columns]

    if v_partition_keys_in_df:
        return v_partition_keys_in_df
    else:
        return []

def check_partitions(table_path_str, partition_keys):
    detail = spark.sql(f"DESCRIBE DETAIL {table_path_str}").first()
    current_partitions = list(detail["partitionColumns"] or [])

    if set(partition_keys).issubset(set(current_partitions)):
        return False
    else:
        return True

def change_partition_of(table_path_str, partition_keys, client_id, facility_id, v_timestamp, uc_schema_name):
    print(f"Updating partition of {table_path_str}")
    df = spark.read.table(table_path_str)
    df = add_audit_columns(df, client_id, facility_id, v_timestamp, uc_schema_name)
    df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy(*partition_keys).saveAsTable(table_path_str)

In [0]:
def process_row(
    row_dict,
    v_unity_catalog,
    v_product_id,
    v_timestamp,
    overwrite_table_list
):
    log_messages = []
    try:
        original_instance_name = row_dict["OriginalInstance"]
        uc_schema_name = row_dict["UC_SchemaName"]
        database_name = row_dict["DatabaseName"]
        table_name = row_dict["TableName"]
        pk_str = row_dict["PK"]
        site_id = row_dict["SiteId"]
        inc_query_str = row_dict["IncrementalExtractQuery"]
        hist_query_str = row_dict["HistoricalExtractQuery"]
        watermark_value = row_dict["WatermarkValue"]
        is_historical_flag = row_dict["IsHistorical"]
        datasource_name = row_dict["DataSourceShortName"]
        step_name_str = f"{database_name}.{table_name}"
        v_internal_facility_id = row_dict["InternalFacilityId"]
        v_internal_client_id = row_dict["InternalClientId"]
        v_datasource_id = row_dict["DataSourceId"]

        query_str = hist_query_str if is_historical_flag else inc_query_str
        placeholder_dict = {
            "UC_SchemaName": uc_schema_name,
            "TableName": table_name,
            "SiteId": site_id,
            "WatermarkValue": f"'{watermark_value}'",
        }
        formatted_query_str = format_query(query_str, placeholder_dict, uc_schema_name, table_name)
        log_messages.append(f"query = {formatted_query_str}")
        df = spark.sql(formatted_query_str)
        db_name = "db_common" if v_internal_client_id == 0 else f"db_{v_internal_client_id}"
        table_path_str = f"{v_unity_catalog}.{db_name}.{datasource_name}_ods_{table_name}"   
        df = add_audit_columns(df, v_internal_client_id, v_internal_facility_id, v_timestamp, uc_schema_name)
        row_count = df.count()
        log_messages.append(f"DF record count = {row_count}")
        
        merge_condition_str = get_merge_condition(pk_str, df, v_internal_facility_id)
        log_messages.append(f"Merge_condition = {merge_condition_str}")
        v_partition_keys = get_partition_keys(df)

        table_exists = spark.catalog.tableExists(table_path_str)

        if v_partition_keys and table_exists:
            v_needs_partition = check_partitions(table_path_str, v_partition_keys)
            if v_needs_partition:
                change_partition_of(table_path_str, v_partition_keys, v_internal_client_id, v_internal_facility_id, v_timestamp, uc_schema_name)

        if not table_exists:
            v_table_state = 'created'
            try:
                overwrite_table_with_partition(df, table_path_str, v_partition_keys)
            except:
                handle_merge(df, table_path_str, merge_condition_str, v_partition_keys, v_internal_facility_id, is_historical_flag, uc_schema_name, pk_str)
                v_table_state = 'updated'
            log_messages.append(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ==> Table {table_path_str} was {v_table_state} successfully.")
            update_process_status_facility_detail('S',
                                            v_product_id,
                                            v_internal_client_id,
                                            v_internal_facility_id,
                                            step_name_str,
                                            v_datasource_id,
                                            v_environment)
            return log_messages
        else:
            handle_merge(df, table_path_str, merge_condition_str, v_partition_keys, v_internal_facility_id, is_historical_flag, uc_schema_name, pk_str)
            log_messages.append(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ==> Table {table_path_str} updated successfully.")
            update_process_status_facility_detail('S',
                                            v_product_id,
                                            v_internal_client_id,
                                            v_internal_facility_id,
                                            step_name_str,
                                            v_datasource_id,
                                            v_environment)
            return log_messages

    except Exception as e:
        log_messages.append(f"Exception occurred: {str(e)}")
        update_process_status_facility_detail('F',
                                                v_product_id,
                                                v_internal_client_id,
                                                v_internal_facility_id,
                                                step_name_str,
                                                v_datasource_id,
                                                v_environment)
        err = ValueError(f"Error occurred: {str(e)}")
        err.logs = log_messages
        raise err 

In [0]:
def process_row_with_retries(row_dict, v_unity_catalog, v_product_id, v_timestamp,
                             overwrite_table_list, max_retries=3):
    attempt = 1
    outer_logs = []


    outer_logs.append(
        f"==============> Processing row: {row_dict['UC_SchemaName']}.{row_dict['TableName']}, "
        f"ClientId={row_dict['InternalClientId']}, SiteId={row_dict['SiteId']}"
    )

    while True:
        inner_logs = []
        try:

            inner_logs = process_row(row_dict, v_unity_catalog, v_product_id, v_timestamp, overwrite_table_list) or []

            # ✅ Success: full logs on first attempt, short summary on retries
            if attempt == 1:
                outer_logs.extend(inner_logs)
            else:
                candidates = [l for l in inner_logs if ("updated successfully" in l or "was " in l or "ERROR:" in l)]
                outer_logs.append(candidates[-1] if candidates else inner_logs[-1])

            outer_logs.append(f"[Attempt {attempt}/{max_retries}] ✅ {row_dict['UC_SchemaName']}.{row_dict['TableName']}")
            print("\n".join(outer_logs))
            return {"status": "success", "attempts": attempt, "logs": outer_logs}

        except Exception as e:
            # ✅ Failure: harvest + keep only a short result line
            inner_logs = getattr(e, "logs", inner_logs) or []
            candidates = [l for l in inner_logs if ("ERROR:" in l or "updated successfully" in l or "was " in l)]
            summary_line = candidates[-1] if candidates else (inner_logs[-1] if inner_logs else None)
            if summary_line:
                outer_logs.append(summary_line)

            if attempt >= max_retries:
                outer_logs.append(f"[Attempt {attempt}/{max_retries}] ❌ {row_dict['UC_SchemaName']}.{row_dict['TableName']} failed")
                print("\n".join(outer_logs))
                return {"status": "failed", "error": str(e), "attempts": attempt, "logs": outer_logs}

            outer_logs.append(f"[Attempt {attempt}/{max_retries}] ⚠️ {row_dict['UC_SchemaName']}.{row_dict['TableName']} error: {e} — retrying in 5s")
            time.sleep(5)
            attempt += 1

In [0]:
with concurrent.futures.ThreadPoolExecutor() as v_executor:
    v_futures_list = [
        v_executor.submit(
            process_row_with_retries,
            row,
            v_unity_catalog,
            v_product_id,
            v_timestamp,
            v_overwrite_table_list,
            max_retries=3
        )
        for row in v_execution_target_obj_df.distinct().collect()
    ]

    concurrent.futures.wait(v_futures_list)

    v_results = []
    for future in v_futures_list:
        try:
            v_results.append(future.result())  
        except Exception as e:
            print(f"[Thread] unexpected error: {e}")
