In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [2]:
spark = SparkSession.builder \
    .appName("Bank Fraud Detection") \
    .getOrCreate()

In [3]:
df = spark.read.csv("hdfs://namenode:9000/user/fraude/input/frauddetectionsmall.csv", header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



In [5]:
display(df)

DataFrame[step: int, type: string, amount: double, nameOrig: string, oldbalanceOrg: double, newbalanceOrig: double, nameDest: string, oldbalanceDest: double, newbalanceDest: double, isFraud: int, isFlaggedFraud: int]

In [6]:
# 2) Combien de lignes / doublons ?
total    = df.count()
distinct = df.distinct().count()
print(f"Lignes totales      : {total}")
print(f"Lignes distinctes   : {distinct}")
print(f"Doublons supprimés : {total - distinct}")

# 3) Null / NaN par colonne
missing = df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c)
    for c in df.columns
])
missing.show()

# 4) Répartition de la cible
df.groupBy("isFraud").count().orderBy("isFraud").show()

Lignes totales      : 10200
Lignes distinctes   : 10200
Doublons supprimés : 0
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrig|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|             0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+--------------+--------+--------------+--------------+-------+--------------+

+-------+-----+
|isFraud|count|
+-------+-----+
|      0|10132|
|      1|   68|
+-------+-----+



In [7]:
df = df.drop("nameOrig", "nameDest", "isFlaggedFraud","oldbalanceDest","newbalanceDest")

In [8]:
numeric_cols = ["step","amount","oldbalanceOrg","newbalanceOrig"]

indexer = StringIndexer(inputCol="type",    outputCol="typeIdx", handleInvalid="keep")
encoder = OneHotEncoder(inputCols=["typeIdx"], outputCols=["typeVec"], dropLast=True)
assembler =VectorAssembler(inputCols=["typeVec"] + numeric_cols, outputCol="features")

pipeline = Pipeline(stages=[indexer, encoder, assembler])
prep_model = pipeline.fit(df)
df_prepared = prep_model.transform(df)

In [9]:
n0 = df_prepared.filter("isFraud=0").count()
n1 = df_prepared.filter("isFraud=1").count()
ratio = n0 / n1

# 5) On l’ajoute au DataFrame
df_prepared = df_prepared.withColumn(
    "classWeight",
    when(col("isFraud") == 1, lit(ratio)).otherwise(lit(1.0))
)

In [10]:
train, test = df_prepared.randomSplit([0.8,0.2], seed=42)

In [11]:
train.show()

+----+-------+--------+-------------+--------------+-------+-------+-------------+--------------------+-----------+
|step|   type|  amount|oldbalanceOrg|newbalanceOrig|isFraud|typeIdx|      typeVec|            features|classWeight|
+----+-------+--------+-------------+--------------+-------+-------+-------------+--------------------+-----------+
|   1|CASH_IN|  270.78|   4184966.65|    4185237.43|      0|    1.0|(5,[1],[1.0])|[0.0,1.0,0.0,0.0,...|        1.0|
|   1|CASH_IN|  484.57|   5422437.76|    5422922.33|      0|    1.0|(5,[1],[1.0])|[0.0,1.0,0.0,0.0,...|        1.0|
|   1|CASH_IN|  863.08|   9290756.54|    9291619.62|      0|    1.0|(5,[1],[1.0])|[0.0,1.0,0.0,0.0,...|        1.0|
|   1|CASH_IN|  911.76|   1335635.48|    1336547.24|      0|    1.0|(5,[1],[1.0])|[0.0,1.0,0.0,0.0,...|        1.0|
|   1|CASH_IN| 1076.27|   3538789.28|    3539865.55|      0|    1.0|(5,[1],[1.0])|[0.0,1.0,0.0,0.0,...|        1.0|
|   1|CASH_IN| 2099.59|   7096554.61|     7098654.2|      0|    1.0|(5,[

In [12]:
lr = LogisticRegression(featuresCol="features", labelCol="isFraud", weightCol="classWeight")
lr_model = lr.fit(train)

In [13]:
dt = DecisionTreeClassifier(featuresCol="features", labelCol="isFraud", weightCol="classWeight")
dt_model = dt.fit(train)

In [14]:
rf = RandomForestClassifier(featuresCol="features", labelCol="isFraud", weightCol="classWeight", numTrees=50, maxDepth=10)
rf_model = rf.fit(train)

In [15]:
models = {"Logistic Regression": lr_model, "Decision Tree": dt_model, "Random Forest": rf_model}
evaluator_roc = BinaryClassificationEvaluator(
    labelCol="isFraud", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

evaluator_pr = BinaryClassificationEvaluator(
    labelCol="isFraud", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

for name, model in models.items():
    pred = model.transform(test)
    acc = MulticlassClassificationEvaluator(labelCol="isFraud", predictionCol="prediction", metricName="accuracy").evaluate(pred)
    auc_roc = BinaryClassificationEvaluator(labelCol="isFraud", rawPredictionCol="rawPrediction", metricName="areaUnderROC").evaluate(pred)
    auc_pr = evaluator_pr.evaluate(pred)
    print(f"📌 {name}")
    print(f"✅ {name} - Accuracy: {acc:.4f}")
    print(f"   ✔️ areaUnderROC : {auc_roc:.4f}")
    print(f"   ✔️ areaUnderPR  : {auc_pr:.4f}\n")

📌 Logistic Regression
✅ Logistic Regression - Accuracy: 0.9103
   ✔️ areaUnderROC : 0.9925
   ✔️ areaUnderPR  : 0.5628

📌 Decision Tree
✅ Decision Tree - Accuracy: 0.9450
   ✔️ areaUnderROC : 0.9774
   ✔️ areaUnderPR  : 0.1125

📌 Random Forest
✅ Random Forest - Accuracy: 0.9745
   ✔️ areaUnderROC : 0.9910
   ✔️ areaUnderPR  : 0.6137



In [16]:
# Matrice de confusion
pred_rf = rf_model.transform(test)
rdd = pred_rf.select("prediction", "isFraud").rdd.map(lambda r: (float(r.prediction), float(r.isFraud)))
metrics = MulticlassMetrics(rdd)
print("Confusion Matrix:\n", metrics.confusionMatrix().toArray())
print(f"Precision (1): {metrics.precision(1.0):.4f}")
print(f"Recall    (1): {metrics.recall(1.0):.4f}")



Confusion Matrix:
 [[1904.   48.]
 [   2.    9.]]
Precision (1): 0.1579
Recall    (1): 0.8182


In [17]:
# Exemple avec Random Forest (ou remplace par ton modèle préféré)
predictions = rf_model.transform(test)

# Affichage des colonnes utiles
predictions.select("features", "prediction", "probability", "isFraud").show(30, truncate=False)
predictions.filter("isFraud = 1").select("features", "prediction", "probability", "isFraud").show(30, truncate=False)

+--------------------------------------------------------+----------+---------------------------------------+-------+
|features                                                |prediction|probability                            |isFraud|
+--------------------------------------------------------+----------+---------------------------------------+-------+
|[0.0,1.0,0.0,0.0,0.0,1.0,783.31,8150331.93,8151115.24]  |0.0       |[1.0,0.0]                              |0      |
|[0.0,1.0,0.0,0.0,0.0,1.0,1271.77,6973823.5,6975095.27]  |0.0       |[1.0,0.0]                              |0      |
|[0.0,1.0,0.0,0.0,0.0,1.0,2643.45,6434890.26,6437533.71] |0.0       |[1.0,0.0]                              |0      |
|[0.0,1.0,0.0,0.0,0.0,1.0,6284.18,7858787.73,7865071.9]  |0.0       |[1.0,0.0]                              |0      |
|[0.0,1.0,0.0,0.0,0.0,1.0,8679.13,7087875.47,7096554.61] |0.0       |[1.0,0.0]                              |0      |
|[0.0,1.0,0.0,0.0,0.0,1.0,9577.45,519812.39,529389.85]  

In [21]:
# Sauvegarde correcte dans HDFS avec nom de service valide
rf_model.save("hdfs://namenode:9000/user/jovyan/models/RF_fraud_model")