In [0]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.functions import current_timestamp, to_date, col, when, row_number
from pyspark.sql.window import Window

In [0]:
# List files in the S3 folder
display(dbutils.fs.ls("s3://miniprojectinputfiles/"))


In [0]:
display(dbutils.fs.ls("s3://miniprojectinputfiles/Patients_Data/"))
display(dbutils.fs.ls("s3://miniprojectinputfiles/Payers_Data/"))

In [0]:
source_path = "s3://miniprojectinputfiles/Patients_Data/"
checkpoint_path = "s3://miniprojectinputfiles/CheckPointPathPatient/"
bronze_checkpoint = "s3://miniprojectinputfiles/BronzeCPPatient/"

In [0]:
from datetime import datetime

def log_event(layer, table_name, event, record_count=0):
    log_df = spark.createDataFrame(
        [
            (
                layer,
                table_name,
                event,
                datetime.now().isoformat(),
                record_count
            )
        ],
        ["layer", "table_name", "event", "event_time", "record_count"]
    )

    log_df.write.mode("append").saveAsTable("logs")


In [0]:
log_event(
    layer="BRONZE",
    table_name="bronze_patient_dtable",
    event="START"
)

In [0]:
patient_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.schemaLocation", checkpoint_path)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("mergeSchema", "true")
        .load(source_path)
)

clean_cols = [col_name.strip().replace(" ", "_").upper() for col_name in patient_df.columns]
patient_df = patient_df.toDF(*clean_cols)
patient_df = patient_df.withColumn("LOAD_CTL_KEY", current_timestamp())
patient_df = (
    patient_df
        .withColumn("BIRTHDATE", to_date("BIRTHDATE", "M/d/yyyy"))
        .withColumn("DEATHDATE", to_date("DEATHDATE", "M/d/yyyy"))
)
(
        patient_df.writeStream
            .format("delta")
            .option("checkpointLocation", bronze_checkpoint)
            .outputMode("append")
            .trigger(once=True)
            .toTable("bronze_patient_dtable")
    )


In [0]:
%sql
select * from bronze_patient_dtable

In [0]:
bronze_count = spark.table("bronze_patient_dtable").count()

log_event(
    layer="BRONZE",
    table_name="bronze_patient_dtable",
    event="END",
    record_count=bronze_count
)


In [0]:
%sql
--drop table if exists bronze_patient_dtable;

In [0]:
import re

In [0]:
source_pathpayer = "s3://miniprojectinputfiles/Payers_Data/"
checkpoint_pathpayer = "s3://miniprojectinputfiles/CheckPointPathPayer/"
bronze_checkpointpayer = "s3://miniprojectinputfiles/BronzeCPPayer/"

In [0]:
payer_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.schemaLocation", checkpoint_pathpayer)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("mergeSchema", "true")
        .load(source_pathpayer)
)

clean_cols = [col_name.strip().replace(" ", "_").upper() for col_name in payer_df.columns]
payer_df = payer_df.toDF(*clean_cols)
payer_df = payer_df.withColumn("LOAD_CTL_KEY", current_timestamp())


(
    payer_df.writeStream
        .format("delta")
        .option("checkpointLocation", bronze_checkpointpayer)
        .outputMode("append")
        .trigger(once=True)
        .toTable("bronze_payer_dtable")
)



In [0]:
%sql
select * from bronze_payer_dtable;

In [0]:
%sql
--%sql
--drop table if exists bronze_patient_dtable;

--drop table bronze_payer_dtable