# Anti-Money Laundering

A brain dump of anti-money-laundering (AML) code snippets for the Data Science capstone project.


## Setup

Captures the set of Python imports the Notebook requires, as well as any constants defined for the analysis.

In [None]:
import calendar
import hashlib
import sys
import os
import random
from copy import deepcopy
from datetime import datetime
from pprint import pprint
from time import monotonic
from typing import Union

import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from google.colab import drive
from IPython import get_ipython
from IPython.display import display

content_base = "/content/drive"
data_dir = os.path.join(content_base, "My Drive/capstone/data")
dataset = "LI-Small_Trans.csv"
data_file = os.path.join(data_dir, dataset)

# Some portions of the analysis are skipped due to how costly they may
# be, or that they only needed to be executed once
check_dataset_uniqueness = False
create_unique_identifiers = False
compare_dataset_conversion_rates = False
get_new_usd_conversion_rates = True
plot_exchange_rate_time_series = True
plot_exchange_rate_heat_map = True

### Notebook Stuff

Not important to the project at all, just modifying aspects of the notebook runtime for my own use. This assumes you're running the notebook in Google Colab.

In [None]:
# Google Colaboratory executes in an environment with a file system
# that has a Linux topography, but where the user should work under
# the `/content` directory
COLAB_ROOT = "/content"

REPO_URL = "https://github.com/engie4800/dsi-capstone-spring-2025-TD-anti-money-laundering.git"
REPO_ROOT = os.path.join(COLAB_ROOT, REPO_URL.split("/")[-1].split(".")[0])
REPO_BRANCH = "colin"

# Clones the repository at `/content/dsi-capstone-spring-2025-TD-anti-money-laundering`
if not os.path.exists(REPO_ROOT):
  os.chdir(COLAB_ROOT)
  !git clone {REPO_URL}

# Pulls the latest code from the provided branch and adds the
# analysis pipeline source code to the Python system path
os.chdir(REPO_ROOT)
!git pull
!git checkout {REPO_BRANCH}
sys.path.append(os.path.join(REPO_ROOT, "Code/src"))
os.chdir(COLAB_ROOT)

In [None]:
from helpers import add_cell_timer

add_cell_timer()

### Load Data

In [None]:
drive.mount(content_base)
files = os.listdir(data_dir)
print("\nData files available:")
pprint(files)

In [None]:
df = pd.read_csv(data_file)

## Data Overview

Explore aspects of the data without applying any transformations or doing any feature engineering.

### Features

The selected data set has the following features.

In [None]:
df.dtypes

Rename the features to be more explicit with the hope of avoiding common mistakes, e.g. mistaking `Account` and `Account.1`. The new names use snake case because we're in Python.

In [None]:
df.rename(
    columns={
        "Timestamp": "timestamp",
        "From Bank": "from_bank",
        "Account": "from_account",
        "To Bank": "to_bank",
        "Account.1": "to_account",
        "Amount Received": "received_amount",
        "Receiving Currency": "received_currency",
        "Amount Paid": "sent_amount",
        "Payment Currency": "sent_currency",
        "Payment Format": "payment_type",
        "Is Laundering": "is_laundering",
    },
    inplace=True,
)

### Data Description

Provides a general overview of the data

In [None]:
df.select_dtypes(include=["number"]).describe()

In [None]:
df.select_dtypes(include=["object", "category"]).drop(columns="timestamp").describe()

In [None]:
df.head()

### Missing Values

Determines whether there are missing values. There aren't any in the initial exploration of the data, and so the following cell will cause the notebook to fail if null values are present, as it may violate an assumption made by subsequent steps in the analysis.

In [None]:
df.isnull().sum()

if df.isnull().values.any():
    raise ValueError(
        "Initial analysis showed that there were no null values in the data "
        "set, and the proceeding work was done under this assumption. "
        "However, null values were detected. Does the dataset now need to be "
        "cleaned prior to analysis?"
    )

## Transaction-Level Transform

Some of the columns we will add to the dataset are simple permutations within each row. These can be added prior to any train-test splits. It might be preferable to add them earlier, so they can be included in some of the below exploratory data analysis.

### Day of Week, Month

Convert the timestamp into day of week and month categorical variables.

In [None]:
df['datetime'] = pd.to_datetime(df['timestamp'], format='%Y/%m/%d %H:%M')

df['day_of_week'] = df['datetime'].dt.day_name()
df['hour_of_day'] = df['datetime'].dt.hour.astype(str)
df['month'] = df['datetime'].dt.month_name()

df[[
    "timestamp",
    "day_of_week",
    "hour_of_day",
    "month",
]].head()

## Data Imbalance

Consider the shape of the data, and explore how the base features are balanced.

### Laundering Rate

Most anti-money-laundering datasets have an imbalance between the number of licit and illicit transactions. For imbalanced datasets, care needs to be taken in the metric chosen to measure model performance or experimental results.

In [None]:
print(f'Laundering rate: {round(100*(df["is_laundering"].sum() / len(df["is_laundering"])), 3)}%')

### Categorical Imbalance

The balance in categorical features can be handled directly. Numerical features need to be binned prior to demonstrating imbalance.

In [None]:
def plot_column_imbalance(
    df: pd.DataFrame,
    column: str,
    label: str,
    bins: list[Union[int, float]]=[0, 10, 100, 1000, 10000, np.inf],
) -> None:
    if pd.api.types.is_numeric_dtype(df[column]):
        # Custom bins for numerical data
        df["binned"] = pd.cut(
            df[column],
            bins=bins,
            include_lowest=True,
        )
        bin_labels = {
            interval: f"{int(interval.left)} - {int(interval.right) if interval.right != np.inf else '∞'}"
            for interval in df["binned"].cat.categories
        }

        df["binned"] = df["binned"].map(bin_labels)
        all_types = sorted(
            bin_labels.values(),
            key=lambda x: int(x.split(" - ")[0]),
        )
        data_column = "binned"
    else:
        if column == "hour_of_day":
            all_types = sorted(df[column].unique(), key=int)
        else:
            all_types = sorted(df[column].unique())
        data_column = column

    df_licit = df[df["is_laundering"] == 0]
    proportion_licit = df_licit[data_column].value_counts(normalize=True) * 100
    proportion_licit = proportion_licit.reindex(all_types, fill_value=0)

    df_illicit = df[df["is_laundering"] == 1]
    proportion_illicit = df_illicit[data_column].value_counts(normalize=True) * 100
    proportion_illicit = proportion_illicit.reindex(all_types, fill_value=0)

    total_proportion = proportion_licit + proportion_illicit
    licit_normalized = (proportion_licit / total_proportion) * 100
    illicit_normalized = (proportion_illicit / total_proportion) * 100

    fig, ax = plt.subplots(figsize=(7, 4))
    y_pos = np.arange(len(all_types))

    ax.barh(y_pos, licit_normalized, color="#76c7c0", label="Licit")
    ax.barh(y_pos, illicit_normalized, left=licit_normalized, color="#f4a261", label="Illicit")
    ax.axvline(50, linestyle="--", color="gray", linewidth=1)

    ax.set_yticks(y_pos)
    ax.set_yticklabels(all_types)
    ax.set_xlabel("Proportion")
    ax.set_title(f"{label}, Licit vs. Illicit")
    ax.legend(loc="upper left", bbox_to_anchor=(1, 1))
    plt.gca().invert_yaxis()
    plt.show()


column_mapping_imbalance = {
    "Payment Type": "payment_type",
    "Sent Currency": "sent_currency",
    "Received Currency": "received_currency",
    "Sent Amount": "sent_amount",
    "Received Amount": "received_amount",
    "Day of Week": "day_of_week",
    "Hour of Day": "hour_of_day",
    "Month": "month",
}
dropdown_imbalance = widgets.Dropdown(
    options=column_mapping_imbalance.keys(),
    description="Column:",
    style={"description_width": "initial"},
)

def update_plot(column_label: str) -> None:
    column = column_mapping_imbalance[column_label]
    plot_column_imbalance(df, column, column_label)

widgets.interactive(update_plot, column_label=dropdown_imbalance)

## Bank+Account Uniqueness

Between the datasets in the following subsesction, are account numbers unique? Given that they aren't, provide a method to make them unique.

### Intra-dataset Uniqueness

The goal is to determine whether the AMLworld synthetic data generates unique account numbers per dataset, which, if unique, might give us the opportunity to train on one dataset and test on another. This is done by streaming the data files and not by loading the files into a data frame because loading the large files into memory is not possible in many computation environments. Even so, this takes a long time.

In [None]:
def check_pairwise_dataset_uniqueness(
    dataset_a: str,
    dataset_b: str,
) -> None:
    hash_map = {}
    hash_map_aggregate = {}
    poor_account_uniqueness = "Account Uniqueness header mismatch"
    for i, dataset in enumerate([dataset_a, dataset_b]):
        print(f"Hashing: {dataset}")
        with open(
            os.path.join(data_dir, dataset), "r", encoding="utf-8",
        ) as file:
            header = True
            for line in file:
                columns = line.strip().split(",")

                # Checks that each data set is formatted with the account data
                # in the same location
                if header:
                    header = False
                    if (
                        columns[1] != "From Bank" or
                        columns[2] != "Account" or
                        columns[3] != "To Bank" or
                        columns[4] != "Account"
                    ):
                        raise ValueError(poor_account_uniqueness)
                    continue

                # Hash on both the from and the to account, keeping track of an
                # enumerated dataset
                for account in [columns[2], columns[4]]:
                    if account not in hash_map:
                        hash_map[account] = [i]
                    elif i not in hash_map[account]:
                        hash_map[account].append(i)

                # Hash on a combination of bank and account, for both the from
                # and to account/bank
                for bank_account in [
                    f"{columns[1]}_{columns[2]}",
                    f"{columns[3]}_{columns[4]}",
                ]:
                    if bank_account not in hash_map_aggregate:
                        hash_map_aggregate[bank_account] = [i]
                    elif i not in hash_map_aggregate[bank_account]:
                        hash_map_aggregate[bank_account].append(i)

    # Checks for duplicate accounts
    count = 0
    for account, datasets in hash_map.items():
        if len(datasets) > 1:
            count += 1
    n = len(hash_map)
    print(f"Hash map by account: {n}, duplicate accounts: {count}")
    print(f"Uniqueness by account: {round(100*(n-count)/n, 3)}%")

    # Checks for duplicate account, bank pairs
    count = 0
    for account, datasets in hash_map_aggregate.items():
        if len(datasets) > 1:
            count += 1
    n = len(hash_map_aggregate)
    print(f"Hash map by bank_account: {n}, duplicates: {count}")
    print(f"Uniqueness by bank_account: {round(100*(n-count)/n, 3)}%")

if check_dataset_uniqueness:
    check_pairwise_dataset_uniqueness(
        "LI-Medium_Trans.csv",
        "LI-Small_Trans.csv",
    )
    print("")
    check_pairwise_dataset_uniqueness(
        "HI-Medium_Trans.csv",
        "LI-Medium_Trans.csv",
    )
    print("")
    check_pairwise_dataset_uniqueness(
        "LI-Large_Trans.csv",
        "LI-Medium_Trans.csv",
    )
else:
    print("Skipped potentially lengthy uniqueness check.")

    # Keeping a snapshot of a previous analysis
    print("Data from a previous run:")
    print("""
Hashing: LI-Medium_Trans.csv
Hashing: LI-Small_Trans.csv
Hash map by account: 2721565, duplicate accounts: 16399
Uniqueness by account: 99.397%
Hash map by bank_account: 2737985, duplicates: 17
Uniqueness by bank_account: 99.999%

Hashing: HI-Medium_Trans.csv
Hashing: LI-Medium_Trans.csv
Hash map by account: 4047087, duplicate accounts: 61973
Uniqueness by account: 98.469%
Hash map by bank_account: 4094704, duplicates: 14414
Uniqueness by bank_account: 99.648%

Hashing: LI-Large_Trans.csv
Hashing: LI-Medium_Trans.csv
Hash map by account: 2054565, duplicate accounts: 2031886
Uniqueness by account: 1.104%
Hash map by bank_account: 2071157, duplicates: 2031918
Uniqueness by bank_account: 1.895%
    """)

### Create Uniqueness

For the given data set name, and for each bank and account number, create a unique identifier. This will ensure that if models are trained on one dataset, they can be transferred to or tested on other datasets without worrying that the model learned identifiers that happen to be non-distinct between the AMLworld datasets.

If the following is used to create unique entity identifiers between datasets, it needs to be applied to two datasets and tested (applying it to one of the larger datasets will be computationally intensive).

In [None]:
def h(value: str, length=8):
    return hashlib.sha256(value.encode()).hexdigest()[:length]

def generate_unique_identifiers(dataset_name, df):
    d = h(dataset_name)

    df["from_bank_hash"] = df["from_bank"].astype(str).map(h)
    df["from_account_hash"] = df["from_account"].astype(str).map(h)
    df["to_bank_hash"] = df["to_bank"].astype(str).map(h)
    df["to_account_hash"] = df["to_account"].astype(str).map(h)

    df["from_unique"] = d + "_" + df["from_bank_hash"] + "_" + df["from_account_hash"]
    df["to_unique"] = d + "_" + df["to_bank_hash"] + "_" + df["to_account_hash"]

    # Drop intermediate hash columns
    df.drop(
        columns=[
            "from_bank_hash",
            "from_account_hash",
            "to_bank_hash",
            "to_account_hash",
        ],
        inplace=True,
    )

    return df

if create_unique_identifiers:
    dataset_name = data_file.split("/")[-1]
    df = generate_unique_identifiers(dataset_name, df)
    df[[
        "from_bank",
        "from_account",
        "from_unique",
        "to_bank",
        "to_account",
        "to_unique",
    ]].head()
else:
    print("Skipped creating unique identifiers.")

## Customer Data

The dataset consists of a series of timestamped transactions with a sending and receiving customer. This section explores the length (in time) of the dataset and at the distribution of data available per customer.

### Currency Conversion

The given transaction dataset contains different currencies. To convert between these currencies, e.g. to convert every transaction into U.S. dollars, the currency conversion rates need to be derived from the dataset (live currency conversions will not apply to the dataset).

In [None]:
def extract_usd_conversion(
    currency_conversion: dict[str, dict[str, float]],
) -> dict[str, float]:
    """
    Given the result of `get_usd_conversion`, that is a dictionary that maps
    each currency to its currency conversion rates, where each set of
    conversion rates may be incomplete, attempt to return a complete set
    of currency conversion rates for U.S. dollars
    """
    raise NotImplementedError(
        "This 'extract_usd_conversion' has not been implemented, because the "
        "data from the small transaction set provides a complete set of "
        "U.S. dollar conversion rates."
    )


def exchange_rate_time_series(df: pd.DataFrame, ylabel: str) -> None:
    plt.figure(figsize=(14, 6))

    i = 0
    cmap = plt.get_cmap("tab20")
    for currency in df.columns:
        color = cmap(i)
        plt.plot(
            df.index,
            df[currency],
            marker=".",
            markersize=8,
            label=currency,
            color=color,
        )
        i += 1

    plt.xlabel("Date")
    plt.ylabel(ylabel)
    plt.title("Exchange Rates to USD Per Transaction")
    plt.legend(ncol=2)
    plt.xticks(rotation=45)
    plt.grid(True)
    plt.show()


def exchange_rate_heat_map(df: pd.DataFrame) -> None:
    plt.figure(figsize=(14, 6))
    ax = sns.heatmap(
        df,
        annot=True,
        cmap="PiYG",
        fmt=".1f",
        linewidths=0.5,
        vmin=-3,
        vmax=3,
    )
    plt.xticks(rotation=45, ha="right")
    plt.xlabel("Currencies")
    plt.ylabel("Dates")
    plt.title("Exchange Rates to USD, Daily % Change")
    plt.show()


def get_usd_conversion(
    dataset_dir: str,
    plot_exchange_rate_heat_map=False,
) -> dict[str, float]:
    """
    Given the name of a dataset, returns a currency conversion dictionary that
    will convert every value into U.S. dollars. The keys of the resultant
    `currency_conversion` are the magic currency strings used in the AMLworld
    datasets
    """
    poor_account_uniqueness = "Currency Conversion header mismatch"

    # If we can't assume that every currency converts to every other currency
    # in the dataset, we need to create a partial conversion map for each
    # currency, and then aggregate the results into a single map for U.S.
    # dollar conversions
    currencies = set()
    currency_conversion = {}
    conversion_series = {}
    with open(dataset_dir, "r", encoding="utf-8") as file:
        header = True
        for line in file:
            columns = line.strip().split(",")

            # Checks that each data set is formatted with the expected data in
            # the same position
            if header:
                header = False
                if (
                    columns[5] != "Amount Received" or
                    columns[6] != "Receiving Currency" or
                    columns[7] != "Amount Paid" or
                    columns[8] != "Payment Currency"
                ):
                    raise ValueError(poor_account_uniqueness)
                continue

            dt = datetime.strptime(columns[0], '%Y/%m/%d %H:%M')
            date = dt.strftime("%Y-%m-%d")

            if date not in currency_conversion:
                currency_conversion[date] = {}

            sent_amount = columns[7]
            sent_currency = columns[8]
            received_amount = columns[5]
            received_currency = columns[6]

            currencies.add(sent_currency)
            currencies.add(received_currency)

            # To convert the sent currency to the received currency, multiply
            # it by this value
            conversion_rate = float(received_amount)/float(sent_amount)

            if sent_currency not in currency_conversion[date]:
                currency_conversion[date][sent_currency] = {
                    sent_currency: 1.0
                }
            if received_currency not in currency_conversion[date][sent_currency]:
                currency_conversion[date][sent_currency][
                    received_currency
                ] = conversion_rate

            # Keep track of all exchange rates
            if sent_currency not in conversion_series:
                conversion_series[sent_currency] = {}
            if received_currency not in conversion_series[sent_currency]:
                conversion_series[sent_currency][
                    received_currency
                ] = []
            conversion_series[sent_currency][received_currency].append(
                (dt, conversion_rate)
            )

    if plot_exchange_rate_time_series:
        data = []
        for currency, values in conversion_series["US Dollar"].items():
            for dt, rate in values:
                data.append((dt, currency, rate))

        dft = pd.DataFrame(data, columns=["t", "Currency", "Exchange Rate"])
        df_pivot = dft.pivot_table(
            index="t",
            columns="Currency",
            values="Exchange Rate",
            aggfunc="mean",
        )
        exchange_rate_time_series(df_pivot, ylabel="Exchange Rates")

        dft["t"] = pd.to_datetime(dft["t"])
        df_pivot = dft.pivot_table(
            index="t",
            columns="Currency",
            values="Exchange Rate",
            aggfunc="mean",
        )
        df_pivot = df_pivot[df_pivot.index <= "2022-09-10"]
        df_pivot = df_pivot - df_pivot.mean()
        exchange_rate_time_series(df_pivot, ylabel="Zero-Mean Exchange Rates")


    if plot_exchange_rate_heat_map:
        data = []
        for currency, values in conversion_series["US Dollar"].items():
            for dt, rate in values:
                data.append((dt, currency, rate))

        dft = pd.DataFrame(data, columns=["t", "Currency", "Exchange Rate"])
        dft["t"] = pd.to_datetime(dft["t"])
        dft.set_index("t", inplace=True)
        dft = dft.groupby([pd.Grouper(freq="D"), "Currency"]).mean().reset_index()
        dft["t"] = dft["t"].dt.date
        df_pivot = dft.pivot_table(
            index="t",
            columns="Currency",
            values="Exchange Rate",
            aggfunc="mean",
        )

        # Sets the order to be aesthetic (the latter few days only have US
        # Dollar and Euro exchanges), although also eurocentric
        all_currencies = list(df_pivot.columns)
        fixed_order = ["US Dollar", "Euro", "Yuan"]
        remaining_currencies = [c for c in all_currencies if c not in fixed_order]
        random.shuffle(remaining_currencies)

        sorted_columns = fixed_order + remaining_currencies
        df_pivot = df_pivot[sorted_columns]

        exchange_rate_heat_map(df_pivot.diff())

    # See if we have a complete currency conversion map for U.S. dollars from
    # the first day, and use it as an approximation for the entire dataset
    usd_conversion = currency_conversion[
        list(currency_conversion.keys())[0]
    ]["US Dollar"]
    if set(usd_conversion.keys()) == currencies:
        return usd_conversion
    return extract_usd_conversion(currency_conversion)


def compare_usd_conversions(dataset_a: str, dataset_b: str) -> None:
    usd_conversion_a = get_usd_conversion(
        os.path.join(data_dir, dataset_a),
    )
    usd_conversion_b = get_usd_conversion(
        os.path.join(data_dir, dataset_b),
    )

    def compare_dictionaries(
        dict_a: dict[str, float], dict_b: dict[str, float],
    ) -> dict[str, int]:
        if set(dict_a.keys()) != set(dict_b.keys()):
            raise ValueError(
                "Cannot compare dictionaries with different sets of keys."
            )

        delta = {}
        for k, v in dict_a.items():
            delta[k] = f"{round(100*abs(v - dict_b[k])/v, 2)}%"

        return delta

    print(f"Percent difference between {dataset_a}, {dataset_b}:")
    pprint(compare_dictionaries(usd_conversion_a, usd_conversion_b))


if compare_dataset_conversion_rates:
    compare_usd_conversions("LI-Small_Trans.csv", "LI-Medium_Trans.csv")
    compare_usd_conversions("LI-Medium_Trans.csv", "LI-Large_Trans.csv")

if get_new_usd_conversion_rates:
    usd_conversion = get_usd_conversion(
        os.path.join(data_dir, dataset),
        plot_exchange_rate_heat_map=plot_exchange_rate_heat_map,
    )
else:
    usd_conversion = {
        "US Dollar": 1.0,
        "Euro": 0.8533787417099838,
        "Yuan": 6.697677681891531,
        "Yen": 105.3976841187823,
        "UK Pound": 0.7739872068230277,
        "Brazil Real": 5.646327447497649,
        "Australian Dollar": 1.4127728666786938,
        "Canadian Dollar": 1.319260431085624,
        "Ruble": 77.79226317392629,
        "Mexican Peso": 21.1287988422576,
        "Rupee": 73.44399970830806,
        "Swiss Franc": 0.9149993127687566,
        "Shekel": 3.3769999188170305,
        "Saudi Riyal": 3.751098012020342,
        "Bitcoin": 8.333333333333333e-05,
    }

### Extract Customer Snapshots

Creates a new dataframe where each row is a customer, and each column is an aggregate of that customer's transaction data. The goal is to train a non-graph-based classification model on customer snapshots that can detect good (licit) customers, and to filter these out prior to feeding data into the graph-based model.

This is meant to be a form of knowledge-driven undersampling, where the filter model will detect licit customers (positives), maximizing the true negative rate (illicit customers) to ensure these make it to the graph model.

In [None]:
def aggregate_customer_data(
    dataset: str,
    conversion: dict[str, float],
) -> pd.DataFrame:
    hash_map = {}
    base_customer_data = {
        "n_sent": 0,
        "total_sent": 0,
        "first_sent": float("inf"),
        "last_sent": float("-inf"),
        "sent_type_counts": dict(),
        "n_destinations": set(),
        "n_received": 0,
        "total_received": 0,
        "first_received": float("inf"),
        "last_received": float("-inf"),
        "received_type_counts": dict(),
        "n_sources": set(),
        "is_launderer": 0,
    }
    payment_type_map = {
        "Reinvestment": "reinvestment",
        "Cheque": "cheque",
        "ACH": "ach",
        "Credit Card": "credit_card",
        "Wire": "wire",
        "Cash": "cash",
        "Bitcoin": "bitcoin",
    }
    currency_map = {
        "US Dollar": "usd",
        "Euro": "euro",
        "Yuan": "yuan",
        "Yen": "yen",
        "UK Pound": "pound",
        "Brazil Real": "real",
        "Australian Dollar": "ausd",
        "Canadian Dollar": "cd",
        "Ruble": "ruble",
        "Mexican Peso": "peso",
        "Rupee": "rupee",
        "Swiss Franc": "franc",
        "Shekel": "shekel",
        "Saudi Riyal": "riyal",
        "Bitcoin": "btc",
    }
    day_names = [x.lower() for x in list(calendar.day_name)]
    hours = [f"hour_{x}" for x in range(24)]

    # Add sent and received currency and day of week columns; currency will be
    # one-hot encoded for which currencies each customer uses, day of week will
    # be the count of sent and received transactions on those days
    for prefix in ["sent", "received"]:
        for k, v in currency_map.items():
            base_customer_data[f"{prefix}_{v}"] = 0
        for day_of_week in day_names:
            base_customer_data[f"{prefix}_{day_of_week}"] = 0
        for hour in hours:
            base_customer_data[f"{prefix}_{hour}"] = 0

    poor_account_uniqueness = "Customer Aggregation header mismatch"

    with open(
        os.path.join(data_dir, dataset), "r", encoding="utf-8",
    ) as file:
        header = True
        for line in file:
            columns = line.strip().split(",")

            # Checks that each data set is formatted with the expected data in
            # the same position
            if header:
                header = False
                if (
                    columns[1] != "From Bank" or
                    columns[2] != "Account" or
                    columns[3] != "To Bank" or
                    columns[4] != "Account" or
                    columns[5] != "Amount Received" or
                    columns[6] != "Receiving Currency" or
                    columns[7] != "Amount Paid" or
                    columns[8] != "Payment Currency" or
                    columns[9] != "Payment Format" or
                    columns[10] != "Is Laundering"
                ):
                    raise ValueError(poor_account_uniqueness)
                continue

            id_from = f"{columns[1]}_{columns[2]}"
            id_to = f"{columns[3]}_{columns[4]}"

            if id_from not in hash_map:
                hash_map[id_from] = deepcopy(base_customer_data)
            if id_to not in hash_map:
                hash_map[id_to] = deepcopy(base_customer_data)

            hash_map[id_from]["n_destinations"].add(id_to)
            hash_map[id_to]["n_sources"].add(id_from)

            sent_currency = currency_map[columns[8]]
            hash_map[id_from][f"sent_{sent_currency}"] = 1
            received_currency = currency_map[columns[6]]
            hash_map[id_from][f"received_{received_currency}"] = 1

            amount_sent = round(float(columns[7])/conversion[columns[8]], 1)
            amount_received = round(float(columns[5])/conversion[columns[6]], 1)

            hash_map[id_from]["n_sent"] += 1
            hash_map[id_to]["n_received"] += 1
            hash_map[id_from]["total_sent"] += amount_sent
            hash_map[id_to]["total_received"] += amount_received


            dt = datetime.strptime(columns[0], '%Y/%m/%d %H:%M')
            unix_time = int(dt.timestamp())
            day_of_week = dt.strftime("%A").lower()
            hour_of_day = f"hour_{dt.hour}"

            if unix_time < hash_map[id_from]["first_sent"]:
                hash_map[id_from]["first_sent"] = unix_time
            if unix_time > hash_map[id_from]["last_sent"]:
                hash_map[id_from]["last_sent"] = unix_time
            if unix_time < hash_map[id_to]["first_received"]:
                hash_map[id_to]["first_received"] = unix_time
            if unix_time > hash_map[id_to]["last_received"]:
                hash_map[id_to]["last_received"] = unix_time

            hash_map[id_from][f"sent_{day_of_week}"] += 1
            hash_map[id_to][f"received_{day_of_week}"] += 1

            hash_map[id_from][f"sent_{hour_of_day}"] += 1
            hash_map[id_to][f"received_{hour_of_day}"] += 1

            payment_type = payment_type_map[columns[9]]
            if payment_type not in hash_map[id_from]["sent_type_counts"]:
                hash_map[id_from]["sent_type_counts"][payment_type] = 1
            else:
                hash_map[id_from]["sent_type_counts"][payment_type] += 1
            if payment_type not in hash_map[id_to]["received_type_counts"]:
                hash_map[id_to]["received_type_counts"][payment_type] = 1
            else:
                hash_map[id_to]["received_type_counts"][payment_type] += 1

            # TODO: do we consider both the sender and receiver a money
            # launderer for participating in the transaction?
            if int(columns[10]):
                hash_map[id_from]["is_launderer"] = 1
                hash_map[id_to]["is_launderer"] = 1

    # Might want to transform this to a dataframe
    return pd.DataFrame.from_dict(hash_map, orient="index")


dfc = aggregate_customer_data(
    dataset=dataset,
    conversion=usd_conversion,
)

# Name the index; it shouldn't be used in an analysis
dfc.index.name = "bank_account"

# Don't keep track of sets of sources or destinations, just the number of each
dfc["n_destinations"] = dfc["n_destinations"].apply(len)
dfc["n_sources"] = dfc["n_sources"].apply(len)

# Convert the set of currencies into one-hot encoded columns

# Get the amount of time the account has been active.
# Then remove the first and last sent and received columns
last = ["last_sent", "last_received"]
first = ["first_sent", "first_received"]
dfc["transaction_length"] = dfc[last].max(axis=1) - dfc[first].min(axis=1)
dfc = dfc.drop(columns=last+first)

# Include the average sent and received transaction sizes
dfc.fillna(0, inplace=True)
dfc["avg_sent"] = np.where(
    dfc["n_sent"] < 1,
    0,
    dfc["total_sent"] / dfc["n_sent"],
)
dfc["avg_received"] = np.where(
    dfc["n_received"] < 1,
    0,
    dfc["total_received"] / dfc["n_received"],
)

# Since we are dealing with currency, round each column to 2 decimal places
dfc = dfc.round(2)

# Create a separate column or feature for each payment type count
dfc_sent_types = (
    pd.json_normalize(dfc["sent_type_counts"])
        .add_prefix("n_sent_")
        .set_index(dfc.index)
)
dfc_sent_types.fillna(0, inplace=True)
dfc = dfc.drop(columns=["sent_type_counts"]).join(dfc_sent_types)
dfc_received_types = (
    pd.json_normalize(dfc["received_type_counts"])
    .add_prefix("n_received_")
    .set_index(dfc.index)
)
dfc_received_types.fillna(0, inplace=True)
dfc = dfc.drop(columns=["received_type_counts"]).join(dfc_received_types)

# The "positive" for a knowledge-drive undersampler is one who does not launder
dfc["is_good_customer"] = dfc["is_launderer"] ^ 1

In [None]:
# Prints a single row in the dataframe, in case it helps to spot check the data
row_dict = {
    col: list(val.values())[0]
    if isinstance(val, dict)
    else val
    for col, val in dfc.iloc[0].to_dict().items()
}
pprint(row_dict)

In [None]:
column_selector = widgets.Dropdown(
    options=dfc.columns,
    description="Feature:",
    style={'description_width': 'initial'}
)

n_drop_slider = widgets.IntSlider(
    value=25,
    min=0,
    max=10000,
    step=1,
    description="Exclude top:",
    style={'description_width': 'initial'}
)

n_bins_slider = widgets.IntSlider(
    value=50,
    min=2,
    max=250,
    step=1,
    description="Histogram bins:",
    style={'description_width': 'initial'}
)


def plot_customer_histogram(column, n_drop=25, n_bins=50):
    dfiltered = dfc.drop(dfc.nlargest(n_drop, column).index)[column]

    plt.figure(figsize=(8, 5))
    plt.hist(dfiltered, bins=n_bins, alpha=0.7, edgecolor='black')
    plt.xlabel(column)
    plt.ylabel("Frequency")
    plt.title(f"Histogram of {column}")
    plt.grid(True)
    plt.show()


widgets.interactive(
    plot_customer_histogram,
    column=column_selector,
    n_drop=n_drop_slider,
    n_bins=n_bins_slider,
)

In [None]:
# TODO: add a noramlization step here that normalizes each column, do we need
# to use a log-based normalization, given how imbalanced the distributions in
# the above histograms are?

In [None]:
# TODO: can we abstract this portion of the code away, similar to what is done
# this Kaggle notebook:
#
#   - <https://www.kaggle.com/code/caesarmario/listen-to-your-heart-a-disease-prediction>
#
from xgboost import XGBClassifier
from sklearn.metrics import (
    average_precision_score,
    auc,
    classification_report,
    confusion_matrix,
    precision_recall_curve,
    roc_auc_score,
    roc_curve,
)
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression

set_model = "xgb"


X = dfc.drop(columns=["is_launderer", "is_good_customer"])
y = dfc["is_good_customer"]

X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    stratify=y,
)

if set_model == "logistic":
    model = LogisticRegression(
        solver="liblinear",
        class_weight="balanced",
        max_iter=500,
    )
elif set_model == "xgb":
    model = XGBClassifier(
        objective="binary:logistic",
        # scale_pos_weight=y_train.value_counts()[0] / y_train.value_counts()[1],
        eval_metric="logloss",
        n_estimators=100,
        learning_rate=0.1,
        max_depth=8,
        random_state=42,
    )

model.fit(X_train, y_train)

y_pred_test = model.predict(X_test)

In [None]:
from ipywidgets import interact, FloatSlider

y_probs = model.predict_proba(X_test)[:, 1]

def update_confusion_matrix(threshold):
    y_pred_test = (y_probs >= threshold).astype(int)

    tn, fp, fn, tp = confusion_matrix(y_test, y_pred_test).ravel()

    print(f"Threshold: {threshold:.3f}")
    print(f"True Negatives (TN): {tn}")
    print(f"False Positives (FP): {fp}")
    print(f"False Negatives (FN): {fn}")
    print(f"True Positives (TP): {tp}")


interact(
    update_confusion_matrix,
    threshold=FloatSlider(
        min=0.0,
        max=1.0,
        step=0.001,
        value=0.8,
        readout_format=".3f",
    ),
)

In [None]:
print(classification_report(y_test, y_pred_test))

In [None]:
fpr, tpr, _ = roc_curve(y_test, y_pred_test)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color="blue", lw=2, label=f"ROC curve (AUC = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], color="gray", linestyle="--")  # Random classifier
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver Operating Characteristic (ROC) Curve")
plt.legend(loc="lower right")
plt.show()

In [None]:
precision, recall, _ = precision_recall_curve(y_test, y_pred_test)
avg_precision = average_precision_score(y_test, y_pred_test)

# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, marker='.', label=f'AP = {avg_precision:.2f}')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend()
plt.grid()
plt.show()

In [None]:
# Some next steps to consider:
#
#   - Confirm that `from_unique` is the same for the same from bank and account
#   - For each `from_unique` get the first and last transaction
#   - Show a distribution of this to get a sense of customer history
#   - Do logistic regression on these customer snapshots
#
#   - Be sure to one-hot encode categorical features
#
# Once categorical data is converted into numerical data, do:
#
#   df.corr()
#