In [0]:
#define varibles use in the scripts
catalog_name = "capstone_aimie_dbk"
schema_name = "medisure"

#storage path
input_path = f"/Volumes/{catalog_name}/{schema_name}/inputs"
schem_path = f"/Volumes/{catalog_name}/{schema_name}/schem"
bronze_path = f"/Volumes/{catalog_name}/{schema_name}/schem/bronze"

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.inputs")

In [0]:
#creation of Bronze tables 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, DateType, ArrayType, BooleanType

# Define explicit schemas for enforcement
claims_batch_schema = StructType([
    StructField("ClaimID", StringType(), True),
    StructField("MemberID", StringType(), True),
    StructField("ProviderID", StringType(), True),
    StructField("ClaimDate", StringType(), True),       
    StructField("ServiceDate", StringType(), True),    
    StructField("Amount", DoubleType(), True),
    StructField("Status", StringType(), True),
    StructField("ICD10Codes", StringType(), True),       
    StructField("CPTCodes", StringType(), True),        
    StructField("ClaimType", StringType(), True),
    StructField("SubmissionChannel", StringType(), True),
    StructField("Notes", StringType(), True),
    StructField("IngestTimestamp", StringType(), True)   
])

claims_stream_schema = StructType([
    StructField("ClaimID", StringType(), True),
    StructField("MemberID", StringType(), True),
    StructField("ProviderID", StringType(), True),
    StructField("ClaimDate", StringType(), True),  
    StructField("Amount", DoubleType(), True),
    StructField("Status", StringType(), True),
    StructField("ICD10Codes", StringType(), True), 
    StructField("CPTCodes", StringType(), True),   
    StructField("EventTimestamp", StringType(), True)
])

diagnosis_schema = StructType([
    StructField("Code", StringType(), True),
    StructField("Description", StringType(), True)
])

members_schema = StructType([
    StructField("MemberID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("DOB", StringType(), True),               
    StructField("Gender", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("PlanType", StringType(), True),
    StructField("EffectiveDate", StringType(), True),     
    StructField("Email", StringType(), True),
    StructField("IsActive", DoubleType(), True),         
    StructField("LastUpdated", StringType(), True)      
])

providers_schema = StructType([
    StructField("ProviderID", StringType(), True),
    StructField("Name", StringType(), True),
    StructField("Specialties", ArrayType(StringType()), True),
    StructField("Locations", ArrayType(StructType([
        StructField("Address", StringType(), True),
        StructField("City", StringType(), True),
        StructField("State", StringType(), True)
    ])), True),
    StructField("IsActive", BooleanType(), True),
    StructField("TIN", StringType(), True),
    StructField("LastVerified", DateType(), True)
])

bronze_claims_batch_df = (spark.read
            .format("csv")
            .schema(claims_batch_schema)
            .option("header", "true")
            .option("timestampFormat", "yyyy-MM-dd[ HH:mm:ss]")
            .load(f"{input_path}/claims_batch.csv"))

bronze_claims_stream_df = (spark.read
                    .schema(claims_stream_schema)
                    .json(f"{input_path}/claims_stream.json"))

bronze_diagnosis_df = (spark.read
               .format("csv")
               .schema(diagnosis_schema)
               .option("header", "true")
               .load(f"{input_path}/diagnosis_ref.csv"))

bronze_members_df = (spark.read
                .format("csv")
                .schema(members_schema)
                .option("header", "true")
                .load(f"{input_path}/members.csv"))

bronze_providers_df = (spark.read
                    .schema(providers_schema)
                    .json(f"{input_path}/providers.json"))

(bronze_claims_batch_df.write.format("delta").mode("overwrite").saveAsTable("medisure.bronze_claims_batch"))
(bronze_claims_stream_df.write.format("delta").mode("overwrite").saveAsTable("medisure.bronze_claims_stream"))
(bronze_diagnosis_df.write.format("delta").mode("overwrite").saveAsTable("medisure.bronze_diagnosis_ref"))
(bronze_members_df.write.format("delta").mode("overwrite").saveAsTable("medisure.bronze_members"))
(bronze_providers_df.write.format("delta").mode("overwrite").saveAsTable("medisure.bronze_providers"))

In [0]:
from pyspark.sql.functions import col, explode, expr

silver_claims_batch_df = (
    bronze_claims_batch_df
    .withColumn("ClaimDate", col("ClaimDate").cast("date"))
    .withColumn("ServiceDate", col("ServiceDate").cast("date"))
    .withColumn("IngestTimestamp", col("IngestTimestamp").cast("timestamp"))
    .withColumn("ICD10Code", explode(expr("split(ICD10Codes, ',')")))
    .withColumn("CPTCode", explode(expr("split(CPTCodes, ',')")))
    .select(
        col("ClaimID"),
        col("MemberID"),
        col("ProviderID"),
        col("ClaimDate"),
        col("Amount"),
        col("Status"),
        col("ICD10Code").alias("ICD10Codes"),
        col("CPTCode").alias("CPTCodes"),
        col("IngestTimestamp")
    )
).dropna(subset=["ClaimID", "MemberID", "ProviderID"]).dropDuplicates(["ClaimID", "MemberID", "ProviderID"]).filter((col("ClaimID").isNotNull()) & (col("MemberID").isNotNull()) & (col("ProviderID").isNotNull()))

silver_claims_stream_df = (
    bronze_claims_stream_df
    .withColumn("ClaimDate", col("ClaimDate").cast("date"))
    .withColumn("IngestTimestamp", col("EventTimestamp").cast("timestamp"))
    .select(
        col("ClaimID"),
        col("MemberID"),
        col("ProviderID"),
        col("ClaimDate"),
        col("Amount"),
        col("Status"),
        col("ICD10Codes"),
        col("CPTCodes"),
        col("IngestTimestamp")
    )
).dropna(subset=["ClaimID", "MemberID", "ProviderID"]).dropDuplicates(["ClaimID", "MemberID", "ProviderID"]).filter((col("ClaimID").isNotNull()) & (col("MemberID").isNotNull()) & (col("ProviderID").isNotNull()))

silver_diagnosis_df = (
    bronze_diagnosis_df
).dropna(subset=["Code"]).dropDuplicates(["Code"]).filter((col("Code").isNotNull()) & (col("Description").isNotNull()))

silver_members_df = (
    bronze_members_df
    .withColumn("DOB", col("DOB").cast("date"))
    .withColumn("EffectiveDate", col("EffectiveDate").cast("date"))
    .withColumn("LastUpdated", col("LastUpdated").cast("date"))
).dropna(subset=["MemberID"]).dropDuplicates(["MemberID"]).filter((col("MemberID").isNotNull()))

silver_providers_df = (
    bronze_providers_df
    .withColumn("LastVerified", col("LastVerified").cast("date"))
    .withColumn("Specialties", explode(col("Specialties")))
    .withColumn("Location", explode(col("Locations")))
    .select(
        col("ProviderID"),
        col("Name").alias("ProviderName"),
        col("Specialties"),
        col("Location.Address").alias("Address"),
        col("Location.City").alias("City"),
        col("Location.State").alias("State"),
        col("IsActive").alias("IsActiveFlag"),
        col("TIN"),
        col("LastVerified")
    )
).dropna(subset=["ProviderID"]).dropDuplicates(["ProviderID"]).filter((col("ProviderID").isNotNull()))

(silver_claims_batch_df.write.format("delta").mode("overwrite").saveAsTable("medisure.silver_claims_batch"))
(silver_claims_stream_df.write.format("delta").mode("overwrite").saveAsTable("medisure.silver_claims_stream"))
(silver_diagnosis_df.write.format("delta").mode("overwrite").saveAsTable("medisure.silver_diagnosis_ref"))
(silver_members_df.write.format("delta").mode("overwrite").saveAsTable("medisure.silver_members"))
(silver_providers_df.write.format("delta").mode("overwrite").saveAsTable("medisure.silver_providers"))

In [0]:
from pyspark.sql.functions import col, when, udf, lit

# Add source flag to each DataFrame
claims_batch_with_source = silver_claims_batch_df.withColumn("Source", lit("Batch"))
claims_stream_with_source = silver_claims_stream_df.withColumn("Source", lit("Stream"))

# Union the DataFrames
combined_claims_df = claims_batch_with_source.unionByName(claims_stream_with_source)

(combined_claims_df.write.format("delta").mode("overwrite").saveAsTable("medisure.silver_claims_transform"))

In [0]:
from pyspark.sql.functions import col, when, udf, lit
from pyspark.sql.types import IntegerType

# Define a UDF for fraud scoring
@udf(returnType=IntegerType())
def fraud_score(amount, status):
    if amount is None or status is None:
        return 0
    if amount > 10000 and status == "Pending":
        return 1
    elif amount > 5000 and status == "Approved":
        return 0.5
    else:
        return 0

# Join datasets
gold_enriched_claims_df = (
    combined_claims_df
    .join(silver_members_df, "MemberID", "inner")
    .join(silver_providers_df, "ProviderID", "inner")
    .join(silver_diagnosis_df, col("ICD10Codes") == col("Code"), "left")
    .withColumn("FraudScore", fraud_score(col("Amount"), col("Status")))
    .withColumn("IsValid", when((col("FraudScore") > 0) & (col("Status") == "Pending"), "No")
                          .when((col("FraudScore") == 0) & (col("Status") == "Approved"), "Yes")
                          .otherwise("Unknown"))
)

spark.sql("""
CREATE TABLE IF NOT EXISTS medisure.gold_enriched_claims (
    ClaimID STRING,
    MemberID STRING,
    ProviderID STRING,
    ClaimDate DATE,
    Amount DOUBLE,
    Status STRING,
    ICD10Codes STRING,
    CPTCodes STRING,
    IngestTimestamp TIMESTAMP,
    Source STRING,
    Name STRING,
    DOB DATE,
    Gender STRING,
    Region STRING,
    PlanType STRING,
    EffectiveDate DATE,
    Email STRING,
    IsActive DOUBLE,
    LastUpdated DATE,
    ProviderName STRING,
    Specialties STRING,
    Address STRING,
    City STRING,
    State STRING,
    IsActiveFlag BOOLEAN,
    TIN STRING,
    LastVerified DATE,
    Description STRING,
    FraudScore INT,
    IsValid STRING
)
USING DELTA
""")

from pyspark.sql.functions import col

# Merge into the gold_enriched_claims table
gold_enriched_claims_df.createOrReplaceTempView("gold_enriched_claims_df")

spark.sql("""
MERGE INTO medisure.gold_enriched_claims AS target
USING gold_enriched_claims_df AS source
ON target.ClaimID = source.ClaimID
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
""")