In [2]:
import pandas as pd
import networkx as nx
from datetime import datetime
from typing import List, Dict, Any
import numpy as np
from concurrent.futures import ProcessPoolExecutor
from collections import defaultdict
import pyarrow as pa
import pyarrow.compute as pc
from tqdm import tqdm

In [None]:
class TransactionNetworkBuilder:
    def __init__(self, cash_node_identifier: str = "CASH"):
        """Initialize with optimized data structures."""
        self.graph = nx.DiGraph()
        # Use defaultdict to avoid constant key checking
        self.node_features = defaultdict(lambda: {'node_type': 'account'})
        self.edge_features = defaultdict(lambda: defaultdict(float))
        self.CASH_NODE = cash_node_identifier
        
        # Pre-allocate buffers for batch processing
        self.edge_buffer = defaultdict(lambda: {
            'amount': 0.0,
            'count': 0,
            'is_cash_transaction': False
        })
        
    def _process_batch(
        self,
        batch: pd.DataFrame,
        party_col: str,
        counterparty_col: str,
        amount_col: str,
        is_credit_col: str,
        transaction_type_col: str = None,
        cash_transaction_identifiers: Dict[str, str] = None
    ) -> Dict:
        """Process a batch of transactions efficiently."""
        edge_updates = defaultdict(lambda: {
            'amount': 0.0,
            'count': 0,
            'is_cash_transaction': False
        })
        
        # Convert to PyArrow Table for faster processing
        table = pa.Table.from_pandas(batch)
        
        # Process regular transactions
        mask = pc.is_null(table[transaction_type_col]) if transaction_type_col else None
        if mask is not None:
            regular_transactions = table.filter(mask)
            
            # Vectorized direction determination
            sources = np.where(
                regular_transactions[is_credit_col].to_numpy(),
                regular_transactions[counterparty_col].to_numpy(),
                regular_transactions[party_col].to_numpy()
            )
            targets = np.where(
                regular_transactions[is_credit_col].to_numpy(),
                regular_transactions[party_col].to_numpy(),
                regular_transactions[counterparty_col].to_numpy()
            )
            amounts = regular_transactions[amount_col].to_numpy()
            
            # Batch update edge buffer
            for source, target, amount in zip(sources, targets, amounts):
                edge_key = (source, target)
                edge_updates[edge_key]['amount'] += amount
                edge_updates[edge_key]['count'] += 1
        
        # Process cash transactions separately (if any)
        if cash_transaction_identifiers and transaction_type_col:
            cash_mask = pc.is_in(
                table[transaction_type_col],
                pa.array(list(cash_transaction_identifiers.keys()))
            )
            cash_transactions = table.filter(cash_mask)
            
            for tx_type, direction in cash_transaction_identifiers.items():
                type_mask = pc.equal(cash_transactions[transaction_type_col], tx_type)
                type_transactions = cash_transactions.filter(type_mask)
                
                if direction == 'out':
                    sources = type_transactions[party_col].to_numpy()
                    targets = np.full_like(sources, self.CASH_NODE)
                else:
                    sources = np.full_like(
                        type_transactions[party_col].to_numpy(),
                        self.CASH_NODE
                    )
                    targets = type_transactions[party_col].to_numpy()
                
                amounts = type_transactions[amount_col].to_numpy()
                
                for source, target, amount in zip(sources, targets, amounts):
                    edge_key = (source, target)
                    edge_updates[edge_key]['amount'] += amount
                    edge_updates[edge_key]['count'] += 1
                    edge_updates[edge_key]['is_cash_transaction'] = True
        
        return edge_updates

    def create_network_from_df(
        self,
        df: pd.DataFrame,
        party_col: str,
        counterparty_col: str,
        amount_col: str,
        is_credit_col: str,
        batch_size: int = 100_000,
        n_workers: int = 4,
        **kwargs
    ):
        """Create network with optimized batch processing."""
        # Add cash node if needed
        if kwargs.get('cash_transaction_identifiers'):
            self.node_features[self.CASH_NODE] = {'node_type': 'cash_system'}
        
        # Split dataframe into batches
        n_batches = len(df) // batch_size + (1 if len(df) % batch_size else 0)
        batches = np.array_split(df, n_batches)
        
        # Process batches in parallel
        with ProcessPoolExecutor(max_workers=n_workers) as executor:
            batch_results = list(tqdm(
                executor.map(
                    lambda b: self._process_batch(
                        b, party_col, counterparty_col,
                        amount_col, is_credit_col,
                        kwargs.get('transaction_type_col'),
                        kwargs.get('cash_transaction_identifiers')
                    ),
                    batches
                ),
                total=len(batches),
                desc="Processing batches"
            ))
        
        # Merge batch results
        for batch_edges in batch_results:
            for (source, target), updates in batch_edges.items():
                edge_key = (source, target)
                if not self.graph.has_edge(source, target):
                    self.graph.add_edge(source, target)
                    self.edge_features[edge_key] = updates
                else:
                    self.edge_features[edge_key]['amount'] += updates['amount']
                    self.edge_features[edge_key]['count'] += updates['count']
                    if updates['is_cash_transaction']:
                        self.edge_features[edge_key]['is_cash_transaction'] = True

    def compute_network_features(self, batch_size: int = 1000):
        """Compute network features in batches."""
        nodes = list(self.graph.nodes())
        node_batches = [
            nodes[i:i + batch_size]
            for i in range(0, len(nodes), batch_size)
        ]
        
        for batch in tqdm(node_batches, desc="Computing node features"):
            batch_features = {}
            for node in batch:
                if node == self.CASH_NODE:
                    continue
                
                # Efficient feature computation using numpy
                in_edges = np.array([
                    self.edge_features[(u, node)]['amount']
                    for u in self.graph.predecessors(node)
                ])
                out_edges = np.array([
                    self.edge_features[(node, v)]['amount']
                    for v in self.graph.successors(node)
                ])
                
                features = {
                    'in_degree': len(in_edges),
                    'out_degree': len(out_edges),
                    'total_degree': len(in_edges) + len(out_edges),
                    'total_received': np.sum(in_edges),
                    'total_sent': np.sum(out_edges)
                }
                
                # Cash-specific features
                if self.CASH_NODE in self.graph:
                    cash_withdrawals = sum(
                        self.edge_features[(node, v)]['amount']
                        for v in self.graph.successors(node)
                        if v == self.CASH_NODE
                    )
                    cash_deposits = sum(
                        self.edge_features[(u, node)]['amount']
                        for u in self.graph.predecessors(node)
                        if u == self.CASH_NODE
                    )
                    features.update({
                        'total_cash_withdrawals': cash_withdrawals,
                        'total_cash_deposits': cash_deposits,
                        'net_cash_flow': cash_deposits - cash_withdrawals
                    })
                
                batch_features[node] = features
            
            # Bulk update node features
            self.node_features.update(batch_features)

    def get_node_features_df(self) -> pd.DataFrame:
        """Convert node features to DataFrame efficiently."""
        return pd.DataFrame.from_dict(self.node_features, orient='index')

    def get_edge_list_df(self) -> pd.DataFrame:
        """Convert edge data to DataFrame efficiently."""
        return pd.DataFrame.from_dict(
            {(s, t): d for (s, t), d in self.edge_features.items()},
            orient='index'
        ).reset_index()

# Example usage:
if __name__ == "__main__":
    # Create sample large dataset
    n_rows = 1_000_000
    data = {
        'party_id': np.random.choice(['A', 'B', 'C', 'D', 'E'], n_rows),
        'counterparty_id': np.random.choice(['B', 'C', 'D', 'E', 'F'], n_rows),
        'amount': np.random.uniform(100, 10000, n_rows),
        'is_credit': np.random.choice([True, False], n_rows),
        'transaction_type': np.random.choice(
            ['TRANSFER', 'ATM_WITHDRAWAL', 'CASH_DEPOSIT', None],
            n_rows,
            p=[0.7, 0.1, 0.1, 0.1]
        )
    }
    df = pd.DataFrame(data)
    
    # Initialize and process
    builder = TransactionNetworkBuilder()
    builder.create_network_from_df(
        df=df,
        party_col='party_id',
        counterparty_col='counterparty_id',
        amount_col='amount',
        is_credit_col='is_credit',
        transaction_type_col='transaction_type',
        cash_transaction_identifiers={
            'ATM_WITHDRAWAL': 'out',
            'CASH_DEPOSIT': 'in'
        },
        batch_size=100_000,
        n_workers=4
    )

In [3]:
G = nx.Graph()