In [1]:
import os
import sys
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@17'
os.environ['SPARK_DRIVER_MEMORY'] = '4g'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, asc, col
from pyspark.sql.functions import from_json, col, regexp_replace, explode, expr
from pyspark.sql.types import ArrayType, StringType

In [3]:

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PatternAlarm-ETL") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/08 06:03:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# ETL Part

In [5]:
df_transactions = spark.read.json("../data/raw/transactions_samples.jsonl")

                                                                                

In [6]:
df_frauds_labels = spark.read.csv(
    "../data/raw/frauds_labels_samples.csv",
    header=True,
    inferSchema=True,
    escape='"',
    quote='"', 
    multiLine=True   
)

In [7]:
df_transactions.head()

25/11/08 06:03:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Row(account_from=None, account_to=None, amount=149.84, billing_address=None, cart_items='[]', country_from=None, country_to=None, currency='EUR', customer_id='A100017', device_fingerprint=None, device_id='ZNHOU4FG7Z8A', domain='gaming', game_id='LOL2Electric', ip_address='45.142.124.124', is_fraud=True, item_name='Premium_weapon_720', item_type='weapon', pattern='fraud_account_takeover', payment_method='steam_wallet', player_id='A100017', purpose=None, session_duration_sec=None, session_length_sec=95, shipping_address=None, timestamp='2025-08-10T22:11:51.290468', timestamp_iso='2025-08-10T22:11:51.290468', transaction_id='txn_017491', transfer_type=None, user_id=None)

In [8]:
df_frauds_labels.head()

Row(alert_id=1, transaction_id='txn_017491', domain='gaming', actor_id='A100017', amount=149.84, transaction_count=1, timestamp=datetime.datetime(2025, 8, 10, 22, 11, 51, 290468), ip_address='45.142.124.124', pattern='fraud_account_takeover', fraud_label=True, fraud_type='fraud_account_takeover', confidence=0.91, label_source='rule_based', label_timestamp=datetime.datetime(2025, 8, 10, 22, 11, 51, 290468), related_transaction_ids='["txn_017491"]')

In [9]:
df_frauds_labels.select("related_transaction_ids").show(3, truncate=False)

+-----------------------+
|related_transaction_ids|
+-----------------------+
|["txn_017491"]         |
|["txn_017764"]         |
|["txn_017714"]         |
+-----------------------+
only showing top 3 rows


In [10]:
df_frauds_parsed = df_frauds_labels.withColumn(
    "related_ids_parsed",
    from_json(col("related_transaction_ids"), ArrayType(StringType()))
)


In [11]:
df_frauds_parsed.select("related_transaction_ids", "related_ids_parsed").show(3, truncate=False)

+-----------------------+------------------+
|related_transaction_ids|related_ids_parsed|
+-----------------------+------------------+
|["txn_017491"]         |[txn_017491]      |
|["txn_017764"]         |[txn_017764]      |
|["txn_017714"]         |[txn_017714]      |
+-----------------------+------------------+
only showing top 3 rows


In [12]:
df_frauds_exploded = df_frauds_parsed.withColumn(
    "related_txn_id",
    explode(col("related_ids_parsed"))
)

In [13]:
df_frauds_exploded.select("alert_id", "related_txn_id").show(5, truncate=False)

+--------+--------------+
|alert_id|related_txn_id|
+--------+--------------+
|1       |txn_017491    |
|2       |txn_017764    |
|3       |txn_017714    |
|4       |txn_017702    |
|5       |txn_118300    |
+--------+--------------+
only showing top 5 rows


In [14]:
df_frauds_renamed = df_frauds_exploded.select(
    col("alert_id"),
    col("transaction_id").alias("fraud_alert_txn_id"),  # Colonne pivot du fraud
    col("domain").alias("fraud_domain"),
    col("actor_id").alias("fraud_actor_id"),
    col("amount").alias("fraud_total_amount"),  # ‚úÖ Montant agr√©g√©
    col("transaction_count").alias("fraud_txn_count"),
    col("timestamp").alias("fraud_first_seen"),
    col("ip_address").alias("fraud_ip"),
    col("pattern").alias("fraud_pattern"),
    col("fraud_label"),
    col("fraud_type"),
    col("confidence"),
    col("label_source"),
    col("label_timestamp"),
    col("related_txn_id")  # ‚úÖ Cl√© de join
)

In [15]:
df_join = df_frauds_renamed.join(
    df_transactions,
    df_frauds_exploded.related_txn_id == df_transactions.transaction_id,
    "inner"
)

In [16]:
print(f"Frauds: {df_frauds_labels.count()}")
print(f"Frauds exploded: {df_frauds_exploded.count()}")
print(f"Transactions: {df_transactions.count()}")
print(f"Join result: {df_join.count()}")


Frauds: 194826


                                                                                

Frauds exploded: 200000


                                                                                

Transactions: 200000
Join result: 200000


                                                                                

In [18]:
df_join.write.mode("overwrite").partitionBy("domain").parquet("../data/processed/training_data.parquet")

                                                                                

In [19]:
# ML Part - Random Forest

In [20]:
df_training = spark.read.parquet("../data/processed/training_data.parquet")

In [21]:
df_training.groupBy("domain").count().show()
df_training.groupBy("fraud_pattern").count().show()

+---------+-----+
|   domain|count|
+---------+-----+
|ecommerce|65000|
|   gaming|70000|
|  fintech|65000|
+---------+-----+

+--------------------+-----+
|       fraud_pattern|count|
+--------------------+-----+
|fraud_friendly_fraud| 1725|
|regular_window_sh...|11849|
|  fraud_card_testing| 4054|
|     regular_shopper|44927|
|   fraud_promo_abuse| 2445|
|  fraud_gold_farming| 3596|
|fraud_chargeback_...| 1220|
|fraud_account_tak...| 4034|
|     regular_grinder|13109|
|regular_casual_pl...|32588|
|regular_whale_spe...|15453|
|       regular_saver|29914|
|fraud_synthetic_i...| 1450|
|  regular_bill_payer|28954|
|   fraud_structuring| 1949|
|fraud_money_laund...| 2733|
+--------------------+-----+



In [22]:
df_training.rdd.getNumPartitions()

9

In [23]:
df_training.printSchema()

root
 |-- alert_id: integer (nullable = true)
 |-- fraud_alert_txn_id: string (nullable = true)
 |-- fraud_domain: string (nullable = true)
 |-- fraud_actor_id: string (nullable = true)
 |-- fraud_total_amount: double (nullable = true)
 |-- fraud_txn_count: integer (nullable = true)
 |-- fraud_first_seen: timestamp (nullable = true)
 |-- fraud_ip: string (nullable = true)
 |-- fraud_pattern: string (nullable = true)
 |-- fraud_label: boolean (nullable = true)
 |-- fraud_type: string (nullable = true)
 |-- confidence: double (nullable = true)
 |-- label_source: string (nullable = true)
 |-- label_timestamp: timestamp (nullable = true)
 |-- related_txn_id: string (nullable = true)
 |-- account_from: string (nullable = true)
 |-- account_to: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- billing_address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- stre

In [24]:
from pyspark.sql.functions import (
    to_timestamp, col, when, coalesce, 
    hour, dayofweek, regexp_replace
)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import ArrayType, StringType
import shutil
from pathlib import Path
from datetime import datetime
import shutil
from pathlib import Path
from datetime import datetime
import mlflow
import mlflow.spark

In [25]:
#1 - Load Data

df_training = spark.read.parquet("../data/processed/training_data.parquet")

print(f"‚úÖ Loaded {df_training.count():,} training samples")
df_training.groupBy("domain").count().show()


‚úÖ Loaded 200,000 training samples
+---------+-----+
|   domain|count|
+---------+-----+
|ecommerce|65000|
|   gaming|70000|
|  fintech|65000|
+---------+-----+



In [26]:
#3 : Feature Engineering

from pyspark.sql.functions import lit

# Constants
HIGH_RISK_COUNTRIES = ['KY', 'PA', 'BZ', 'VG']

# Fix timestamp type
df_fixed = df_training.withColumn(
    "timestamp",
    to_timestamp(col("timestamp"))
)

# Create all features
df_features = df_fixed \
    .withColumn("country_mismatch",
        when(
            col("country_from").isNotNull() & col("country_to").isNotNull(),
            (col("country_from") != col("country_to")).cast("int")
        ).otherwise(0)
    ) \
    .withColumn("hour_of_day", 
        hour(col("fraud_first_seen"))
    ) \
    .withColumn("day_of_week", 
        dayofweek(col("fraud_first_seen"))
    ) \
    .withColumn("is_weekend", 
        (dayofweek(col("fraud_first_seen")).isin([1, 7])).cast("int")
    ) \
    .withColumn("is_near_threshold", 
        ((col("amount") >= 9500) & (col("amount") < 10000)).cast("int")
    ) \
    .withColumn("involves_high_risk_country",
        when(
            col("country_to").isNotNull() | col("country_from").isNotNull(),
            (col("country_to").isin(HIGH_RISK_COUNTRIES) | 
             col("country_from").isin(HIGH_RISK_COUNTRIES)).cast("int")
        ).otherwise(0)
    ) \
    .withColumn("is_rapid_session",
        when(
            col("session_length_sec").isNotNull(),
            (col("session_length_sec") < 120).cast("int")
        ).otherwise(0)
    ) \
    .withColumn("amount_per_txn",
        col("amount") / (col("fraud_txn_count") + 1)
    ) \
    .withColumn("session_efficiency",
        col("amount") / (col("session_length_sec") + 1)
    ) \
    .withColumn("night_rapid_combo",
        (((col("hour_of_day") < 6) | (col("hour_of_day") > 22)) & 
         (col("is_rapid_session") == 1)).cast("int")
    ) \
    .withColumn("fraud_pattern_simplified",  # ‚úÖ NEW - Merge all regulars!
        when(col("fraud_pattern").startswith("regular_"), lit("regular"))
        .otherwise(col("fraud_pattern"))
    )

# Fill ALL NULL values
df_clean = df_features.na.fill({
    "session_length_sec": 0,
    "payment_method": "unknown",
    "hour_of_day": 12,
    "day_of_week": 3,
    "is_weekend": 0,
    "is_near_threshold": 0,
    "involves_high_risk_country": 0,
    "is_rapid_session": 0,
    "amount_per_txn": 0,
    "session_efficiency": 0,
    "night_rapid_combo": 0
})

print("‚úÖ Feature engineering complete (15 features)")
print(f"‚úÖ Classes: {df_clean.select('fraud_pattern_simplified').distinct().count()} (1 regular + 7 fraud types)")

‚úÖ Feature engineering complete (15 features)
‚úÖ Classes: 10 (1 regular + 7 fraud types)


In [27]:
#4: Compute class weights

# Compute class weights
print("\nüìä Computing class weights...")

class_counts = df_clean.groupBy("fraud_pattern").count().collect()
total = sum([row['count'] for row in class_counts])

# Create weight map
weight_map = {row['fraud_pattern']: total / (16 * row['count']) 
              for row in class_counts}

print("Class weights:")
for pattern, weight in sorted(weight_map.items(), key=lambda x: x[1], reverse=True)[:5]:
    print(f"  {pattern}: {weight:.2f}")

# Add weight column
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

@udf(returnType=DoubleType())
def get_weight(pattern):
    return weight_map.get(pattern, 1.0)

df_weighted = df_clean.withColumn("class_weight", get_weight(col("fraud_pattern")))

print("‚úÖ Class weights applied")


üìä Computing class weights...
Class weights:
  fraud_chargeback_fraud: 10.25
  fraud_synthetic_identity: 8.62
  fraud_friendly_fraud: 7.25
  fraud_structuring: 6.41
  fraud_promo_abuse: 5.11
‚úÖ Class weights applied


In [28]:
#5 : Pipeline Setup

# Feature columns (15 features)
feature_cols = [
    "amount",
    "fraud_txn_count", 
    "session_length_sec",
    "country_mismatch",
    "hour_of_day",
    "day_of_week",
    "is_weekend",
    "is_near_threshold",
    "involves_high_risk_country",
    "is_rapid_session",
    "amount_per_txn",       
    "session_efficiency",    
    "night_rapid_combo",  
    "payment_idx",
    "domain_idx"
]

# Pipeline
pipeline = Pipeline(stages=[
    StringIndexer(inputCol="payment_method", outputCol="payment_idx"),
    StringIndexer(inputCol="domain", outputCol="domain_idx"),
    StringIndexer(inputCol="fraud_pattern_simplified", outputCol="label_idx"),  # ‚úÖ Use simplified!
    VectorAssembler(inputCols=feature_cols, outputCol="features"),
    RandomForestClassifier(
        featuresCol="features",
        labelCol="label_idx",
        numTrees=150,
        maxDepth=16,
        minInstancesPerNode=3,
        maxBins=64,
        # ‚ùå NO weightCol
        seed=42
    )
])

print(f"‚úÖ Pipeline configured (15 features, 8 classes)")

‚úÖ Pipeline configured (15 features, 8 classes)


In [29]:
#5 : Train/Test Split & Training

# Split data (use df_weighted now)
train, test = df_clean.randomSplit([0.8, 0.2], seed=42)

print(f"Train: {train.count():,} samples")
print(f"Test: {test.count():,} samples")

# Show class distribution
print("\nüìä Class Distribution:")
train.groupBy("fraud_pattern_simplified").count().orderBy(col("count").desc()).show()

# Train model
print("\nüöÄ Training RandomForest (8 classes: 1 regular + 7 fraud types)...")
model = pipeline.fit(train)
print("‚úÖ Training complete!")

                                                                                

Train: 160,136 samples
Test: 39,864 samples

üìä Class Distribution:
+------------------------+------+
|fraud_pattern_simplified| count|
+------------------------+------+
|                 regular|141456|
|    fraud_account_tak...|  3280|
|      fraud_card_testing|  3259|
|      fraud_gold_farming|  2909|
|    fraud_money_laund...|  2182|
|       fraud_promo_abuse|  1953|
|       fraud_structuring|  1586|
|    fraud_friendly_fraud|  1377|
|    fraud_synthetic_i...|  1186|
|    fraud_chargeback_...|   948|
+------------------------+------+


üöÄ Training RandomForest (8 classes: 1 regular + 7 fraud types)...


25/11/08 06:04:10 WARN DAGScheduler: Broadcasting large task binary with size 1496.2 KiB
25/11/08 06:04:11 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/11/08 06:04:12 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/11/08 06:04:14 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
25/11/08 06:04:16 WARN DAGScheduler: Broadcasting large task binary with size 5.7 MiB
25/11/08 06:04:17 WARN DAGScheduler: Broadcasting large task binary with size 7.5 MiB
25/11/08 06:04:19 WARN DAGScheduler: Broadcasting large task binary with size 1009.7 KiB
25/11/08 06:04:19 WARN DAGScheduler: Broadcasting large task binary with size 9.9 MiB
25/11/08 06:04:21 WARN DAGScheduler: Broadcasting large task binary with size 1287.8 KiB
25/11/08 06:04:22 WARN DAGScheduler: Broadcasting large task binary with size 12.8 MiB
25/11/08 06:04:24 WARN DAGScheduler: Broadcasting large task binary with size 1597.5 KiB
25/11/08 06:04:24 WARN DAGScheduler: Broa

‚úÖ Training complete!


In [30]:
#6 : Predictions & Evaluation

# Generate predictions
predictions = model.transform(test)

# Show sample predictions
print("\nüìä Sample Predictions:")
predictions.select(
    "fraud_pattern", 
    "label_idx", 
    "prediction", 
    "probability"
).show(10, truncate=False)

# Accuracy
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="label_idx",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator_acc.evaluate(predictions)

# F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label_idx",
    predictionCol="prediction",
    metricName="f1"
)
f1 = evaluator_f1.evaluate(predictions)

print(f"\nüéØ Accuracy: {accuracy:.2%}")
print(f"üéØ F1 Score: {f1:.2%}")


üìä Sample Predictions:


25/11/08 06:04:56 WARN DAGScheduler: Broadcasting large task binary with size 8.3 MiB


+----------------------+---------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|fraud_pattern         |label_idx|prediction|probability                                                                                                                                                                       |
+----------------------+---------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|regular_shopper       |0.0      |0.0       |[0.9583338711940685,0.0,0.008638383340530307,0.0,0.0,0.02526986364419497,0.0,0.007738464373728143,1.9417447477910585E-5,0.0]                                                      |
|regular_shopper       |0.0      |0.0       |[0.9701939594729959,0.0,0.0029080417493700596,0.0,0.0,0

25/11/08 06:04:57 WARN DAGScheduler: Broadcasting large task binary with size 8.3 MiB
25/11/08 06:04:58 WARN DAGScheduler: Broadcasting large task binary with size 8.3 MiB


üéØ Accuracy: 97.50%
üéØ F1 Score: 97.27%


                                                                                

In [31]:
#7 Confusion Matrix & Error Analysis

# 7Ô∏è‚É£ Confusion Matrix & Error Analysis

from pyspark.mllib.evaluation import MulticlassMetrics  # ‚úÖ mllib, pas ml
import pandas as pd

# 1Ô∏è‚É£ Confusion Matrix
print("\nüìä Confusion Matrix Analysis:")

# Prepare data for MulticlassMetrics
predictionAndLabels = predictions.select("prediction", "label_idx").rdd.map(
    lambda row: (float(row.prediction), float(row.label_idx))
)

metrics = MulticlassMetrics(predictionAndLabels)

# Get label names
label_indexer = model.stages[2]  # StringIndexer for fraud_pattern
labels = label_indexer.labels

print("\nüîç Confusion Matrix:")
confusion_matrix = metrics.confusionMatrix().toArray()

# Print with labels (abbreviated)
print(f"\n{'':5s}", end="")
for i in range(len(labels)):
    print(f"{i:5d}", end="")
print()

for i in range(len(labels)):
    print(f"{i:3d} | ", end="")
    for j in range(len(labels)):
        print(f"{int(confusion_matrix[i][j]):5d}", end="")
    print(f"  | {labels[i][:30]}")

# 2Ô∏è‚É£ Per-class metrics
print("\nüìà Per-Class Performance:")
print(f"{'Class':30s} | {'Precision':>10s} | {'Recall':>10s} | {'F1':>10s}")
print("-" * 70)

for i, label in enumerate(labels):
    precision = metrics.precision(float(i))
    recall = metrics.recall(float(i))
    f1 = metrics.fMeasure(float(i))
    print(f"{label:30s} | {precision:10.2%} | {recall:10.2%} | {f1:10.2%}")

# 3Ô∏è‚É£ Worst performers
print("\n‚ùå Worst Performing Classes (F1 < 70%):")
worst = [(label, metrics.fMeasure(float(i))) 
         for i, label in enumerate(labels) 
         if metrics.fMeasure(float(i)) < 0.70]

for label, f1 in sorted(worst, key=lambda x: x[1]):
    print(f"  {label:30s}: {f1:.2%}")


üìä Confusion Matrix Analysis:


25/11/08 06:05:04 WARN DAGScheduler: Broadcasting large task binary with size 8.3 MiB
25/11/08 06:05:05 WARN DAGScheduler: Broadcasting large task binary with size 8.3 MiB



üîç Confusion Matrix:





         0    1    2    3    4    5    6    7    8    9
  0 | 35091    0  232    0    0    7    0    8    0    0  | regular
  1 |     0  752    0    0    0    0    0    0    0    2  | fraud_account_takeover
  2 |   179    0  615    0    0    1    0    0    0    0  | fraud_card_testing
  3 |     0    0    0  687    0    0    0    0    0    0  | fraud_gold_farming
  4 |     0    0    0    0  533    0    0    0   18    0  | fraud_money_laundering
  5 |   250    0   26    0    0  146    0   70    0    0  | fraud_promo_abuse
  6 |     0    0    0    0    0    0  363    0    0    0  | fraud_structuring
  7 |   125    0    1    0    0   48    0  174    0    0  | fraud_friendly_fraud
  8 |     0    0    0    0    8    0    4    0  252    0  | fraud_synthetic_identity
  9 |     0   17    0    0    0    0    0    0    0  255  | fraud_chargeback_fraud

üìà Per-Class Performance:
Class                          |  Precision |     Recall |         F1
-----------------------------------------------

                                                                                

In [34]:
# ============================================================================
# Save Model (Spark + MLflow)
# ============================================================================

import shutil
import mlflow
import mlflow.spark
from pathlib import Path
from datetime import datetime

# 1Ô∏è‚É£ Save Spark model locally
model_path = Path("../data/models/fraud_detector_v1")

if model_path.exists():
    shutil.rmtree(model_path)
    print(f"üóëÔ∏è  Deleted old model")

model.write().overwrite().save(str(model_path))
print(f"‚úÖ Spark model saved to {model_path}")

# 2Ô∏è‚É£ Log to MLflow (same parent folder)
mlflow_path = Path("../data/models/mlflow_tracking")
mlflow_path.mkdir(parents=True, exist_ok=True)

mlflow.set_tracking_uri(f"file:{mlflow_path.absolute()}")
mlflow.set_experiment("fraud_detection")

with mlflow.start_run() as run:
    mlflow.spark.log_model(model, "model")
    mlflow.log_metric("accuracy", 0.975)
    mlflow.log_metric("f1_score", 0.973)
    mlflow.log_param("num_trees", 150)
    mlflow.log_param("max_depth", 16)
    
    run_id = run.info.run_id
    model_uri = f"runs:/{run_id}/model"
    
    # Save URI for FastAPI
    (model_path.parent / "mlflow_model_uri.txt").write_text(model_uri)
    
    print(f"‚úÖ MLflow model logged")
    print(f"   Run ID: {run_id}")
    print(f"   URI: {model_uri}")

# 3Ô∏è‚É£ Save README
(model_path / "README.md").write_text(f"""# Fraud Detection Model v1
- **Accuracy: 97.50%**
- **F1 Score: 97.27%**
- **Classes: 8** (1 regular + 7 fraud types)
- **Training Date:** {datetime.now().strftime('%Y-%m-%d')}
""")

print("‚úÖ Done!")

üóëÔ∏è  Deleted old model


  return FileStore(store_uri, store_uri)
2025/11/08 06:27:38 INFO mlflow.tracking.fluent: Experiment with name 'fraud_detection' does not exist. Creating a new experiment.


‚úÖ Spark model saved to ../data/models/fraud_detector_v1
‚úÖ MLflow model logged
   Run ID: 0a8ee170f87a4350b4d03d21270f99b4
   URI: runs:/0a8ee170f87a4350b4d03d21270f99b4/model
‚úÖ Done!
