In [0]:
import dlt
from pyspark.sql.functions import col,regexp_replace,when,explode_outer,from_json, lit,to_date,to_timestamp,split,size, explode, udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, IntegerType

In [0]:
@dlt.expect_or_drop("memberid_not_null","MemberID IS NOT NULL")

@dlt.table(
    name="silver_members_view",
    comment="Cleaned members data with quality checks",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
def silver_members():
   df = spark.read.table("capstone.medisure.bronze_members")
   return df.dropDuplicates(["MemberID"])

# Quarantine → invalid rows having memberid equal to null
@dlt.table(
    name="silver_members_quarantine",
    comment="Members rows that failed quality checks (null MemberID)"
)
def silver_members_quarantine():
    df = spark.read.table("capstone.medisure.bronze_members")
    return df.filter(col("MemberID").isNull())

In [0]:
#Silver Table with expectation of Code not being null
@dlt.expect_or_drop("Code_not_null", "Code IS NOT NULL")
@dlt.table(
    name="silver_diagnosis_ref_view",
    comment="Cleaned diagnosis_ref data with Code enforced as integer format if decimal",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)

def silver_diagnosis_ref():
    df = spark.read.table("capstone.medisure.bronze_diagnosis_ref")
    return df

# Quarantine → invalid rows having code equal to null
@dlt.table(
    name="silver_diagnosis_ref_quarantine",
    comment="Diagnosis reference rows that failed quality checks (null Code)"
)
def silver_diagnosis_ref_quarantine():
    df = spark.read.table("capstone.medisure.bronze_diagnosis_ref")
    return df.filter(col("Code").isNull())

In [0]:

location_schema = ArrayType(
    StructType([
        StructField("Address", StringType(), True),
        StructField("City", StringType(), True),
        StructField("State", StringType(), True)
    ])
)

@dlt.expect_or_drop("providerid_not_null", "ProviderID IS NOT NULL")
@dlt.table(
    name="silver_providers_view",
    comment="Normalized provider directory with flattened locations and specialties",
    table_properties={"quality": "silver"}
)
def silver_providers():
    df = spark.read.table("capstone.medisure.bronze_providers")

    # Parse JSON string into array<struct>
    df = df.withColumn("Locations", from_json(col("Locations"), location_schema)) \
                .withColumn("Specialties", from_json(col("Specialties"), ArrayType(StringType())))

    # Explode parsed arrays
    df = (
        df.withColumn("location", explode_outer("Locations"))
                .withColumn("Specialty", explode_outer("Specialties"))
                .select(
                    "ProviderID",
                    "Name",
                    "TIN",
                    "IsActive",
                    "LastVerified",
                    col("location.Address").alias("Address"),
                    col("location.City").alias("City"),
                    col("location.State").alias("State"),
                    "Specialty"
                )
    )

    return df.dropDuplicates(["ProviderID", "Address", "Specialty"])


# Quarantine → invalid rows having providerid equal to null
@dlt.table(
    name="silver_providers_quarantine",
    comment="Providers rows that failed quality checks (null ProviderID)"
)
def silver_providers_quarantine():
    df = spark.read.table("capstone.medisure.bronze_providers")
    return df.filter(col("ProviderID").isNull())



In [0]:

# Bronze Claims Stream → View
@dlt.view(
    name="silver_claims_stream",
    comment="Intermediate cleaned claims stream data view aligned to batch schema"
)
def silver_claims_view():
    return (
        spark.read.table("capstone.medisure.bronze_claims_stream")
        # Add missing columns that exist in batch but not in stream
        .withColumn("ServiceDate", lit(None).cast("date"))
        .withColumn("ClaimType", lit(None).cast("string"))
        .withColumn("SubmissionChannel", lit(None).cast("string"))
        .withColumn("Notes", lit(None).cast("string"))
        # Cast ClaimDate to proper date
        .withColumn("ClaimDate", to_date(col("ClaimDate"), "yyyy-MM-dd"))
        #rename EventTimestamp to IngestionTimeStamp
        .withColumn("IngestTimestamp", to_timestamp("EventTimestamp"))
        .drop("EventTimestamp") 
    )

# Bronze Claims Batch → View
@dlt.view(
    name="silver_claims_batch",
    comment="Intermediate cleaned claims batch data view"
)
def silver_claims_batch():
    return (
        spark.read.table("capstone.medisure.bronze_claims_batch")
        .withColumn("ClaimDate", to_date(col("ClaimDate"), "yyyy-MM-dd"))
    )


# Union View (Batch + Stream Claims View)
@dlt.view(
    name="silver_claims_union",
    comment="Union of batch and stream claims"
)
def silver_claims_union():
    batch_df = dlt.read("silver_claims_batch")
    stream_df = dlt.read("silver_claims_stream")
    return batch_df.unionByName(stream_df)

#quarantine all invalid records of claims before the insertion in the final silver claims table
@dlt.table(
    name="silver_claims_quarantine",
    comment="Claims that failed validation or FK checks",
    table_properties={
        "pipelines.autoOptimize.managed": "true"
    }
)
def quarantine_claims():
    df = dlt.read("silver_claims_union")

    # Deduplicate
    df = df.dropDuplicates(["ClaimID", "MemberID", "ProviderID"])

    # Data type enforcement
    df = df.withColumn("Amount", col("Amount").cast("decimal(12,2)")) \
           .withColumn("ICD10Codes", split(col("ICD10Codes"), ";")) \
           .withColumn("CPTCodes", split(col("CPTCodes"), ";"))

    # FK validation
    members = dlt.read("silver_members_view").select("MemberID")
    providers = dlt.read("silver_providers_view").select("ProviderID")
    df = df.join(members, "MemberID", "left").join(providers, "ProviderID", "left")

    # Filter records failing validations
    df_quarantine = df.filter(
        (col("ClaimID").isNull()) |
        (col("MemberID").isNull()) |
        (col("ProviderID").isNull())
    ).withColumn(
        "QuarantineReason",
        when(col("ClaimID").isNull(), lit("Missing ClaimID"))
        .when(col("MemberID").isNull(), lit("Invalid MemberID"))
        .when(col("ProviderID").isNull(), lit("Invalid ProviderID"))
    )

    return df_quarantine


#insertion of data in silver_claims table   
@dlt.table(
    name="silver_claims_view",
    comment="Cleaned claims with FK validation, fraud scoring; invalid records sent to quarantine",
    table_properties={
        "quality": "silver",
        "pipelines.autoOptimize.managed": "true"
    }
)
def silver_claims():
    df = dlt.read("silver_claims_union")

    # # Deduplicate
    # df = df.dropDuplicates(["ClaimID", "MemberID", "ProviderID"])

    # Data type enforcement
    df = df.withColumn("Amount", col("Amount").cast("decimal(12,2)")) \
           .withColumn("ICD10Codes", split(col("ICD10Codes"), ";")) \
           .withColumn("CPTCodes", split(col("CPTCodes"), ";"))

    # FK validation
    members = (
        dlt.read("silver_members_view")
        .select(
            "MemberID",
            col("Name").alias("MemberName"),
            col("PlanType").alias("MemberPlanType")
        )
    )

    providers = (
        dlt.read("silver_providers_view")
        .select(
            "ProviderID",
            col("Name").alias("ProviderName"),
            col("Specialty").alias("ProviderSpecialties")
        )
    )
    df = df.join(members, "MemberID", "left").join(providers, "ProviderID", "left")

    # Filter valid id's only
    df = df.filter(
        (col("ClaimID").isNotNull()) &
        (col("MemberID").isNotNull()) &
        (col("ProviderID").isNotNull())
    )

    # Fraud scoring UDF with incremental scoring, the more score of the record get, the higher the fraud risk
    def base_fraud_score(amount, cpt_codes, submission_channel, service_date, claim_date):
        score = 0
        if amount is not None and amount > 10000:
            score += 5
        if cpt_codes is not None and len(cpt_codes) > 5:
            score += 3
        if submission_channel == "Paper" and submission_channel is None:
            score += 2
        if service_date is not None and claim_date is not None and service_date > claim_date:
            score += 4
        return score
    
    # Register the UDF
    fraud_udf = udf(base_fraud_score, IntegerType())

    # Fraud scoring: apply UDF and Mapping of numeric score to HIGH / MEDIUM / LOW using CASE/WHEN
    df = df.withColumn(
        "FraudScoreNumeric",
        fraud_udf(
            col("Amount"),
            col("CPTCodes"),
            col("SubmissionChannel"),
            col("ServiceDate"),
            col("ClaimDate")
        )
    ).withColumn(
        "FraudRisk",
        when(col("FraudScoreNumeric") >= 10, lit("HIGH"))
        .when(col("FraudScoreNumeric") >= 5, lit("MEDIUM"))
        .otherwise(lit("LOW"))
    )

    return df.dropDuplicates(["ClaimID", "MemberID", "ProviderID"])




In [0]:
silver_views = ["silver_members", "silver_claims", "silver_providers", "silver_diagnosis_ref"]

for view in silver_views:
    full_view_name = f"capstone.medisure.{view}_view"
    new_table_name = f"capstone.medisure.{view}_delta_table"

    # Check if the view exists via information_schema
    exists = spark.sql(f"""
        SELECT COUNT(*) 
        FROM system.information_schema.tables
        WHERE table_schema = 'medisure'
          AND table_name = '{view}_view'
          AND table_catalog = 'capstone'
    """).collect()[0][0]

    if exists > 0:
        df = spark.table(full_view_name)
        df.write.mode("overwrite").saveAsTable(new_table_name)
        print(f"Converted {view} into {view}_table (Delta table)")
    else:
        print(f"View {full_view_name} does not exist, skipping...")
