In [None]:
import datatable as dt
import numpy as np
from datetime import datetime
from datatable import f, join, sort
import pandas as pd
import dask.dataframe as dd
import sys
import os
from sklearn.model_selection import train_test_split

def initial_preprocessing(raw_data, first_timestamp):
    
    data = []

    currency_dict = {}
    payment_format_dict = {}
    bank_account_dict = {}
    account_dict = {}

    def get_dict_value(name, collection):
        if name in collection:
            value = collection[name]
        else:
            value = len(collection)
            collection[name] = value
        return value

    for i in range(raw_data.nrows):
        datetime_object = datetime.strptime(raw_data[i, "Timestamp"], '%Y/%m/%d %H:%M')
        timestamp = datetime_object.timestamp()
        day = datetime_object.day
        month = datetime_object.month
        year = datetime_object.year

        if first_timestamp == -1:
            start_time = datetime(year, month, day)
            first_timestamp = start_time.timestamp() - 10

        timestamp = timestamp - first_timestamp

        receiving_currency = get_dict_value(raw_data[i, "Receiving Currency"], currency_dict)
        payment_currency = get_dict_value(raw_data[i, "Payment Currency"], currency_dict)

        payment_format = get_dict_value(raw_data[i, "Payment Format"], payment_format_dict)

        from_acc_id_str = raw_data[i, "From Bank"] + raw_data[i, 2]
        from_id = get_dict_value(from_acc_id_str, account_dict)

        to_acc_id_str = raw_data[i, "To Bank"] + raw_data[i, 4]
        to_id = get_dict_value(to_acc_id_str, account_dict)

        amount_received = float(raw_data[i, "Amount Received"])
        amount_paid = float(raw_data[i, "Amount Paid"])

        is_laundering = int(raw_data[i, "Is Laundering"])
        
        data.append([i, from_id, to_id, timestamp, amount_paid, payment_currency, amount_received, receiving_currency,
                     payment_format, is_laundering])
        
        # Creating a pandas DataFrame
        pandas_df = pd.DataFrame(data, columns=['Index', 'From_ID', 'To_ID', 'Timestamp', 'Amount_Paid', 'Payment_Currency',
                                     'Amount_Received', 'Receiving_Currency', 'Payment_Format', 'Is_Laundering'])

        ddf = dd.from_pandas(pandas_df, npartitions=2)

    return ddf, first_timestamp, currency_dict, payment_format_dict, bank_account_dict, account_dict


In [None]:
def add_edges_to_graph(G, ddf):
    def add_edges(partition):
        for index, row in partition.iterrows():
            G.add_edge(row['From_ID'], row['To_ID'], 
                       timestamp=row['Timestamp'], 
                       amount_sent=row['Amount_Paid'], 
                       amount_received=row['Amount_Received'], 
                       received_currency=row['Receiving_Currency'], 
                       payment_format=row['Payment_Format'])

    ddf.map_partitions(add_edges).compute()
    return ddf

In [None]:
import networkx as nx
import pandas as pd
import dask.dataframe as dd
import numpy as np

def create_graph(ddf):
    
    G = nx.DiGraph()
    ddf = add_edges_to_graph(G, ddf)
    
    return G, ddf

In [None]:
import networkx as nx
import pandas as pd
import dask.dataframe as dd
import numpy as np

def extract_features(node):
    features = {}
    # Degree
    features['degree'] = G.degree[node]
    # In Degree
    features['in_degree'] = G.in_degree[node]
    # Out Degree
    features['out_degree'] = G.out_degree[node]
    # Clustering Coefficient
    features['clustering_coefficient'] = nx.clustering(G, node)
    # Degree Centrality
    features['degree_centrality'] = nx.degree_centrality(G)[node]
    
    return features


In [None]:
def merge_trans_with_gf(transactions_ddf, graph_ddf):
    
    # Create a dictionary from graph_ddf for faster lookups
    graph_dict = dict(zip(graph_ddf['Node'], graph_ddf[['degree', 'in_degree', 'out_degree', 'clustering_coefficient', 'degree_centrality']].values))
    
    def merge_partition(partition):
        
        for index, row in partition.iterrows():
            
            from_node = row['From_ID']
            to_node = row['To_ID']
            
            if from_node in graph_dict:
                graph_row = graph_dict[from_node]
                partition.loc[index, 'from_degree'] = graph_row['degree']
                partition.loc[index, 'from_in_degree'] = graph_row['in_degree']
                partition.loc[index, 'from_out_degree'] = graph_row['out_degree']
                partition.loc[index, 'from_clustering_coeff'] = graph_row['clustering_coefficient']
                partition.loc[index, 'from_degree_centrality'] = graph_row['degree_centrality']
                
            if to_node in graph_dict:
                graph_row = graph_dict[to_node]
                partition.loc[index, 'to_degree'] = graph_row['degree']
                partition.loc[index, 'to_in_degree'] = graph_row['in_degree']
                partition.loc[index, 'to_out_degree'] = graph_row['out_degree']
                partition.loc[index, 'to_clustering_coeff'] = graph_row['clustering_coefficient']
                partition.loc[index, 'to_degree_centrality'] = graph_row['degree_centrality']
                
        return partition
    
    # Apply the function to each partition
    merged_ddf = transactions_ddf.map_partitions(merge_partition)
    
    return merged_ddf

# read data and train test split

In [None]:
input_file = "HI-Small_Trans.csv"
raw_data = dt.fread(input_file, columns=dt.str32, fill=True)

# Convert the raw_data DataTable to a pandas DataFrame
raw_data_df = raw_data.to_pandas()
print(raw_data_df.head())
# Splitting the raw_data into train and test sets
train_df, test_df = train_test_split(raw_data_df, test_size=0.2, random_state=42, stratify=raw_data_df['Is Laundering'])

# Convert the splits back to DataTable if necessary
train_dt = dt.Frame(train_df)
test_dt = dt.Frame(test_df)

# train set prep

In [None]:
initial_preprocessed_ddf, first_timestamp, currency_dict, payment_format_dict, bank_account_dict, account_dict
= initial_preprocessing(train_dt, first_timestamp = -1)
initial_preprocessed_ddf.head()

In [None]:
G, train_graph_ddf = create_graph(initial_preprocessed_ddf)
train_graph_ddf.head()

In [None]:
print("Number of nodes:", G.number_of_nodes())
print("Number of edges:", G.number_of_edges())

In [None]:
# Convert the list of unique nodes to a Dask DataFrame
unique_nodes = list(set(train_graph_ddf['From_ID']).union(train_graph_ddf['To_ID']))

#append to unique nodes whenever new accounts from test set come up
unique_nodes_dd = dd.from_pandas(pd.DataFrame(unique_nodes, columns=['Node']), npartitions=2)

# Apply extract_features function to each unique node
graph_features = unique_nodes_dd.map_partitions(lambda df: df.apply(lambda row: extract_features(row['Node']), axis=1), meta={'degree': 'float64', 'in_degree': 'float64', 'out_degree': 'float64', 'clustering_coefficient': 'float64', 'degree_centrality': 'float64'})

# Persist the result in memory
graph_features = graph_features.persist()

# Display the first few rows of the resulting Dask DataFrame
print(graph_features.head())

In [None]:
# Add new columns to transactions_ddf
train_graph_ddf['from_degree'] = None
train_graph_ddf['from_in_degree'] = None
train_graph_ddf['from_out_degree'] = None
train_graph_ddf['from_clustering_coeff'] = None
train_graph_ddf['from_degree_centrality'] = None
train_graph_ddf['to_degree'] = None
train_graph_ddf['to_in_degree'] = None
train_graph_ddf['to_out_degree'] = None
train_graph_ddf['to_clustering_coeff'] = None
train_graph_ddf['to_degree_centrality'] = None
    

In [None]:
train_graph_ddf.head()

In [None]:
preprocessed_train_df = merge_trans_with_gf(train_graph_ddf, graph_features)
# normalize the dataset then train

# test set prep

In [None]:
import datatable as dt
import numpy as np
from datetime import datetime
from datatable import f, join, sort
import pandas as pd
import dask.dataframe as dd
import sys
import os
from sklearn.model_selection import train_test_split

def initial_preprocessing(raw_data, first_timestamp, currency_dict, payment_format_dict, bank_account_dict, account_dict):
    
    data = []

    def get_dict_value(name, collection):
        if name in collection:
            value = collection[name]
        else:
            value = len(collection)
            collection[name] = value
        return value

    for i in range(raw_data.nrows):
        datetime_object = datetime.strptime(raw_data[i, "Timestamp"], '%Y/%m/%d %H:%M')
        timestamp = datetime_object.timestamp()
        day = datetime_object.day
        month = datetime_object.month
        year = datetime_object.year

        if first_timestamp == -1:
            start_time = datetime(year, month, day)
            first_timestamp = start_time.timestamp() - 10

        timestamp = timestamp - first_timestamp

        receiving_currency = get_dict_value(raw_data[i, "Receiving Currency"], currency_dict)
        payment_currency = get_dict_value(raw_data[i, "Payment Currency"], currency_dict)

        payment_format = get_dict_value(raw_data[i, "Payment Format"], payment_format_dict)

        from_acc_id_str = raw_data[i, "From Bank"] + raw_data[i, 2]
        from_id = get_dict_value(from_acc_id_str, account_dict)

        to_acc_id_str = raw_data[i, "To Bank"] + raw_data[i, 4]
        to_id = get_dict_value(to_acc_id_str, account_dict)

        amount_received = float(raw_data[i, "Amount Received"])
        amount_paid = float(raw_data[i, "Amount Paid"])

        is_laundering = int(raw_data[i, "Is Laundering"])
        
        data.append([i, from_id, to_id, timestamp, amount_paid, payment_currency, amount_received, receiving_currency,
                     payment_format, is_laundering])
        
        # Creating a pandas DataFrame
        pandas_df = pd.DataFrame(data, columns=['Index', 'From_ID', 'To_ID', 'Timestamp', 'Amount_Paid', 'Payment_Currency',
                                     'Amount_Received', 'Receiving_Currency', 'Payment_Format', 'Is_Laundering'])

        ddf = dd.from_pandas(pandas_df, npartitions=2)
        
    return ddf, first_timestamp, currency_dict, payment_format_dict, bank_account_dict, account_dict


In [None]:
test_initial_preprocessed_ddf, first_timestamp, currency_dict, payment_format_dict, bank_account_dict, account_dict
= initial_preprocessing(test_dt, first_timestamp)
test_initial_preprocessed_ddf.head()

In [None]:
test_graph_ddf = add_edge_to_graph(G, test_initial_preprocessed_ddf)

In [None]:
unique_nodes_test = list(set(test_graph_ddf['From_ID']).union(test_graph_ddf['To_ID']))

#apunique_nodes_tesunique_nodes_testto unique nodes whenever new accounts from test set come up
unique_nodes_dd = dd.from_pandas(pd.DataFrame(unique_nodes, columns=['Node']), npartitions=2)

# Apply extract_features function to each unique node
graph_features = unique_nodes_dd.map_partitions(lambda df: df.apply(lambda row: extract_features(row['Node']), axis=1), meta={'degree': 'float64', 'in_degree': 'float64', 'out_degree': 'float64', 'clustering_coefficient': 'float64', 'degree_centrality': 'float64'})

# Persist the result in memory
graph_features = graph_features.persist()

# Display the first few rows of the resulting Dask DataFrame
print(graph_features.head())

In [None]:
# Add new columns to transactions_ddf
test_graph_ddf['from_degree'] = None
test_graph_ddf['from_in_degree'] = None
test_graph_ddf['from_out_degree'] = None
test_graph_ddf['from_clustering_coeff'] = None
test_graph_ddf['from_degree_centrality'] = None
test_graph_ddf['to_degree'] = None
test_graph_ddf['to_in_degree'] = None
test_graph_ddf['to_out_degree'] = None
test_graph_ddf['to_clustering_coeff'] = None
test_graph_ddf['to_degree_centrality'] = None

In [None]:
preprocessed_test_df = merge_trans_with_gf(test_graph_ddf, graph_features)

In [None]:
train = train + test without the graph features
graph_features = extract_gf(all nodes from train)
append to train
model.fit(train)