In [1]:

import random
import numpy as np
import pandas as pd
import torch
from torch_geometric.data import Data
from neo4j import GraphDatabase

URI = "neo4j://localhost"
AUTH = ("neo4j", "password")

NODE_TYPE_MAPPINGS = {
    'account': 0,
    'transaction': 1,
    'user': 2,
    'country': 3,
    'lob': 4,
    'sector': 5
}

TXN_TYPE_MAPPINGS = {
    'DEPOSIT-CASH': 0,
    'DEPOSIT-CHECK': 1,
    'EXCHANGE': 2,
    'MAKE-PAYMENT': 3,
    'MOVE-FUNDS': 4,
    'PAY-CHECK': 5,
    'QUICK-PAYMENT': 6,
    'WITHDRAWAL': 7,
}

with GraphDatabase.driver(URI, auth=AUTH) as driver:
    driver.verify_connectivity()

# src dst ts src_label src_type dst_type seed graph_edge_type
df_edges = pd.DataFrame({'src': [], 'dst': [], 'ts': [], 'src_label': [], 'src_type': [], 'dst_type': [], 'seed': [], 'graph_edge_type': []})
fstore = {}

In [2]:
def extract_features(node):
    node_type = next(iter(node.labels))
    node_id = ''
    if node.get('id') is not None:
        node_id = node.get('id')
    else:
        node_id = node.get('name')
    if node_type == 'Transaction':
        import time
        from datetime import datetime

        date_time_str = node.get('ts')
        date_time_obj = datetime.strptime(
            date_time_str, '%Y-%m-%d %H:%M:%S')

        timestamp = int(time.mktime(date_time_obj.timetuple()))

        return node_id, node_type, [NODE_TYPE_MAPPINGS[node_type.lower()], float(node.get('amount')), timestamp, TXN_TYPE_MAPPINGS[node.get('type')]]
    else:
        return node_id, node_type, []

def extract_data(relationship):
    src = relationship.start_node
    dst = relationship.end_node
    edge_type = relationship.type
    
    src_id, src_type, src_features = extract_features(src)
    dst_id, dst_type, dst_features = extract_features(dst)
    fstore[src_id] = src_features
    fstore[dst_id] = dst_features
    src_label = -1
    if src_type == 'Transaction':
        src_label = int(src.get('isFraud'))
    df_edges.loc[len(df_edges)] = [src_id, dst_id, df_edges.shape[0], src_label, src_type, dst_type, 1, edge_type]

def fetch_edges(tx):
    belongs_to = list(tx.run(f"MATCH p=()-[r:BELONGS_TO]->() RETURN p limit 50"))
    from_country = list(tx.run("MATCH p=()-[r:FROM]->() RETURN p limit 50"))
    lob_in = list(tx.run("MATCH p=()-[r:LOB_IN]->() RETURN p limit 50"))
    received_by = list(
        tx.run(f"MATCH p=()-[r:RECEIVED_BY]->() RETURN p limit 50"))
    transferred_by = list(
        tx.run(f"MATCH p=()-[r:TRANSFERRED_BY]->() RETURN p limit 50"))
    works_in = list(tx.run("MATCH p=()-[r:WORKS_IN]->() RETURN p limit 50"))
    edges = belongs_to + from_country + lob_in + received_by + transferred_by + works_in
    for edge in edges:
        extract_data(edge[0].relationships[0])


In [3]:
with driver.session() as session:
    session.execute_read(fetch_edges)

In [4]:
df_edges.head(50)

Unnamed: 0,src,dst,ts,src_label,src_type,dst_type,seed,graph_edge_type
0,ACCOUNT-3731,COMPANY-3724,0,-1,Account,User,1,BELONGS_TO
1,ACCOUNT-1105,JPMC-COMPANY-1097,1,-1,Account,User,1,BELONGS_TO
2,ACCOUNT-7627,JPMC-COMPANY-7619,2,-1,Account,User,1,BELONGS_TO
3,ACCOUNT-5168,CLIENT-5160,3,-1,Account,User,1,BELONGS_TO
4,ACCOUNT-9269,BILL-COMPANY-9268,4,-1,Account,User,1,BELONGS_TO
5,ACCOUNT-3163,COMPANY-3157,5,-1,Account,User,1,BELONGS_TO
6,ACCOUNT-4854,CLIENT-4853,6,-1,Account,User,1,BELONGS_TO
7,ACCOUNT-9499,JPMC-COMPANY-9490,7,-1,Account,User,1,BELONGS_TO
8,ACCOUNT-6426,COMPANY-6418,8,-1,Account,User,1,BELONGS_TO
9,ACCOUNT-8196,CLIENT-8195,9,-1,Account,User,1,BELONGS_TO


In [5]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)


In [6]:
from custom_xfraud.utils import timeit
from custom_xfraud.models import GNN, HetNet as Net, HetNetLogi as NetLogi
from custom_xfraud.graph_loader import NaiveHetDataLoader, NaiveHetGraph
from custom_xfraud.data_utils import create_naive_het_graph_from_edges as _create_naive_het_graph_from_edges
from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler
from ignite.contrib.metrics import AveragePrecision, ROC_AUC
from ignite.handlers import EarlyStopping, ModelCheckpoint, Timer
from ignite.metrics import Accuracy, Loss
from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator, Engine
from ignite.utils import convert_tensor
import torch.nn.functional as F
import torch.nn as nn
import torch
import datetime
import pandas as pd
import numpy as np
import joblib
import tqdm
import fire
import os
import logging
import tempfile
import subprocess
from functools import partial
import glob

logging.basicConfig(
    format='%(asctime)s | %(levelname)s | %(name)s | %(message)s')
logger = logging.getLogger('exp')
logger.setLevel(logging.INFO)

# torch
mem = joblib.Memory('./data/cache')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

create_naive_het_graph_from_edges = mem.cache(_create_naive_het_graph_from_edges)


def prepare_batch(batch, ts_range, fstore, default_feature, g, device=device, non_blocking=False):
    encoded_seeds, encoded_ids, edge_ids = batch
    encoded_seeds = set(encoded_seeds)
    encode_to_new = dict((e, i) for i, e in enumerate(encoded_ids))
    mask = np.asarray([e in encoded_seeds for e in encoded_ids])
    decoded_ids = [g.node_decode[e] for e in encoded_ids]

    x = np.asarray([
        fstore.get(e, default_feature) for e in decoded_ids
    ])
    x = convert_tensor(torch.FloatTensor(x), device=device,
                       non_blocking=non_blocking)

    edge_list = [g.edge_list_encoded[:, idx] for idx in edge_ids]
    def f(x): return encode_to_new[x]
    f = np.vectorize(f)
    edge_list = [f(e) for e in edge_list]
    edge_list = [
        convert_tensor(torch.LongTensor(e), device=device,
                       non_blocking=non_blocking)
        for e in edge_list]

    y = np.asarray([
        -1 if e not in encoded_seeds else g.seed_label_encoded[e]
        for e in encoded_ids
    ])
    assert (y >= 0).sum() == len(encoded_seeds)

    y = torch.LongTensor(y)
    y = convert_tensor(y, device=device, non_blocking=non_blocking)
    mask = torch.BoolTensor(mask)
    mask = convert_tensor(mask, device=device, non_blocking=non_blocking)

    y = y[mask]

    node_type_encode = g.node_type_encode
    node_type = [node_type_encode[g.node_type[e]] for e in decoded_ids]
    node_type = torch.LongTensor(np.asarray(node_type))
    node_type = convert_tensor(
        node_type, device=device, non_blocking=non_blocking)

    edge_type = [[g.edge_list_type_encoded[eid]
                  for eid in list_] for list_ in edge_ids]
    edge_type = [torch.LongTensor(np.asarray(e)) for e in edge_type]
    edge_type = [convert_tensor(
        e, device=device, non_blocking=non_blocking) for e in edge_type]

    return ((mask, x, edge_list, node_type, edge_type), y)


def main(df_edges, fstore, path_result='exp_result.csv',
         dir_model='./model',
         conv_name='gcn', sample_method='sage',
         batch_size=(64, 16),
         width=16, depth=6,
         n_hid=400, n_heads=8, n_layers=6, dropout=0.2,
         optimizer='adamw', clip=0.25,
         n_batch=32, max_epochs=10, patience=8,
         seed_epoch=False, num_workers=0,
         seed=2020, debug=False, continue_training=False):

    if conv_name == '' or conv_name == 'logi':
        width, depth = 1, 1

    stats = dict(
        batch_size=batch_size,
        width=width, depth=depth,
        n_hid=n_hid, n_heads=n_heads, n_layers=n_layers, dropout=dropout,
        conv_name=conv_name, optimizer=str(optimizer), clip=clip,
        max_epochs=max_epochs, patience=patience,
        seed=seed, sample_method=sample_method
    )
    logger.info('Param %s', stats)

    
    if debug:
        logger.info('Main in debug mode.')
        df_edges = df_edges.iloc[:10000]
    if 'seed' not in df_edges:
        df_edges['seed'] = 1
    with timeit(logger, 'g-init'):
        g = create_naive_het_graph_from_edges(df_edges)

    seed_set = set(df_edges.query('seed>0')['src'])
    logger.info('#seed %d', len(seed_set))

    times = pd.Series(df_edges['ts'].unique())
    times_train_valid_split = times.quantile(0.7)
    times_valid_test_split = times.quantile(0.9)
    train_range = set(t for t in times
                        if t is not None and t <= times_train_valid_split)
    valid_range = set(t for t in times
                        if t is not None and times_train_valid_split < t <= times_valid_test_split)
    test_range = set(t for t in times
                        if t is not None and t > times_valid_test_split)
    
    logger.info('Range Train %s\t Valid %s\t Test %s',
                train_range, valid_range, test_range)

    x0 = fstore.get(g.get_seed_nodes(train_range)[0], [])
    assert x0 is not None
    num_feat = x0.shape[0]

    np.random.seed(seed)
    torch.manual_seed(seed)

    dl_train = NaiveHetDataLoader(
        width=width, depth=depth,
        g=g, ts_range=train_range, method=sample_method,
        batch_size=batch_size, n_batch=n_batch,
        seed_epoch=seed_epoch, num_workers=num_workers, shuffle=True)

    dl_valid = NaiveHetDataLoader(
        width=width, depth=depth,
        g=g, ts_range=valid_range, method=sample_method,
        batch_size=batch_size, n_batch=n_batch,
        seed_epoch=True, num_workers=num_workers, shuffle=False,
        cache_result=True)

    dl_test = NaiveHetDataLoader(
        width=width, depth=depth,
        g=g, ts_range=test_range, method=sample_method,
        batch_size=batch_size, n_batch=n_batch,
        seed_epoch=True, num_workers=num_workers, shuffle=False,
        cache_result=True)

    logger.info('Len dl train %d, valid %d, test %d.',
                len(dl_train), len(dl_valid), len(dl_test))
    for _ in tqdm.tqdm(dl_test, desc='gen-test-dl', ncols=80):
        pass

    num_node_type = len(g.node_type_encode)
    num_edge_type = len(g.edge_type_encode)
    logger.info('#node_type %d, #edge_type %d',
                num_node_type, num_edge_type)

    if conv_name != 'logi':
        if conv_name == '':
            gnn = None
        else:
            gnn = GNN(conv_name=conv_name,
                        n_in=num_feat,
                        n_hid=n_hid, n_heads=n_heads, n_layers=n_layers,
                        dropout=dropout,
                        num_node_type=num_node_type,
                        num_edge_type=num_edge_type
                        )

        model = Net(gnn, num_feat, num_embed=n_hid, n_hidden=n_hid)
    else:
        model = NetLogi(num_feat)
    model.to(device)

    model_loaded = False
    if continue_training:
        files = glob.glob(f'{dir_model}/model-{conv_name}-{seed}*')
        if len(files) > 0:
            files.sort(key=os.path.getmtime)
            load_file = files[-1]
            logger.info(f'Continue training from checkpoint {load_file}')
            model.load_state_dict(torch.load(load_file))
            model_loaded = True

    if optimizer == 'adamw':
        optimizer = torch.optim.AdamW(model.parameters())
    elif optimizer == 'adam':
        optimizer = torch.optim.Adam(model.parameters())
    elif optimizer == 'sgd':
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    elif optimizer == 'adagrad':
        optimizer = torch.optim.Adagrad(model.parameters())

    pb = partial(
        prepare_batch, g=g, fstore=fstore, ts_range=train_range,
        default_feature=np.zeros_like(x0))

    loss = nn.CrossEntropyLoss()

    from ignite.engine import EventEnum, _prepare_batch
    from ignite.distributed import utils as idist
    from ignite.engine.deterministic import DeterministicEngine

    from typing import Any, Callable, Dict, Optional, Sequence, Tuple, Union

    class ForwardEvents(EventEnum):
        FORWARD_STARTED = 'forward_started'
        FORWARD_COMPLETED = 'forward_completed'

    """create UDF trainer to register forward events"""
    def udf_supervised_trainer(
            model: torch.nn.Module,
            optimizer: torch.optim.Optimizer,
            loss_fn: Union[Callable, torch.nn.Module],
            device: Optional[Union[str, torch.device]] = None,
            non_blocking: bool = False,
            prepare_batch: Callable = _prepare_batch,
            output_transform: Callable = lambda x, y, y_pred, loss: loss.item(),
            deterministic: bool = False,
    ) -> Engine:
        device_type = device.type if isinstance(
            device, torch.device) else device
        on_tpu = "xla" in device_type if device_type is not None else False

        if on_tpu and not idist.has_xla_support:
            raise RuntimeError(
                "In order to run on TPU, please install PyTorch XLA")

        def _update(engine: Engine, batch: Sequence[torch.Tensor]) -> Union[Any, Tuple[torch.Tensor]]:
            model.train()
            optimizer.zero_grad()
            x, y = prepare_batch(batch, device=device,
                                    non_blocking=non_blocking)

            engine.fire_event(ForwardEvents.FORWARD_STARTED)
            y_pred = model(x)
            engine.fire_event(ForwardEvents.FORWARD_COMPLETED)

            loss = loss_fn(y_pred, y)
            loss.backward()

            optimizer.step()

            return output_transform(x, y, y_pred, loss)

        trainer = Engine(
            _update) if not deterministic else DeterministicEngine(_update)
        trainer.register_events(*ForwardEvents)

        return trainer

    trainer = udf_supervised_trainer(
        model, optimizer, loss,
        device=device, prepare_batch=pb)

    pb = partial(
        prepare_batch, g=g, fstore=fstore, ts_range=valid_range,
        default_feature=np.zeros_like(x0))
    evaluator = create_supervised_evaluator(model,
                                            metrics={
                                                'accuracy': Accuracy(),
                                                'loss': Loss(loss),
                                                'ap': AveragePrecision(
                                                    output_transform=lambda out: (out[0][:, 1], out[1])),
                                                'auc': ROC_AUC(
                                                    output_transform=lambda out: (out[0][:, 1], out[1])),
                                            }, device=device, prepare_batch=pb)

    if model_loaded:
        with torch.no_grad():
            evaluator.run(dl_test)
        metrics = evaluator.state.metrics
        logger.info(
            'Loaded model stat: Test\tLoss %.2f\tAccuracy %.2f\tAUC %.4f\tAP %.4f',
            metrics['loss'], metrics['accuracy'],
            metrics['auc'], metrics['ap']
        )

    scheduler = CosineAnnealingScheduler(
        optimizer, 'lr',
        start_value=0.05, end_value=1e-4,
        cycle_size=len(dl_train) * max_epochs)
    trainer.add_event_handler(Events.ITERATION_STARTED, scheduler)

    pbar_train = tqdm.tqdm(desc='train', total=len(dl_train), ncols=100)
    t_epoch = Timer(average=True)
    t_epoch.pause()

    t_iter = Timer(average=True)
    t_iter.pause()

    @trainer.on(ForwardEvents.FORWARD_STARTED)
    def resume_timer(engine):
        t_epoch.resume()
        t_iter.resume()

    @trainer.on(ForwardEvents.FORWARD_COMPLETED)
    def pause_timer(engine):
        t_epoch.pause()
        t_iter.pause()
        t_iter.step()

    @trainer.on(Events.EPOCH_STARTED)
    def log_training_loss(engine):
        pbar_train.refresh()
        pbar_train.reset()

    @trainer.on(Events.ITERATION_COMPLETED)
    def log_training_loss(engine):
        pbar_train.update(1)
        pbar_train.set_description(
            'Train [Eopch %03d] Loss %.4f T-iter %.4f' % (
                engine.state.epoch, engine.state.output, t_iter.value()
            )
        )

    @trainer.on(Events.EPOCH_COMPLETED)
    def log_training_results(engine):
        t_epoch.step()
        evaluator.run(dl_valid)
        metrics = evaluator.state.metrics
        logger.info(
            '[Epoch %03d]\tLoss %.4f\tAccuracy %.4f\tAUC %.4f\tAP %.4f \tTime %.2f / %03d',
            engine.state.epoch,
            metrics['loss'], metrics['accuracy'],
            metrics['auc'], metrics['ap'],
            t_epoch.value(), t_epoch.step_count
        )
        t_iter.reset()
        t_epoch.pause()
        t_iter.pause()

    def score_function(engine):
        return engine.state.metrics['auc']

    handler = EarlyStopping(
        patience=patience, score_function=score_function, trainer=trainer)
    evaluator.add_event_handler(Events.COMPLETED, handler)

    cp = ModelCheckpoint(dir_model, f'model-{conv_name}-{seed}', n_saved=1,
                            create_dir=True,
                            score_function=lambda e: evaluator.state.metrics['auc'],
                            require_empty=False)
    trainer.add_event_handler(
        Events.EPOCH_COMPLETED, cp, {conv_name: model})

    trainer.run(dl_train, max_epochs=max_epochs)

    path_model = cp.last_checkpoint
    model.load_state_dict(torch.load(path_model))
    model.eval()
    with torch.no_grad():
        evaluator.run(dl_test)
    metrics = evaluator.state.metrics
    logger.info(
        'Test\tLoss %.2f\tAccuracy %.2f\tAUC %.4f\tAP %.4f',
        metrics['loss'], metrics['accuracy'],
        metrics['auc'], metrics['ap']
    )

    stats.update(dict(metrics))

    stats['epoch'] = trainer.state.epoch,

    row = pd.DataFrame([stats])
    if os.path.exists(path_result):
        result = pd.read_csv(path_result)
    else:
        result = pd.DataFrame()
    result = result.append(row)
    result.to_csv(path_result, index=False)


 Please refer to the documentation for more details.
  from ignite.contrib.handlers.param_scheduler import CosineAnnealingScheduler


In [7]:
class FeatureStore(object):

    def __init__(self, store):
        self.db = store

    def _key(self, key):
        return key

    def put(self, key, value):
        self.db[self._key(key)] = value

    def get(self, key, default_value):
        if self._key(key) not in self.db:
            return np.array(default_value)
        return np.array(self.db[self._key(key)])

In [8]:
main(df_edges, FeatureStore(fstore))

INFO:exp:Param {'batch_size': (64, 16), 'width': 16, 'depth': 6, 'n_hid': 400, 'n_heads': 8, 'n_layers': 6, 'dropout': 0.2, 'conv_name': 'gcn', 'optimizer': 'adamw', 'clip': 0.25, 'max_epochs': 10, 'patience': 8, 'seed': 2020, 'sample_method': 'sage'}
INFO:exp:Started task g-init ...
INFO:factory-naive-het-graph:Started task node-type-init ...
INFO:factory-naive-het-graph:Completed task node-type-init - 0.008 sec.
INFO:factory-naive-het-graph:Started task edge-list-init ...
INFO:factory-naive-het-graph:Completed task edge-list-init - 0.003 sec.
INFO:native-het-g:Started task node-enc-init ...
INFO:native-het-g:Completed task node-enc-init - 0.000 sec.
INFO:native-het-g:Started task edge-type-init ...
INFO:native-het-g:Completed task edge-type-init - 0.000 sec.


________________________________________________________________________________
[Memory] Calling custom_xfraud.data_utils.create_naive_het_graph_from_edges...
create_naive_het_graph_from_edges(              src                dst   ts  src_label src_type dst_type  seed  \
0    ACCOUNT-3731       COMPANY-3724    0         -1  Account     User     1   
1    ACCOUNT-1105  JPMC-COMPANY-1097    1         -1  Account     User     1   
2    ACCOUNT-7627  JPMC-COMPANY-7619    2         -1  Account     User     1   
3    ACCOUNT-5168        CLIENT-5160    3         -1  Account     User     1   
4    ACCOUNT-9269  BILL-COMPANY-9268    4         -1  Account     User     1   
..            ...                ...  ...        ...      ...      ...   ...   
295  ACCOUNT-4207            35810.0  295         -1  Account   Sector     1   
296  ACCOUNT-4031            24763.0  296         -1  Account   Sector     1   
297  ACCOUNT-4030            26974.0  297         -1  Account   Sector     1   
298  A

edge-init: 100%|██████████| 600/600 [00:00<00:00, 791876.15it/s]
INFO:native-het-g:Started task seed-label-init ...
INFO:native-het-g:Completed task seed-label-init - 0.000 sec.
INFO:exp:Completed task g-init - 0.072 sec.
INFO:exp:#seed 166
INFO:exp:Range Train {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 16

________________________________create_naive_het_graph_from_edges - 0.0s, 0.0min


AssertionError: 