In [0]:
import pandas as pd
from pyspark.sql.functions import *

In [0]:
# List files in the S3 folder
display(dbutils.fs.ls("s3://miniprojectinputfiles/"))


In [0]:
display(dbutils.fs.ls("s3://miniprojectinputfiles/Patients_Data/"))
display(dbutils.fs.ls("s3://miniprojectinputfiles/Payers_Data/"))

In [0]:
source_path = "s3://miniprojectinputfiles/Patients_Data/"
checkpoint_path = "s3://miniprojectinputfiles/CheckPointPathPatient/"
bronze_checkpoint = "s3://miniprojectinputfiles/BronzeCPPatient/"

In [0]:
patient_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.schemaLocation", checkpoint_path)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("mergeSchema", "true")
        .load(source_path)
)

clean_cols = [col_name.strip().replace(" ", "_").upper() for col_name in patient_df.columns]
patient_df = patient_df.toDF(*clean_cols)
patient_df = patient_df.withColumn("LOAD_CTL_KEY", current_timestamp())
patient_df = (
    patient_df
        .withColumn("BIRTHDATE", to_date("BIRTHDATE", "M/d/yyyy"))
        .withColumn("DEATHDATE", to_date("DEATHDATE", "M/d/yyyy"))
)

(
    patient_df.writeStream
        .format("delta")
        .option("checkpointLocation", bronze_checkpoint)
        .outputMode("append")
        .trigger(once=True)
        .toTable("bronze_patient_dtable")
)



In [0]:
%sql
--drop table if exists bronze_patient_dtable;

In [0]:
%sql
select * from bronze_patient_dtable;

In [0]:
import re

In [0]:
source_path = "s3://miniprojectinputfiles/Payers_Data/"
checkpoint_path = "s3://miniprojectinputfiles/CheckPointPathPayer/"
bronze_checkpoint = "s3://miniprojectinputfiles/BronzeCPPayer/"

In [0]:
payer_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .option("cloudFiles.schemaLocation", checkpoint_path)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("mergeSchema", "true")
        .load(source_path)
)

clean_cols = [col_name.strip().replace(" ", "_").upper() for col_name in payer_df.columns]
payer_df = payer_df.toDF(*clean_cols)
payer_df = payer_df.withColumn("LOAD_CTL_KEY", current_timestamp())
payer_df = (
    payer_df
        .withColumn("BIRTHDATE", to_date("BIRTHDATE", "M/d/yyyy"))
        .withColumn("DEATHDATE", to_date("DEATHDATE", "M/d/yyyy"))
)

(
    payer_df.writeStream
        .format("delta")
        .option("checkpointLocation", bronze_checkpoint)
        .outputMode("append")
        .trigger(once=True)
        .toTable("bronze_payer_dtable")
)



In [0]:
%sql
select * from workspace.default.bronze_payer_dtable;

In [0]:
%sql
--%sql
--drop table if exists bronze_patient_dtable;

--drop table bronze_payer_dtable