In [0]:
# ===============================================================
# 🚀 TELECOM SIMULATION FRAUD DETECTION PIPELINE - End-to-End (Managed Table)
# Input: workspace.default.fraud_data
# Database / Schema: sim_detection (auto-created if missing)
# ===============================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, count, avg, sum, round, lit, expr, concat_ws, to_timestamp
)

# -----------------------------
# 1️⃣ SETUP SPARK SESSION
# -----------------------------
spark = (
    SparkSession.builder
    .appName("SimFraudDetectionPipeline")
    .enableHiveSupport()
    .getOrCreate()
)

# -----------------------------
# 2️⃣ INGESTION - Read Managed Table (user-provided)
# -----------------------------
df = spark.table("workspace.default.fraud_data")
print("✅ Data Ingested Successfully (workspace.default.fraud_data)")
df.printSchema()
df.show(5, truncate=False)

# Ensure event_time is proper timestamp (if stored as string)
if "event_time" in df.columns:
    df = df.withColumn("event_time", to_timestamp(col("event_time")))

# -----------------------------
# 3️⃣ CREATE DATABASE / SCHEMA (sim_detection)
# -----------------------------
spark.sql("CREATE DATABASE IF NOT EXISTS sim_detection")
spark.sql("USE sim_detection")
print("✅ Using schema: sim_detection")

# -----------------------------
# 4️⃣ BRONZE / SILVER / GOLD TABLES
# -----------------------------
# Bronze - raw/ingested data (keep original as-is)
df.write.mode("overwrite").format("delta").saveAsTable("sim_detection.bronze")
print("✅ Bronze table written: sim_detection.bronze")

# Silver - clean data: drop rows with null critical fields and deduplicate
critical_cols = ["event_id", "event_time", "subscriber_id", "sim_serial"]
df_silver = df.dropDuplicates().na.drop(subset=[c for c in critical_cols if c in df.columns])
df_silver.write.mode("overwrite").format("delta").saveAsTable("sim_detection.silver")
print("✅ Silver table written: sim_detection.silver")

# Gold - feature engineering / derived columns
df_gold = df_silver

# create derived key if not present
if "key" not in df_gold.columns and {"subscriber_id", "sim_serial"}.issubset(set(df_gold.columns)):
    df_gold = df_gold.withColumn("key", concat_ws("|", col("subscriber_id"), col("sim_serial")))

# example engineered flags (tweak thresholds as needed)
if "fraud_alert_score" in df_gold.columns:
    df_gold = df_gold.withColumn(
        "high_fraud_alert_flag",
        when(col("fraud_alert_score") >= 0.7, lit(1)).otherwise(lit(0))
    )
else:
    df_gold = df_gold.withColumn("high_fraud_alert_flag", lit(0))

if "recent_sim_swaps_30d" in df_gold.columns:
    df_gold = df_gold.withColumn(
        "recent_sim_swap_flag",
        when(col("recent_sim_swaps_30d") >= 1, lit(1)).otherwise(lit(0))
    )
else:
    df_gold = df_gold.withColumn("recent_sim_swap_flag", lit(0))

if "failed_verification_attempts" in df_gold.columns:
    df_gold = df_gold.withColumn(
        "many_failed_verifications_flag",
        when(col("failed_verification_attempts") >= 3, lit(1)).otherwise(lit(0))
    )
else:
    df_gold = df_gold.withColumn("many_failed_verifications_flag", lit(0))

df_gold.write.mode("overwrite").format("delta").saveAsTable("sim_detection.gold")
print("✅ Gold table written: sim_detection.gold")

# -----------------------------
# 5️⃣ PROCESSING & KPIs
# -----------------------------
# KPI 1: Count by city
if "city" in df_gold.columns:
    kpi_city = df_gold.groupBy("city").agg(count("*").alias("total_events")).orderBy(col("total_events").desc())
else:
    kpi_city = spark.createDataFrame([], schema="city string, total_events long")

# KPI 2: Count by KYC status
if "kyc_status" in df_gold.columns:
    kpi_kyc = df_gold.groupBy("kyc_status").agg(count("*").alias("count_by_kyc"))
else:
    kpi_kyc = spark.createDataFrame([], schema="kyc_status string, count_by_kyc long")

# KPI 3: Average fraud_alert_score by city
if "fraud_alert_score" in df_gold.columns and "city" in df_gold.columns:
    kpi_avg_fraud = df_gold.groupBy("city").agg(round(avg("fraud_alert_score"), 4).alias("avg_fraud_alert_score"))
else:
    kpi_avg_fraud = spark.createDataFrame([], schema="city string, avg_fraud_alert_score double")

# KPI 4: Suspicious transactions count (suspicious_transaction_flag)
if "suspicious_transaction_flag" in df_gold.columns:
    kpi_suspicious = df_gold.groupBy().agg(
        count("*").alias("total_events"),
        sum(expr("CASE WHEN suspicious_transaction_flag THEN 1 ELSE 0 END")).alias("suspicious_count")
    ).withColumn("suspicious_percentage", round(col("suspicious_count") / col("total_events") * 100, 2))
else:
    kpi_suspicious = spark.createDataFrame([(0, 0, 0.0)], schema="total_events long, suspicious_count long, suspicious_percentage double")

# KPI 5: High risk country percentage
if "high_risk_country_flag" in df_gold.columns:
    kpi_high_risk = df_gold.groupBy().agg(
        count("*").alias("total_events"),
        sum(expr("CASE WHEN high_risk_country_flag THEN 1 ELSE 0 END")).alias("high_risk_count")
    ).withColumn("high_risk_percentage", round(col("high_risk_count") / col("total_events") * 100, 2))
else:
    kpi_high_risk = spark.createDataFrame([(0, 0, 0.0)], schema="total_events long, high_risk_count long, high_risk_percentage double")

print("✅ KPIs computed")

# -----------------------------
# 6️⃣ SAVE KPI RESULTS AS MANAGED TABLES (under sim_detection)
# -----------------------------
kpi_city.write.mode("overwrite").saveAsTable("sim_detection.kpi_city_counts")
kpi_kyc.write.mode("overwrite").saveAsTable("sim_detection.kpi_kyc_counts")
kpi_avg_fraud.write.mode("overwrite").saveAsTable("sim_detection.kpi_avg_fraud_by_city")
kpi_suspicious.write.mode("overwrite").saveAsTable("sim_detection.kpi_suspicious_summary")
kpi_high_risk.write.mode("overwrite").saveAsTable("sim_detection.kpi_high_risk_summary")

print("✅ KPI tables saved under sim_detection.*")

# -----------------------------
# 7️⃣ BUSINESS INSIGHTS
# -----------------------------
print("✅ Business Insights:")

# Top city by events
try:
    top_city_row = kpi_city.limit(1).collect()
    if top_city_row:
        top_city = top_city_row[0]
        print(f"Top City: {top_city['city']} with {top_city['total_events']} events")
except Exception:
    pass

# City with highest average fraud alert score
try:
    top_fraud_city_row = kpi_avg_fraud.orderBy(col("avg_fraud_alert_score").desc()).limit(1).collect()
    if top_fraud_city_row:
        tfc = top_fraud_city_row[0]
        print(f"City with highest average fraud alert score: {tfc['city']} (avg score = {tfc['avg_fraud_alert_score']})")
except Exception:
    pass

# Suspicious percentage overall
try:
    susp_row = kpi_suspicious.collect()[0]
    print(f"Suspicious events: {susp_row['suspicious_count']} / {susp_row['total_events']} ({susp_row['suspicious_percentage']}%)")
except Exception:
    pass

# High risk country percentage overall
try:
    hr_row = kpi_high_risk.collect()[0]
    print(f"High-risk country flagged events: {hr_row['high_risk_count']} / {hr_row['total_events']} ({hr_row['high_risk_percentage']}%)")
except Exception:
    pass

# Overall average fraud_alert_score across dataset
if "fraud_alert_score" in df_gold.columns:
    overall_avg_fraud = df_gold.agg(round(avg("fraud_alert_score"), 4).alias("overall_avg")).collect()[0]["overall_avg"]
    print(f"Average Fraud Alert Score across all events: {overall_avg_fraud}")

# Fraud stats summary (total, fraud-like events as high_fraud_alert_flag)
fraud_stats = (
    df_gold
    .select(
        count("*").alias("total_events"),
        sum(col("high_fraud_alert_flag")).alias("high_alert_events")
    )
    .withColumn("high_alert_percentage", round(col("high_alert_events") / col("total_events") * 100, 2))
)
fraud_stats.show(truncate=False)

# -----------------------------
# 8️⃣ VISUALIZATION PREPARATION
# -----------------------------
# Export a KPI table for BI tools
kpi_city.write.mode("overwrite").saveAsTable("sim_detection.kpi_export_city")
print("✅ KPI data ready for visualization tools (sim_detection.kpi_export_city)")

# -----------------------------
# 🎯 PIPELINE COMPLETE
# -----------------------------
print("🎯 End-to-End Simulated Telecom Fraud Detection Pipeline Completed Successfully")


✅ Data Ingested Successfully (workspace.default.fraud_data)
root
 |-- event_id: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- city: string (nullable = true)
 |-- subscriber_id: string (nullable = true)
 |-- sim_serial: string (nullable = true)
 |-- device_imei: string (nullable = true)
 |-- activation_channel: string (nullable = true)
 |-- kyc_status: string (nullable = true)
 |-- identity_document_type: string (nullable = true)
 |-- identity_mismatch_flag: boolean (nullable = true)
 |-- location_variance_km: double (nullable = true)
 |-- failed_verification_attempts: integer (nullable = true)
 |-- recent_sim_swaps_30d: integer (nullable = true)
 |-- account_age_days: integer (nullable = true)
 |-- high_risk_country_flag: boolean (nullable = true)
 |-- topup_amount_last_24h: double (nullable = true)
 |-- suspicious_transaction_flag: boolean (nullable = true)
 |-- fraud_alert_score: double (nullable = true)
 |-- travel_pattern_score: double (nullable = true)

In [0]:
# Display all important tables in a single cell (one after the other)

tables = [
    "sim_detection.bronze",
    "sim_detection.silver",
    "sim_detection.gold",
    "sim_detection.kpi_city_counts",
    "sim_detection.kpi_avg_fraud_by_city",
    "sim_detection.kpi_suspicious_summary",
    "sim_detection.kpi_high_risk_summary"
]

for t in tables:
    print(f"\n===== Showing: {t} =====\n")
    df = spark.table(t).limit(10)
    display(df)



===== Showing: sim_detection.bronze =====



event_id,event_time,city,subscriber_id,sim_serial,device_imei,activation_channel,kyc_status,identity_document_type,identity_mismatch_flag,location_variance_km,failed_verification_attempts,recent_sim_swaps_30d,account_age_days,high_risk_country_flag,topup_amount_last_24h,suspicious_transaction_flag,fraud_alert_score,travel_pattern_score,blacklist_match_flag,escalated_to_fraud_team,key
264860c9-2d2f-46a6-adbb-35ee244d630d,2025-01-25T04:12:38.785Z,Bengaluru,SUB-44586820,SIM-179761784122699,IMEI-375820637648073,Retail,Verified,National ID,False,1.7522113992049366,0,0,2358,False,91.33797203447774,False,0.5965045607842397,0.5206090038155781,False,False,SUB-44586820|SIM-179761784122699
28d22734-eff8-4176-bbc1-20d39346fdca,2025-01-16T15:15:39.437Z,Delhi,SUB-52030257,SIM-018362428755129,IMEI-936298954193378,Retail,Verified,National ID,False,1.4618678967293075,5,2,1096,False,8.82473601213473,False,0.3550614469278791,0.4332003138771612,False,False,SUB-52030257|SIM-018362428755129
272f9269-593c-42b6-9db1-5bb1e248dba5,2025-01-29T00:14:24.045Z,Chennai,SUB-21992727,SIM-072333343985685,IMEI-162112162126165,Retail,Verified,National ID,False,2.334414717261056,1,2,688,False,3.856922363733377,False,0.5147191402061156,0.5001225106774085,False,False,SUB-21992727|SIM-072333343985685
8ad96c83-ba4c-468b-bb82-120a7257f963,2025-01-20T04:06:02.161Z,Bengaluru,SUB-26776348,SIM-218605319134711,IMEI-095813122267511,Retail,Verified,Driver License,False,2.3974215146233147,3,0,3288,False,51.285650291176175,False,0.6912696439954293,0.4607614901799043,False,False,SUB-26776348|SIM-218605319134711
8e536be3-4b10-4645-ab07-e6700aec3d59,2025-02-28T19:51:30.285Z,Delhi,SUB-35632018,SIM-964143599638861,IMEI-077865044878489,Retail,Verified,Driver License,False,29.801462972645496,2,0,1582,False,41.64016630751814,True,0.1415517577964142,0.8915326292942939,False,False,SUB-35632018|SIM-964143599638861
dcb28b61-a728-4c34-8ce4-16bda64f3173,2025-01-20T02:04:43.885Z,Bengaluru,SUB-22551312,SIM-478482367601474,IMEI-559355121501655,Retail,Verified,National ID,False,1.3477353199557114,1,2,3476,False,15.11198175213041,False,0.1538477427752227,0.9296105147274588,False,False,SUB-22551312|SIM-478482367601474
1afd9be5-75b2-4b64-bbab-4b9acfc2a34a,2025-01-20T03:54:22.063Z,Bengaluru,SUB-58473810,SIM-209336393196114,IMEI-755421536391283,Retail,Verified,National ID,False,2.3120293764884448,4,0,2998,False,9.121308528874035,False,0.3905862341569189,0.5085200580187177,False,False,SUB-58473810|SIM-209336393196114
5b8f83e5-7846-43ca-8455-f5f170eb1653,2025-01-25T20:29:36.177Z,Delhi,SUB-11583877,SIM-164841266635045,IMEI-744563508052454,Retail,Verified,Passport,False,0.6341398770354794,0,0,1929,False,7.375431973913924,False,0.8848914400092779,0.6408138339191507,False,False,SUB-11583877|SIM-164841266635045
cf16a279-41ce-4fac-96f1-69d120aa96a9,2025-01-25T17:46:52.513Z,Chennai,SUB-47049001,SIM-728009854024476,IMEI-810217141360674,Retail,Verified,National ID,False,4.332670869223251,2,1,1627,False,8.605111291867159,False,0.1013948266986782,0.9142832836031328,False,False,SUB-47049001|SIM-728009854024476
3d023d86-7ab9-4e0e-a5fe-c274aa275209,2025-02-21T00:21:32.652Z,Hyderabad,SUB-85629030,SIM-740783000925574,IMEI-264561209725957,Call Center,Pending,National ID,False,2.157068849057223,0,2,2511,False,1.5266793977746125,False,0.0535602542617155,0.7143126442134037,False,False,SUB-85629030|SIM-740783000925574



===== Showing: sim_detection.silver =====



event_id,event_time,city,subscriber_id,sim_serial,device_imei,activation_channel,kyc_status,identity_document_type,identity_mismatch_flag,location_variance_km,failed_verification_attempts,recent_sim_swaps_30d,account_age_days,high_risk_country_flag,topup_amount_last_24h,suspicious_transaction_flag,fraud_alert_score,travel_pattern_score,blacklist_match_flag,escalated_to_fraud_team,key
9cd424e7-9f76-47ff-bf4f-4792dfd67d4c,2025-01-21T22:46:39.470Z,Hyderabad,SUB-93885844,SIM-188262137418499,IMEI-357227697300543,Online,Verified,Passport,False,5.967494081879707,0,0,3467,False,115.79477149243756,False,0.1330686907865594,0.8902204621980692,False,False,SUB-93885844|SIM-188262137418499
b285765e-5077-45e6-8af8-55e5b32be38b,2025-01-06T07:55:33.425Z,Mumbai,SUB-47269321,SIM-068438383236590,IMEI-833253840448928,Call Center,Verified,National ID,False,4.993648879705308,0,1,2739,False,10.142157785654526,False,0.6730549345844213,0.0957333290250043,False,False,SUB-47269321|SIM-068438383236590
a5dbab30-dc17-4a5c-ac24-f2267a5361b7,2025-01-26T09:32:29.648Z,Chennai,SUB-73971245,SIM-193122386341361,IMEI-618539206262913,Online,Verified,Driver License,False,4.545890243414115,0,3,660,False,30.89977479177045,False,0.9931145247345728,0.1636600882120271,False,False,SUB-73971245|SIM-193122386341361
a0e93f3f-3921-4e18-b6e3-28cb36af5144,2025-01-24T01:16:20.027Z,Bengaluru,SUB-64155290,SIM-734943405568631,IMEI-180215207707254,Retail,Verified,Driver License,False,2.469830383601289,0,0,1846,False,24.9920899709159,False,0.9560431109149438,0.2017996092189877,False,False,SUB-64155290|SIM-734943405568631
720d6919-6af5-4e07-bf75-d862c6b2eac6,2025-01-07T00:15:33.454Z,Chennai,SUB-60088511,SIM-444228905547664,IMEI-319870895760135,Retail,Verified,National ID,False,3.6309313955822855,0,1,1813,False,18.222938334694685,False,0.8967713852364857,0.202094727497831,False,False,SUB-60088511|SIM-444228905547664
1fa2aab3-45f1-4c7a-a6dd-2fd3011b5a28,2025-02-13T07:55:48.775Z,Mumbai,SUB-75420201,SIM-900265736555835,IMEI-571869052152941,Retail,Verified,Driver License,False,5.793916416838977,0,0,499,False,3.164832009094706,False,0.0138620038059061,0.910094293357502,False,False,SUB-75420201|SIM-900265736555835
3a57ee0a-929c-4299-ad1e-77cbd2bd65f8,2025-02-12T06:59:58.245Z,Hyderabad,SUB-05605783,SIM-494510489305358,IMEI-226510349382861,Call Center,Verified,National ID,False,7.558378331167349,0,1,2697,False,4.1428859544711845,False,0.2472780991114355,0.3286724698394208,False,False,SUB-05605783|SIM-494510489305358
2307c7fa-4d33-4f4d-9cc4-af7fe0475a38,2025-01-12T20:09:45.131Z,Hyderabad,SUB-83602936,SIM-609956716396842,IMEI-038697894756325,Retail,Verified,Driver License,False,1.184860362902835,0,0,2025,False,7.460800905874767,False,0.3932168990637372,0.4902720184289985,False,False,SUB-83602936|SIM-609956716396842
5c22eb72-7efa-4b5e-b5c9-c1a9f814441a,2025-02-13T05:22:09.479Z,Mumbai,SUB-44052720,SIM-657504623254414,IMEI-832525879454645,Online,Verified,National ID,False,1.5423603277490257,2,2,2101,False,23.81182318450461,False,0.4631964709523835,0.7195789734409006,False,False,SUB-44052720|SIM-657504623254414
aa726502-2349-4c2a-9255-91a5cad7662b,2025-02-11T10:12:52.102Z,Mumbai,SUB-15401948,SIM-907398563679218,IMEI-341639481516543,Retail,Verified,National ID,False,2.784364536264868,0,1,1571,False,12.834176315308294,False,0.6563443818019027,0.4859352241865075,False,False,SUB-15401948|SIM-907398563679218



===== Showing: sim_detection.gold =====



event_id,event_time,city,subscriber_id,sim_serial,device_imei,activation_channel,kyc_status,identity_document_type,identity_mismatch_flag,location_variance_km,failed_verification_attempts,recent_sim_swaps_30d,account_age_days,high_risk_country_flag,topup_amount_last_24h,suspicious_transaction_flag,fraud_alert_score,travel_pattern_score,blacklist_match_flag,escalated_to_fraud_team,key,high_fraud_alert_flag,recent_sim_swap_flag,many_failed_verifications_flag
9cd424e7-9f76-47ff-bf4f-4792dfd67d4c,2025-01-21T22:46:39.470Z,Hyderabad,SUB-93885844,SIM-188262137418499,IMEI-357227697300543,Online,Verified,Passport,False,5.967494081879707,0,0,3467,False,115.79477149243756,False,0.1330686907865594,0.8902204621980692,False,False,SUB-93885844|SIM-188262137418499,0,0,0
b285765e-5077-45e6-8af8-55e5b32be38b,2025-01-06T07:55:33.425Z,Mumbai,SUB-47269321,SIM-068438383236590,IMEI-833253840448928,Call Center,Verified,National ID,False,4.993648879705308,0,1,2739,False,10.142157785654526,False,0.6730549345844213,0.0957333290250043,False,False,SUB-47269321|SIM-068438383236590,0,1,0
a5dbab30-dc17-4a5c-ac24-f2267a5361b7,2025-01-26T09:32:29.648Z,Chennai,SUB-73971245,SIM-193122386341361,IMEI-618539206262913,Online,Verified,Driver License,False,4.545890243414115,0,3,660,False,30.89977479177045,False,0.9931145247345728,0.1636600882120271,False,False,SUB-73971245|SIM-193122386341361,1,1,0
a0e93f3f-3921-4e18-b6e3-28cb36af5144,2025-01-24T01:16:20.027Z,Bengaluru,SUB-64155290,SIM-734943405568631,IMEI-180215207707254,Retail,Verified,Driver License,False,2.469830383601289,0,0,1846,False,24.9920899709159,False,0.9560431109149438,0.2017996092189877,False,False,SUB-64155290|SIM-734943405568631,1,0,0
720d6919-6af5-4e07-bf75-d862c6b2eac6,2025-01-07T00:15:33.454Z,Chennai,SUB-60088511,SIM-444228905547664,IMEI-319870895760135,Retail,Verified,National ID,False,3.6309313955822855,0,1,1813,False,18.222938334694685,False,0.8967713852364857,0.202094727497831,False,False,SUB-60088511|SIM-444228905547664,1,1,0
1fa2aab3-45f1-4c7a-a6dd-2fd3011b5a28,2025-02-13T07:55:48.775Z,Mumbai,SUB-75420201,SIM-900265736555835,IMEI-571869052152941,Retail,Verified,Driver License,False,5.793916416838977,0,0,499,False,3.164832009094706,False,0.0138620038059061,0.910094293357502,False,False,SUB-75420201|SIM-900265736555835,0,0,0
3a57ee0a-929c-4299-ad1e-77cbd2bd65f8,2025-02-12T06:59:58.245Z,Hyderabad,SUB-05605783,SIM-494510489305358,IMEI-226510349382861,Call Center,Verified,National ID,False,7.558378331167349,0,1,2697,False,4.1428859544711845,False,0.2472780991114355,0.3286724698394208,False,False,SUB-05605783|SIM-494510489305358,0,1,0
2307c7fa-4d33-4f4d-9cc4-af7fe0475a38,2025-01-12T20:09:45.131Z,Hyderabad,SUB-83602936,SIM-609956716396842,IMEI-038697894756325,Retail,Verified,Driver License,False,1.184860362902835,0,0,2025,False,7.460800905874767,False,0.3932168990637372,0.4902720184289985,False,False,SUB-83602936|SIM-609956716396842,0,0,0
5c22eb72-7efa-4b5e-b5c9-c1a9f814441a,2025-02-13T05:22:09.479Z,Mumbai,SUB-44052720,SIM-657504623254414,IMEI-832525879454645,Online,Verified,National ID,False,1.5423603277490257,2,2,2101,False,23.81182318450461,False,0.4631964709523835,0.7195789734409006,False,False,SUB-44052720|SIM-657504623254414,0,1,0
aa726502-2349-4c2a-9255-91a5cad7662b,2025-02-11T10:12:52.102Z,Mumbai,SUB-15401948,SIM-907398563679218,IMEI-341639481516543,Retail,Verified,National ID,False,2.784364536264868,0,1,1571,False,12.834176315308294,False,0.6563443818019027,0.4859352241865075,False,False,SUB-15401948|SIM-907398563679218,0,1,0



===== Showing: sim_detection.kpi_city_counts =====



city,total_events
Bengaluru,150630
Chennai,100454
Delhi,99843
Mumbai,99631
Hyderabad,49442



===== Showing: sim_detection.kpi_avg_fraud_by_city =====



city,avg_fraud_alert_score
Chennai,0.5008
Delhi,0.5002
Mumbai,0.4995
Bengaluru,0.5006
Hyderabad,0.4981



===== Showing: sim_detection.kpi_suspicious_summary =====



total_events,suspicious_count,suspicious_percentage
500000,25045,5.01



===== Showing: sim_detection.kpi_high_risk_summary =====



total_events,high_risk_count,high_risk_percentage
500000,30127,6.03
