
### Librairies

In [1]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as func
from pyspark.sql.window import Window
import argparse
from datetime import datetime
import pandas as pd
import re

### Traitements

#### Initialiser le SparkContext

In [2]:
findspark.init()  # Trouve les exécutables dans le dossier SPARK_HOME

sc = SparkContext(master="local[*]")  # Créé un SparkContext local
sql_c = SQLContext(sc)  # Instancie un SQLContext

24/06/28 14:49:49 WARN Utils: Your hostname, knackibot resolves to a loopback address: 127.0.1.1; using 192.168.1.192 instead (on interface wlp3s0)
24/06/28 14:49:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/28 14:49:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Lire l'échantillon des transactions

In [3]:
data = sql_c.read.option("header", True).csv("../data/transactions-small.csv")

#### Lire et formater l'échantillon des transactions frauduleuses

In [4]:
# Lire le fichier texte
with open('../data/laundering-small.txt', 'r') as file:
    file_content = file.read()

# Séparer les différentes sections de tentatives de blanchiment d'argent
attempts = re.split(r'BEGIN LAUNDERING ATTEMPT - .+\n', file_content)
attempts = [attempt.strip() for attempt in attempts if attempt.strip()]  # Supprimer les chaînes vides

# Initialiser la liste pour stocker toutes les transactions frauduleuses
frauds = []

# Lire les transactions frauduleuses de chaque section
for attempt in attempts:
    for line in attempt.split('\n'):
        if not line.startswith('END LAUNDERING ATTEMPT'):
            frauds.append(line.split(','))

# Formater dans un dataframe
frauds = pd.DataFrame(frauds,
                            columns= [ \
                                "Timestamp","From Bank","Account2","To Bank","Account4",
                                "Amount Received","Receiving Currency","Amount Paid",
                                "Payment Currency","Payment Format","status"
                                ])

# Agréger la table des transactions frauduleuses à la maille (compte x devis)
frauds.drop_duplicates(["Account2","Receiving Currency"])
frauds = frauds[["Account2","Receiving Currency"]]
frauds["has_laundering"] = True
frauds.head()

# Convertir frauds en spark dataframe
frauds = sql_c.createDataFrame(frauds)

frauds.show(5)


[Stage 1:>                                                          (0 + 1) / 1]

+---------+------------------+--------------+
| Account2|Receiving Currency|has_laundering|
+---------+------------------+--------------+
|80279F810| Australian Dollar|          true|
|80279F8B0| Australian Dollar|          true|
|800E36A50| Australian Dollar|          true|
|801BF8E70|              Euro|          true|
|80074C7E0|              Euro|          true|
+---------+------------------+--------------+
only showing top 5 rows



                                                                                

#### Consulter les premières lignes de la table des transactions

In [5]:
data.show(5)

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+
|       Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+
|2022/09/01 00:08|      011|8000ECA90|    011|8000ECA90|     3195403.00|         US Dollar| 3195403.00|       US Dollar|  Reinvestment|
|2022/09/01 00:21|    03402|80021DAD0|  03402|80021DAD0|        1858.96|         US Dollar|    1858.96|       US Dollar|  Reinvestment|
|2022/09/01 00:00|      011|8000ECA90| 001120|8006AA910|      592571.00|         US Dollar|  592571.00|       US Dollar|        Cheque|
|2022/09/01 00:16|    03814|8006AD080|  03814|8006AD080|          12.32|         US Dollar|      12.32|       US Dollar|  Reinvestment|
|2022/09/01 00:00|      020|8006AD530|    020|80

#### Convertir certaines variables

In [6]:
data = data \
    .withColumn("Timestamp", func.to_timestamp("Timestamp","yyyy/MM/dd HH:mm")) \
    .withColumn("Amount Received", func.col("Amount Received").cast("double")) \
    .withColumn("Amount Paid", func.col("Amount Paid").cast("double"))

data.printSchema()

root
 |-- Timestamp: timestamp (nullable = true)
 |-- From Bank: string (nullable = true)
 |-- Account2: string (nullable = true)
 |-- To Bank: string (nullable = true)
 |-- Account4: string (nullable = true)
 |-- Amount Received: double (nullable = true)
 |-- Receiving Currency: string (nullable = true)
 |-- Amount Paid: double (nullable = true)
 |-- Payment Currency: string (nullable = true)
 |-- Payment Format: string (nullable = true)



#### Créer une fenêtre temporelle sur le maillage (compte x devise)

In [7]:
windowSpec = Window.partitionBy("Account2","Receiving Currency").orderBy("Timestamp")     

#### Calculer le délai entre les transactions à la maille (compte x devise)

In [8]:
data = data \
    .withColumn("previous_Timestamp",
                func.lag("Timestamp").over(windowSpec)
                ) \
    .withColumn("delay_transaction",
                func.when(
                    func.col("previous_Timestamp").isNotNull(),
                    func.col("Timestamp").cast("long") - func.col("previous_Timestamp").cast("long")
                    ) \
                    .otherwise(None)
                )
data.show(5)

[Stage 5:>                                                          (0 + 1) / 1]

+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------------+-----------------+
|          Timestamp|From Bank| Account2|To Bank| Account4|Amount Received|Receiving Currency|Amount Paid|Payment Currency|Payment Format| previous_Timestamp|delay_transaction|
+-------------------+---------+---------+-------+---------+---------------+------------------+-----------+----------------+--------------+-------------------+-----------------+
|2022-09-01 00:00:00|      070|10042B6F0|    012|80123D560|       30774.69|              Yuan|   30774.69|            Yuan|   Credit Card|               null|             null|
|2022-09-01 00:00:00|      070|10042B6F0|    015|803A33A80|       42452.75|              Yuan|   42452.75|            Yuan|   Credit Card|2022-09-01 00:00:00|                0|
|2022-09-01 00:00:00|      070|10042B6F0|    015|803A33A80|     1221938.71|              Yuan| 1221938.71|         

                                                                                

#### Calculer le montant total reçu pour chaque maille (compte x devise)

In [9]:
data_receiver = data \
    .groupby("Account4","Payment Currency") \
    .agg(
        func.sum("Amount Paid").alias("received")
    )

data_receiver.show(5)



+---------+----------------+--------------------+
| Account4|Payment Currency|            received|
+---------+----------------+--------------------+
|800C2E610|       US Dollar|  14792.869999999999|
|80018AC80|       US Dollar|2.1719611776999998E8|
|804397DC0|       US Dollar|           175191.34|
|8001FC2C0|       US Dollar|           149586.38|
|800201980|       US Dollar|  1667720.6100000006|
+---------+----------------+--------------------+
only showing top 5 rows



                                                                                

#### Calculer, à la maille (compte x devise), le nombre de transactions émises, le délai moyen entre celles-ci, et le montant total émis

In [10]:
data_sender = data \
    .groupby("Account2","Receiving Currency") \
    .agg(
        func.count("*").alias("num_transactions"),
        func.round(func.avg("delay_transaction")).cast("int").alias("avg_delay_transactions"),
        func.sum("Amount Received").alias("withdrawals")
    )

# Libérer de la mémoire
data = None

data_sender.show(5)

[Stage 11:>                                                         (0 + 1) / 1]

+---------+------------------+----------------+----------------------+--------------------+
| Account2|Receiving Currency|num_transactions|avg_delay_transactions|         withdrawals|
+---------+------------------+----------------+----------------------+--------------------+
|10042B6F0|              Yuan|           42385|                    20|1.059043235620106...|
|10042BA08|       Saudi Riyal|           22417|                    39|2.331883999062970...|
|800044900|         US Dollar|             135|                  6253|  1852232.4500000016|
|8000496E0|             Ruble|              51|                 15434| 5.072754238999996E8|
|80004B700|   Canadian Dollar|              54|                 15481|   370375.8500000001|
+---------+------------------+----------------+----------------------+--------------------+
only showing top 5 rows



                                                                                

#### Joindre toutes les colonnes dans une même table de sortie

In [11]:
out_table = data_sender \
    .join(
        data_receiver,
        (data_sender["Account2"] == data_receiver["Account4"]) & \
            (data_sender["Receiving Currency"] == data_receiver["Payment Currency"]),
        "left"
        ) \
    .join(
        frauds,
        ["Account2","Receiving Currency"],
        "left"
    ) \
    .fillna(False,["has_laundering"]) \
    .drop(*["Account4","Payment Currency"])

data_receiver, data_sender, frauds= None, None, None

out_table.show(5)

[Stage 19:>                                                         (0 + 1) / 1]

+---------+------------------+----------------+----------------------+--------------------+-------------------+--------------+
| Account2|Receiving Currency|num_transactions|avg_delay_transactions|         withdrawals|           received|has_laundering|
+---------+------------------+----------------+----------------------+--------------------+-------------------+--------------+
|10042B6F0|              Yuan|           42385|                    20|1.059043235620106...| 1421533.4500000004|         false|
|10042BA08|       Saudi Riyal|           22417|                    39|2.331883999062970...| 353520.13000000006|         false|
|800044900|         US Dollar|             135|                  6253|  1852232.4500000016| 40177.900000000016|         false|
|8000496E0|             Ruble|              51|                 15434| 5.072754238999996E8|8.019665816000001E7|         false|
|80004B700|   Canadian Dollar|              54|                 15481|   370375.8500000001|            9885.53|

                                                                                

#### Quelques statistiques descriptives

In [12]:
num_cols = [name for name,type in out_table.dtypes if type not in ['string','boolean']]    
out_table[num_cols].describe().show()

24/06/28 14:50:48 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/06/28 14:51:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+-------+-----------------+----------------------+--------------------+-------------------+
|summary| num_transactions|avg_delay_transactions|         withdrawals|           received|
+-------+-----------------+----------------------+--------------------+-------------------+
|  count|           690906|                478257|              690906|             550527|
|   mean|10.03275554127479|    199478.70502888615|6.3386390574794784E7|5.632632744592764E7|
| stddev|328.9970842586872|    239709.79695164334|1.065735430877576...|6.232381278807476E9|
|    min|                1|                     0|              1.0E-6|             1.0E-6|
|    max|           222037|               1101960| 7.29023328537458E12|3.64511664480207E12|
+-------+-----------------+----------------------+--------------------+-------------------+



                                                                                

In [14]:
out_table \
    .withColumn("has_laundering",func.col("has_laundering").cast("int")) \
    .agg(func.sum("has_laundering").alias("Nombre de mailles concernées par une transaction frauduleuse")) \
    .show()


                                                                                

+------------------------------------------------------------+
|Nombre de mailles concernées par une transaction frauduleuse|
+------------------------------------------------------------+
|                                                        1023|
+------------------------------------------------------------+

