In [None]:
from pyspark.sql.functions import input_file_name, to_date, col
from pyspark.sql.types import DoubleType, IntegerType

# ------------------------------------
# 1. Claims Data
# ------------------------------------
claims_path = "abfss://fakepath/Files/RawData/Claims_Data/*.csv"
df_claims = spark.read.option("header", True).csv(claims_path)
df_claims = (
    df_claims
    .withColumn("source_file", input_file_name())
    .withColumn("payment_amount", col("payment_amount").cast(DoubleType()))
    .withColumn("processed_date", to_date("processed_date", "yyyy-MM-dd"))
)
df_claims.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("bronze_claims")

In [None]:
# ------------------------------------
# 2. Prescriptions Data
# ------------------------------------
prescriptions_path = "abfss:/fakepath/Files/RawData/Prescriptions_Data/*.csv"
df_prescriptions = spark.read.option("header", True).csv(prescriptions_path)
df_prescriptions = (
    df_prescriptions
    .withColumn("source_file", input_file_name())
    .withColumn("rx_date", to_date("rx_date", "yyyy-MM-dd"))
)
df_prescriptions.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("bronze_prescriptions")

In [None]:
# ------------------------------------
# 3. Dispensing Data
# ------------------------------------
dispensing_path = "abfss://fakepath/Files/RawData/Dispensing_Data/*.csv"
df_dispensing = spark.read.option("header", True).csv(dispensing_path)
df_dispensing = (
    df_dispensing
    .withColumn("source_file", input_file_name())
    .withColumn("fill_date", to_date("fill_date", "yyyy-MM-dd"))
    .withColumn("quantity", col("quantity").cast(IntegerType()))
    .withColumn("patient_pay", col("patient_pay").cast(DoubleType()))
    .withColumn("third_party_paid", col("third_party_paid").cast(DoubleType()))
)
df_dispensing.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("bronze_dispensing")

In [None]:
# ------------------------------------
# 4. Inventory Data
# ------------------------------------
inventory_path = "abfss://fakepath/Files/RawData/Inventory_Data/*.csv"
df_inventory = spark.read.option("header", True).csv(inventory_path)
df_inventory = (
    df_inventory
    .withColumn("source_file", input_file_name())
    .withColumn("order_date", to_date("order_date", "yyyy-MM-dd"))
    .withColumn("received_date", to_date("received_date", "yyyy-MM-dd"))
    .withColumn("quantity_ordered", col("quantity_ordered").cast(IntegerType()))
    .withColumn("quantity_received", col("quantity_received").cast(IntegerType()))
)
df_inventory.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("bronze_inventory")

In [None]:
# ------------------------------------
# 5. Patient Eligibility Data
# ------------------------------------
patient_eligibility_path = "abfss:/fakepath/Files/RawData/Patient_Data/*.csv"
df_patient_eligibility = spark.read.option("header", True).csv(patient_eligibility_path)
df_patient_eligibility = (
    df_patient_eligibility
    .withColumn("source_file", input_file_name())
    .withColumn("encounter_date", to_date("encounter_date", "yyyy-MM-dd"))
)
df_patient_eligibility.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("bronze_patient_eligibility")

In [None]:
# ------------------------------------
# 6. Drug Catalog - Append only new files based on source_file
# ------------------------------------
drug_catalog_path = "abfss://fakepath/Files/RawData/Drug_catalog/*.csv"
df_drug_new = spark.read.option("header", True).csv(drug_catalog_path).withColumn("source_file", input_file_name())

try:
    # Read distinct source_file names already loaded
    df_existing_sources = spark.read.table("bronze_drug_catalog").select("source_file").distinct()
    
    # Filter out the already loaded files from new dataframe
    df_drug_filtered = df_drug_new.join(df_existing_sources, on="source_file", how="left_anti")
except Exception as e:
    # If bronze_drug_catalog doesn't exist yet, load all
    print(f"Could not read existing bronze_drug_catalog table: {e}")
    df_drug_filtered = df_drug_new

if df_drug_filtered.count() > 0:
    df_drug_catalog = (
        df_drug_filtered
        .withColumn("340b_price", col("340b_price").cast(DoubleType()))
        .withColumn("wac_price", col("wac_price").cast(DoubleType()))
        .withColumn("gpo_price", col("gpo_price").cast(DoubleType()))
    )
    df_drug_catalog.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable("bronze_drug_catalog")
    display(df_drug_catalog)
else:
    print("No new drug catalog files to process.")