In [1]:
import pandas as pd
import numpy as np
import tqdm
from pyspark.sql.window import Window
from sklearn.preprocessing import LabelEncoder
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from graphframes import GraphFrame
from pyspark.sql.types import *
import multiprocessing
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
import os
import shutil

In [2]:
spark_driver_memory = "10g"
spark_executor_memory = "6g"
spark_partial_results_folder = './partial_results'


spark = SparkSession.builder \
                    .config("spark.driver.memory", spark_driver_memory) \
                    .config("spark.executor.memory", spark_executor_memory) \
                    .master("local[*]") \
                    .config("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)\
                    .config("spark.executor.cores", "10") \
                    .getOrCreate()
print("Spark session created")
sc = spark.sparkContext
print("Spark context created")


if not os.path.exists(spark_partial_results_folder):
    os.makedirs(spark_partial_results_folder)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/10 20:31:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session created
Spark context created


In [3]:
schema = StructType([
    StructField('timestamp', StringType(), True),
    StructField('from_bank', IntegerType(), True),
    StructField('from_account', StringType(), True),
    StructField('to_bank', IntegerType(), True),
    StructField('to_account', StringType(), True),
    StructField('amount_received', FloatType(), True),
    StructField('receiving_currency', StringType(), True),
    StructField('amount_paid', FloatType(), True),
    StructField('payment_currency', StringType(), True),
    StructField('payment_format', StringType(), True),
    StructField('is_laundering', IntegerType(), True)])



df = spark.read.csv("../dataset/HI-Small_Trans.csv", header = False, schema=schema)

In [4]:
df = df.withColumn("index", monotonically_increasing_id())
df = df.filter(col('index') > 0)
df.show(5)

+----------------+---------+------------+-------+----------+---------------+------------------+-----------+----------------+--------------+-------------+-----+
|       timestamp|from_bank|from_account|to_bank|to_account|amount_received|receiving_currency|amount_paid|payment_currency|payment_format|is_laundering|index|
+----------------+---------+------------+-------+----------+---------------+------------------+-----------+----------------+--------------+-------------+-----+
|2022/09/01 00:20|       10|   8000EBD30|     10| 8000EBD30|        3697.34|         US Dollar|    3697.34|       US Dollar|  Reinvestment|            0|    1|
|2022/09/01 00:20|     3208|   8000F4580|      1| 8000F5340|           0.01|         US Dollar|       0.01|       US Dollar|        Cheque|            0|    2|
|2022/09/01 00:00|     3209|   8000F4670|   3209| 8000F4670|       14675.57|         US Dollar|   14675.57|       US Dollar|  Reinvestment|            0|    3|
|2022/09/01 00:02|       12|   8000F5030

In [5]:
df = df.withColumn("timestamp", to_timestamp("timestamp", "yyyy/MM/dd HH:mm"))

# Split the timestamp column into separate components
df = df.withColumn("year", year("timestamp"))\
                             .withColumn("month", month("timestamp"))\
                             .withColumn("day", dayofmonth("timestamp"))\
                             .withColumn("hour", hour("timestamp"))\
                             .withColumn("minute", minute("timestamp"))

df.cache()

DataFrame[timestamp: timestamp, from_bank: int, from_account: string, to_bank: int, to_account: string, amount_received: float, receiving_currency: string, amount_paid: float, payment_currency: string, payment_format: string, is_laundering: int, index: bigint, year: int, month: int, day: int, hour: int, minute: int]

In [6]:
ordered_df = df.orderBy("timestamp")

# Calculate row counts for splits
total_rows = ordered_df.count()
train_rows, validation_rows = int(total_rows * 0.6), int(total_rows * 0.2)
test_rows = total_rows - train_rows - validation_rows

# Add a dummy partition and assign row numbers based on ordered timestamps
w = Window.partitionBy(lit(1)).orderBy("timestamp")
ordered_df = ordered_df.withColumn("row_number", F.row_number().over(w))

# Split and repartition the DataFrame into train, validation, and test sets based on row numbers
train_df = ordered_df.filter(col("row_number") <= train_rows).drop("row_number", "dummy_partition").repartition(48)
validation_df = ordered_df.filter(col("row_number").between(train_rows + 1, train_rows + validation_rows)).drop("row_number", "dummy_partition").repartition(48)
test_df = ordered_df.filter(col("row_number") > train_rows + validation_rows).drop("row_number", "dummy_partition").repartition(48)

df.unpersist()

                                                                                

DataFrame[timestamp: timestamp, from_bank: int, from_account: string, to_bank: int, to_account: string, amount_received: float, receiving_currency: string, amount_paid: float, payment_currency: string, payment_format: string, is_laundering: int, index: bigint, year: int, month: int, day: int, hour: int, minute: int]

# Add features

In [7]:
def add_trans_received(df, name):
    
    window = Window.partitionBy('to_account', 'day')
    df1 = df.withColumn("transaction_received_per_day", count('*').over(window))

    window = Window.partitionBy('to_account', 'hour')
    df1 = df1.withColumn("transaction_received_per_hour", count('*').over(window))

    window = Window.partitionBy('to_account', 'minute')
    df1 = df1.withColumn("transaction_received_per_minute", count('*').over(window))

    #df.coalesce(48).write.parquet(os.path.join(spark_partial_results_folder, f'partial_{name}'), mode='overwrite')
    df.unpersist()
    return df1.cache()

In [8]:
def add_trans_send(df, name):
    window = Window.partitionBy('from_account', 'day')
    df1 = df.withColumn("transaction_send_per_day", count('*').over(window))

    window = Window.partitionBy('from_account', 'hour')
    df1 = df1.withColumn("transaction_send_per_hour", count('*').over(window))

    window = Window.partitionBy('from_account', 'minute')
    df1 = df1.withColumn("transaction_send_per_minute", count('*').over(window))

    df.unpersist()
    return df1.cache()

In [9]:
def add_time_since_last_trans(df, name):
    windowSpec = Window.partitionBy("from_account").orderBy("timestamp")

    # Usa la funzione 'lag' per ottenere il timestamp della transazione precedente
    df1 = df.withColumn("prev_timestamp", F.lag(df.timestamp).over(windowSpec))

    # Calcola la differenza in minuti
    df1 = df1.withColumn(
        "minutes_since_last_transaction", 
        F.when(
            F.isnull(df1.prev_timestamp), 
            -1
        ).otherwise(
            (F.unix_timestamp(df1.timestamp) - F.unix_timestamp(df1.prev_timestamp)) / 60
        )
    )
    df1 = df1.drop('prev_timestamp')

    df.unpersist()
    return df1.cache()

In [10]:
def add_amount_variation(df, name):
    windowSpec = Window.partitionBy("from_account").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)

    # Calcola l'importo medio delle transazioni precedenti
    df1 = df.withColumn("avg_previous_amount", F.coalesce(F.avg("amount_received").over(windowSpec), F.lit(0)))

    # Calcola la variazione dell'importo e utilizza 0 come valore di default se è nulla
    df1 = df1.withColumn("amount_variation", F.coalesce(F.col("amount_received") - F.col("avg_previous_amount"), F.lit(0)))

    
    df.unpersist()
    return df1.cache()

In [11]:
def add_trans_recurrence(df, name):
    windowSpec = Window.partitionBy("from_account", "to_account").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

    # Calcola la ricorrenza delle transazioni
    df1 = df.withColumn("transaction_recurrence", F.count("timestamp").over(windowSpec))
    
    df.unpersist()
    return df1.cache()

In [12]:
def add_unique_payment_format_last_day(df, name):
    df1 = df.withColumn("timestamp_minutes", F.unix_timestamp("timestamp")/60)

    # Crea una finestra partizionata per from_account e ordinata per timestamp_minutes
    # e limita la finestra alle ultime 24 ore (1440 minuti)
    windowSpec = Window.partitionBy("from_account").orderBy("timestamp_minutes")\
                .rangeBetween(-1440, Window.currentRow)

    # Calcola i diversi payment_format utilizzati all'interno della finestra temporale
    df1 = df1.withColumn("unique_payment_formats_last_day", F.collect_set("payment_format").over(windowSpec))

    # Calcola il numero di metodi di pagamento unici utilizzati
    df1 = df1.withColumn("num_unique_payment_formats_last_day", F.size("unique_payment_formats_last_day"))

    df1 = df1.drop('unique_payment_formats_last_day')

    
    df.unpersist()
    return df1.cache()

In [13]:
def add_unique_payment_currency_last_day(df, name):
    windowSpec = Window.partitionBy("from_account").orderBy("timestamp_minutes")\
                .rangeBetween(-1440, Window.currentRow)

    # Calcola i diversi payment_format utilizzati all'interno della finestra temporale
    df1 = df.withColumn("unique_payment_currency_last_day", F.collect_set("payment_currency").over(windowSpec))

    # Calcola il numero di metodi di pagamento unici utilizzati
    df1 = df1.withColumn("num_unique_payment_currency_last_day", F.size("unique_payment_currency_last_day"))

    df1 = df1.drop('timestamp_minutes').drop('unique_payment_formats_last_day').drop('unique_payment_currency_last_day')

    
    df.unpersist()
    return df1.cache()

In [20]:
def create_features(df):
    df1 = df.select("*",
                    F.count("from_account").over(Window.partitionBy("from_account", "to_account", F.date_format("timestamp", "yyyy-MM-dd"))).alias("daily_trans_between_accounts"),
                    F.size(F.collect_set("to_account").over(Window.partitionBy("from_account", "timestamp"))).alias("num_dest_accounts")
                    )
    
    
    df1.cache().count()
    df.unpersist()
    return df1

In [15]:
def label_encoding(df, name):

    def label_columns(df, col1, col2 = None):
        unique_columns = set(df.select(col1).orderBy(col1).distinct().rdd.flatMap(lambda x: x).collect())
        if(col2 != None):
            unique_columns.update(df.select(col2).distinct().rdd.flatMap(lambda x: x).collect())

        # Create a dictionary to map accounts to indexes
        column_to_index = {column: index for index, column in enumerate(unique_columns)}

        # Create a UDF to map accounts to their indexes
        from pyspark.sql.functions import udf
        from pyspark.sql.types import IntegerType

        column_to_index_udf = udf(lambda column: column_to_index[column], IntegerType())

        # Add indexed columns to the DataFrame
        df_to_return = df.withColumn(f"{col1}_indexed", column_to_index_udf(col(col1))).drop(col1).withColumnRenamed(f'{col1}_indexed', col1) 
        if(col2!=None):
            df_to_return = df_to_return.withColumn(f"{col2}_indexed", column_to_index_udf(col(col2))).drop(col2).withColumnRenamed(f'{col2}_indexed', col2)
        return df_to_return
    
    df1 = label_columns(df, 'from_account', 'to_account')
    df1 = label_columns(df1, 'receiving_currency', 'payment_currency')
    df1 = label_columns(df1, 'payment_format')

    df1.coalesce(48).write.parquet(os.path.join(spark_partial_results_folder, f'partial_{name}'), mode='overwrite')
    df.unpersist()
    

In [16]:
def save_as_parquet(name):
    df = pd.read_parquet(f'./partial_results/partial_{name}')
    
    df.drop('index', axis=1, inplace=True)

    df['same_account'] = df['from_account'] == df['to_account']
    df['same_bank'] = df['from_bank'] == df['to_bank']
    df['same_currency'] = df['receiving_currency'] == df['payment_currency']
    df['same_amount'] = df['amount_received'] == df['amount_paid']

    df.drop('timestamp', axis=1, inplace=True)
    df.drop('from_bank', axis=1, inplace=True)
    df.drop('to_bank', axis=1, inplace=True)
    df.drop('from_account', axis=1, inplace=True)
    df.drop('to_account', axis=1, inplace=True)
    df.drop('year', axis=1, inplace=True)
    df.drop('month', axis=1, inplace=True)

    column = df.pop("is_laundering")
    df['is_laundering'] = column

    
    columns_to_encode = ['receiving_currency', 'payment_currency']
    combined_values = df[columns_to_encode].values.ravel()
    unique_combined_values = pd.unique(combined_values)
    encoder = LabelEncoder()
    encoder.fit(unique_combined_values)
    for column in columns_to_encode:
        df[column] = encoder.transform(df[column])

    encoder.fit(df['payment_format'])
    df['payment_format'] = encoder.transform(df['payment_format'])

    columns_to_convert = ['same_account', 'same_bank', 'same_currency', 'same_amount', 'is_laundering']
    df[columns_to_convert] = df[columns_to_convert].astype(int)

    df.drop('amount_received', axis=1, inplace=True)
    df.drop('amount_paid', axis=1, inplace=True)

    df.to_parquet(f'./preprocessed_data/{name}_small', index=False)


In [17]:
def preprocess_data(df, name):
    df = add_trans_received(df, name)
    print(f'{name}: Added trans received')
    df = add_trans_send(df, name)
    print(f'{name}: Added trans send')
    df = add_time_since_last_trans(df, name)
    print(f'{name}: add_time_since_last_trans')
    df = add_amount_variation(df, name)
    print(f'{name}: add_amount_variation')
    df = add_unique_payment_format_last_day(df, name)
    print(f'{name}: add_unique_payment_format_last_day')
    df = add_unique_payment_currency_last_day(df, name)
    print(f'{name}: add_unique_payment_currency_last_day')
    df = create_features(df)
    print(f'{name}: add_features_day')
    df = label_encoding(df, name)
    print(f'{name}: label_encoding')
    save_as_parquet(name)
    print(f'{name}: saved - finish')

In [21]:
preprocess_data(train_df, 'train')
preprocess_data(validation_df, 'val')
preprocess_data(test_df, 'test')

train: Added trans received
train: Added trans send
train: add_time_since_last_trans
train: add_amount_variation
train: add_unique_payment_format_last_day
train: add_unique_payment_currency_last_day


23/08/10 20:31:47 WARN CacheManager: Asked to cache already cached data.
                                                                                

train: add_features_day


                                                                                

train: label_encoding
train: saved - finish
val: Added trans received
val: Added trans send
val: add_time_since_last_trans
val: add_amount_variation
val: add_unique_payment_format_last_day
val: add_unique_payment_currency_last_day


                                                                                

val: add_features_day


                                                                                

val: label_encoding
val: saved - finish
test: Added trans received
test: Added trans send
test: add_time_since_last_trans
test: add_amount_variation
test: add_unique_payment_format_last_day
test: add_unique_payment_currency_last_day


                                                                                

test: add_features_day


                                                                                

test: label_encoding
test: saved - finish


In [22]:
spark.stop()