In [0]:
%pip install scikit-learn imbalanced-learn
%restart_python

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Cell 2: ETL - Silver Layer (Data Cleansing & Feature Engineering)
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')

print("Loading Bronze Tables...")
claims_bronze = spark.table("fraud_detection.claims_bronze")
patients = spark.table("fraud_detection.patients")
providers = spark.table("fraud_detection.providers")

print("\n=== SILVER LAYER: DATA ENRICHMENT ===")

# Rename conflicting columns before join
patients = patients.withColumnRenamed("state", "patient_state")
providers = providers.withColumnRenamed("state", "provider_state")

# Join claims with patients and providers
claims_enriched = claims_bronze \
    .join(patients, "patient_id", "left") \
    .join(providers, "provider_id", "left")

# Feature Engineering
claims_silver = claims_enriched.withColumn(
    "claim_year", F.year(F.col("claim_date"))
).withColumn(
    "claim_month_num", F.month(F.col("claim_date"))
).withColumn(
    "claim_day_of_week", F.dayofweek(F.col("claim_date"))
).withColumn(
    "amount_log", F.log1p(F.col("billed_amount"))
)

# Calculate provider-level statistics
provider_stats = claims_silver.groupBy("provider_id").agg(
    F.count("claim_id").alias("provider_claim_count"),
    F.avg("billed_amount").alias("provider_avg_amount"),
    F.stddev("billed_amount").alias("provider_stddev_amount"),
    F.sum(F.when(F.col("is_fraud") == 1, 1).otherwise(0)).alias("provider_fraud_count")
)

# Calculate patient-level statistics
patient_stats = claims_silver.groupBy("patient_id").agg(
    F.count("claim_id").alias("patient_claim_count"),
    F.avg("billed_amount").alias("patient_avg_amount")
)

# Join statistics back
claims_silver = claims_silver \
    .join(provider_stats, "provider_id", "left") \
    .join(patient_stats, "patient_id", "left")

# Calculate amount deviation from provider average
claims_silver = claims_silver.withColumn(
    "amount_deviation_pct",
    F.when(F.col("provider_avg_amount") > 0,
           ((F.col("billed_amount") - F.col("provider_avg_amount")) / F.col("provider_avg_amount")) * 100
    ).otherwise(0)
)

# Save Silver Table
claims_silver.write.format("delta").mode("overwrite").saveAsTable("fraud_detection.claims_silver")

print(f"✅ Silver table created with {claims_silver.count()} enriched records")
print("\nSample enriched data:")
claims_silver.select(
    "claim_id", "billed_amount", "specialty", "provider_claim_count", 
    "amount_deviation_pct", "is_fraud"
).show(5)

Loading Bronze Tables...

=== SILVER LAYER: DATA ENRICHMENT ===
✅ Silver table created with 5000 enriched records

Sample enriched data:
+--------+-------------+---------+--------------------+--------------------+--------+
|claim_id|billed_amount|specialty|provider_claim_count|amount_deviation_pct|is_fraud|
+--------+-------------+---------+--------------------+--------------------+--------+
| C003001|      3690.81|Neurology|                 106|   -9.42524532166707|       0|
| C003002|     21543.58| Oncology|                 116|  161.71920854175633|       0|
| C003003|      2001.87|Neurology|                 106| -27.771445150972557|       0|
| C003004|       3499.5|Neurology|                 100|  12.630412065097197|       0|
| C003005|      3286.98|Neurology|                 103|   5.389043396259714|       0|
+--------+-------------+---------+--------------------+--------------------+--------+
only showing top 5 rows


In [0]:
# Cell 3: ML-Based Fraud Detection (Gold Layer)

print("\n=== GOLD LAYER: ML FRAUD DETECTION ===")

# Load silver data
claims_silver_df = spark.table("fraud_detection.claims_silver").toPandas()

print(f"Total claims: {len(claims_silver_df)}")
print(f"Known fraud cases: {claims_silver_df['is_fraud'].sum()}")

# Feature preparation
feature_columns = [
    'billed_amount', 'amount_log', 'claim_month_num', 'claim_day_of_week',
    'provider_claim_count', 'provider_avg_amount', 'provider_fraud_count',
    'patient_claim_count', 'amount_deviation_pct'
]

# Handle missing values
claims_silver_df[feature_columns] = claims_silver_df[feature_columns].fillna(0)

# Encode categorical features
le_specialty = LabelEncoder()
le_procedure = LabelEncoder()

claims_silver_df['specialty_encoded'] = le_specialty.fit_transform(claims_silver_df['specialty'].fillna('Unknown'))
claims_silver_df['procedure_encoded'] = le_procedure.fit_transform(claims_silver_df['procedure_code'].fillna('Unknown'))

# Final feature set
ml_features = feature_columns + ['specialty_encoded', 'procedure_encoded']
X = claims_silver_df[ml_features]
y = claims_silver_df['is_fraud']

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

print(f"\nTraining set: {len(X_train)} claims")
print(f"Test set: {len(X_test)} claims")

# Train Random Forest Classifier
print("\n🤖 Training Random Forest Classifier...")
rf_model = RandomForestClassifier(
    n_estimators=100,
    max_depth=10,
    min_samples_split=20,
    class_weight='balanced',
    random_state=42,
    n_jobs=-1
)
rf_model.fit(X_train, y_train)

# Predictions
y_pred = rf_model.predict(X_test)
y_pred_proba = rf_model.predict_proba(X_test)[:, 1]

print("\n=== MODEL PERFORMANCE ===")
print(classification_report(y_test, y_pred, target_names=['Normal', 'Fraud']))

# Feature importance
feature_importance = pd.DataFrame({
    'feature': ml_features,
    'importance': rf_model.feature_importances_
}).sort_values('importance', ascending=False)

print("\n=== TOP 10 IMPORTANT FEATURES ===")
print(feature_importance.head(10))

# Add predictions to full dataset
claims_silver_df['fraud_score'] = rf_model.predict_proba(X)[:, 1]
claims_silver_df['ml_prediction'] = rf_model.predict(X)

# Anomaly Detection using Isolation Forest (unsupervised)
print("\n🔍 Running Isolation Forest for Anomaly Detection...")
iso_forest = IsolationForest(contamination=0.08, random_state=42)
claims_silver_df['anomaly_score'] = iso_forest.fit_predict(X)
claims_silver_df['is_anomaly'] = (claims_silver_df['anomaly_score'] == -1).astype(int)

# Combined fraud flag
claims_silver_df['final_fraud_flag'] = (
    (claims_silver_df['is_fraud'] == 1) | 
    (claims_silver_df['ml_prediction'] == 1) | 
    (claims_silver_df['is_anomaly'] == 1)
).astype(int)

# Generate fraud explanation
def generate_fraud_explanation(row):
    reasons = []
    
    if row['fraud_type'] is not None:
        reasons.append(f"Known fraud: {row['fraud_reason']}")
    
    if row['ml_prediction'] == 1:
        reasons.append(f"ML detected (confidence: {row['fraud_score']:.2%})")
    
    if row['is_anomaly'] == 1:
        reasons.append("Statistical anomaly detected")
    
    if abs(row['amount_deviation_pct']) > 200:
        reasons.append(f"Amount {row['amount_deviation_pct']:.0f}% deviation from provider average")
    
    if row['provider_fraud_count'] > 5:
        reasons.append(f"Provider has {row['provider_fraud_count']} known fraud cases")
    
    return " | ".join(reasons) if reasons else "No fraud detected"

claims_silver_df['fraud_explanation'] = claims_silver_df.apply(generate_fraud_explanation, axis=1)

# Select columns for Gold table
gold_columns = [
    'claim_id', 'patient_id', 'provider_id', 'claim_date', 'procedure_code',
    'diagnosis_code', 'billed_amount', 'specialty', 'patient_name', 'provider_name',
    'is_fraud', 'fraud_type', 'ml_prediction', 'fraud_score', 'is_anomaly',
    'final_fraud_flag', 'fraud_explanation', 'claim_month', 'amount_deviation_pct',
    'provider_fraud_count'
]

claims_gold_df = claims_silver_df[gold_columns]

# Save to Gold table
claims_gold_spark = spark.createDataFrame(claims_gold_df)
claims_gold_spark.write.format("delta").mode("overwrite").saveAsTable("fraud_detection.claims_gold")

print(f"\n✅ Gold table created with fraud analysis")
print(f"Total suspicious claims: {claims_gold_df['final_fraud_flag'].sum()}")
print(f"Detection rate: {claims_gold_df['final_fraud_flag'].sum() / len(claims_gold_df) * 100:.1f}%")

# Show sample suspicious claims
print("\n=== SAMPLE SUSPICIOUS CLAIMS ===")
suspicious = claims_gold_df[claims_gold_df['final_fraud_flag'] == 1].head(10)
print(suspicious[['claim_id', 'specialty', 'billed_amount', 'fraud_score', 'fraud_explanation']])


=== GOLD LAYER: ML FRAUD DETECTION ===
Total claims: 5000
Known fraud cases: 400

Training set: 3500 claims
Test set: 1500 claims

🤖 Training Random Forest Classifier...

=== MODEL PERFORMANCE ===
              precision    recall  f1-score   support

      Normal       0.96      0.98      0.97      1380
       Fraud       0.77      0.57      0.66       120

    accuracy                           0.95      1500
   macro avg       0.87      0.78      0.82      1500
weighted avg       0.95      0.95      0.95      1500


=== TOP 10 IMPORTANT FEATURES ===
                 feature  importance
8   amount_deviation_pct    0.284977
1             amount_log    0.147584
0          billed_amount    0.140629
10     procedure_encoded    0.104953
6   provider_fraud_count    0.074563
5    provider_avg_amount    0.063078
7    patient_claim_count    0.048826
4   provider_claim_count    0.039367
2        claim_month_num    0.037378
9      specialty_encoded    0.034715

🔍 Running Isolation Forest for A