In [1]:
#TODO: Data Processing
# ==============================================================
# Input:  raw/transactions_raw.parquet, labels/labels.parquet,
#         processed/nodes.parquet (for contract filtering)
# Output: processed/formatted_transactions.parquet
#         processed/node_labels.parquet
#         processed/data_splits.json
# ==============================================================
from google.colab import drive
drive.mount('/content/drive')

import pandas as pd
import json
import gc

Mounted at /content/drive


In [2]:
import os
import pyarrow.parquet as pq

shared_folder_path = '/content/drive/MyDrive/Math_168_Folder'
os.chdir(shared_folder_path)
DATA_DIR = "/content/drive/MyDrive/Math_168_Folder/data"
OUTPUT_DIR = os.path.join(DATA_DIR, "processed")

# Input files (from Rust pipeline)
TRANSACTIONS_PATH = os.path.join(DATA_DIR, "raw", "transactions_raw.parquet")
LABELS_PATH       = os.path.join(DATA_DIR, "labels", "labels.parquet")
NODES_PATH        = os.path.join(DATA_DIR, "processed", "nodes.parquet")

print(f"File size: {os.path.getsize(TRANSACTIONS_PATH) / 1e9:.2f} GB")

meta = pq.read_metadata(TRANSACTIONS_PATH)
print(f"Rows: {meta.num_rows:,}")
print(f"Columns: {meta.num_columns}")

File size: 5.58 GB
Rows: 90,728,163
Columns: 5


In [3]:
# ================================================================
# Load raw data
# ================================================================
# Read the two input parquet files produced by the Rust pipeline.
# transactions_raw columns: tx_hash, block_timestamp, from_address,
#                           to_address, value_wei  (all strings)
# labels columns: address, label, label_source, source_url, retrieved_at
# ================================================================

chunks = []
parquet_file = pq.ParquetFile(TRANSACTIONS_PATH)

for batch in parquet_file.iter_batches(batch_size=1_000_000):
    chunk = batch.to_pandas()
    # Filter invalid rows immediately to save memory
    chunk = chunk[
        chunk["from_address"].notna() & (chunk["from_address"] != "") &
        chunk["to_address"].notna()   & (chunk["to_address"]   != "")
    ]
    chunks.append(chunk)
    print(f"  Processed batch — kept {len(chunk):,} rows", end="\r")

df = pd.concat(chunks, ignore_index=True)
del chunks
gc.collect()

print(f"\nLoaded {len(df):,} valid rows")

labels = pd.read_parquet(LABELS_PATH)

print(f"Raw transactions loaded: {len(df):,} rows")
print(f"Labels loaded:           {len(labels):,} sanctioned addresses")
print()

# Quick data-quality report
n_empty_from = (df["from_address"].isna() | (df["from_address"] == "")).sum()
n_empty_to   = (df["to_address"].isna()   | (df["to_address"]   == "")).sum()
print(f"Empty/null from_address: {n_empty_from:,} / {len(df):,}")
print(f"Empty/null to_address:   {n_empty_to:,} / {len(df):,}")
print()


Loaded 90,728,163 valid rows
Raw transactions loaded: 90,728,163 rows
Labels loaded:           82 sanctioned addresses

Empty/null from_address: 0 / 90,728,163
Empty/null to_address:   0 / 90,728,163



In [4]:
# ================================================================
# Step 1: Filter Out Contract Addresses
# ================================================================
# Load nodes.parquet from the Rust pipeline (Stage 4: build-nodes).
# It contains: address, is_contract (bool), degree_in, degree_out, node_type.
# Remove all transaction rows where EITHER from_address or to_address
# is a smart contract. This keeps only EOA-to-EOA transactions.
# ================================================================

nodes_pf = pq.ParquetFile(NODES_PATH)

# Only read the columns we need to save memory
contract_addresses = set()
for batch in nodes_pf.iter_batches(batch_size=1_000_000, columns=["address", "is_contract"]):
    chunk = batch.to_pandas()
    contracts = chunk.loc[chunk["is_contract"] == True, "address"]
    contract_addresses.update(contracts)
    print(f"  Scanned batch — found {len(contract_addresses):,} contracts so far", end="\r")

print(f"\nTotal contract addresses found: {len(contract_addresses):,}")

before = len(df)
df = df[
    ~df["from_address"].isin(contract_addresses) &
    ~df["to_address"].isin(contract_addresses)
]
df = df.reset_index(drop=True)
after = len(df)

print(f"\nStep 1 — Filter contract addresses")
print(f"  Before: {before:,}  After: {after:,}  Removed: {before - after:,}")
print(f"  Percentage removed: {(before - after) / before * 100:.2f}%")
print()

del contract_addresses
gc.collect()

  Scanned batch — found 721,891 contracts so far
Total contract addresses found: 721,891

Step 1 — Filter contract addresses
  Before: 90,728,163  After: 69,657,566  Removed: 21,070,597
  Percentage removed: 23.22%



0

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
# ================================================================
# Step 2: Create Address-to-ID Mapping
# ================================================================
# Collect every unique address from both from_address and to_address.
# Sort them so the mapping is deterministic, then assign each one
# a numeric ID starting from 0. Add from_id / to_id columns.
# ================================================================

all_addresses = pd.concat([df["from_address"], df["to_address"]]).unique()
address_to_id = {addr: i for i, addr in enumerate(sorted(all_addresses))}

df["from_id"] = df["from_address"].map(address_to_id)
df["to_id"]   = df["to_address"].map(address_to_id)

print(f"Step 2 — Address-to-ID mapping")
print(f"  Unique addresses: {len(address_to_id):,}")
print(f"  ID range: 0 .. {len(address_to_id) - 1}")
print()

Step 2 — Address-to-ID mapping
  Unique addresses: 30,378,276
  ID range: 0 .. 30378275



In [7]:
# ================================================================
# Step 3: Convert Timestamps
# ================================================================
# Parse block_timestamp from ISO-8601 strings to datetime objects.
# Sort the dataframe by timestamp ascending (earliest first).
# Subtract the earliest timestamp so the first row has Timestamp=0
# and all other values are seconds elapsed since that first tx.
# ================================================================

df["block_timestamp"] = pd.to_datetime(df["block_timestamp"])
df = df.sort_values("block_timestamp").reset_index(drop=True)

t_min = df["block_timestamp"].min()
df["Timestamp"] = (df["block_timestamp"] - t_min).dt.total_seconds().astype(int)

print(f"Step 3 — Convert timestamps")
print(f"  Earliest: {t_min}")
print(f"  Latest:   {df['block_timestamp'].max()}")
print(f"  Span:     {df['Timestamp'].max():,} seconds")
print(f"  First Timestamp value: {df['Timestamp'].iloc[0]}")
print()

df.drop(columns=["block_timestamp"], inplace=True)
gc.collect()

Step 3 — Convert timestamps
  Earliest: 2015-08-07 05:01:09+00:00
  Latest:   2026-02-19 19:21:47+00:00
  Span:     332,605,238 seconds
  First Timestamp value: 0



17

In [8]:
# ================================================================
# Step 4: Convert Values (Wei -> ETH)
# ================================================================
# value_wei is a string in wei (1 ETH = 1e18 wei).
# Convert to float ETH. Store as both "Amount Sent" and
# "Amount Received" — they are equal for direct ETH transfers.
# ================================================================

WEI_PER_ETH = 10**18

df["Amount Sent"]     = df["value_wei"].astype(float) / WEI_PER_ETH
df["Amount Received"] = df["Amount Sent"]

print(f"Step 4 — Wei to ETH conversion")
print(f"  Min:  {df['Amount Sent'].min():.8f} ETH")
print(f"  Max:  {df['Amount Sent'].max():.8f} ETH")
print(f"  Mean: {df['Amount Sent'].mean():.8f} ETH")
print()

df.drop(columns=["value_wei"], inplace=True)
gc.collect()


Step 4 — Wei to ETH conversion
  Min:  0.00000000 ETH
  Max:  1400000.00000000 ETH
  Mean: 14.61243847 ETH



7

In [9]:
# ================================================================
# Step 5: Add Edge Labels
# ================================================================
# Build a set of the 82 OFAC-sanctioned addresses from labels.
# For each transaction edge:
#   - Is Laundering = 1  if from_address OR to_address is sanctioned
#   - Is Laundering = 0  if NEITHER endpoint is sanctioned
# Edges between two clean addresses are intentionally kept as 0 —
# they give the GNN examples of normal graph structure.
# ================================================================

sanctioned_set = set(labels["address"])

df["Is Laundering"] = (
    df["from_address"].isin(sanctioned_set) |
    df["to_address"].isin(sanctioned_set)
).astype(int)

n_launder = (df["Is Laundering"] == 1).sum()
n_clean   = (df["Is Laundering"] == 0).sum()
print(f"Step 5 — Edge labels")
print(f"  Sanctioned addresses: {len(sanctioned_set)}")
print(f"  Laundering edges (1): {n_launder:,}")
print(f"  Clean edges (0):      {n_clean:,}")
print()

df.drop(columns=["from_address", "to_address"], inplace=True)
gc.collect()

Step 5 — Edge labels
  Sanctioned addresses: 82
  Laundering edges (1): 9,602
  Clean edges (0):      69,647,964



7

In [10]:
# ================================================================
# Step 6: Build Formatted Transactions File
# ================================================================
# Assemble the final dataframe with columns in this order:
#   EdgeID, from_id, to_id, Timestamp, Amount Sent, Sent Currency,
#   Amount Received, Received Currency, Payment Format, Is Laundering
#
# - EdgeID = row index starting at 0
# - Sent Currency, Received Currency, Payment Format = always 1
#   (single-chain ETH data; kept for IBM Multi-GNN format compatibility)
#
# Save to: OUTPUT_DIR/formatted_transactions.parquet
# ================================================================

SENT_CURRENCY_CODE = 1
RECEIVED_CURRENCY_CODE = 1
PAYMENT_FORMAT_CODE = 1

formatted = pd.DataFrame({
    "EdgeID":            range(len(df)),
    "from_id":           df["from_id"].astype(int),
    "to_id":             df["to_id"].astype(int),
    "Timestamp":         df["Timestamp"].astype(int),
    "Amount Sent":       df["Amount Sent"].astype(float),
    "Sent Currency":     SENT_CURRENCY_CODE,
    "Amount Received":   df["Amount Received"].astype(float),
    "Received Currency": RECEIVED_CURRENCY_CODE,
    "Payment Format":    PAYMENT_FORMAT_CODE,
    "Is Laundering":     df["Is Laundering"].astype(int),
})

formatted_path = os.path.join(OUTPUT_DIR, "formatted_transactions.parquet")
formatted.to_parquet(formatted_path, index=False)

print("Step 6 — Formatted transactions saved")
print(f"  Path: {formatted_path}")
print(f"  Rows: {len(formatted):,}")
print(f"  Columns: {list(formatted.columns)}")
print()

n_edges = len(formatted)  # save for Step 8

del df, formatted
gc.collect()
print("Memory freed before Step 7")

Step 6 — Formatted transactions saved
  Path: /content/drive/MyDrive/Math_168_Folder/data/processed/formatted_transactions.parquet
  Rows: 69,657,566
  Columns: ['EdgeID', 'from_id', 'to_id', 'Timestamp', 'Amount Sent', 'Sent Currency', 'Amount Received', 'Received Currency', 'Payment Format', 'Is Laundering']

Memory freed before Step 7


In [11]:
# ================================================================
# Step 7: Build Node Labels File
# ================================================================
# For each address in address_to_id:
#   - node_id = address_to_id[address]
#   - is_sanctioned = 1 if address in sanctioned_set, else 0
#
# Since contracts were filtered in Step 1, only EOA addresses remain.
#
# Columns: node_id, is_sanctioned
# Save to: OUTPUT_DIR/node_labels.parquet
# ================================================================

node_labels = pd.DataFrame({
    "node_id": list(address_to_id.values()),
    "is_sanctioned": [1 if addr in sanctioned_set else 0 for addr in address_to_id.keys()],
}).sort_values("node_id").reset_index(drop=True)

node_labels_path = os.path.join(OUTPUT_DIR, "node_labels.parquet")
node_labels.to_parquet(node_labels_path, index=False)

n_sanctioned = node_labels['is_sanctioned'].sum()
n_total = len(node_labels)
ratio = (n_total - n_sanctioned) / max(n_sanctioned, 1)

print("Step 7 — Node labels saved")
print(f"  Path: {node_labels_path}")
print(f"  Rows (nodes): {n_total:,}")
print(f"  Sanctioned nodes: {n_sanctioned:,}")
print(f"  Non-sanctioned nodes: {n_total - n_sanctioned:,}")
print(f"  Class imbalance ratio: {ratio:,.0f}:1")
print()

Step 7 — Node labels saved
  Path: /content/drive/MyDrive/Math_168_Folder/data/processed/node_labels.parquet
  Rows (nodes): 30,378,276
  Sanctioned nodes: 69
  Non-sanctioned nodes: 30,378,207
  Class imbalance ratio: 440,264:1



In [12]:
# ================================================================
# Step 8: Create Train/Validation Split
# ================================================================
# Data is sorted by time (Step 3). Temporal split:
#   - Train = first 80% of edges (chronologically earliest)
#   - Val   = last 20% of edges (chronologically latest)
# This prevents temporal data leakage.
#
# No test split — in a transfer learning setup the test set comes
# from a different domain (different blockchain).
#
# Save as: OUTPUT_DIR/data_splits.json
# ================================================================

split_point = int(n_edges * 0.8)
train_edge_ids = list(range(0, split_point))
val_edge_ids   = list(range(split_point, n_edges))

splits = {
    "train_edge_ids": train_edge_ids,
    "val_edge_ids": val_edge_ids,
}

splits_path = os.path.join(OUTPUT_DIR, "data_splits.json")
with open(splits_path, "w") as f:
    json.dump(splits, f)

print("Step 8 — Data splits saved")
print(f"  Path: {splits_path}")
print(f"  Train edges: {len(train_edge_ids):,} ({len(train_edge_ids)/n_edges*100:.1f}%)")
print(f"  Val edges:   {len(val_edge_ids):,} ({len(val_edge_ids)/n_edges*100:.1f}%)")
print()

Step 8 — Data splits saved
  Path: /content/drive/MyDrive/Math_168_Folder/data/processed/data_splits.json
  Train edges: 55,726,052 (80.0%)
  Val edges:   13,931,514 (20.0%)

