In [1]:
%load_ext autoreload
%autoreload 2

In [4]:
import pandas as pd
import numpy as np
import torch
import dgl
# import dgl.graphbolt as gb
from dataclasses import dataclass
from functools import partial
# from tqdm import tqdm
from IPython.display import display
from typing import Dict, Tuple, Optional, List
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from eda_src.data_loader import load_transactions, load_accounts, load_patterns
from eda_src.feature_engineering import apply_feature_eng_accounts, apply_feature_eng_transactions
from models import AMLModelTrainer, GraphAttentionNetwork
from sklearn.model_selection import train_test_split

In [5]:
dataset_name = "HI-Small"

print(f"Loading {dataset_name}...\n")
transactions_df = apply_feature_eng_transactions(load_transactions(dataset_size=dataset_name))
accounts_df = apply_feature_eng_accounts(load_accounts(dataset_size=dataset_name))
# patterns_df = load_patterns(dataset_size=dataset_name)

display(transactions_df.head(1))
display(accounts_df.head(1))
# display(patterns_df.head(1))

Loading HI-Small...


Loading transactions from: /home/bernard/.cache/kagglehub/datasets/ealtman2019/ibm-transactions-for-anti-money-laundering-aml/versions/8/HI-Small_Trans.csv
File size: 453.6 MB

Loaded 5,078,345 transactions
Date range: 2022-09-01 00:00:00 to 2022-09-18 16:18:00
Laundering transactions: 5,177 (0.102%)

Loading accounts from: /home/bernard/.cache/kagglehub/datasets/ealtman2019/ibm-transactions-for-anti-money-laundering-aml/versions/8/HI-Small_accounts.csv

Loaded 518,581 accounts from 30470 banks


Unnamed: 0,from_bank_account_id,to_bank_account_id,log_amount_paid,log_amount_received,amount_ratio,amount_diff,currency_match,same_bank,is_laundering
0,10_8000EBD30,10_8000EBD30,3.567889,3.567889,-0.005122,0.002653,1,1,0


Unnamed: 0,bank_account_id,entity_type_Corporation,entity_type_Country,entity_type_Direct,entity_type_Individual,entity_type_Partnership,entity_type_Sole Proprietorship
0,331579_80B779D80,0,0,0,0,0,1


In [None]:
class AMLGraphLoader:
    """
    Handles loading data, building the DGL graph, and creating data loaders.
    
    The full graph 'g' is kept on the CPU. The EdgeDataLoader will sample
    neighborhoods and create mini-batch 'blocks' that are moved to the GPU
    during training, respecting the memory constraints.
    """
    def __init__(self, accounts_df, transactions_df):
        self.accounts_df = accounts_df
        self.transactions_df = transactions_df
        self.g = None
        self._build_graph()
        self.train_mask = None
        self.val_mask = None
        self.test_mask = None

    def _build_graph(self):
        print("Building graph...")
        
        # 1. Map string account IDs to contiguous integer node IDs
        # Using pd.Categorical is efficient for this
        self.accounts_df['node_id'] = self.accounts_df['bank_account_id'].astype('category').cat.codes
        account_id_map = self.accounts_df.set_index('bank_account_id')['node_id']
        
        self.transactions_df['src_nid'] = self.transactions_df['from_bank_account_id'].map(account_id_map)
        self.transactions_df['dst_nid'] = self.transactions_df['to_bank_account_id'].map(account_id_map)

        # Drop any transactions where mapping failed (e.g., account not in accounts_df)
        self.transactions_df = self.transactions_df.dropna(subset=['src_nid', 'dst_nid'])
        
        # Get source and destination node IDs as tensors
        src_nids = torch.tensor(self.transactions_df['src_nid'].values, dtype=torch.long)
        dst_nids = torch.tensor(self.transactions_df['dst_nid'].values, dtype=torch.long)
        
        # 2. Create the DGL graph on the CPU
        num_nodes = len(self.accounts_df)
        self.g = dgl.graph((src_nids, dst_nids), num_nodes=num_nodes)
        
        # 3. Add node features
        # Ensure features are sorted by the new 'node_id'
        node_features_df = self.accounts_df.sort_values('node_id')
        node_feats_tensor = torch.tensor(np.stack(node_features_df[
            [
                'entity_type_Corporation', 
                'entity_type_Country',
                'entity_type_Direct', 
                'entity_type_Individual',
                'entity_type_Partnership', 
                'entity_type_Sole Proprietorship'
            ]].values), dtype=torch.float32)
        self.g.ndata['feat'] = node_feats_tensor
        
        # 4. Add edge features and labels
        # Tensors are created in the same order as the transactions_df, which matches edge order
        edge_feats_tensor = torch.tensor(np.stack(self.transactions_df[
            [
                'log_amount_paid',
                'log_amount_received', 
                'amount_ratio', 
                'amount_diff', 
                'currency_match',
                'same_bank', 
                # 'payment_format_ACH',
                # 'payment_format_Bitcoin', 
                # 'payment_format_Cash',
                # 'payment_format_Cheque', 
                # 'payment_format_Credit Card',
                # 'payment_format_Reinvestment', 
                # 'payment_format_Wire',
                # 'receiving_currency_Australian Dollar', 
                # 'receiving_currency_Bitcoin',
                # 'receiving_currency_Canadian Dollar',
                # 'receiving_currency_Euro',
                # 'receiving_currency_Mexican Peso',
                # 'receiving_currency_Ruble',
                # 'receiving_currency_Rupee',
                # 'receiving_currency_UK Pound',
                # 'receiving_currency_US Dollar',
                # 'receiving_currency_Yen',
                # 'receiving_currency_Yuan',
                # 'receiving_currency_Brazil Real',
                # 'receiving_currency_Saudi Riyal', 
                # 'receiving_currency_Shekel',
                # 'receiving_currency_Swiss Franc', 
                # 'payment_currency_Australian Dollar',
                # 'payment_currency_Bitcoin', 
                # 'payment_currency_Canadian Dollar',
                # 'payment_currency_Euro', 
                # 'payment_currency_Mexican Peso',
                # 'payment_currency_Ruble', 
                # 'payment_currency_Rupee',
                # 'payment_currency_UK Pound', 
                # 'payment_currency_US Dollar',
                # 'payment_currency_Yen', 
                # 'payment_currency_Yuan',
                # 'payment_currency_Brazil Real', 
                # 'payment_currency_Saudi Riyal',
                # 'payment_currency_Shekel', 
                # 'payment_currency_Swiss Franc'
            ]].values), dtype=torch.float32)
        edge_labels_tensor = torch.tensor(self.transactions_df['is_laundering'].values, dtype=torch.long)
        
        self.g.edata['feat'] = edge_feats_tensor
        self.g.edata['label'] = edge_labels_tensor
        
        # 5. Create train/validation/test masks for edges
        num_edges = self.g.num_edges()
        eids = np.arange(num_edges)
        
        # Stratify split to ensure both classes are represented
        try:
            train_eids, test_eids, train_labels, test_labels = train_test_split(
                eids, self.transactions_df['is_laundering'].values, test_size=0.3, random_state=42, stratify=self.transactions_df['is_laundering'].values
            )
            val_eids, test_eids, val_labels, test_labels = train_test_split(
                test_eids, test_labels, test_size=0.5, random_state=42, stratify=test_labels
            )
        except ValueError:
            # Fallback for small datasets where stratification fails
            print("Warning: Stratification failed, using non-stratified split.")
            train_eids, test_eids = train_test_split(eids, test_size=0.3, random_state=42)
            val_eids, test_eids = train_test_split(test_eids, test_size=0.5, random_state=42)
            
        train_mask = torch.zeros(num_edges, dtype=torch.bool)
        val_mask = torch.zeros(num_edges, dtype=torch.bool)
        test_mask = torch.zeros(num_edges, dtype=torch.bool)
        
        train_mask[train_eids] = True
        val_mask[val_eids] = True
        test_mask[test_eids] = True
        
        self.g.edata['train_mask'] = train_mask
        self.g.edata['val_mask'] = val_mask
        self.g.edata['test_mask'] = test_mask

        print("Graph built successfully on CPU.")
        print(f"  Nodes: {self.g.num_nodes()}")
        print(f"  Edges: {self.g.num_edges()}")
        print(f"  Train Edges: {train_mask.sum().item()}")
        print(f"  Val Edges:   {val_mask.sum().item()}")
        print(f"  Test Edges:  {test_mask.sum().item()}")
        
    def get_dataloaders(self, batch_size, num_layers=2, fanouts=[15, 10]):
        """
        Creates and returns the DGL EdgeDataLoaders for training, validation, and testing.
        
        Args:
            batch_size (int): Number of seed edges per mini-batch.
            num_layers (int): Number of GNN layers.
            fanouts (list): List of neighbor sampling sizes for each layer, from out-to-in.
                            Must match num_layers.
        
        Returns:
            tuple: (train_loader, val_loader, test_loader, full_graph)
        """
        if len(fanouts) != num_layers:
            raise ValueError(f"Length of fanouts ({len(fanouts)}) must match num_layers ({num_layers}).")
            
        # 1. Define the neighborhood sampler
        # This samples neighborhoods for the src/dst nodes of the seed edges
        sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)

        # 2. Get the Edge IDs for each set
        # We use g.eids('all') to get the original edge IDs corresponding to the masks
        train_eids = self.g.eids('all')[self.g.edata['train_mask']]
        val_eids = self.g.eids('all')[self.g.edata['val_mask']]
        test_eids = self.g.eids('all')[self.g.edata['test_mask']]

        if mode == 'train':
            mask = graph_data.train_mask
        elif mode == 'val':
            mask = graph_data.val_mask
        else:
            mask = graph_data.test_mask
        
        edge_ids = torch.where(mask)[0]
        
        # 3. Create the EdgeDataLoaders
        # We set device='cpu' (default) and num_workers > 0
        # The data loader will run sampling in parallel on CPU
        # The main training loop will manually move blocks to the GPU
        train_loader = dgl.dataloading.EdgeDataLoader(
            self.g, train_eids, sampler,
            batch_size=batch_size,
            shuffle=True,
            drop_last=False,
            num_workers=4  # Adjust based on your CPU cores
        )
        
        val_loader = dgl.dataloading.EdgeDataLoader(
            self.g, val_eids, sampler,
            batch_size=batch_size,
            shuffle=False,
            drop_last=False,
            num_workers=4
        )
        
        test_loader = dgl.dataloading.EdgeDataLoader(
            self.g, test_eids, sampler,
            batch_size=batch_size,
            shuffle=False,
            drop_last=False,
            num_workers=4
        )
        
        print("DataLoaders created.")
        return train_loader, val_loader, test_loader, self.g

In [7]:
# 2. Initialize the loader
loader_util = AMLGraphLoader(accounts_df, transactions_df)

# 3. Get dataloaders
train_loader, val_loader, test_loader, full_graph = loader_util.get_dataloaders(
    batch_size=1024, 
    num_layers=2, 
    fanouts=[15, 10]
)

Building graph...
Graph built successfully on CPU.
  Nodes: 518581
  Edges: 5078345
  Train Edges: 3554841
  Val Edges:   761752
  Test Edges:  761752


AttributeError: 'DGLGraph' object has no attribute 'eids'

### DGL Pipeline

In [4]:
@dataclass
class GraphData:
    """Container for processed AML graph data ready for GNN training."""
    graph: dgl.DGLGraph
    labels: torch.Tensor
    train_mask: torch.Tensor
    val_mask: torch.Tensor
    test_mask: torch.Tensor
    num_classes: int
    node_features_dim: int
    edge_features_dim: int
    account_mapping: Dict[str, int]

In [5]:
class DataLoader:
    """
    Data loader for Anti-Money Laundering graph construction with DGL.    
    Supports GIN, PNA, and GAT models with temporal and edge features.
    """
    
    def __init__(
        self,
        transactions_df: pd.DataFrame,
        accounts_df: pd.DataFrame,
        add_self_loops: bool = False
    ):
        """
        Initialize AML data loader.
        
        Args:
            transactions_df: Transaction data from load_transactions()
            accounts_df: Account metadata from load_accounts()
            add_self_loops: Whether to add self-loops to graph
        """
        self.transactions = transactions_df.copy(deep=True)
        self.accounts = accounts_df.copy(deep=True)
        self.add_self_loops = add_self_loops

        print("Initializing AML Data Loader...")
        print(f"Transactions: {len(self.transactions):,}")
        print(f"Accounts: {len(self.accounts):,}")
    
    def _create_account_mapping(self) -> Dict[str, int]:
        """Create mapping from account IDs to node indices."""
        # Get unique accounts from both source and target
        all_accounts = pd.concat([
            self.transactions['from_bank_account_id'],
            self.transactions['to_bank_account_id']
        ]).unique()
        
        account_mapping = {acc: idx for idx, acc in enumerate(all_accounts)}
        print(f"Created account mapping: {len(account_mapping):,} unique accounts")
        return account_mapping
    
    def _encode_edge_features(self) -> np.ndarray:
        """
        Encode edge features from transaction and account data.
        
        Returns:
            Array of shape (n_edges, edge_features_dim)
        """
        features_list = []
        
        # 1. Amount features (log transform to reduce skew)
        amount_received = self.transactions['amount_received'].values.reshape(-1, 1)
        amount_paid = self.transactions['amount_paid'].values.reshape(-1, 1)

        log_amount_received = np.log10(amount_received)
        log_amount_paid = np.log10(amount_paid)
        
        features_list.extend([log_amount_received, log_amount_paid])
        
        # 2. Amount ratio and difference
        amount_ratio = (amount_received / (amount_paid + 1e-8)).reshape(-1, 1)
        amount_diff = (amount_received - amount_paid).reshape(-1, 1)
        
        scaler_ratio = StandardScaler()
        scaler_diff = StandardScaler()
        
        features_list.append(scaler_ratio.fit_transform(amount_ratio))
        features_list.append(scaler_diff.fit_transform(amount_diff))
        
        # 3. Categorical features
        one_hot_array = pd.get_dummies(self.transactions[['payment_format', 'receiving_currency', 'payment_currency']], dtype=float).to_numpy()
        features_list.append(one_hot_array)
        
        # 4. Currency matching flags
        currency_match = (
            self.transactions['receiving_currency'] == self.transactions['payment_currency']
        ).astype(float).values.reshape(-1, 1)
        features_list.append(currency_match)
        
        # 5. Bank relationship features
        same_bank = (
            self.transactions['from_bank'] == self.transactions['to_bank']
        ).astype(float).values.reshape(-1, 1)
        features_list.append(same_bank)
        
        # Concatenate all features
        edge_features = np.hstack(features_list)
        
        print(f"Edge features shape: {edge_features.shape}")
        return edge_features
    
    def _create_node_features(
        self,
        account_mapping: Dict[str, int]
    ) -> np.ndarray:
        """
        Create node features from account metadata and transaction statistics.
        
        Args:
            account_mapping: Mapping from account IDs to node indices
            
        Returns:
            Array of shape (n_nodes, node_features_dim)
        """        
        node_features_df = pd.DataFrame.from_dict(account_mapping, orient='index', columns=['node_id']).reset_index().rename(columns={'index': 'bank_account_id'})
        
        node_features_df = node_features_df.merge(
            self.accounts,
            on='bank_account_id',
            how='left'
        ).drop_duplicates()

        # print('test: ', node_features_df)
        node_features = pd.get_dummies(node_features_df[['entity_type']], dtype=float).to_numpy()

        # Add degree centrality features (?)
        print(f"Node features shape: {node_features.shape}")
        return node_features
    
    def build_graph(
        self,
        train_ratio: float = 0.7,
        val_ratio: float = 0.15,
        test_ratio: float = 0.15,
        seed: int = 42
    ) -> GraphData:
        """
        Build DGL heterogeneous graph for AML detection.
        
        Args:
            train_ratio: Proportion of edges for training
            val_ratio: Proportion of edges for validation
            test_ratio: Proportion of edges for testing
            seed: Random seed for reproducibility
            
        Returns:
            AMLGraphData object containing graph and associated data
        """
        np.random.seed(seed)
        torch.manual_seed(seed)
        
        print("\n" + "="*60)
        print("Building DGL Graph for AML Detection")
        print("="*60)
        
        # 1. Create account mapping
        account_mapping = self._create_account_mapping()
        
        # 2. Create edge list
        src_nodes = [account_mapping[acc] for acc in self.transactions['from_bank_account_id']]
        dst_nodes = [account_mapping[acc] for acc in self.transactions['to_bank_account_id']]

        # 3. Create edge features
        edge_features = self._encode_edge_features()
        
        # 4. Create node features
        node_features = self._create_node_features(account_mapping)
        
        # 5. Build DGL graph
        graph = dgl.graph((src_nodes, dst_nodes), num_nodes=len(account_mapping))
        
        # Add self-loops if requested
        if self.add_self_loops:
            graph = dgl.add_self_loop(graph)
            print(f"Added self-loops to graph")
        
        # 6. Add features to graph
        graph.ndata['feat'] = torch.FloatTensor(node_features)
        graph.edata['feat'] = torch.FloatTensor(edge_features)

        # 6a. Compute additional graph statistics
        in_degrees = graph.in_degrees().float()
        out_degrees = graph.out_degrees().float()
        
        # Add degree features to nodes
        degree_features = torch.stack([in_degrees, out_degrees], dim=1)
        graph.ndata['feat'] = torch.cat([graph.ndata['feat'], degree_features], dim=1)
        
        # 7. Add edge labels (laundering indicator)
        edge_labels = torch.LongTensor(self.transactions['is_laundering'].values)
        graph.edata['label'] = edge_labels
        
        # 8. Create train/val/test masks for edges
        n_edges = graph.num_edges()
        indices = np.random.permutation(n_edges)
        
        train_size = int(train_ratio * n_edges)
        val_size = int(val_ratio * n_edges)
        
        train_idx = indices[:train_size]
        val_idx = indices[train_size:train_size + val_size]
        test_idx = indices[train_size + val_size:]
        
        train_mask = torch.zeros(n_edges, dtype=torch.bool)
        val_mask = torch.zeros(n_edges, dtype=torch.bool)
        test_mask = torch.zeros(n_edges, dtype=torch.bool)
        
        train_mask[train_idx] = True
        val_mask[val_idx] = True
        test_mask[test_idx] = True
        
        graph.edata['train_mask'] = train_mask
        graph.edata['val_mask'] = val_mask
        graph.edata['test_mask'] = test_mask
        
        print("\n" + "-"*60)
        print("Graph Statistics:")
        print("-"*60)
        print(f"Nodes: {graph.num_nodes():,}")
        print(f"Edges: {graph.num_edges():,}")
        print(f"Node feature dim: {graph.ndata['feat'].shape[1]}")
        print(f"Edge feature dim: {graph.edata['feat'].shape[1]}")
        print(f"Average degree: {graph.num_edges() / graph.num_nodes():.2f}")
        print(f"\nTrain edges: {train_mask.sum().item():,} ({train_ratio*100:.1f}%)")
        print(f"Val edges: {val_mask.sum().item():,} ({val_ratio*100:.1f}%)")
        print(f"Test edges: {test_mask.sum().item():,} ({test_ratio*100:.1f}%)")
        print(f"\nLaundering edges: {edge_labels.sum().item():,} ({edge_labels.float().mean()*100:.3f}%)")
        print("="*60 + "\n")
        
        return GraphData(
            graph=graph,
            labels=edge_labels,
            train_mask=train_mask,
            val_mask=val_mask,
            test_mask=test_mask,
            num_classes=2,
            node_features_dim=graph.ndata['feat'].shape[1],
            edge_features_dim=graph.edata['feat'].shape[1],
            account_mapping=account_mapping
        )
    
    def get_subgraph_sampler(
        self,
        graph_data: GraphData,
        batch_size: int = 1024,
        num_neighbors: List[int] = [10, 5],
        mode: str = 'train'
    ) -> dgl.dataloading.DataLoader:
        """
        Create a neighborhood sampler for mini-batch training.
        
        Args:
            graph_data: Processed graph data
            batch_size: Number of edges per batch
            num_neighbors: Number of neighbors to sample per layer
            mode: One of 'train', 'val', or 'test'
            
        Returns:
            DGL DataLoader for mini-batch training
        """
        if mode == 'train':
            mask = graph_data.train_mask
        elif mode == 'val':
            mask = graph_data.val_mask
        else:
            mask = graph_data.test_mask
        
        edge_ids = torch.where(mask)[0]
        
        sampler = dgl.dataloading.MultiLayerFullNeighborSampler(len(num_neighbors))
        
        # For edge classification, we need EdgeDataLoader
        # Convert edge IDs to node pairs
        # src, dst = graph_data.graph.find_edges(edge_ids)
        edge_sampler = dgl.dataloading.as_edge_prediction_sampler(
            sampler,
            negative_sampler=None  # No negative sampling for supervised classification
        )
        
        dataloader = dgl.dataloading.DataLoader(
            graph_data.graph,
            edge_ids,
            edge_sampler,
            batch_size=batch_size,
            shuffle=(mode == 'train'),
            drop_last=False,
            num_workers=2
        )
        
        return dataloader

In [7]:
loader = DataLoader(trans_df.head(100000), accounts_df, add_self_loops=False)
graph_data = loader.build_graph()
subgraph_sampler = loader.get_subgraph_sampler(graph_data)

Initializing AML Data Loader...
Transactions: 100,000
Accounts: 518,581

Building DGL Graph for AML Detection
Created account mapping: 81,352 unique accounts
Edge features shape: (100000, 43)
Node features shape: (81352, 6)

------------------------------------------------------------
Graph Statistics:
------------------------------------------------------------
Nodes: 81,352
Edges: 100,000
Node feature dim: 8
Edge feature dim: 43
Average degree: 1.23

Train edges: 70,000 (70.0%)
Val edges: 15,000 (15.0%)
Test edges: 15,000 (15.0%)

Laundering edges: 5 (0.005%)



In [11]:
for i in subgraph_sampler:
    print(i)
    break



(tensor([149601, 273850,  81204,  ...,   1921, 266964, 198585]), Graph(num_nodes=1402, num_edges=1024,
      ndata_schemes={'feat': Scheme(shape=(8,), dtype=torch.float32), '_ID': Scheme(shape=(), dtype=torch.int64)}
      edata_schemes={'feat': Scheme(shape=(43,), dtype=torch.float32), 'label': Scheme(shape=(), dtype=torch.int64), 'train_mask': Scheme(shape=(), dtype=torch.bool), 'val_mask': Scheme(shape=(), dtype=torch.bool), 'test_mask': Scheme(shape=(), dtype=torch.bool), '_ID': Scheme(shape=(), dtype=torch.int64)}), [Block(num_src_nodes=1970, num_dst_nodes=1823, num_edges=3478), Block(num_src_nodes=1823, num_dst_nodes=1402, num_edges=3060)])


In [8]:
def create_gat_model(
    node_feat_dim: int,
    edge_feat_dim: int,
    hidden_dims: List[int] = [128, 64, 32],
    num_heads: List[int] = [8, 8, 1],
    **kwargs
) -> GraphAttentionNetwork:
    """Create GAT model for AML detection."""
    return GraphAttentionNetwork(
        in_node_feats=node_feat_dim,
        in_edge_feats=edge_feat_dim,
        hidden_dims=hidden_dims,
        num_heads=num_heads,
        **kwargs
    )

In [9]:
model = create_gat_model(
        node_feat_dim=graph_data.node_features_dim,
        edge_feat_dim=graph_data.edge_features_dim,
        hidden_dims=[128, 64, 32],
        num_heads=[8, 8, 1]
    )
    
# Train
trainer = AMLModelTrainer(model, learning_rate=0.001)
history = trainer.train(graph_data.graph, num_epochs=100)




Training GraphAttentionNetwork


DGLError: There are 0-in-degree nodes in the graph, output for those nodes will be invalid. This is harmful for some applications, causing silent performance regression. Adding self-loop on the input graph by calling `g = dgl.add_self_loop(g)` will resolve the issue. Setting ``allow_zero_in_degree`` to be `True` when constructing this module will suppress the check and let the code run.