In [0]:
%run ./Log_Files

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *

In [0]:
# from datetime import datetime

# def log_event(layer, table_name, event, record_count=0):
#     log_df = spark.createDataFrame(
#         [(layer,
#           table_name,
#           event,
#           datetime.now().isoformat(),
#           record_count
#           )
#         ],
#         ["layer", "table_name", "event", "event_time", "record_count"]
#     )

#     log_df.write.mode("append").saveAsTable("logs")

In [0]:
# log_event(
#     layer="SILVER",
#     table_name="silver_patient_dtable",
#     event="START"
# )

In [0]:
from datetime import datetime

log_event_file(
    pipeline_name="Patient_Insurance_SCD2",
    notebook_name="Staging_and_SCD2",
    layer_name="Silver",
    status="STARTED",
    message="Silver pipeline execution started",
    record_count=0,
    event_time=datetime.now()
)



In [0]:
bronze = spark.table("bronze_patient_dtable")
patient_silver = (
        bronze.withColumn(
            "ActivationInd",
            when(col("ACTIONIND") == 'D', "Deactive")
            .otherwise("Active")
        )
    )

In [0]:
try:
    # ---------------- CLEANING ----------------
    patient_silver = (
        patient_silver
        .withColumn("FIRST", regexp_replace(col("FIRST"), "[0-9]", ""))
        .withColumn("LAST", regexp_replace(col("LAST"), "[0-9]", ""))
        .withColumn("FULL_NAME", concat_ws(" ", "FIRST", "LAST"))
        .withColumn("REASONDESCRIPTION",
                    when(col("REASONDESCRIPTION").isNull(), "Not Specified")
                    .otherwise(col("REASONDESCRIPTION")))
        .withColumn("PATIENT_KEY", md5(col("PATIENTID")))
        .withColumn(
            "CHK_SUM_TXT",
            md5(concat_ws(
                "||",
                col("FIRST"), col("BIRTHDATE"), col("DEATHDATE"),
                col("LAST"), col("MARITAL"), col("GENDER"),
                col("ADDRESS"), col("CITY"), col("STATE"),
                col("COUNTRY"), col("HOSPITALNAME"),
                col("REASONDESCRIPTION"), col("PAYERID")
            ))
        )
        .withColumn("CURR_RCD_IND", lit("Y"))
        .withColumn("CRET_TS", current_timestamp())
        .withColumn("UPDT_TS", to_timestamp(lit("9999-12-31")))
    )

    # ---------------- DEDUP BASED ON BUSINESS COLS ----------------
    business_cols = [
        "PATIENTID","BIRTHDATE","DEATHDATE","FIRST","LAST","MARITAL","GENDER",
        "BIRTHPLACE","ADDRESS","CITY","STATE","COUNTRY","HOSPITALNAME",
        "PAYERID","REASONDESCRIPTION","ACTIONIND"
    ]

    w = Window.partitionBy(business_cols).orderBy(col("LOAD_CTL_KEY").desc())

    patient_sliver_dedup = (
        patient_silver
        .withColumn("rn", row_number().over(w))
        .filter("rn = 1")
        .drop("rn")
    )

    # ---------------- PRIORITY LOGIC (U > I > D) ----------------
    priority_window = Window.partitionBy(
        "PATIENT_KEY", "LOAD_CTL_KEY"
    ).orderBy(
        when(col("ACTIONIND") == "U", 3)
        .when(col("ACTIONIND") == "I", 2)
        .when(col("ACTIONIND") == "D", 1)
        .otherwise(0).desc()
    )

    patient_sliver_dededup = (
        patient_sliver_dedup
        .withColumn("rn", row_number().over(priority_window))
        .filter(col("rn") == 1)
        .drop("rn")
    )

    # ---------------- WRITE SILVER ----------------
    patient_sliver_dededup.write.mode("overwrite").saveAsTable("silver_patient_dtable")

    # ---------------- SUCCESS LOG ----------------
    record_count = spark.table("silver_patient_dtable").count()

    log_event_file(
        pipeline_name="Patient_Insurance_SCD2",
        notebook_name="Staging_and_SCD2",
        layer_name="Silver",
        status="SUCCESS",
        message="Silver pipeline completed successfully",
        record_count=record_count,
        event_time=datetime.now()
    )
except Exception as e:

    log_event_file(
        pipeline_name="Patient_Insurance_SCD2",
        notebook_name="Staging_and_SCD2",
        layer_name="Silver",
        status="FAILED",
        message=str(e),
        record_count=0,
        event_time=datetime.now()
    )

    raise e



In [0]:
%sql
select * from silver_patient_dtable

In [0]:
records_read = spark.table("silver_patient_dtable").count()

log_event_file(
     pipeline_name="Patient_Insurance_SCD2",
     notebook_name="Staging_and_SCD2",
     layer_name="Silver",
     status="END",
     message=f"Silver pipeline Finished",
     record_count=records_read,
event_time=datetime.now()
 )


In [0]:
# bronze = spark.table("bronze_patient_dtable")
# patient_silver = (
#     bronze.withColumn(
#         "ActivationInd",
#         when(col("ACTIONIND") == 'D', "Deactive")
#         .otherwise("Active")
#     ))

In [0]:
display(patient_silver)

In [0]:
#  # Read Bronze Patient Table
# patient_silver = (
#     patient_silver
#     .withColumn("FIRST", regexp_replace(col("FIRST"), "[0-9]", ""))
#     .withColumn("LAST",  regexp_replace(col("LAST"),  "[0-9]", ""))
#     .withColumn("FULL_NAME", concat_ws(" ", "FIRST", "LAST"))
#     .withColumn("REASONDESCRIPTION", when(col("REASONDESCRIPTION").isNull(), "Not Specified").otherwise(col("REASONDESCRIPTION")))
#     # Primary Key
#     .withColumn("PATIENT_KEY", md5(col("PATIENTID")))
#     # Checksum
#     .withColumn(
#         "CHK_SUM_TXT",
#         md5(
#             concat_ws(
#                 "||",
#                 col("FIRST"),
#                 col("BIRTHDATE"),
#                 col("DEATHDATE"),
#                 col("LAST"),
#                 col("MARITAL"),
#                 col("GENDER"),
#                 col("ADDRESS"),
#                 col("CITY"),
#                 col("STATE"),
#                 col("COUNTRY"),
#                 col("HOSPITALNAME"),
#                 col("REASONDESCRIPTION"),
#                 col("PAYERID")
#             )
#         )
#     )
#     #Transaction code
#     # .withColumn("TransactionCode", lit("I"))        
 
#     # Current Record Indicator 
#     .withColumn("CURR_RCD_IND", lit("Y")) 
 
#     #Row Created timestamp
#     .withColumn("CRET_TS", current_timestamp())
 
#     #Row Update timestamp (9999-12-31 as timestamp)
#     .withColumn("UPDT_TS", to_timestamp(lit("9999-12-31"))
# )
# )


**DEDUPLICATION OF PATIENTS RECORDS**

In [0]:
# from pyspark.sql import Window
# from pyspark.sql.functions import row_number

# business_cols = [
#     "PATIENTID","BIRTHDATE","DEATHDATE","FIRST","LAST","MARITAL","GENDER",
#     "BIRTHPLACE","ADDRESS","CITY","STATE","COUNTRY","HOSPITALNAME","PAYERID",
#     "REASONDESCRIPTION","ACTIONIND"
# ]

# w = Window.partitionBy(business_cols).orderBy(col("LOAD_CTL_KEY").desc())

# patient_sliver_dedup = patient_silver\
#     .withColumn("rn", row_number().over(w)) \
#     .filter("rn = 1") \
#     .drop("rn")



In [0]:
# display(patient_sliver_dedup)

In [0]:
# priority_window = Window.partitionBy(
#     "PATIENT_KEY",
#     "LOAD_CTL_KEY"          # priority only within same load
# ).orderBy(
#     when(col("ACTIONIND") == "U", 3)
#     .when(col("ACTIONIND") == "I", 2)
#     .when(col("ACTIONIND") == "D", 1)
#     .otherwise(0)
#     .desc()
# )

# patient_sliver_dededup = (
#     patient_sliver_dedup
#     .withColumn("rn", row_number().over(priority_window))
#     .filter(col("rn") == 1)
#     .drop("rn")
# )


In [0]:
# display(patient_sliver_dededup)

In [0]:
# patient_sliver_dededup.write.mode("overwrite").saveAsTable("silver_patient_dtable")


In [0]:
# %sql
# select * from silver_patient_dtable

In [0]:
# silver_count = spark.table("silver_patient_dtable").count()

# log_event(
#     layer="SILVER",
#     table_name="silver_patient_dtable",
#     event="END",
#     record_count=silver_count
# )

In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS gold_patient_scd2_dtable (
    PATIENTID STRING,
    FIRST STRING,
    LAST STRING,
    FULL_NAME STRING,
    BIRTHDATE  DATE,
    DEATHDATE DATE,
    MARITAL STRING,
    GENDER STRING,
    ADDRESS STRING,
    CITY STRING,
    STATE STRING,
    COUNTRY STRING,
    HOSPITAL_NAME STRING,
    REASONDESCRIPTION STRING,
    PAYERID STRING,
    CURR_RCD_IND STRING,
    CRET_TS TIMESTAMP,
    UPDT_TS TIMESTAMP,
    LOAD_CTL_KEY TIMESTAMP,
    PATIENT_KEY STRING,
    CHK_SUM_TXT STRING    
  
)
USING delta
""")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
bronze = spark.table("bronze_payer_dtable")
payer_silver = (
    bronze
    .withColumn("PHONE", when(col("PHONE").isNull(), "000-000-0000")
                          .otherwise(col("PHONE")))
    .withColumn("ZIP", when(col("ZIP").isNull(), "00000")
                        .otherwise(col("ZIP")))
    .withColumn("ADDRESS", when(col("ADDRESS").isNull(), "NOT SPECIFIED")
                        .otherwise(col("ADDRESS")))
    .withColumn("STATE_HEADQUARTERED", when(col("STATE_HEADQUARTERED").isNull(), "NOT SPECIFIED").otherwise(col("STATE_HEADQUARTERED")))
    .withColumn("CITY",when(col("CITY").isNull(), "NOT SPECIFIED").otherwise(col("CITY")))
    # BUSINESS KEYS
    .withColumn("PAYER_KEY", md5(col("PAYERID")))
    .withColumn("CHK_SUM_TXT",
        md5(concat_ws("||",
            col("NAME"),
            col("ADDRESS"),
            col("CITY"),
            col("STATE_HEADQUARTERED"),
            col("ZIP"),
            col("PHONE")
        ))
    )

    # SCD2 METADATA
    .withColumn("TransactionCode", lit("I"))
    .withColumn("CURR_RCD_IND", lit("Y")) 
    .withColumn(
    "ActivationInd",
    when(col("NAME") == "NO_INSURANCE", "Inactive")
    .otherwise("Active"))
    .withColumn("CRET_TS", current_timestamp())
    .withColumn("UPDT_TS", to_timestamp(lit("9999-12-31")))
)






**DEDUPLICATION OF PAYERS RECORDS**

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

business_cols = [
    "PAYERID","NAME","ADDRESS","CITY","STATE_HEADQUARTERED","ZIP","PHONE"
]

w = Window.partitionBy(business_cols).orderBy(col("LOAD_CTL_KEY").desc())

payer_sliver_dedup = payer_silver\
    .withColumn("rn", row_number().over(w)) \
    .filter("rn = 1") \
    .drop("rn")


In [0]:
# payer_sliver_dedup.write.format("delta").mode("overwrite").saveAsTable("silver_payer_dtable")

In [0]:
# %sql

# select * from workspace.default.silver_payer_dtable

In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS gold_payer_scd2_dtable (
    PAYERID STRING,
    NAME STRING,
    ADDRESS STRING,
    CITY STRING,
    STATE_HEADQUARTERED STRING,
    ZIP STRING,
    PHONE STRING,
    PAYER_KEY STRING,
    CHK_SUM_TXT STRING,
    TransactionCode STRING,
    CURR_RCD_IND STRING,
    CRET_TS TIMESTAMP,
    UPDT_TS TIMESTAMP,
    LOAD_CTL_KEY TIMESTAMP
    
)
USING delta
""")


In [0]:
%sql
--Drop table if exists gold_patient_scd2_dtable;
--Drop table if exists gold_payer_scd2_dtable

In [0]:
%sql
--drop table silver_patient_dtable
--drop table silver_payer_dtable