In [0]:
from pyspark.sql import SparkSession, functions as F
from datetime import datetime

# ---------------------------------------------
#  Initialize Spark Session
# ---------------------------------------------
spark = SparkSession.builder.appName("SilverToSQL_Load_AllTables").getOrCreate()

# ---------------------------------------------
#  Base Silver Path
# ---------------------------------------------
silver_base_path = "/mnt/bronze-mount/Silver"

# ---------------------------------------------
#  Azure SQL Connection Details
# ---------------------------------------------
sql_server_name = "azuresqlansdb.database.windows.net"
sql_database_name = "srinipocdb"
sql_user = "ANS"
sql_password = "@srinu1001"

jdbc_url = (
    f"jdbc:sqlserver://{sql_server_name}:1433;"
    f"database={sql_database_name};"
    "encrypt=true;"
    "trustServerCertificate=false;"
    "hostNameInCertificate=*.database.windows.net;"
    "loginTimeout=30;"
)

# ---------------------------------------------
#  Table Lists (by Load Type)
# ---------------------------------------------
incremental_tables = ["Patients", "Procedures", "Encounters"]
full_load_tables = ["Payers", "Organizations"]

# ---------------------------------------------
#  Process Incremental Tables
# ---------------------------------------------
for table_name in incremental_tables:
    print(f"\n==============================")
    print(f"üöÄ Incremental Load: {table_name}")
    print(f"==============================")

    silver_path = f"{silver_base_path}/{table_name}"

    # üïì Step 1 ‚Äî Get Last Watermark & Incremental Column
    config_query = f"""
    SELECT LastWatermarkValue, IncrementalColumn
    FROM dbo.configtable
    WHERE SourceObjectName = '{table_name}'
      AND DestinationZone = 'Goldprestage'
    """

    watermark_df = (
        spark.read.format("jdbc")
        .option("url", jdbc_url)
        .option("query", config_query)
        .option("user", sql_user)
        .option("password", sql_password)
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
        .load()
    )

    if watermark_df.count() == 0:
        print(f"No watermark record found for {table_name}. Skipping...")
        continue

    last_watermark = watermark_df.collect()[0]["LastWatermarkValue"]
    incremental_column_ds = watermark_df.collect()[0]["IncrementalColumn"]

    print(f"üïì Last Watermark for {table_name}: {last_watermark}")
    print(f"üìÖ Incremental Column: {incremental_column_ds}")

    # Step 2 ‚Äî Read Delta Table
    try:
        df_silver = spark.read.format("delta").load(silver_path)
    except Exception as e:
        print(f"Error reading Delta table for {table_name}: {str(e)}")
        continue

    if incremental_column_ds not in df_silver.columns:
        print(f" Skipping {table_name}: Missing '{incremental_column_ds}' column.")
        continue

    #  Step 3 ‚Äî Find Max Timestamp
    max_cleaned_ts = df_silver.agg(F.max(F.col(incremental_column_ds))).collect()[0][0]
    if not max_cleaned_ts:
        print(f"No valid '{incremental_column_ds}' found for {table_name}.")
        continue

    #  Convert max_cleaned_ts to SQL-compatible string
    if isinstance(max_cleaned_ts, datetime):
        max_cleaned_ts_str = max_cleaned_ts.strftime("%Y-%m-%d %H:%M:%S")
    else:
        max_cleaned_ts_str = str(max_cleaned_ts)[:19]  # fallback

    #  Step 4 ‚Äî Filter Incremental Records
    df_incremental = df_silver.filter(
        (F.col(incremental_column_ds) > F.to_timestamp(F.lit(last_watermark)))
        & (F.col(incremental_column_ds) <= F.lit(max_cleaned_ts))
    )

    incr_count = df_incremental.count()
    print(f" Incremental Records Found: {incr_count}")

    if incr_count == 0:
        print(f"No new data for {table_name}. Skipping.")
        continue

    #  Step 5 ‚Äî Write Incremental Data to SQL
    target_table = f"dbo.{table_name}_Prestage"
    try:
        df_incremental.write.format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", target_table) \
            .option("user", sql_user) \
            .option("password", sql_password) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("append") \
            .save()

        print(f" {incr_count} records written to {target_table}.")
    except Exception as e:
        print(f"Failed writing {table_name} to SQL: {str(e)}")
        continue

    #  Step 6 ‚Äî Update Watermark using JVM JDBC (fixed timestamp format)
    try:
        conn = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(
            jdbc_url, sql_user, sql_password
        )
        stmt = conn.createStatement()

        update_sql = f"""
        UPDATE dbo.configtable
        SET LastWatermarkValue = CONVERT(datetime, '{max_cleaned_ts_str}', 120)
        WHERE SourceObjectName = '{table_name}' AND DestinationZone = 'Goldprestage'
        """

        stmt.executeUpdate(update_sql)
        conn.commit()
        conn.close()
        print(f" Watermark updated for {table_name} ‚Üí {max_cleaned_ts_str}")
    except Exception as e:
        print(f" Failed to update watermark for {table_name}: {str(e)}")

# ---------------------------------------------
#  Process Full Load Tables (Overwrite Mode)
# ---------------------------------------------
for table_name in full_load_tables:
    print(f"\n==============================")
    print(f" Full Load: {table_name}")
    print(f"==============================")

    silver_path = f"{silver_base_path}/{table_name}"

    try:
        df_silver = spark.read.format("delta").load(silver_path)
    except Exception as e:
        print(f"Error reading Delta table for {table_name}: {str(e)}")
        continue

    record_count = df_silver.count()
    print(f" Total Records to Load: {record_count}")

    target_table = f"dbo.{table_name}_Prestage"

    try:
        df_silver.write.format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", target_table) \
            .option("user", sql_user) \
            .option("password", sql_password) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .mode("overwrite") \
            .save()

        print(f"Full load completed for {table_name}: {record_count} records written.")
    except Exception as e:
        print(f" Error writing full load for {table_name}: {str(e)}")
        continue

print("\n All Silver tables processed successfully (Incremental + Full loads)!")



üöÄ Incremental Load: Patients
üïì Last Watermark for Patients: 2025-11-11 05:42:00
üìÖ Incremental Column: Last_Modified
‚úÖ Incremental Records Found: 974
üíæ 974 records written to dbo.Patients_Prestage.
‚úÖ Watermark updated for Patients ‚Üí 2025-11-11 05:42:00

üöÄ Incremental Load: Procedures
üïì Last Watermark for Procedures: 2022-01-30 02:05:37
üìÖ Incremental Column: start
‚úÖ Incremental Records Found: 0
‚ÑπÔ∏è No new data for Procedures. Skipping.

üöÄ Incremental Load: Encounters
üïì Last Watermark for Encounters: 2022-02-06 01:57:36
üìÖ Incremental Column: start
‚úÖ Incremental Records Found: 0
‚ÑπÔ∏è No new data for Encounters. Skipping.

üöÄ Full Load: Payers
‚úÖ Total Records to Load: 10
üíæ Full load completed for Payers: 10 records written.

üöÄ Full Load: Organizations
‚úÖ Total Records to Load: 1
üíæ Full load completed for Organizations: 1 records written.

üéâ All Silver tables processed successfully (Incremental + Full loads)!


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, trim, regexp_replace, when, length, lpad

# Initialize Spark session
spark = SparkSession.builder.appName("BronzeToSilver_Cleaning").getOrCreate()

# Base Bronze & Silver paths
bronze_base_path = "/mnt/bronze-mount/Bronze"
silver_base_path = "/mnt/bronze-mount/Silver"

# ---------------------------------------------------------
# Function: Get latest (Year/Month/Day) folder in ADLS
# ---------------------------------------------------------
def get_latest_folder(base_path: str):
    try:
        years = [f.name.replace("/", "") for f in dbutils.fs.ls(base_path)]
        latest_year = max(years)

        months = [f.name.replace("/", "") for f in dbutils.fs.ls(f"{base_path}/{latest_year}")]
        latest_month = max(months)

        days = [f.name.replace("/", "") for f in dbutils.fs.ls(f"{base_path}/{latest_year}/{latest_month}")]
        latest_day = max(days)

        latest_path = f"{base_path}/{latest_year}/{latest_month}/{latest_day}/"
        return latest_path
    except Exception as e:
        print(f" Error locating latest folder in {base_path}: {e}")
        return None


# ---------------------------------------------------------
# Iterate over all entities under Bronze
# ---------------------------------------------------------
entities = [f.name.replace("/", "") for f in dbutils.fs.ls(bronze_base_path)]

for entity in entities:
    entity_path = f"{bronze_base_path}/{entity}"
    latest_path = get_latest_folder(entity_path)

    if not latest_path:
        print(f"No valid folder found for {entity}")
        continue

    files = [f.name for f in dbutils.fs.ls(latest_path)]

    for file_name in files:
        file_name_clean = file_name.replace(".Parquet", "").replace(".parquet", "")
        file_path = latest_path + file_name

        # ====================================================
        #  Entity-specific cleaning logic
        # ====================================================

        # -------------------------------
        #  PATIENTS
        # -------------------------------
        if file_name_clean.lower() == "patients":
            print(f" Cleaning Patients data from {file_path}")
            df = spark.read.parquet(file_path)

            # 1Ô∏è Standardize column names
            for c in df.columns:
                df = df.withColumnRenamed(c, c.lower().strip().replace(" ", "_"))

            # 2Ô∏è Trim all string columns
            string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
            for c in string_cols:
                df = df.withColumn(c, F.trim(F.col(c)))

            # 3Ô∏è Clean ZIP codes
            if "zip" in df.columns:
                df = df.withColumn("zip", F.regexp_replace(F.col("zip").cast("string"), r"\.0$", ""))
                df = df.withColumn("zip", F.when(F.length(F.col("zip")) < 5, F.lpad(F.col("zip"), 5, "0")).otherwise(F.col("zip")))
                df = df.withColumn("zip", F.when(F.col("zip").isNull() | (F.trim(F.col("zip")) == ""), "00000").otherwise(F.col("zip")))

            # 4Ô∏è Normalize name-related columns
            for c in ["prefix", "first", "last", "maiden", "suffix"]:
                if c in df.columns:
                    df = df.withColumn(c, F.initcap(F.trim(F.col(c))))
                    default_value = "N/A" if c in ["suffix", "maiden"] else "Not Provided"
                    df = df.withColumn(c, F.when(F.col(c).isNull() | (F.trim(F.col(c)) == ""), default_value).otherwise(F.col(c)))

            # 5Ô∏è Standardize categorical columns
            for c in ["marital", "race", "ethnicity", "gender"]:
                if c in df.columns:
                    df = df.withColumn(c, F.upper(F.trim(F.col(c))))
                    df = df.withColumn(c, F.when(F.col(c).isNull() | (F.col(c) == ""), "UNKNOWN").otherwise(F.col(c)))

            # 6Ô∏è Convert BIRTHDATE/DEATHDATE to timestamp and compute AGE
            if "birthdate" in df.columns:
                df = df.withColumn("birthdate", F.to_timestamp("birthdate", "yyyy-MM-dd"))
                df = df.withColumn("age", F.floor(F.datediff(F.current_date(), F.col("birthdate")) / 365.25))
            if "deathdate" in df.columns:
                df = df.withColumn("deathdate", F.to_timestamp("deathdate", "yyyy-MM-dd"))

            # 7Ô∏è Clean address/location fields
            for c in ["address", "city", "state", "county", "birthplace"]:
                if c in df.columns:
                    df = df.withColumn(c, F.initcap(F.trim(F.regexp_replace(F.col(c), r"\s+", " "))))
                    df = df.withColumn(c, F.when(F.col(c).isNull() | (F.col(c) == ""), "Unknown").otherwise(F.col(c)))

            # 8Ô∏è Validate coordinates
            for c in ["lat", "lon"]:
                if c in df.columns:
                    df = df.withColumn(c, F.when(F.col(c).isNull(), 0.0).otherwise(F.col(c)))

            # 9Ô∏è Validate patient ID
            if "id" in df.columns:
                df = df.withColumn("valid_uuid",
                    F.when(F.regexp_replace(F.col("id"), "-", "").rlike("^[0-9a-fA-F]{32}$"), "Valid").otherwise("Invalid"))

            #  Remove non-printable characters
            for c in string_cols:
                df = df.withColumn(c, F.regexp_replace(F.col(c), "[^\\x20-\\x7E]", "")) 
            df = df.withColumn("Last_Modified", F.current_timestamp())    

            df_Patients = df
            print(f" Cleaned Patients | Records: {df_Patients.count()}")

            #  Write to Silver Delta table
            silver_path = f"{silver_base_path}/Patients"
            df_Patients.write.format("delta").mode("append").save(silver_path)
            print(f" Patients Delta table saved to {silver_path}")

        # -------------------------------
        #  PAYERS
        # -------------------------------
        elif file_name_clean.lower() == "payers":
            print(f" Cleaning Payers data from {file_path}")
            df = spark.read.parquet(file_path)

            for c in df.columns:
                df = df.withColumnRenamed(c, c.upper())

            string_cols = [f.name for f in df.schema.fields if f.dataType == StringType()]
            for c in string_cols:
                df = df.withColumn(c, trim(col(c)))

            if "ZIP" in df.columns:
                df = df.withColumn("ZIP", regexp_replace(col("ZIP").cast(StringType()), r"\.0$", ""))
                df = df.withColumn("ZIP", when(length(col("ZIP")) < 5, lpad(col("ZIP"), 5, "0")).otherwise(col("ZIP")))
                df = df.withColumn("ZIP", when(col("ZIP").isNull(), "00000").otherwise(col("ZIP")))

            if "PHONE" in df.columns:
                df = df.withColumn("PHONE", regexp_replace(col("PHONE").cast(StringType()), r"[^0-9]", ""))
                df = df.withColumn("PHONE", when(col("PHONE") == "", "Not Provided").otherwise(col("PHONE")))

            fill_defaults = {
                "ADDRESS": "Not Provided",
                "CITY": "Unknown",
                "STATE_HEADQUARTERED": "Unknown",
                "PHONE": "Not Provided",
                "ZIP": "00000"
            }
            for k, v in fill_defaults.items():
                if k in df.columns:
                    df = df.withColumn(k, when(col(k).isNull() | (trim(col(k)) == ""), v).otherwise(col(k)))

            if "NAME" in df.columns:
                df = df.withColumn("NAME", regexp_replace(trim(col("NAME")), r"\s+", " "))
            if "CITY" in df.columns:
                df = df.withColumn("CITY", regexp_replace(trim(col("CITY")), r"\s+", " "))

            if "ID" in df.columns:
                df = df.withColumn("VALID_UUID",
                    when(regexp_replace(col("ID"), "-", "").rlike("^[0-9a-fA-F]{32}$"), "Valid").otherwise("Invalid"))

            df_Payers = df
            print(f" Cleaned Payers | Records: {df_Payers.count()}")

            #  Write to Silver Delta table
            silver_path = f"{silver_base_path}/Payers"
            df_Payers.write.format("delta").mode("append").save(silver_path)
            print(f" Payers Delta table saved to {silver_path}")

        # -------------------------------
        #  PROCEDURES
        # -------------------------------
        elif file_name_clean.lower() == "procedures":
            print(f" Cleaning Procedures data from {file_path}")
            df = spark.read.parquet(file_path)

            for c in df.columns:
                df = df.withColumnRenamed(c, c.lower().strip().replace(" ", "_"))

            df = df.withColumn("start", F.to_timestamp("start", "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
                   .withColumn("stop", F.to_timestamp("stop", "yyyy-MM-dd'T'HH:mm:ss'Z'"))
            df = df.fillna({"reasoncode": 0, "reasondescription": "Unknown Reason"})

            for c in ["description", "reasondescription"]:
                if c in df.columns:
                    df = df.withColumn(c, F.trim(F.initcap(F.col(c))))
                    df = df.withColumn(c, F.regexp_replace(F.col(c), "[^\\x20-\\x7E]", ""))

            if "base_cost" in df.columns:
                df = df.withColumn("base_cost", F.when(F.col("base_cost") < 0, 0).otherwise(F.col("base_cost")))

            if all(col in df.columns for col in ["start", "stop"]):
                df = df.withColumn("duration_minutes", (F.col("stop").cast("long") - F.col("start").cast("long")) / 60)
            df = df.withColumn("Procedure_Id", F.monotonically_increasing_id())
            df_Procedures = df
            print(f" Cleaned Procedures | Records: {df_Procedures.count()}")
            

            #  Write to Silver Delta table
            silver_path = f"{silver_base_path}/Procedures"
            df_Procedures.write.format("delta").mode("append").save(silver_path)
            print(f" Procedures Delta table saved to {silver_path}")

        # -------------------------------
        # ENCOUNTERS
        # -------------------------------
        elif file_name_clean.lower() == "encounters":
            print(f" Cleaning Encounters data from {file_path}")
            df = spark.read.parquet(file_path)

            for c in df.columns:
                df = df.withColumnRenamed(c, c.lower().strip().replace(" ", "_"))

            df = df.withColumn("start", F.to_timestamp("start", "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
                   .withColumn("stop", F.to_timestamp("stop", "yyyy-MM-dd'T'HH:mm:ss'Z'"))

            fill_values = {
                "reasoncode": 0,
                "reasondescription": "Unknown Reason",
                "payer_coverage": 0,
                "base_encounter_cost": 0,
                "total_claim_cost": 0
            }
            df = df.fillna(fill_values)

            for c in ["description", "reasondescription"]:
                if c in df.columns:
                    df = df.withColumn(c, F.trim(F.initcap(F.col(c))))
                    df = df.withColumn(c, F.regexp_replace(F.col(c), "[^\\x20-\\x7E]", ""))

            if "start" in df.columns and "stop" in df.columns:
                df = df.withColumn("duration_minutes", (F.col("stop").cast("long") - F.col("start").cast("long")) / 60)

            df_Encounters = df
            print(f" Cleaned Encounters | Records: {df_Encounters.count()}")

            #  Write to Silver Delta table
            silver_path = f"{silver_base_path}/Encounters"
            df_Encounters.write.format("delta").mode("append").save(silver_path)
            print(f" Encounters Delta table saved to {silver_path}")

        # -------------------------------
        #  ORGANIZATIONS
        # -------------------------------
        elif file_name_clean.lower() == "organizations":
            print(f"Cleaning Organizations data from {file_path}")
            df = spark.read.parquet(file_path)

            for c in df.columns:
                df = df.withColumnRenamed(c, c.lower().strip().replace(" ", "_"))

            string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
            for c in string_cols:
                df = df.withColumn(c, F.trim(F.col(c)))

            if "zip" in df.columns:
                df = df.withColumn("zip", F.col("zip").cast("string"))
                df = df.withColumn("zip", F.regexp_replace(F.col("zip"), r"\.0$", ""))
                df = df.withColumn("zip", F.when(F.length(F.col("zip")) < 5, F.lpad(F.col("zip"), 5, "0")).otherwise(F.col("zip")))
                df = df.withColumn("zip", F.when(F.col("zip").isNull() | (F.col("zip") == ""), "00000").otherwise(F.col("zip")))

            if "name" in df.columns:
                df = df.withColumn("name", F.initcap(F.trim(F.regexp_replace(F.col("name"), r"\s+", " "))))
                df = df.withColumn("name", F.when(F.col("name").isNull() | (F.col("name") == ""), "Unknown Organization").otherwise(F.col("name")))

            df_Organizations = df
            print(f" Cleaned Organizations | Records: {df_Organizations.count()}")

            #  Write to Silver Delta table
            silver_path = f"{silver_base_path}/Organizations"
            df_Organizations.write.format("delta").mode("append").save(silver_path)
            print(f" Organizations Delta table saved to {silver_path}")

        else:
            print(f" No cleaning logic defined for {file_name_clean}")
