# 2. Pipeline Local - Exécution Interactive

Ce notebook exécute le pipeline complet de détection de fraude en mode local, cellule par cellule, pour permettre l'inspection intermédiaire des résultats.

In [1]:
import time
import os
import sys
import json
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col, count

## [1/6] Initialisation de la Session Spark

In [2]:
pipeline_start_time = time.time()

spark = SparkSession.builder \
    .appName("FraudDetectionPipeline-local") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.extraJavaOptions", "--add-opens=java.base/java.lang=ALL-UNNAMED") \
    .config("spark.executor.extraJavaOptions", "--add-opens=java.base/java.lang=ALL-UNNAMED") \
    .getOrCreate()

sc = spark.sparkContext
print(f"✓ Session Spark initialisée (Version: {sc.version}, Master: {sc.master})")

25/11/06 18:32:32 WARN Utils: Your hostname, laureal-Inspiron-7306-2n1 resolves to a loopback address: 127.0.1.1; using 192.168.1.22 instead (on interface wlp0s20f3)
25/11/06 18:32:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/06 18:32:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/06 18:32:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


✓ Session Spark initialisée (Version: 3.4.1, Master: local[*])


## [2/6] Chargement et Préparation des Données

In [3]:
data_path = "../data/raw/creditcard.csv"

initial_df = spark.read.csv(data_path, header=True, inferSchema=True)

print(f"Dataset chargé: {initial_df.count()} lignes, {len(initial_df.columns)} colonnes")

# Division train/test (80/20)
train_df, test_df = initial_df.randomSplit([0.8, 0.2], seed=42)

print(f"Train: {train_df.count()} lignes")
print(f"Test: {test_df.count()} lignes")

                                                                                

Dataset chargé: 284807 lignes, 31 colonnes


25/11/06 18:32:55 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Train: 228045 lignes




Test: 56762 lignes


                                                                                

### Exploration des Données d'Entraînement

In [4]:
train_df.printSchema()

class_dist = train_df.groupBy("Class").agg(count("*").alias("count"))
print("\nDistribution de la cible en entraînement :")
class_dist.show()

root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nulla



+-----+------+
|Class| count|
+-----+------+
|    1|   400|
|    0|227645|
+-----+------+



                                                                                

## [3/6] Entraînement du Modèle

In [5]:
# Récupérer tous les noms de colonnes sauf 'Class' (la cible)
feature_cols = [col for col in train_df.columns if col != 'Class']

print(f"Colonnes de features : {feature_cols}")

# Créer un vecteur de features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Normaliser les features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Créer le modèle RandomForest
rf = RandomForestClassifier(
    featuresCol="scaled_features",
    labelCol="Class",
    numTrees=10,
    maxDepth=10,
    seed=42
)

# Créer le pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

print("Pipeline créé. Lancement de l'entraînement...")

training_start_time = time.time()
pipeline_model = pipeline.fit(train_df)
training_end_time = time.time()
training_duration = training_end_time - training_start_time

print(f"✓ Modèle entraîné en {training_duration:.2f} secondes.")

Colonnes de features : ['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount']
Pipeline créé. Lancement de l'entraînement...


                                                                                

✓ Modèle entraîné en 13.10 secondes.


## [4/6] Évaluation des Performances

In [6]:
# Faire des prédictions sur le jeu de test
predictions_df = pipeline_model.transform(test_df)

print("Prédictions effectuées.")
predictions_df.select("Class", "prediction", "probability").show(5)

Prédictions effectuées.
+-----+----------+--------------------+
|Class|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.99994613603415...|
|    0|       0.0|[0.99993223754481...|
|    0|       0.0|[0.99986023838505...|
|    0|       0.0|[0.99964341814314...|
|    0|       0.0|[0.99994840470464...|
+-----+----------+--------------------+
only showing top 5 rows



### Calcul des Métriques

In [7]:
# Évaluateur binaire (pour AUC-PR et AUC-ROC)
binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction")
auc_roc = binary_evaluator.setMetricName("areaUnderROC").evaluate(predictions_df)
auc_pr = binary_evaluator.setMetricName("areaUnderPR").evaluate(predictions_df)

# Évaluateur multi-classe (pour accuracy, precision, recall, f1)
multi_evaluator = MulticlassClassificationEvaluator(labelCol="Class", predictionCol="prediction")
accuracy = multi_evaluator.setMetricName("accuracy").evaluate(predictions_df)
precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(predictions_df)
recall = multi_evaluator.setMetricName("weightedRecall").evaluate(predictions_df)
f1 = multi_evaluator.setMetricName("f1").evaluate(predictions_df)

print(f"\n=== MÉTRIQUES DE PERFORMANCE ===")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")
print(f"AUC-ROC: {auc_roc:.4f}")
print(f"AUC-PR: {auc_pr:.4f}")




=== MÉTRIQUES DE PERFORMANCE ===
Accuracy: 0.9995
Precision: 0.9995
Recall: 0.9995
F1-Score: 0.9995
AUC-ROC: 0.9769
AUC-PR: 0.8384


                                                                                

## [5/6] Sauvegarde des Résultats

In [8]:
# Créer le dictionnaire de métriques
metrics = {
    'accuracy': accuracy,
    'precision': precision,
    'recall': recall,
    'f1_score': f1,
    'areaUnderROC': auc_roc,
    'areaUnderPR': auc_pr,
    'total_execution_time_seconds': round(time.time() - pipeline_start_time, 2),
    'training_time_seconds': round(training_duration, 2)
}

# Créer les dossiers si nécessaire
os.makedirs("../results", exist_ok=True)
os.makedirs("../models", exist_ok=True)

# Sauvegarder les métriques en JSON
metrics_path = "../results/metrics_local_notebook.json"
with open(metrics_path, 'w') as f:
    json.dump(metrics, f, indent=4)

print(f"✓ Métriques sauvegardées dans : {metrics_path}")

# Sauvegarder le modèle
model_path = "../models/fraud_detection_model_local_notebook.spark"
pipeline_model.write().overwrite().save(model_path)
print(f"✓ Modèle sauvegardé dans : {model_path}")

✓ Métriques sauvegardées dans : ../results/metrics_local_notebook.json
✓ Modèle sauvegardé dans : ../models/fraud_detection_model_local_notebook.spark


## [6/6] Arrêt de la Session Spark

In [9]:
spark.stop()
print("\n=== PIPELINE TERMINÉ AVEC SUCCÈS ===")
print(f"Temps total d'exécution : {metrics['total_execution_time_seconds']:.2f} secondes")


=== PIPELINE TERMINÉ AVEC SUCCÈS ===
Temps total d'exécution : 79.25 secondes
