In [0]:
%sql
USE CATALOG hive_metastore;
USE mini_proj_logs;


In [0]:
# dbutils.fs.rm("s3a://mini-project-sathwik/logs/etl_log", True)


In [0]:
%run ./05_Logging_Utility


In [0]:
# Databricks Notebook
# ---------------------------------------------------------
# Notebook 02: Clean, Transform and Prepare Silver Data
# ---------------------------------------------------------

from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# ---------------------------------------------
# Paths
# ---------------------------------------------
bronze_patient_path = "s3a://mini-project-sathwik/bronze/patient_bronze"
bronze_insurance_path = "s3a://mini-project-sathwik/bronze/insurance_bronze"

silver_patient_path = "s3a://mini-project-sathwik/silver/patient_silver"
silver_insurance_path = "s3a://mini-project-sathwik/silver/insurance_silver"

In [0]:
# ---------------------------------------------
# 1. Read Bronze Tables
# ---------------------------------------------
df_patient_raw = spark.read.format("delta").load(bronze_patient_path)
df_insurance_raw = spark.read.format("delta").load(bronze_insurance_path)

print("Bronze Patient Schema:")
df_patient_raw.printSchema()

In [0]:
# ---------------------------------------------
# 2. Clean + Transform Patient Data (Silver)
# ---------------------------------------------
df_patient_silver = (
    df_patient_raw
    .withColumn("patient_id", trim(col("patient_id")))
    .withColumn("name", trim(col("name")))
    .withColumn("age", col("age").cast("int"))
    .withColumn("address", trim(col("address")))
    .withColumn("phone_number", regexp_replace(trim(col("phone_number")), "\\s+", ""))
    .withColumn("bill_amount", col("bill_amount").cast("double"))
    .withColumn("insurance_provider", trim(col("insurance_provider")))
    .withColumn("policy_id", trim(col("policy_id")))

    # Split name into first_name and last_name
    .withColumn("first_name", split(col("name"), " ").getItem(0))
    .withColumn("last_name", split(col("name"), " ").getItem(1))

    # Business key (MD5)
    .withColumn("primary_key", md5(concat_ws("|", col("patient_id"), col("name"))))

    # Checksum of all changeable attributes
    .withColumn(
        "checksum_txt",
        md5(concat_ws("|",
            col("address"),
            col("age"),
            col("phone_number"),
            col("bill_amount"),
            col("insurance_provider"),
            col("policy_id")
        ))
    )

    .withColumn("create_timestamp", current_timestamp())
    .withColumn("update_timestamp",lit("9999-12-31 23:59:59").cast("timestamp"))
    .withColumn("load_ctl_key", date_format(current_timestamp(), "yyyyMMddHHmmss"))
    .withColumn("start_date", current_timestamp())
    .withColumn("end_date", lit("9999-12-31"))
)

df_patient_silver.createOrReplaceTempView("patient_landing_cleaned")

print("Silver Patient Schema:")
df_patient_silver.printSchema()

In [0]:
# ---------------------------------------------
# 3. Write Silver Patient Delta Table
# ---------------------------------------------
dbutils.fs.rm(silver_patient_path, True)
df_patient_silver.write.mode("overwrite").format("delta").save(silver_patient_path)

In [0]:
spark.sql("DROP TABLE IF EXISTS patient_silver")
spark.sql("""
    CREATE TABLE IF NOT EXISTS patient_silver
    USING DELTA
    LOCATION 's3a://mini-project-sathwik/silver/patient_silver'
""")

print("Silver Patient Table Created Successfully.")

In [0]:
# ---------------------------------------------
# 4. Insurance Silver (Simple Clean)    abc101  abc101 a-b-c101(insurance_id) 
# ---------------------------------------------
from pyspark.sql.functions import (
    col, trim, current_timestamp, lit, date_format,
    md5, concat_ws
)

df_insurance_silver = (
    df_insurance_raw
        .withColumn("policy_id", trim(col("policy_id")))
        .withColumn("insurance_provider", trim(col("insurance_provider")))
        .withColumn("claim_status", trim(col("claim_status")))
        .withColumn("amount_covered", col("amount_covered").cast("double"))
        .withColumn("checksum_txt", md5(concat_ws("|",
            col("insurance_provider"),
            col("amount_covered"),
            col("claim_status")
        )))
        .withColumn("primary_key", md5(col("policy_id")))
        .withColumn("create_timestamp", current_timestamp())
        .withColumn("update_timestamp",lit("9999-12-31 23:59:59").cast("timestamp"))
        .withColumn("load_ctl_key", date_format(current_timestamp(), "yyyyMMddHHmmss"))
        .withColumn("start_date", current_date())
        .withColumn("end_date", lit("9999-12-31"))
)

dbutils.fs.rm(silver_insurance_path, True)
df_insurance_silver.write.mode("overwrite").format("delta").save(silver_insurance_path)


In [0]:
spark.sql("DROP TABLE IF EXISTS insurance_silver")
spark.sql("""
    CREATE TABLE IF NOT EXISTS insurance_silver
    USING DELTA
    LOCATION 's3a://mini-project-sathwik/silver/insurance_silver'
""")

print("Silver Insurance Table Created Successfully.")

In [0]:
%sql
-- ---------------------------------------------
-- 1. Load Silver Table into Temp View
-- ---------------------------------------------
CREATE OR REPLACE TEMP VIEW patient_silver AS
SELECT *
FROM delta.`s3a://mini-project-sathwik/silver/patient_silver`;

In [0]:
%sql
desc patient_silver;

In [0]:
%sql
-- ---------------------------------------------
-- 1. Load Silver Table into Temp View
-- ---------------------------------------------
CREATE OR REPLACE TEMP VIEW insurance_silver AS
SELECT *
FROM delta.`s3a://mini-project-sathwik/silver/insurance_silver`;

In [0]:
# %sql
# DROP TABLE IF EXISTS patient_dim;

In [0]:
%sql
CREATE TABLE if not exists patient_dim (
    patient_id STRING,
    first_name STRING,
    last_name STRING,
    age INT,
    address STRING,
    phone_number STRING,
    bill_amount DOUBLE,
    insurance_provider STRING,
    policy_id STRING,
    checksum_txt STRING,
    load_ctl_key STRING,
    create_timestamp TIMESTAMP,
    update_timestamp TIMESTAMP,
    is_current STRING
)
USING DELTA
LOCATION 's3a://mini-project-sathwik/gold/patient_dim'
;


In [0]:
# %sql
# DROP TABLE IF EXISTS insurance_dim;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS insurance_dim (
    policy_id STRING,                -- business key
    insurance_provider STRING,
    amount_covered DOUBLE,
    claim_status STRING,
    checksum_txt STRING,             -- detect changes
    load_ctl_key STRING,                -- audit field
    create_timestamp TIMESTAMP,    -- SCD2 validity start
    update_timestamp TIMESTAMP,      -- SCD2 validity end
    is_current STRING                -- 'Y' or 'N'
)
USING DELTA
LOCATION 's3a://mini-project-sathwik/gold/insurance_dim'
;


In [0]:
%sql
-- 3. Create Transaction Indicator (I/U/N)
-- ---------------------------------------------
CREATE OR REPLACE TABLE mini_proj_logs.patient_silver_addTranscationindicator 
using delta
LOCATION "s3a://mini-project-sathwik/silver/patient_silver_addTransactionIndicator"
AS
SELECT
    src.*,
    CASE
        WHEN dim.patient_id IS NULL THEN 'I'
        WHEN dim.checksum_txt <> src.checksum_txt THEN 'U'
        ELSE 'N'
    END AS transaction_ind
FROM patient_silver src
LEFT JOIN patient_dim dim
    ON src.patient_id = dim.patient_id
    AND dim.is_current = 'Y';

In [0]:
%sql
-- 3. Create Transaction Indicator (I/U/N) for Insurance
-- -----------------------------------------------------
CREATE OR REPLACE TABLE mini_proj_logs.insurance_silver_addTranscationindicator 
using delta
LOCATION "s3a://mini-project-sathwik/silver/insurance_silver_addTransactionIndicator" 
AS
SELECT
    src.*,
    CASE
        WHEN dim.policy_id IS NULL THEN 'I'              
        WHEN dim.checksum_txt <> src.checksum_txt THEN 'U'  
        ELSE 'N'                                         
    END AS transaction_ind
FROM insurance_silver src
LEFT JOIN insurance_dim dim
    ON src.policy_id = dim.policy_id
    AND dim.is_current = 'Y';



In [0]:
%sql
DROP TABLE IF EXISTS patient_prefinal_silver;

In [0]:
%sql
DROP TABLE IF EXISTS insurance_prefinal_silver;

In [0]:
# %sql
# drop table if exists patient_final_silver;


In [0]:
# %sql
# drop table if exists insurance_final_silver;

In [0]:
dbutils.fs.rm("s3a://mini-project-sathwik/silver/patient_prefinal_silver", recurse=True)
dbutils.fs.rm("s3a://mini-project-sathwik/silver/insurance_prefinal_silver", recurse=True)


In [0]:
%sql
-- ---------------------------------------------------
-- 2. Create Patient Final Silver as Delta Table
-- ---------------------------------------------------
CREATE TABLE IF NOT EXISTS patient_prefinal_silver
USING DELTA
LOCATION 's3a://mini-project-sathwik/silver/patient_prefinal_silver'
AS
SELECT * EXCEPT (rn)
FROM (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY patient_id
               ORDER BY row_sequence DESC
           ) AS rn
    FROM mini_proj_logs.patient_silver_addTranscationindicator
) t
WHERE rn = 1;




In [0]:
%sql
select * from patient_prefinal_silver;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS patient_final_silver
USING DELTA
LOCATION 's3a://mini-project-sathwik/silver/patient_final_silver'
AS 
SELECT *
FROM patient_prefinal_silver
WHERE 1 = 0;


In [0]:
table = "patient_final_silver"
column = "activation_ind"

# Get list of columns from table
cols = [c.name for c in spark.table(table).schema]

if column not in cols:
    spark.sql(f"ALTER TABLE {table} ADD COLUMN {column} STRING")
    print(f"Column '{column}' added.")
else:
    print(f"Column '{column}' already exists — nothing to do.")


In [0]:
%sql
desc patient_prefinal_silver

In [0]:
%sql
SHOW TABLES IN mini_proj_logs;


In [0]:
from datetime import date
batch_id = date.today().strftime("%Y-%m-%d")
file_name = "patient_batch1"

try:
    spark.sql("""
        MERGE INTO patient_final_silver AS tgt
        USING patient_prefinal_silver AS src
        ON tgt.patient_id = src.patient_id
        AND tgt.end_date = '9999-12-31'

        WHEN MATCHED AND tgt.checksum_txt <> src.checksum_txt THEN
          UPDATE SET
             tgt.end_date = current_date(),
             tgt.update_timestamp = current_timestamp(),
             tgt.activation_ind = 'U'

        WHEN MATCHED AND tgt.checksum_txt = src.checksum_txt THEN
          UPDATE SET
             tgt.transaction_ind = 'N'
    """)

    log_etl(
        pipeline_name="Patient Silver - UPDATE",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="SUCCESS",
        row_count=0,
        message="UPDATE step completed"
    )

except Exception as e:
    log_etl(
        pipeline_name="Patient Silver - UPDATE",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="FAILED",
        row_count=0,
        message=str(e)
    )
    raise e


In [0]:
from datetime import date
batch_id = date.today().strftime("%Y-%m-%d")
file_name="patient_batch1"
try:
    spark.sql("""
    MERGE INTO patient_final_silver AS tgt
    USING patient_prefinal_silver AS src
    ON tgt.patient_id = src.patient_id
    AND tgt.end_date = '9999-12-31'                 -- match active rows
    WHEN NOT MATCHED THEN
    INSERT (
        patient_id,
        name,
        age,
        address,
        phone_number,
        bill_amount,
        insurance_provider,
        policy_id,
        ingest_time,
        source_file,
        row_sequence,
        first_name,
        last_name,
        primary_key,
        checksum_txt,
        create_timestamp,
        update_timestamp,
        load_ctl_key,
        transaction_ind,
        activation_ind,
        start_date,
        end_date
    )
    VALUES (
        src.patient_id,
        src.name,
        src.age,
        src.address,
        src.phone_number,
        src.bill_amount,
        src.insurance_provider,
        src.policy_id,
        src.ingest_time,
        src.source_file,
        src.row_sequence,
        src.first_name,
        src.last_name,
        src.primary_key,
        src.checksum_txt,
        current_timestamp(),
        src.update_timestamp,
        src.load_ctl_key,
        src.transaction_ind,
        'I',                    -- new active version
        current_date(),
        '9999-12-31'
    );
    """)
    log_etl(
        pipeline_name="Patient Silver - INSERT",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="SUCCESS",
        row_count=0,
        message="INSERT step completed"
    )
except Exception as e:
    log_etl(
        pipeline_name="Patient Silver - INSERT",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="FAILED",
        row_count=0,
        message=str(e)
    )
    raise e

In [0]:

from datetime import date
batch_id = date.today().strftime("%Y-%m-%d")
file_name="patient_batch1"
try:
    spark.sql("""
    UPDATE patient_final_silver tgt
    SET 
        tgt.activation_ind = 'D',
        tgt.update_timestamp = current_timestamp()
    WHERE tgt.end_date = '9999-12-31'      -- only active rows
    AND tgt.patient_id NOT IN (SELECT patient_id FROM patient_prefinal_silver);
    """)
    log_etl(
        pipeline_name="Patient Silver - DELETE",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="SUCCESS",
        row_count=0,
        message="DELETE step completed"
    )
except Exception as e:
    log_etl(
        pipeline_name="Patient Silver - DELETE",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="FAILURE",
        row_count=0,
        message="DELETE step failed"
    )
    


In [0]:
%sql
select * from patient_final_silver;

In [0]:
%sql
DROP TABLE IF EXISTS insurance_prefinal_silver; CREATE TABLE IF NOT EXISTS insurance_prefinal_silver
USING DELTA
LOCATION 's3a://mini-project-sathwik/silver/insurance_prefinal_silver'
AS
SELECT * EXCEPT (rn)
FROM (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY policy_id
               ORDER BY row_sequence DESC
           ) AS rn
    FROM mini_proj_logs.insurance_silver_addTranscationindicator
) t
WHERE rn = 1;


In [0]:
%sql
CREATE TABLE IF NOT EXISTS insurance_final_silver
USING DELTA
LOCATION 's3a://mini-project-sathwik/silver/insurance_final_silver'
AS 
SELECT 
    *
FROM insurance_prefinal_silver
WHERE 1 = 0;


In [0]:
table = "insurance_final_silver"
column = "activation_ind"

# Get list of columns from table
cols = [c.name for c in spark.table(table).schema]

if column not in cols:
    spark.sql(f"ALTER TABLE {table} ADD COLUMN {column} STRING")
    print(f"Column '{column}' added.")
else:
    print(f"Column '{column}' already exists — nothing to do.")


In [0]:
from datetime import date
batch_id = date.today().strftime("%Y-%m-%d")
file_name = "insurance_batch1"

try:
    spark.sql("""
        MERGE INTO insurance_final_silver AS tgt
        USING insurance_prefinal_silver AS src
        ON tgt.policy_id = src.policy_id
        AND tgt.end_date = '9999-12-31'

        WHEN MATCHED AND tgt.checksum_txt <> src.checksum_txt THEN
          UPDATE SET
            tgt.end_date         = current_date(),
            tgt.update_timestamp = current_timestamp(),
            tgt.activation_ind   = 'U'

        WHEN MATCHED AND tgt.checksum_txt = src.checksum_txt THEN
          UPDATE SET
            tgt.transaction_ind = 'N'
    """)

    log_etl(
        pipeline_name="Insurance Silver - UPDATE",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="SUCCESS",
        row_count=0,
        message="UPDATE step completed"
    )

except Exception as e:
    log_etl(
        pipeline_name="Insurance Silver - UPDATE",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="FAILURE",
        row_count=0,
        message=str(e)
    )
    raise e


In [0]:
from datetime import date
batch_id = date.today().strftime("%Y-%m-%d")
file_name="insurance_batch1"
try:
    spark.sql("""
    MERGE INTO insurance_final_silver AS tgt
    USING insurance_prefinal_silver AS src
    ON tgt.policy_id = src.policy_id
    AND tgt.end_date = '9999-12-31'  
    WHEN NOT MATCHED THEN
    INSERT (
        policy_id,
        insurance_provider,
        amount_covered,
        claim_status,
        ingest_time,
        source_file,
        row_sequence,
        checksum_txt,
        primary_key,
        create_timestamp,
        update_timestamp,
        load_ctl_key,
        transaction_ind,
        activation_ind,
        start_date,
        end_date
    )
    VALUES (
        src.policy_id,
        src.insurance_provider,
        src.amount_covered,
        src.claim_status,
        src.ingest_time,
        src.source_file,
        src.row_sequence,
        src.checksum_txt,
        src.primary_key,
        src.create_timestamp,
        src.update_timestamp,
        src.load_ctl_key,
        src.transaction_ind,
        'I',
        current_date(),
        '9999-12-31'
    );
    """)
    log_etl(
        pipeline_name="Insurance Silver - INSERT",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="SUCCESS",
        row_count=0,
        message="INSERT step completed"
    )
except Exception as e:
    log_etl(
        pipeline_name="Insurance Silver - INSERT",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="FAILURE",
        row_count=0,
        message="INSERT step failed"
    )


In [0]:
from datetime import date
batch_id = date.today().strftime("%Y-%m-%d")
file_name="insurance_batch1"
try:
    spark.sql("""
    UPDATE insurance_final_silver tgt
    SET 
        tgt.activation_ind = 'D',
        tgt.update_timestamp = current_timestamp()
    WHERE tgt.end_date = '9999-12-31'   -- only active rows
    AND tgt.policy_id NOT IN (SELECT policy_id FROM insurance_prefinal_silver);
    """)
    log_etl(
        pipeline_name="Insurance Silver - DELETE",
        batch_id=batch_id,
        file_name=file_name,
        layer="silver",
        status="SUCCESS",
        row_count=0,
        message="DELETE step completed"
    )
except Exception as e:
    log_etl(
        pipeline_name="Insurance Silver - DELETE",
        batch_id=batch_id,       
        file_name=file_name,
        layer="silver",
        status="FAILURE",
        row_count=0,
        message="DELETE step failed"
    )


In [0]:
%sql
SELECT patient_id, transaction_ind, checksum_txt
FROM mini_proj_logs.patient_silver_addTranscationindicator;


In [0]:
%sql
SELECT *
FROM patient_final_silver;


In [0]:
%sql
SELECT *
FROM insurance_final_silver;


In [0]:
spark.sql("DESCRIBE patient_final_silver").show()