In [1]:
if 'google.colab' in str(get_ipython()):
    KAGGLE_USER=""
    KAGGLE_KEY=""
    VIEW="default-viewer"
    VERBOSE="0"

In [2]:
if 'google.colab' in str(get_ipython()):
    GITHUB_TOKEN = ""
    !rm -rf anti-money-laundering
    #!git clone https://{GITHUB_TOKEN}@github.com/FedericoBruzzone/anti-money-laundering.git
    !git clone https://github.com/FedericoBruzzone/anti-money-laundering.git
    !mv anti-money-laundering/.* .
    !mv anti-money-laundering/* .
    !rm -rf ./anti-money-laundering
    !pip install -e .

In [3]:
if 'google.colab' in str(get_ipython()):
    !cp ./.env.example ./.env
    with open('.env', 'r') as f:
        mod = f.read().splitlines()
        mod = mod[2:-2]
        mod.append(f'KAGGLE_USER={KAGGLE_USER}')
        mod.append(f'KAGGLE_KEY={KAGGLE_KEY}')
        mod.append(f'VIEW={VIEW}')
        mod.append(f'VERBOSE={VERBOSE}')
    with open('.env', 'w') as f:
        for i in mod:
            f.write(i + '\n')

In [4]:
import os
from dotenv import load_dotenv
load_dotenv()

import time
import pandas as pd

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

from src.utils.kaggle_config            import setup_kaggle
from src.utils.kaggle_config            import download_dataset

from src.utils.datasets_handler         import (get_train_and_test,
                                                get_X_and_Y,
                                                print_dataset,
                                                label_encoder,
                                                split_timestamp)
from src.utils.performance_measures     import calculate_performances
from src.utils.dataset_sampling_methods import (oversampling,
                                                undersampling,
                                                bootstrap_sampling)

from src.utils.print_utils              import (printLBlue, printGreen)

from src.decision_tree.decision_tree    import CustomDecisionTree
from src.decision_tree.ID3              import DecisionTreeID3
from src.decision_tree.C45              import DecisionTreeC45
from src.decision_tree.entropy_type     import EntropyType
from src.decision_tree.criterion_type   import CriterionType

from IPython.display import Image, display

from src.utils.spark_config import get_spark_session
from pyspark import TaskContext
import pandas as pd

VERBOSE = int(os.getenv('VERBOSE'))
VIEW = os.getenv('VIEW')

setup_kaggle()
print("Downloading dataset...") 
download_dataset("iammustafatz/diabetes-prediction-dataset")
download_dataset("ealtman2019/ibm-transactions-for-anti-money-laundering-aml")
print("Done.")

hi_small_trans = "HI-Small_Trans.csv"
diabetes = "diabetes_prediction_dataset.csv"

Downloading dataset...
Done.


In [5]:
index_tree = 0

def create_trees(partition_elements, verbose=False):
    list_series = []
    for element in partition_elements:
        series_tmp = pd.Series(element.asDict())
        list_series.append(series_tmp)

    part_df = pd.DataFrame(list_series, columns=COLUMNS_NAME)
    X_train, y_train = get_X_and_Y(part_df, verbose=VERBOSE)
    X_train, _ = label_encoder(X_train, ['Timestamp', 'Account', 'Account.1', 'Receiving Currency', 'Payment Currency', 'Payment Format'])

    if index_tree == 0:
        decision_tree: DecisionTreeID3 = DecisionTreeID3(max_depth=8,
                                                        num_thresholds_numerical_attr=2,
                                                        VERBOSE=False)
    elif index_tree == 1:
        decision_tree: CustomDecisionTree = CustomDecisionTree(criterion=EntropyType.SHANNON, 
                                                            type_criterion=CriterionType.BEST, 
                                                            max_depth=20, 
                                                            min_samples_split=100,
                                                            num_thresholds_numerical_attr=2,
                                                            VERBOSE=False)
    elif index_tree == 2:
        decision_tree: DecisionTreeC45 = DecisionTreeC45(max_depth=4,
                                                        min_samples_split=100,
                                                        VERBOSE=False)
    

    decision_tree.fit(X_train, y_train)

    if verbose:
        ctx = TaskContext()
        decision_tree.create_dot_files(filename="tree" + str(ctx.partitionId()),
                                       generate_png=True,
                                       view="default-viewer")
    yield decision_tree

def predict_trees(new_line):
    def wrap(tree):
        prediction = tree.predict(new_line)
        return prediction
    return wrap

def predict_trees_all(X_test):
    def wrap(tree):
        predictions = tree.predict_test_no_gen(X_test)
        return predictions
    return wrap

In [6]:
name = "AntiMoneyLaundering"

spark = get_spark_session(name, VERBOSE)

df_train, df_test = get_train_and_test(hi_small_trans, verbose=VERBOSE)

df_train_p = df_train[df_train["Is Laundering"] == 1]
df_train_n = df_train[df_train["Is Laundering"] == 0]
df_train_n = df_train_n.sample(frac=0.1, random_state=2)
df_train = pd.concat([df_train_p, df_train_n])

df_train = oversampling(df_train, VERBOSE=False)
df_train = bootstrap_sampling(df_train)
print("len(df_train):", len(df_train))

COLUMNS_NAME: list = df_train.columns.tolist()
X_train, y_train = get_X_and_Y(df_train, verbose=VERBOSE)
X_test, y_test = get_X_and_Y(df_test, verbose=VERBOSE)

# X_train, _ = label_encoder(X_train, ['Timestamp', 'Account', 'Account.1', 'Receiving Currency', 'Payment Currency', 'Payment Format'])
X_test, _ = label_encoder(X_test, ['Timestamp', 'Account', 'Account.1', 'Receiving Currency', 'Payment Currency', 'Payment Format'])

23/10/01 17:09:23 WARN Utils: Your hostname, federicobruzzone resolves to a loopback address: 127.0.1.1; using 192.168.1.4 instead (on interface wlo1)
23/10/01 17:09:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/01 17:09:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


len(df_train): 670558


In [7]:
df = spark.createDataFrame(df_train)

print("Printing spark dataframe...")
df.show()

rdd = df.rdd

Printing spark dataframe...


[Stage 0:>                                                          (0 + 1) / 1]

+----------------+---------+---------+-------+---------+---------------+------------------+-----------+-----------------+--------------+-------------+
|       Timestamp|From Bank|  Account|To Bank|Account.1|Amount Received|Receiving Currency|Amount Paid| Payment Currency|Payment Format|Is Laundering|
+----------------+---------+---------+-------+---------+---------------+------------------+-----------+-----------------+--------------+-------------+
|2022/09/02 18:03|     3305|800419340|   1362|800419480|         196.87|         US Dollar|     196.87|        US Dollar|        Cheque|            0|
|2022/09/02 04:18|   238845|80E265080| 238190|80E992DA0|         602.86|       Swiss Franc|     602.86|      Swiss Franc|   Credit Card|            0|
|2022/09/10 09:23|    11107|801B6EE90|  23525|811B0B300|         719.41|              Euro|     719.41|             Euro|   Credit Card|            0|
|2022/09/02 10:30|       28|80B7B8090| 131086|80B7B84C0|       32805.32|      Mexican Peso|   

                                                                                

In [8]:
def map_to_column_value_pairs(row):
    return [(i, row[i]) for i in range(len(row))]

def count_values(a, b):
    return a + b

In [1]:
# ID3
index_tree = 0
start_time = time.time()
predictions = rdd.mapPartitions(create_trees, False) \
                 .map(predict_trees_all(X_test)) \
                 .flatMap(map_to_column_value_pairs) \
                 .map(lambda x: (x, 1)) \
                 .reduceByKey(count_values) \
                 .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
                 .reduceByKey(count_values) \
                 .map(lambda x: (x[0], max(x[1], key=lambda item: item[1]))) \
                 .map(lambda x: x[1][0]) \
                 .collect()
end_time = time.time()
print("\nFit time + Predict Time: %.2f minutes" % ((end_time - start_time) / 60))
calculate_performances(predictions, y_test, verbose=True)

NameError: name 'time' is not defined

In [10]:
# Custom
index_tree = 1
start_time = time.time()
predictions = rdd.mapPartitions(create_trees, False) \
                 .map(predict_trees_all(X_test)) \
                 .flatMap(map_to_column_value_pairs) \
                 .map(lambda x: (x, 1)) \
                 .reduceByKey(count_values) \
                 .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
                 .reduceByKey(count_values) \
                 .map(lambda x: (x[0], max(x[1], key=lambda item: item[1]))) \
                 .map(lambda x: x[1][0]) \
                 .collect()
end_time = time.time()
print("\nFit time + Predict Time: %.2f minutes" % ((end_time - start_time) / 60))
calculate_performances(predictions, y_test, verbose=True)

ERROR:root:KeyboardInterrupt while sending command.               (0 + 12) / 12]
Traceback (most recent call last):
  File "/home/fcb/Documents/anti-money-laundering/venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/fcb/Documents/anti-money-laundering/venv/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

[Stage 4:>                                                        (0 + 12) / 12]

In [None]:
# C4.5
index_tree = 2
start_time = time.time()
predictions = rdd.mapPartitions(create_trees, False) \
                 .map(predict_trees_all(X_test)) \
                 .flatMap(map_to_column_value_pairs) \
                 .map(lambda x: (x, 1)) \
                 .reduceByKey(count_values) \
                 .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
                 .reduceByKey(count_values) \
                 .map(lambda x: (x[0], max(x[1], key=lambda item: item[1]))) \
                 .map(lambda x: x[1][0]) \
                 .collect()
end_time = time.time()
print("\nFit time + Predict Time: %.2f minutes" % ((end_time - start_time) / 60))
calculate_performances(predictions, y_test, verbose=True)

In [None]:
spark.stop()