# Notebook 4 - Graph Analysis Fraud Detection (Reconstruit pour Spark 4.0.1)

**Objectif :** Simuler la d√©tection de fraude par analyse de graphe (PageRank) en utilisant uniquement les fonctionnalit√©s natives de **PySpark DataFrames** (`pyspark.sql`) pour contourner l'incompatibilit√© de la librairie GraphFrames avec Spark 4.0.1.

In [7]:
# üîπ √âtape 1 : Initialisation Spark et Imports
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum, explode

spark = SparkSession.builder \
    .appName("Fraud-Graph-Sim") \
    .getOrCreate()

print(f"Spark Session (version {spark.version}) d√©marr√©e sans GraphFrames.")

Spark Session (version 4.0.1) d√©marr√©e sans GraphFrames.


In [8]:
# üîπ √âtape 2 : Charger le dataset (Utilisation du CSV original)
print("--- Chargement du CSV original ---")
df = spark.read.csv(
    "hdfs:///user/hadoop/BigDataFraude_ML-GraphX/creditcard.csv", 
    header=True, 
    inferSchema=True
)
df.cache()
print("Donn√©es charg√©es.")

--- Chargement du CSV original ---




Donn√©es charg√©es.


26/01/05 01:31:18 WARN CacheManager: Asked to cache already cached data.        


In [9]:
# üîπ √âtape 3 : Cr√©er les Vertices (Noeuds)
# Utiliser V1 (qui simule le client/carte) comme ID.
print("--- Cr√©ation des Vertices (Clients V1) ---")
vertices = df.select(
    col("V1").alias("id"),  
    col("Class").alias("label"),
    col("Amount")
).distinct()

# Ajout d'une colonne pour le PageRank initial
vertices = vertices.withColumn("pagerank", lit(1.0))
vertices.cache()
vertices.show(5)

--- Cr√©ation des Vertices (Clients V1) ---
+----------------+-----+------+--------+
|              id|label|Amount|pagerank|
+----------------+-----+------+--------+
|-1.2768303373631|    0| 110.4|     1.0|
|1.21205680491093|    0|  2.28|     1.0|
|1.08102680841932|    0| 17.24|     1.0|
|1.49157444507907|    0|   2.0|     1.0|
|1.09337038677875|    0|  49.9|     1.0|
+----------------+-----+------+--------+
only showing top 5 rows


26/01/05 01:31:19 WARN CacheManager: Asked to cache already cached data.


In [10]:
# üîπ √âtape 4 : Cr√©er les Edges (Liens)
# Lien cr√©√© si deux clients diff√©rents (V1) font une transaction frauduleuse (Class=1) dans la m√™me heure.
print("--- Cr√©ation des Edges (Liens Frauduleux Co-occurrents) ---")
edges = df.alias("t1").join(
    df.alias("t2"),
    (
        # 1. Les deux sont des fraudes
        (col("t1.Class") == 1) & (col("t2.Class") == 1) &
        
        # 2. Elles se produisent dans la m√™me heure (Correction syntaxique)
        ((col("t1.Time") / 3600).cast("int") == (col("t2.Time") / 3600).cast("int")) &
        
        # 3. Ce sont des entit√©s/clients diff√©rents (src != dst)
        (col("t1.V1") != col("t2.V1"))
    )
).select(
    col("t1.V1").alias("src"),
    col("t2.V1").alias("dst")
).distinct()
edges.cache()
edges.show(5)

26/01/05 01:31:19 WARN CacheManager: Asked to cache already cached data.


--- Cr√©ation des Edges (Liens Frauduleux Co-occurrents) ---
+-----------------+-----------------+
|              src|              dst|
+-----------------+-----------------+
|0.314596589729515|-1.58550536691994|
|-4.72771265581559|-2.58961719821269|
|-16.5986647432584|-25.2663550194138|
|-19.8563223334433|  -27.84818067198|
|-2.78724793061533|  -27.84818067198|
+-----------------+-----------------+
only showing top 5 rows


In [11]:
# üîπ √âtape 5 : Calcul du degr√© de sortie et normalisation (Pr√©paration PageRank)
# Pour PageRank, nous avons besoin du nombre de liens sortants par source.
print("--- Pr√©paration : Calcul des degr√©s sortants ---")
out_degrees = edges.groupBy("src")\
                   .count()\
                   .withColumnRenamed("count", "outDegree")

# Joindre les degrees aux edges pour la normalisation
normalized_edges = edges.join(out_degrees, "src")\
                        .withColumn("weight", lit(1.0) / col("outDegree"))\
                        .select("src", "dst", "weight")
normalized_edges.cache()
normalized_edges.show(5)

--- Pr√©paration : Calcul des degr√©s sortants ---
+-----------------+-----------------+------+
|              src|              dst|weight|
+-----------------+-----------------+------+
|-14.4744374924863|-16.3679230107968|   0.1|
|-14.4744374924863|-15.3988450085358|   0.1|
|-14.4744374924863|-12.2240206243564|   0.1|
|-14.4744374924863|-15.2713618637585|   0.1|
|-14.4744374924863|-14.1791651073631|   0.1|
+-----------------+-----------------+------+
only showing top 5 rows


26/01/05 01:31:19 WARN CacheManager: Asked to cache already cached data.


In [13]:
# üîπ √âtape 6 : Algorithme PageRank (Simul√© avec PySpark DataFrames)

from pyspark.sql.functions import coalesce # Importation requise pour la correction

MAX_ITER = 5
RESET_PROBABILITY = 0.15

print(f"--- PageRank Simul√© ({MAX_ITER} it√©rations) ---")

current_vertices = vertices.select("id", "label", "pagerank").cache()

for i in range(MAX_ITER):
    # 1. Calcul du Score de contribution de chaque source (pr * weight)
    contributions = current_vertices.join(
        normalized_edges, current_vertices.id == normalized_edges.src
    ).withColumn(
        "contribution", col("pagerank") * col("weight")
    ).select(col("dst").alias("id"), "contribution")
    
    # 2. Agr√©gation des contributions par destination (nouveau PageRank)
    new_pageranks = contributions.groupBy("id").agg(sum("contribution").alias("sum_contribution"))
    
    # 3. Application du facteur de r√©initialisation (PageRank Formula)
    current_vertices = current_vertices.drop("pagerank")\
                                       .join(new_pageranks, "id", "left_outer")\
                                       .withColumn(
                                           "sum_contribution_clean", 
                                           coalesce(col("sum_contribution"), lit(0)) # <--- CORRECTION APPLIQU√âE ICI
                                       )
    
    # PR(new) = (1 - alpha) * PR(contribution) + alpha / N
    N = float(current_vertices.count())
    
    # Utiliser la colonne propre "sum_contribution_clean"
    current_vertices = current_vertices.withColumn(
        "pagerank", 
        lit(1.0 - RESET_PROBABILITY) * col("sum_contribution_clean") + lit(RESET_PROBABILITY / N)
    ).select("id", "label", "pagerank").cache()
    
    print(f"It√©ration {i+1} compl√©t√©e.")
    
results = current_vertices.orderBy(col("pagerank").desc())
results.show(10)

--- PageRank Simul√© (5 it√©rations) ---


26/01/05 01:32:06 WARN CacheManager: Asked to cache already cached data.


It√©ration 1 compl√©t√©e.


                                                                                

It√©ration 2 compl√©t√©e.


                                                                                

It√©ration 3 compl√©t√©e.


                                                                                

It√©ration 4 compl√©t√©e.
It√©ration 5 compl√©t√©e.
+-------------------+-----+------------------+
|                 id|label|          pagerank|
+-------------------+-----+------------------+
|  -1.58550536691994|    1|0.4437073305244993|
|  -3.49910753739178|    1|0.4437073305244993|
|  -0.25147096006823|    1|0.4437073305244993|
|0.00843036489558254|    1|0.4437073305244993|
|  -1.81328048476897|    1|0.4437073305244993|
|  0.725645739819857|    1|0.4437073305244993|
| 0.0267792264491516|    1|0.4437073305244993|
|  0.314596589729515|    1|0.4437073305244993|
|  0.857321003765953|    1|0.4437073305244993|
|  -1.78322883722709|    1|0.4437073305244993|
+-------------------+-----+------------------+
only showing top 10 rows


In [14]:
# üîπ √âtape 7 : Clients frauduleux les plus centraux (R√©sultat final)
print("--- Top 10 des clients FRUDULEUX (label=1) avec le PageRank le plus √©lev√© ---")
suspect = results.filter(col("label") == 1).orderBy(col("pagerank").desc())
suspect.show(10)

--- Top 10 des clients FRUDULEUX (label=1) avec le PageRank le plus √©lev√© ---
+-------------------+-----+------------------+
|                 id|label|          pagerank|
+-------------------+-----+------------------+
|  -1.58550536691994|    1|0.4437073305244993|
|  -3.49910753739178|    1|0.4437073305244993|
|0.00843036489558254|    1|0.4437073305244993|
|  -0.25147096006823|    1|0.4437073305244993|
|  0.725645739819857|    1|0.4437073305244993|
| 0.0267792264491516|    1|0.4437073305244993|
|  -1.81328048476897|    1|0.4437073305244993|
|  0.314596589729515|    1|0.4437073305244993|
|  0.857321003765953|    1|0.4437073305244993|
|  -1.78322883722709|    1|0.4437073305244993|
+-------------------+-----+------------------+
only showing top 10 rows


26/01/05 01:35:07 WARN JavaUtils: Attempt to delete using native Unix OS command failed for path = /tmp/blockmgr-58abbaaa-f3b8-41f6-ab22-6bd008b8cef3. Falling back to Java IO way
java.io.IOException: Failed to delete: /tmp/blockmgr-58abbaaa-f3b8-41f6-ab22-6bd008b8cef3
	at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingUnixNative(JavaUtils.java:199)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:116)
	at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
	at org.apache.spark.util.SparkFileUtils.deleteRecursively(SparkFileUtils.scala:121)
	at org.apache.spark.util.SparkFileUtils.deleteRecursively$(SparkFileUtils.scala:120)
	at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1048)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1(DiskBlockManager.scala:372)
	at org.apache.spark.storage.DiskBlockManager.$anonfun$doStop$1$adapted(DiskBlockManager.scala:368)
	at scala.collection.ArrayOps$.foreach$