In [1]:
import pandas as pd
import numpy as np
import ctypes
from joblib import Parallel, delayed
import uuid

In [6]:
n_rows=1000000
n_groups=10
n_accounts=100

df = pd.DataFrame({
    'buyer': np.random.randint(0, n_accounts, n_rows),
    'seller': np.random.randint(0, n_accounts, n_rows),
    'transactionHash': [str(uuid.uuid4()) for _ in range(n_rows)],
    'amount': np.random.rand(n_rows) * 1000,
    'wash_label': pd.NA,
    'Group': np.random.randint(0, n_groups, n_rows)
})

df.head()

Unnamed: 0,buyer,seller,transactionHash,amount,wash_label,Group
0,0,34,d86ac875-f527-4eb3-968e-6d1dadd972c8,407.52201,,1
1,19,10,185b2346-6e11-4239-b184-02e74ca2aad5,761.858093,,0
2,93,43,b05e5411-7f3e-46a8-b375-59f59b0ca2f1,224.237718,,9
3,69,33,208b6867-3c19-4fae-b4f2-070f45deeb32,615.789268,,3
4,99,26,8e1c6169-ad5a-47ba-80be-4f6411ecd77e,715.551534,,1


In [7]:
def detect_label_wash_trades(df: pd.DataFrame, margin: float = 0.01):
    if df.empty:
        print("returning early")
        return []

    all_ids = pd.concat([df['buyer'], df['seller']])
    id_map = {id_: i for i, id_ in enumerate(all_ids.unique())}

    buyers_remapped = df['buyer'].map(id_map).astype(np.int32).to_numpy()
    sellers_remapped = df['seller'].map(id_map).astype(np.int32).to_numpy()
    amounts = df['amount'].astype(np.float64).to_numpy()
    n = len(df)
    num_unique_ids = len(id_map)

    result_flags = (ctypes.c_int * n)()

    lib = ctypes.CDLL('./test.dll')
    lib.detect_label_wash_trades.argtypes = [
        ctypes.POINTER(ctypes.c_int),
        ctypes.POINTER(ctypes.c_int),
        ctypes.POINTER(ctypes.c_double),
        ctypes.c_int,
        ctypes.c_double,
        ctypes.POINTER(ctypes.c_int),
        ctypes.c_int
    ]
    lib.detect_label_wash_trades.restype = ctypes.c_int

    lib.detect_label_wash_trades(
        buyers_remapped.ctypes.data_as(ctypes.POINTER(ctypes.c_int)),
        sellers_remapped.ctypes.data_as(ctypes.POINTER(ctypes.c_int)),
        amounts.ctypes.data_as(ctypes.POINTER(ctypes.c_double)),
        n,
        margin,
        result_flags,
        num_unique_ids
    )

    wash_trade_hashes = df.loc[
        np.frombuffer(result_flags, dtype=np.int32).astype(bool),
        'transactionHash'
    ].tolist()
    return wash_trade_hashes

In [8]:
# Group and run in parallel
grouped = df.groupby('Group')

scc_wash_trades_all = Parallel(n_jobs=4)(
    delayed(detect_label_wash_trades)(group.copy().reset_index(drop=True)) for _, group in grouped
)

# scc_wash_trades_all = [detect_label_wash_trades(group.copy().reset_index(drop=True)) for _, group in grouped]

# Flatten result
all_hashes = [tx for sublist in scc_wash_trades_all for tx in sublist]

# Set wash_label to True where transactionHash is flagged
df.loc[df['transactionHash'].isin(all_hashes), 'wash_label'] = True

In [9]:
matching_rows = df[df['buyer'] == df['seller']]

correctly_flagged = df[(df['buyer'] == df['seller']) & (df['wash_label'] == True)]

missed_flags = df[(df['buyer'] == df['seller']) & (df['wash_label'] == False)]

incorrect_flags = df[(df["buyer"] != df["seller"]) & (df["wash_label"] == True)]

print("Total buyer==seller:", matching_rows.shape[0])
print()
print("Correctly flagged:", correctly_flagged.shape[0])
print()
print("Missed (false negatives):", missed_flags.shape[0])
print()
print("Incorrectly (false positives):", incorrect_flags.shape[0])


Total buyer==seller: 10065

Correctly flagged: 10065

Missed (false negatives): 0

Incorrectly (false positives): 0
