#**Projet**: **Big** **Data** **Analytics**.

---



---


                     Detéction de Faude dans les transactions Financières.
---

---

**Réalisé par:**

*   Fadila Alaoui.(IIN)
*   Safouan Garaaouch.(ICSD)









---





*   **Contexte du Projet:**

Le secteur financier est confronté à un défi croissant en matière de fraude dans les transactions. Les fraudes financières peuvent avoir un impact significatif sur les institutions financières ainsi que sur les clients. Dans ce contexte, le projet de Big Data Analytics vise à développer un système robuste de détection de fraude basé sur l'analyse avancée de données provenant de transactions financières en utlisant Pysaprk et MLlib.


---






---


*   **Descreption** **du jeu** **de** **donnée:**


> **Objectif**:

 le jeu de données utlisé dans notre projet est une collection synthétique de transactions financières,créée dans le but de détecter la fraude. générés à l'aide de de l'IA générative pour produire des données réalistes mais non réelles qui vont être utilisées pour former des modèles d'apprentissage automatique sans les préoccupations de confidentialité et de sécurité liées à l'utilisation de données clients réelles.


> **Dictionnaire du jeu de données:**

Le jeu de données contient des informations sur des transactions financières avec les colonnes suivantes :

**Transaction_ID :** Un identifiant unique pour chaque transaction.

**Date :** La date et l'heure auxquelles la transaction a eu lieu.

**Montant :** Le montant d'argent impliqué dans la transaction.

**Commerçant :** Le type de commerçant où la transaction a eu lieu (par exemple, Restaurant, Supermarché).

**Emplacement :** La localisation géographique de la transaction (par exemple, Phoenix, New York).

**User_ID :** Un identifiant pour l'utilisateur qui a effectué la transaction.

**Frauduleux :** Un indicateur signalant si la transaction était frauduleuse (0 pour non, 1 pour oui).

**Type_Transaction :** La nature de la transaction (par exemple, Dépôt, Paiement, Retrait).

**Mode_Saisie :** Comment la transaction a été saisie (par exemple, Puce, En ligne, Saisie manuelle).

Age_Compte : L'âge du compte en années au moment de la transaction.

**Transactions_Précédentes :** Le nombre de transactions précédentes que l'utilisateur avait effectuées.


---









> Installation de Pyspark dans notre environnement PySpark qui fournit une API Python conviviale pour interagir avec les fonctionnalités pour traiter les données. En utlisant Pip


---



In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=6488abf21dfeee4c0a0959f1deb10e4fd6a0075765244b6e423727183e29aba3
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
#Importation des bibliothèques nécessaires:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import DoubleType, FloatType
from pyspark.sql.functions import col, to_date, date_format
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import random
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour
from pyspark.ml.feature import StandardScaler


In [None]:
# Initialize a Spark session
spark = SparkSession.builder.appName("Fraud Detection").getOrCreate()

In [None]:
# Load the data
df = spark.read.csv('/content/corrected_enriched_financial_transactions_30k.csv', header=True, inferSchema=True)



# 1.   Analyse Exploratoire des données:

---




In [None]:
#Afficher les cinq premières lignes du DataFrame.
df.show(5)

+--------------+--------------------+------------------+-----------+--------+-------+----------+----------------+------------+------------------+---------------------+
|Transaction_ID|                Date|            Amount|   Merchant|Location|User_ID|Fraudulent|Transaction_Type|  Entry_Mode|       Account_Age|Previous_Transactions|
+--------------+--------------------+------------------+-----------+--------+-------+----------+----------------+------------+------------------+---------------------+
|           447|2023-11-27 15:00:...|1029.1933632539153| Restaurant| Phoenix|   1788|         0|         Deposit|        Chip|3.5163073310123765|   16.933340293078178|
|           683|2023-09-28 15:00:...| 228.9851910465608|Supermarket|New York|   1862|         0|         Payment|        Chip| 5.572630448789214|    9.596356135771336|
|           804|2023-08-16 15:00:...| 909.8757369368176| Restaurant|New York|   1390|         0|      Withdrawal|        Chip| 3.761729866948193|   11.991090296

In [None]:
#afficher le schéma d'un DataFrame.
df.printSchema()


root
 |-- Transaction_ID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Merchant: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- User_ID: integer (nullable = true)
 |-- Fraudulent: integer (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Entry_Mode: string (nullable = true)
 |-- Account_Age: double (nullable = true)
 |-- Previous_Transactions: double (nullable = true)



In [None]:
#générer un résumé statistique des colonnes  du DataFrame.
df.describe().show()


+-------+------------------+------------------+-----------+--------+------------------+------------------+----------------+----------+------------------+---------------------+
|summary|    Transaction_ID|            Amount|   Merchant|Location|           User_ID|        Fraudulent|Transaction_Type|Entry_Mode|       Account_Age|Previous_Transactions|
+-------+------------------+------------------+-----------+--------+------------------+------------------+----------------+----------+------------------+---------------------+
|  count|             30000|             30000|      30000|   30000|             30000|             30000|           30000|     30000|             30000|                30000|
|   mean| 514.3299333333333|2515.0564320899994|       NULL|    NULL|         1486.5497|            0.0322|            NULL|      NULL|4.5091770873363535|   10.031656883173788|
| stddev|296.81802501756613|1454.5520169491997|       NULL|    NULL|285.64160673968127|0.1765338460653573|            NU

In [None]:
#Code ajusté pour gérer correctement les colonnes non numériques
df.select([
    count(when(col(c).isNull(), c)).alias(c) if df.schema[c].dataType not in [DoubleType(), FloatType()]
    else count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns
]).show()



+--------------+----+------+--------+--------+-------+----------+----------------+----------+-----------+---------------------+
|Transaction_ID|Date|Amount|Merchant|Location|User_ID|Fraudulent|Transaction_Type|Entry_Mode|Account_Age|Previous_Transactions|
+--------------+----+------+--------+--------+-------+----------+----------------+----------+-----------+---------------------+
|             0|   0|     0|       0|       0|      0|         0|               0|         0|          0|                    0|
+--------------+----+------+--------+--------+-------+----------+----------------+----------+-----------+---------------------+



In [None]:
# effectuer des opérations de regroupement (group-by) sur différentes colonnes du DataFrame
df.groupBy('Transaction_Type').count().show()
df.groupBy('Merchant').count().show()
df.groupBy('Location').count().show()
df.groupBy('Entry_Mode').count().show()


+----------------+-----+
|Transaction_Type|count|
+----------------+-----+
|         Deposit| 8076|
|        Transfer| 7886|
|         Payment| 6972|
|      Withdrawal| 7066|
+----------------+-----+

+------------+-----+
|    Merchant|count|
+------------+-----+
| Supermarket| 5575|
| Electronics| 5726|
|    Clothing| 6155|
|  Restaurant| 6065|
|Online Store| 6479|
+------------+-----+

+-----------+-----+
|   Location|count|
+-----------+-----+
|    Phoenix| 5926|
|Los Angeles| 6305|
|    Chicago| 5520|
|    Houston| 5305|
|   New York| 6944|
+-----------+-----+

+---------------+-----+
|     Entry_Mode|count|
+---------------+-----+
|           Chip| 7466|
|Magnetic Stripe| 7035|
|   Manual Entry| 7539|
|         Online| 7960|
+---------------+-----+



In [None]:
# Select only numerical columns and convert to RDD
numerical_data = df.select(['Amount', 'User_ID','Account_Age','Previous_Transactions','Transaction_ID']).rdd.map(lambda row: row[0:])

# Calculate the correlation matrix
correlation_matrix = Statistics.corr(numerical_data, method="pearson")

# Print the correlation matrix
print(correlation_matrix)


[[ 1.00000000e+00  4.17715755e-02 -6.20428009e-02 -8.63122712e-03
  -6.29605324e-03]
 [ 4.17715755e-02  1.00000000e+00  9.27034915e-04  2.75727597e-02
  -9.34355520e-03]
 [-6.20428009e-02  9.27034915e-04  1.00000000e+00 -1.50231108e-02
   4.52745178e-02]
 [-8.63122712e-03  2.75727597e-02 -1.50231108e-02  1.00000000e+00
   4.81353658e-02]
 [-6.29605324e-03 -9.34355520e-03  4.52745178e-02  4.81353658e-02
   1.00000000e+00]]




> Commentaire et Conclusions sur le code ci-dessus:

Le code réalise une analyse de corrélation entre les colonnes numériques spécifiques du notre DataFrame en utilisant PySpark,en utlisant La corrélation de Pearson qui est un indicateur statistique qui mesure la relation linéaire entre deux variables continues, et dans ce contexte, elle est utilisée pour évaluer les associations entre les différentes colonnes numériques sélectionnées.

Et d'après la sortie de ce code il s'apparait les correlations sont faible entre les varaiable assignées, donc on va appter à utliser des modèle de classifications de la bibliothèque MLlib pour prédire la fraude dans les transactions financières.



# 2.Preprocessing Feature selection:


---



Le prétraitement des données et la sélection des caractéristiques (feature selection) sont deux étapes cruciales dans notre projet afin de construire des modèles performant concernant la détection de fraude dans les transactions financières.

Ci-dessous on a effectuer préprocesser le jeu de donnée pour qu'il soit adaptable à notre objectif et par la suite séléctionner les caractéristiques qui vont nous permettre de construire trois modèle de detection de fraude:

---



In [None]:
# Modifier la colonne 'Date' pour ne contenir que la partie date.
df = df.withColumn("Date", to_date(col("Date")))
# Ajouter une nouvelle colonne 'Time' en extrayant la partie temporelle au format HH:mm:ss.
df = df.withColumn("Time", date_format(col("Date"), "HH:mm:ss"))


In [None]:
#Afficher les colonnes Date et time après la modification effctuer sur le la colonne initiale Date.
df.select("Date", "Time").show(5)

+----------+--------+
|      Date|    Time|
+----------+--------+
|2023-11-27|00:00:00|
|2023-09-28|00:00:00|
|2023-08-16|00:00:00|
|2023-04-07|00:00:00|
|2023-03-20|00:00:00|
+----------+--------+
only showing top 5 rows





* Définir une UDF qui va nous permettre  de définir nos propres fonctions pour traiter les données d'une manière spécifique à notre besoins.Dans ce cas c'est pour générer des heures aléatoires pour résoudre le problème de temps encontourer après l'ajout de la nouvelle colonne Time.



In [None]:
def random_time():
    hour = random.randint(0, 23)
    minute = random.randint(0, 59)
    second = random.randint(0, 59)
    return f"{hour:02d}:{minute:02d}:{second:02d}"

# Enregistrer la UDF (User-defined functions).
random_time_udf = udf(random_time, StringType())

In [None]:
#Afficher la colonne time après la definition de notre nouvelle fonction qui génère le temps aléatoirement.
df = df.withColumn("Time", random_time_udf())
df.select("Time").show(5)


+--------+
|    Time|
+--------+
|20:42:26|
|02:56:06|
|16:31:38|
|09:34:00|
|11:03:05|
+--------+
only showing top 5 rows



In [None]:
#Extraire l'heure à partir de la colonne 'Time' .
df = df.withColumn("Hour", hour(df["Time"]))
#Extraire (Year,Month,Day) à partire de la colonne "Date" en mettant chachune dans une nouvelle colonne.
df = df.withColumn("Year", year(col("Date")))
df = df.withColumn("Month", month(col("Date")))
df = df.withColumn("Day", dayofweek(col("Date")))
#ajouter une nouvelle colonne "HourOfDay" au DataFrame df.
df = df.withColumn("HourOfDay", hour(col("Time")))


In [None]:
# génère une liste d'indexeurs de chaînes pour les colonnes spécifiées dans le DataFrame. Ces indexeurs seront utilisés pour transformer les valeurs catégoriques de ces colonnes en indices numériques.
#Chaque indexeur créé sera utilisé ultérieurement pour appliquer cette transformation aux données réelles.
indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_Indexed").fit(df)
    for column in ["Transaction_Type", "Location", "Merchant"]
]


-Ce code Ci-dessus assemble les caractéristiques numériques et catégoriques, puis les met à l'échelle pour préparer les données en vue d'une utilisation du modèle d'apprentissage automatique.Le processus d'assemblage est nécessaire car de nombreux algorithmes d'apprentissage automatique s'attendent à recevoir toutes les caractéristiques dans un seul vecteur,tandis que la mise à l'échelle est souvent nécessaire pour normaliser les caractéristiques numériques.

In [None]:
categorical_columns = ["Location", "Merchant", "Transaction_Type"]
#'features' inclura toutes les colonnes de caractéristiques après le prétraitement.
assembler = VectorAssembler(inputCols=["Amount", "Year", "Month", 'Day', "HourOfDay"] + [c + "_Index" for c in categorical_columns], outputCol="assembled_features")

# Scale features pour  la mise à l'échelle des caractéristiques (features) dans l'ensemble de données.
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaledFeatures")


In [None]:
# Convertir'Fraudulent'to double type for the classifier
#prépare la variable cible  en s'assurant qu'elle est du type de données approprié, ici, DoubleType().
df = df.withColumn("label", col("Fraudulent").cast(DoubleType()))

# 3.construction du modèle:

---



Après le préprocessing du jeu de données et la séléction de ses différents caractéristiques, dans cette étape on va construire notre modèle en suivant les le processus suivant:




> Choix de modèle.

> Division du jeu de donnée en un ensemble de test et d'entrainnement.

> Entrainement du modèle.

> Evaluation du modèle




---






In [None]:
#Importations des bibliothèques nécessaires:
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_date, dayofweek, hour
from pyspark.sql.types import StringType, DoubleType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Classifier
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="label")
# Pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

In [None]:
#Diviser le jeu de donnée en un ensemble d'entrainnement et de tests:
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)



> Le code ci-dessous  en PySpark utilise trois modèles différents de classification d'arbres pour entraîner des classificateurs et évaluer leurs performances sur L'ensemble de données.



In [None]:
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Gradient-Boosted Tree Classifier
gbt = GBTClassifier(featuresCol="scaledFeatures", labelCol="label", maxIter=10)

# Decision Tree Classifier
dt = DecisionTreeClassifier(featuresCol="scaledFeatures", labelCol="label")
# Pipeline for Random Forest
rf_pipeline = Pipeline(stages=indexers + [assembler, scaler, rf])

# Pipeline pour Gradient-Boosted Trees
gbt_pipeline = Pipeline(stages=indexers + [assembler, scaler, gbt])

# Pipeline pour Decision Tree
dt_pipeline = Pipeline(stages=indexers + [assembler, scaler, dt])
# List of model pipelines
model_pipelines = [("Random Forest", rf_pipeline),
                   ("Gradient-Boosted Trees", gbt_pipeline),
                   ("Decision Tree", dt_pipeline)]

for model_name, pipeline in model_pipelines:
    # Fit the model
    fitted_model = pipeline.fit(train_data)

    # Make predictions on the test data
    predictions = fitted_model.transform(test_data)

    # Evaluate the model using AUC as the metric
    evaluator = BinaryClassificationEvaluator(labelCol="label")
    auc = evaluator.evaluate(predictions)

    # Print the evaluation results
    print(f"{model_name} AUC: {auc}")


Random Forest AUC: 0.8208788511361574
Gradient-Boosted Trees AUC: 0.9487648311028234
Decision Tree AUC: 0.40607477021072946




> Ce code ci- dessous en PySpark évalue la performance de modèles de classification en utilisant différents métriques d'évaluation, tels que l'AUC (Area Under the ROC Curve), la précision, le rappel et le score F1



In [None]:
binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

for model_name, pipeline in model_pipelines:
    # Fit the model
    fitted_model = pipeline.fit(train_data)

    # faire des predictions sur l'ensemble de test
    predictions = fitted_model.transform(test_data)

    # Evaluer le modele en utlisant AUC

    auc = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderROC"})

    # Evaluer le  modele en utlisant Precision
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "precisionByLabel"})

    # Evaluer le modele en utilisant Recall
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "recallByLabel"})

    # Evaluer le modele en utlisant F1 Score
    f1_score = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

    # Print the evaluation results
    print(f"{model_name} Results:")
    print(f"  AUC: {auc}")
    print(f"  Precision: {precision}")
    print(f"  Recall: {recall}")
    print(f"  F1 Score: {f1_score}\n")


Random Forest Results:
  AUC: 0.8071742107955573
  Precision: 0.9696460573476703
  Recall: 1.0
  F1 Score: 0.9556929191120129

Gradient-Boosted Trees Results:
  AUC: 0.9487648311028234
  Precision: 0.9711689477226835
  Recall: 1.0
  F1 Score: 0.9592578874069271

Decision Tree Results:
  AUC: 0.40607477021072946
  Precision: 0.97131931166348
  Recall: 0.9975742173963267
  F1 Score: 0.958239341500121



In [None]:
import pandas as pd

# Define the data for the table
data = {
    "Modèle": ["Random Forest", "Gradient-Boosted Trees", "Decision Tree"],
    "AUC": [0.8071, 0.9487, 0.4060],
    "Précision": [0.9696, 0.9711, 0.9713],
    "Rappel": [1.0, 1.0, 0.9975],
    "F1 Score": [0.9556, 0.9952, 0.9582]
}

# Create the DataFrame
df = pd.DataFrame(data)

# Print the DataFrame to display the table
print(df)
