In [0]:
adls_account = "insurancedatalake01"
storage_key = os.environ.get("AZURE_STORAGE_KEY")

spark.conf.set(
    f"fs.azure.account.key.{adls_account}.dfs.core.windows.net",
    storage_key
)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

adls_account = "insurancedatalake01"
container = "datalake"
raw_path = f"abfss://{container}@{adls_account}.dfs.core.windows.net/raw"
bronze_path = f"abfss://{container}@{adls_account}.dfs.core.windows.net/bronze"
silver_path = f"abfss://{container}@{adls_account}.dfs.core.windows.net/silver"
gold_path = f"abfss://{container}@{adls_account}.dfs.core.windows.net/gold"

dim_customer_silver = spark.read.format("delta").load(f"{silver_path}/dim_customer")
dim_policy_silver   = spark.read.format("delta").load(f"{silver_path}/dim_policy")
dim_handler_silver  = spark.read.format("delta").load(f"{silver_path}/dim_handler")
dim_date_silver     = spark.read.format("delta").load(f"{silver_path}/dim_date")

fact_claim_raw = spark.read.format("delta").load(f"{silver_path}/fact_claims")

display(fact_claim_raw.limit(5))

In [0]:
# =========================
# COPY DIMS SILVER -> GOLD
# =========================

dim_customer_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/dim_customer")

dim_policy_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/dim_policy")

dim_handler_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/dim_handler")

dim_date_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/dim_date")


In [0]:
# =========================
# BUILD FactClaim (CLEAN)
# =========================

# good DQ
good_dq_condition = (
    (F.col("IsClosedBeforeCreated") == 0) &
    (F.col("IsClaimAmountNegative") == 0) &
    (F.col("IsVehicleCountInvalid") == 0) &
    (F.col("IsSeverityInvalid") == 0) &
    (F.col("IsIncidentOutsidePolicy") == 0) &
    (F.col("IsHandlerEmploymentInconsistent") == 0) &
    (F.col("IsCustomerMissing") == 0) &
    (F.col("IsPolicyMissing") == 0) &
    (F.col("IsHandlerMissing") == 0) &
    (F.col("ClaimAmount").isNotNull()) &
    (F.col("IncidentDate").isNotNull())
)

fact_claim_clean = (
    fact_claim_raw
    .filter(good_dq_condition)
    .select(
        "ClaimNumber",
        "CustomerID",
        "PolicyNumber",
        "HandlerID",
        "IncidentDate",
        "ClaimCreateDate",
        "ClaimClosedDate",
        "IncidentType",
        "IncidentSeverity",
        "NoOfVehiclesInvolved",
        "ClaimAmount",
        "AnnualPremium",
        "IsOpen",
        "ClaimDuration",
        "CustomerAgeAtIncident",
        # keep a few flags for BI analysis if needed
        "IsClaimOutlier",
        "IsClaimAmountMissing",
        "WasNoOfVehiclesImputed"
    )
)

# Map date -> date key
fact_claim_clean = (
    fact_claim_clean
    .withColumn("IncidentDateKey",     F.col("IncidentDate"))
    .withColumn("ClaimCreateDateKey",  F.col("ClaimCreateDate"))
    .withColumn("ClaimClosedDateKey",  F.col("ClaimClosedDate"))
)

display(fact_claim_clean.limit(10))

# Ghi ra GOLD
fact_claim_clean.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/fact_claims")


ClaimNumber,CustomerID,PolicyNumber,HandlerID,IncidentDate,ClaimCreateDate,ClaimClosedDate,IncidentType,IncidentSeverity,NoOfVehiclesInvolved,ClaimAmount,AnnualPremium,IsOpen,ClaimDuration,CustomerAgeAtIncident,IsClaimOutlier,IsClaimAmountMissing,WasNoOfVehiclesImputed,IncidentDateKey,ClaimCreateDateKey,ClaimClosedDateKey
CL0000009,C000028,P0000225,H0035,2023-07-17,2023-07-25,2023-10-04,Fire,Severe,4,2591.88,2663.1,0,71.0,37,0,0,0,2023-07-17,2023-07-25,2023-10-04
CL0000010,C000561,P0000754,H0044,2026-05-16,2026-05-19,2026-08-09,Vandalism,Total Loss,4,1181.85,329.76,0,82.0,46,0,0,0,2026-05-16,2026-05-19,2026-08-09
CL0000012,C000006,P0001380,H0012,2025-07-12,2025-07-25,2025-10-25,Vandalism,Minor,1,1212.42,1127.46,0,92.0,63,0,0,0,2025-07-12,2025-07-25,2025-10-25
CL0000013,C000679,P0001093,H0003,2023-12-01,2023-12-04,2024-03-11,Weather,Total Loss,4,1722.64,2091.61,0,98.0,30,0,0,0,2023-12-01,2023-12-04,2024-03-11
CL0000023,C000956,P0001440,H0050,2025-12-06,2025-12-14,2026-03-16,Collision,Severe,2,1066.03,416.38,0,92.0,37,0,0,0,2025-12-06,2025-12-14,2026-03-16
CL0000027,C000327,P0001877,H0016,2023-06-07,2023-06-19,2023-07-17,Glass Damage,Moderate,1,659.26,1014.48,0,28.0,64,0,0,0,2023-06-07,2023-06-19,2023-07-17
CL0000028,C000313,P0000966,H0017,2025-12-28,2026-01-08,,Vandalism,Severe,1,7789.75,2796.95,1,,66,0,0,0,2025-12-28,2026-01-08,
CL0000029,C000785,P0000826,H0009,2023-08-31,2023-09-03,2023-12-11,Theft,Minor,2,1855.36,2962.24,0,99.0,70,0,0,0,2023-08-31,2023-09-03,2023-12-11
CL0000031,C000036,P0000332,H0027,2024-10-24,2024-10-31,2025-01-05,Theft,Total Loss,3,5706.56,2044.22,0,66.0,34,0,0,0,2024-10-24,2024-10-31,2025-01-05
CL0000038,C000212,P0000182,H0017,2026-07-08,2026-07-15,2026-08-25,Weather,Moderate,3,1376.08,1763.5,0,41.0,23,0,0,0,2026-07-08,2026-07-15,2026-08-25


In [0]:
#--FACT:FactClaimDaily--
fact_claim_daily = (
    fact_claim_clean
    .groupBy("IncidentDate")
    .agg(
        F.count("*").alias("TotalClaims"),
        F.sum("ClaimAmount").alias("TotalClaimAmount"),
        F.avg("ClaimAmount").alias("AvgClaimAmount"),
        F.sum(F.when(F.col("IsOpen") == 1, 1).otherwise(0)).alias("OpenClaims"),
        F.sum(F.when(F.col("IsOpen") == 0, 1).otherwise(0)).alias("ClosedClaims"),
        F.avg("ClaimDuration").alias("AvgClaimDuration")
    )
    .withColumnRenamed("IncidentDate", "Date")   # để join với DimDate.Date
)

display(fact_claim_daily.limit(10))

fact_claim_daily.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/fact_claims_daily")


Date,TotalClaims,TotalClaimAmount,AvgClaimAmount,OpenClaims,ClosedClaims,AvgClaimDuration
2025-10-21,2,2599.34,1299.67,1,1,75.0
2023-07-15,2,3861.49,1930.745,0,2,63.0
2023-06-22,1,5866.87,5866.87,0,1,29.0
2024-09-18,2,3032.28,1516.14,0,2,61.0
2026-10-09,3,1413.7299999999998,471.2433333333333,0,3,50.0
2026-08-18,1,13456.19,13456.19,1,0,
2026-02-13,2,6073.780000000001,3036.8900000000003,0,2,77.0
2024-05-30,1,206.43,206.43,0,1,48.0
2025-11-04,3,8921.08,2973.693333333333,0,3,50.66666666666666
2024-02-05,2,21345.910000000003,10672.955000000002,2,0,


In [0]:
# =========================
# MART: MartCustomerClaims
# =========================

# Join fact + dim customer để có attributes
fact_cust = (
    fact_claim_clean.alias("f")
    .join(
        dim_customer_silver.select(
            "CustomerID",
            "DOB",
            "MaritalStatus",
            "EmploymentStatus",
            "VehicleType",
            "CreditScore",
            "AnnualMileage"
        ).alias("c"),
        on="CustomerID",
        how="left"
    )
)

mart_customer_claims = (
    fact_cust
    .groupBy("CustomerID", "MaritalStatus", "EmploymentStatus", "VehicleType")
    .agg(
        F.count("*").alias("TotalClaims"),
        F.sum("ClaimAmount").alias("TotalClaimAmount"),
        F.avg("ClaimAmount").alias("AvgClaimAmount"),
        F.max("ClaimCreateDate").alias("LastClaimDate"),
        F.min("ClaimCreateDate").alias("FirstClaimDate"),
        F.avg("CustomerAgeAtIncident").alias("AvgAgeAtIncident")
    )
)

display(mart_customer_claims.limit(10))

mart_customer_claims.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/mart_customer_claims")


CustomerID,MaritalStatus,EmploymentStatus,VehicleType,TotalClaims,TotalClaimAmount,AvgClaimAmount,LastClaimDate,FirstClaimDate,AvgAgeAtIncident
C000550,Widowed,Employed,Coupe,3,6532.37,2177.4566666666665,2025-07-22,2024-10-19,48.66666666666666
C000005,Divorced,Self-employed,Coupe,2,4740.53,2370.265,2024-02-23,2023-12-31,36.0
C000560,Widowed,Self-employed,Hatchback,1,4398.65,4398.65,2024-08-20,2024-08-20,46.0
C000762,Divorced,Employed,SUV,5,9484.2,1896.84,2026-06-25,2025-09-10,29.0
C000102,Married,Retired,Hatchback,1,1999.81,1999.81,2024-02-11,2024-02-11,52.0
C000437,Married,Self-employed,Truck,1,1724.34,1724.34,2026-03-22,2026-03-22,74.0
C000097,Single,Unemployed,Sedan,5,12616.36,2523.272,2025-09-12,2024-11-04,54.4
C000471,Widowed,Self-employed,Hatchback,3,7762.13,2587.3766666666666,2024-11-14,2024-09-09,72.33333333333333
C000706,Widowed,Self-employed,Sedan,5,26667.67,5333.534000000001,2025-01-05,2024-08-14,72.0
C000575,Divorced,Employed,Coupe,1,3598.07,3598.07,2026-04-13,2026-04-13,56.0


In [0]:
# =========================
# MART: MartHandlerPerformance
# =========================

fact_handler = (
    fact_claim_clean.alias("f")
    .join(
        dim_handler_silver.select(
            "HandlerID", "FullName", "Location", "ExperienceLevel"
        ).alias("h"),
        on="HandlerID",
        how="left"
    )
)

mart_handler_perf = (
    fact_handler
    .groupBy("HandlerID", "FullName", "Location", "ExperienceLevel")
    .agg(
        F.count("*").alias("TotalClaimsHandled"),
        F.sum("ClaimAmount").alias("TotalClaimAmountHandled"),
        F.avg("ClaimAmount").alias("AvgClaimAmountHandled"),
        F.avg("ClaimDuration").alias("AvgClaimDuration"),
        F.sum(F.when(F.col("IsOpen") == 1, 1).otherwise(0)).alias("OpenClaims"),
        F.sum(F.when(F.col("IsClaimOutlier") == 1, 1).otherwise(0)).alias("OutlierClaims")
    )
)

display(mart_handler_perf.limit(10))

mart_handler_perf.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/mart_handler_performance")


HandlerID,FullName,Location,ExperienceLevel,TotalClaimsHandled,TotalClaimAmountHandled,AvgClaimAmountHandled,AvgClaimDuration,OpenClaims,OutlierClaims
H0024,Ryan Buckley,Brisbane,Junior,23,55396.99000000001,2408.564782608696,55.66666666666666,2,0
H0032,Brenda Sellers,Perth,Junior,42,129240.14999999998,3077.1464285714283,69.34210526315789,4,0
H0038,Anthony Ellis,Perth,Senior,5,17040.62,3408.124,67.2,0,0
H0007,Brian Erickson,Adelaide,Junior,37,88790.87999999999,2399.753513513513,70.58064516129032,6,0
H0002,Roger Rasmussen,Melbourne,Lead,9,14980.88,1664.5422222222223,49.333333333333336,3,0
H0012,Aaron Porter,Adelaide,Junior,19,66127.03,3480.37,69.05263157894737,0,0
H0029,Cheryl Thomas,Brisbane,Mid,21,89195.98999999999,4247.428095238095,56.35,1,0
H0044,Emma Green,Melbourne,Mid,38,147019.29000000004,3868.928684210527,62.8235294117647,4,0
H0015,Daisy Delacruz,Sydney,Lead,25,88214.95000000001,3528.5980000000004,51.95,5,0
H0013,Stephanie Davis,Melbourne,Mid,21,88240.23999999999,4201.91619047619,53.333333333333336,3,0


In [0]:
# =========================
# MART: MartPolicyPerformance (optional)
# =========================

fact_policy = (
    fact_claim_clean.alias("f")
    .join(
        dim_policy_silver.select(
            "PolicyNumber", "CustomerID", "PolicyCreateDate", "PolicyExpirationDate",
            "PolicyDuration"
        ).alias("p"),
        on=["PolicyNumber","CustomerID"],
        how="left"
    )
)

mart_policy_perf = (
    fact_policy
    .groupBy("PolicyNumber", "CustomerID")
    .agg(
        F.sum("AnnualPremium").alias("TotalPremium"),
        F.count("ClaimNumber").alias("TotalClaims"),
        F.sum("ClaimAmount").alias("TotalClaimAmount"),
        (F.sum("ClaimAmount") / F.sum("AnnualPremium")).alias("LossRatio")
    )
)

display(mart_policy_perf.limit(10))

mart_policy_perf.write.format("delta").mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{gold_path}/mart_policy_performance")


PolicyNumber,CustomerID,TotalPremium,TotalClaims,TotalClaimAmount,LossRatio
P0000265,C000726,8928.08,4,10274.559999999998,1.1508140608059063
P0001711,C000272,2393.02,2,3723.47,1.5559711159956875
P0000080,C000164,6305.879999999999,6,17700.630000000005,2.8070039391805754
P0001715,C000291,2935.07,1,3541.39,1.2065776966137092
P0001877,C000327,6086.879999999999,6,7350.09,1.2075299660910024
P0000503,C000703,913.59,3,852.27,0.9328801760089318
P0001199,C000739,2658.45,3,4544.2,1.7093419097594464
P0000140,C000532,1261.96,1,2400.52,1.9022156011284037
P0000242,C000723,1245.88,1,2053.89,1.6485456063184254
P0001103,C000056,5930.72,2,20969.52,3.535746081420131


In [0]:
print("GOLD tables/folders:")
display(dbutils.fs.ls(gold_path))

print("FactClaim sample:")
display(spark.read.format("delta").load(f"{gold_path}/fact_claims").limit(5))

print("FactClaimDaily sample:")
display(spark.read.format("delta").load(f"{gold_path}/fact_claims_daily").limit(5))


GOLD tables/folders:


path,name,size,modificationTime
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/dim_customer/,dim_customer/,0,1765040104000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/dim_date/,dim_date/,0,1765040109000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/dim_handler/,dim_handler/,0,1765040108000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/dim_policy/,dim_policy/,0,1765040106000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/fact_claims/,fact_claims/,0,1765040242000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/fact_claims_daily/,fact_claims_daily/,0,1765187855000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/mart_customer_claims/,mart_customer_claims/,0,1765210542000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/mart_handler_performance/,mart_handler_performance/,0,1765210570000
abfss://datalake@insurancedatalake01.dfs.core.windows.net/gold/mart_policy_performance/,mart_policy_performance/,0,1765211030000


FactClaim sample:


ClaimNumber,CustomerID,PolicyNumber,HandlerID,IncidentDate,ClaimCreateDate,ClaimClosedDate,IncidentType,IncidentSeverity,NoOfVehiclesInvolved,ClaimAmount,AnnualPremium,IsOpen,ClaimDuration,CustomerAgeAtIncident,IsClaimOutlier,IsClaimAmountMissing,WasNoOfVehiclesImputed,IncidentDateKey,ClaimCreateDateKey,ClaimClosedDateKey
CL0000009,C000028,P0000225,H0035,2023-07-17,2023-07-25,2023-10-04,Fire,Severe,4,2591.88,2663.1,0,71,37,0,0,0,2023-07-17,2023-07-25,2023-10-04
CL0000010,C000561,P0000754,H0044,2026-05-16,2026-05-19,2026-08-09,Vandalism,Total Loss,4,1181.85,329.76,0,82,46,0,0,0,2026-05-16,2026-05-19,2026-08-09
CL0000012,C000006,P0001380,H0012,2025-07-12,2025-07-25,2025-10-25,Vandalism,Minor,1,1212.42,1127.46,0,92,63,0,0,0,2025-07-12,2025-07-25,2025-10-25
CL0000013,C000679,P0001093,H0003,2023-12-01,2023-12-04,2024-03-11,Weather,Total Loss,4,1722.64,2091.61,0,98,30,0,0,0,2023-12-01,2023-12-04,2024-03-11
CL0000023,C000956,P0001440,H0050,2025-12-06,2025-12-14,2026-03-16,Collision,Severe,2,1066.03,416.38,0,92,37,0,0,0,2025-12-06,2025-12-14,2026-03-16


FactClaimDaily sample:


Date,TotalClaims,TotalClaimAmount,AvgClaimAmount,OpenClaims,ClosedClaims,AvgClaimDuration
2025-10-21,2,2599.34,1299.67,1,1,75.0
2023-07-15,2,3861.49,1930.745,0,2,63.0
2023-06-22,1,5866.87,5866.87,0,1,29.0
2024-09-18,2,3032.28,1516.14,0,2,61.0
2026-10-09,3,1413.7299999999998,471.2433333333333,0,3,50.0
