In [253]:
# 该版确保id的输入正确

import pandas as pd
import numpy as np
# from sklearn.preprocessing import LabelEncoder
import scipy.sparse as sp
import dgl
import torch
from dgl.data import AsNodePredDataset#, AsLinkPredDataset
import numpy as np 
import torch.nn as nn 
import torch.nn.functional as F
from dgl.nn import SAGEConv
import dgl.function as fn
import tqdm
import sklearn.metrics
from sklearn.metrics import roc_auc_score
import os

In [94]:
gpu_device = torch.device('cuda')
cpu_device = torch.device('cpu')
print(gpu_device, cpu_device)

cuda cpu


In [4]:
def load_cora(data_path):
    data0 = dgl.data.CSVDataset(data_path,force_reload=True)
    data = AsNodePredDataset(data0, split_ratio=(0.5,0.2,0.3))
    g = data[0]
    g.ndata["features"] = g.ndata.pop("feat").float()
    g.ndata["labels"] = g.ndata.pop("label")
    return g, data.num_classes

In [5]:
data_path = '../../dataset/cora_csv_idrank/'
raw_g, n_classes = load_cora(data_path)

Done saving data into cached files.


In [20]:
g = dgl.add_reverse_edges(raw_g)

In [95]:
node_features = g.ndata['features']
node_labels = g.ndata['labels']
num_features = node_features.shape[1]
num_classes = (node_labels.max() + 1).item()
print('Number of classes: {:d}'.format(num_classes))

Number of classes: 7


In [21]:
g.edges(), '  ', g.edges()[0].shape

((tensor([   0,    0,    0,  ..., 2586, 1874, 2707]),
  tensor([  21,  905,  906,  ..., 1874, 1876, 1897])),
 '  ',
 torch.Size([10858]))

In [14]:
g.nodes()

tensor([   0,    1,    2,  ..., 2705, 2706, 2707])

In [26]:
g.edges()[1][:10858//2], g.edges()[0][10858//2:]

(tensor([  21,  905,  906,  ..., 2586, 1874, 2707]),
 tensor([  21,  905,  906,  ..., 2586, 1874, 2707]))

In [31]:
# reverse id 的规定
"""
reverse_eids (Tensor or dict[etype, Tensor], optional) –
A tensor of reverse edge ID mapping. The i-th element indicates the ID of the i-th edge’s reverse edge.
"""
E = g.number_of_edges()
reverse_eids = torch.cat([torch.arange(E//2,  E), torch.arange(0, E//2)])
reverse_eids

tensor([5429, 5430, 5431,  ..., 5426, 5427, 5428])

In [79]:
# 规定提取的边
eids = np.arange(g.number_of_edges())
eids = np.random.permutation(eids)

train_size = int(0.7 * len(eids))
train_eid = eids[:train_size]
# train_g = dgl.remove_edges(g, eids[train_size:])

test_eid = eids[train_size:]
# test_g = dgl.remove_edges(g, eids[:train_size])

In [81]:
train_eid.shape, test_eid.shape

((7600,), (3258,))

In [120]:
# 建立样本迭代器-base
# https://docs.dgl.ai/generated/dgl.dataloading.DataLoader.html
# https://docs.dgl.ai/en/0.8.x/generated/dgl.dataloading.as_edge_prediction_sampler.html#dgl.dataloading.as_edge_prediction_sampler

negative_sampler = dgl.dataloading.negative_sampler.Uniform(3)  # N = 3
neighbor_sampler = dgl.dataloading.MultiLayerNeighborSampler([10,5])
edge_sampler = dgl.dataloading.as_edge_prediction_sampler(
    sampler=neighbor_sampler,
    negative_sampler=negative_sampler
)
edge_sampler_excl = dgl.dataloading.as_edge_prediction_sampler(
    sampler=neighbor_sampler,
    negative_sampler=negative_sampler,
    exclude='reverse_id',
    reverse_eids=reverse_eids
)

In [284]:
train_dataloader = dgl.dataloading.DataLoader(
    graph = g, 
    indices = train_eid, # indices=torch.arange(g.number_of_edges()), 
    graph_sampler=edge_sampler_excl,
    batch_size=512,
    shuffle=True,
    drop_last=False,
    num_workers=4,
    device='cpu'
    # use_prefetch_thread=True,
    # pin_prefetcher=True, 
    # use_ddp=True       # Make it work with distributed data parallel
)

In [285]:
test_dataloader = dgl.dataloading.DataLoader(
    graph=g,
    indices = test_eid, #indices=torch.arange(g.number_of_edges()),
    graph_sampler=edge_sampler,
    batch_size=512,
    shuffle=False,
    drop_last=False,
    num_workers=4,
    device='cpu'
)

In [286]:
input_nodes, pos_graph, neg_graph, mfgs = next(iter(train_dataloader))
print('Number of input nodes:', len(input_nodes))
print('Positive graph # nodes:', pos_graph.number_of_nodes(), '# edges:', pos_graph.number_of_edges())
print('Negative graph # nodes:', neg_graph.number_of_nodes(), '# edges:', neg_graph.number_of_edges())
print(mfgs)

# 128 * 3 = 384

Number of input nodes: 2624
Positive graph # nodes: 1625 # edges: 512
Negative graph # nodes: 1625 # edges: 1536
[Block(num_src_nodes=2624, num_dst_nodes=2432, num_edges=8331), Block(num_src_nodes=2432, num_dst_nodes=1625, num_edges=4576)]


In [287]:
input_nodes, pos_graph, neg_graph, mfgs = next(iter(test_dataloader))
print('Number of input nodes:', len(input_nodes))
print('Positive graph # nodes:', pos_graph.number_of_nodes(), '# edges:', pos_graph.number_of_edges())
print('Negative graph # nodes:', neg_graph.number_of_nodes(), '# edges:', neg_graph.number_of_edges())
print(mfgs)

Number of input nodes: 2634
Positive graph # nodes: 1644 # edges: 512
Negative graph # nodes: 1644 # edges: 1536
[Block(num_src_nodes=2634, num_dst_nodes=2449, num_edges=9262), Block(num_src_nodes=2449, num_dst_nodes=1644, num_edges=5402)]


In [289]:
# 制定model
class Model(nn.Module):
    def __init__(self, in_feats, h_feats):
        super(Model, self).__init__()
        self.h_feats = h_feats
        self.in_feats = in_feats
        self.conv1 = SAGEConv(self.in_feats, self.h_feats, aggregator_type='mean')
        self.conv2 = SAGEConv(self.h_feats, self.h_feats, aggregator_type='mean')
        
    def forward(self, mfgs, x):
        h_dst = x[: mfgs[0].num_dst_nodes()]
        h = self.conv1(mfgs[0], (x, h_dst))
        h = F.relu(h)
        h_dst = h[: mfgs[1].num_dst_nodes()]
        h = self.conv2(mfgs[1], (h, h_dst))
        return h
    

class DotPredictor(nn.Module):
    def forward(self, g, h):
        with g.local_scope():
            g.ndata['h'] = h
            g.apply_edges(fn.u_dot_v('h', 'h', 'score'))
            # return g.edata['score'][:,0]
            return torch.sigmoid(g.edata['score'][:,0]) # 相比原版接了sigmoid缩放到0、1区间
        

class MLPPredictor(nn.Module):
    def __init__(self, h_feats):
        super().__init__()
        self.W1 = nn.Linear(h_feats * 2, h_feats)
        self.W2 = nn.Linear(h_feats, 1)

    def apply_edges_method(self, edges):
        """
        Computes a scalar score for each edge of the given graph.

        Parameters
        ----------
        edges :
            Has three members ``src``, ``dst`` and ``data``, each of
            which is a dictionary representing the features of the
            source nodes, the destination nodes, and the edges
            themselves.

        Returns
        -------
        dict
            A dictionary of new edge features.
        """
        h = torch.cat([edges.src['h'], edges.dst['h']], 1)
        return {'score': self.W2(F.relu(self.W1(h))).squeeze(1)}

    def forward(self, g, h):
        with g.local_scope():
            g.ndata['h'] = h
            g.apply_edges(self.apply_edges_method)
            return g.edata['score'][:,0]

In [291]:
# 评估模块
def compute_auc(pos_score, neg_score):
    with torch.no_grad():
        scores = torch.cat([pos_score, neg_score]).numpy()
        labels = torch.cat([torch.ones_like(pos_score), torch.zeros_like(neg_score)])
        return roc_auc_score(labels, scores)

In [292]:
def get_test_result(model, predictor, test_dataloader, threds):
    model.eval()
    with torch.no_grad(), test_dataloader.enable_cpu_affinity():
        pred_all, pred01_all, label_all = torch.Tensor(), torch.Tensor(), torch.Tensor()
        loss_all = 0
        for step, (input_nodes, pos_graph, neg_graph, mfgs) in enumerate(test_dataloader):
            pos_graph = pos_graph.to(gpu_device)
            neg_graph = neg_graph.to(gpu_device)
            mfgs = [mfg.int().to(gpu_device) for mfg in mfgs]

            inputs = mfgs[0].srcdata['features']
            outputs = model(mfgs, inputs)
            pos_score = predictor(pos_graph, outputs)
            neg_score = predictor(neg_graph, outputs)

            # the score and label of edges (real and non-existent)
            score = torch.cat([pos_score, neg_score])
            label = torch.cat([torch.ones_like(pos_score), torch.zeros_like(neg_score)])
            loss = F.binary_cross_entropy_with_logits(score, label)

            pred = score.cpu()
            pred01 = pred.detach()
            pred01[pred01>=threds] = 1
            pred01[pred01<threds] = 0
            
            pred_all = torch.cat([pred_all, pred])
            pred01_all = torch.cat([pred01_all, pred01])
            label_all = torch.cat([label_all, label.cpu()])
            loss_all += loss
            
        accu = sklearn.metrics.accuracy_score(label_all.cpu().numpy(),pred01_all)
        auc = roc_auc_score(label_all.cpu().numpy(), pred_all)
        size = len(test_dataloader)
            # tq.set_postfix({'test-loss': '%.03f' % loss.item(), 
            #                 'test-accu': '%0.3f'%accu.item(),
            #                 'test-auc': '%0.3f'%auc.item()
            #                }, refresh=False)
    return accu, auc, pred_all, pred01_all, label_all, loss_all, size

In [293]:
# 实例化
model = Model(num_features, 256).to(gpu_device)
predictor = DotPredictor().to(gpu_device)
opt = torch.optim.Adam(list(model.parameters()) + list(predictor.parameters()))

In [294]:
# 训练过程
best_test_auc = 0
best_model_path = './../demo1/models/'


for epoch in range(10):
    with train_dataloader.enable_cpu_affinity(), tqdm.tqdm(train_dataloader) as tq:
        # with tqdm.tqdm(train_dataloader) as tq:
        for step, (input_nodes, pos_graph, neg_graph, mfgs) in enumerate(tq):
            model.train()
            pos_graph = pos_graph.to(gpu_device)
            neg_graph = neg_graph.to(gpu_device)
            mfgs = [mfg.int().to(gpu_device) for mfg in mfgs]

            inputs = mfgs[0].srcdata['features']
            outputs = model(mfgs, inputs)
            pos_score = predictor(pos_graph, outputs)
            neg_score = predictor(neg_graph, outputs)

            # the score and label of edges (real and non-existent)
            score = torch.cat([pos_score, neg_score])
            label = torch.cat([torch.ones_like(pos_score), torch.zeros_like(neg_score)])
            loss = F.binary_cross_entropy_with_logits(score, label)

            pred = score.cpu()
            pred01 = pred.detach()
            threds = 0.5
            pred01[pred01>=threds] = 1
            pred01[pred01<threds] = 0
            with torch.no_grad():
                accu = sklearn.metrics.accuracy_score(label.cpu().numpy(),pred01)
                auc = roc_auc_score(label.cpu().numpy(), score.detach().cpu())

            opt.zero_grad()
            loss.backward()
            opt.step()
                
            tq.set_postfix({
                'train:':'-----',
                'loss': '%.03f' % loss.item(), 
                'accu': '%0.3f'%accu.item(),
                'auc': '%0.3f'%auc.item()
                           }, refresh=False)
            LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
            DATE_FORMAT = "%m/%d/%Y %H:%M:%S %p"

            logging.basicConfig(filename='my.log', level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT)
            logging.info(epoch,loss,accu,auc)
        
        # train里的 每 N 个step执行一次
        if (epoch+1)%3==0:
            test_accu, test_auc, test_pred_all, test_pred01_all, test_label_all, test_loss_all, test_size = get_test_result(
                model, predictor, test_dataloader, threds)
            print("------------ 执行测试集 ------------ :")
            print("test_loss={:.3f}, test_accu={:.3f}, test_auc={:.3f}".format(test_loss_all/test_size, test_accu, test_auc))
            if(best_test_auc < test_auc):
                best_test_auc = test_auc
                # model save
                torch.save({'state':model.state_dict(), 'optimizer':opt.state_dict(), 'epoch':epoch, 'size':len(train_dataloader)}, 
                          os.path.join("./models/model-demo3"+"batch_size="+str(len(train_dataloader))+"epoch="+str(epoch)+".pt"))

            # https://docs.dgl.ai/en/0.8.x/tutorials/blitz/4_link_predict.html 最终test预测到 auc=0.865 程度

4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:08<00:00,  1.72it/s, train:=-----, loss=0.847, accu=0.559, auc=0.820]


4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:09<00:00,  1.63it/s, train:=-----, loss=0.823, accu=0.627, auc=0.890]


4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:08<00:00,  1.70it/s, train:=-----, loss=0.815, accu=0.648, auc=0.905]

4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]





------------ 执行测试集 ------------ :
test_loss=0.807, test_accu=0.665, test_auc=0.773
4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:08<00:00,  1.78it/s, train:=-----, loss=0.797, accu=0.691, auc=0.918]


4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:08<00:00,  1.76it/s, train:=-----, loss=0.775, accu=0.719, auc=0.910]


4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:09<00:00,  1.61it/s, train:=-----, loss=0.759, accu=0.739, auc=0.903]

4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]





------------ 执行测试集 ------------ :
test_loss=0.750, test_accu=0.758, test_auc=0.837
4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:08<00:00,  1.67it/s, train:=-----, loss=0.735, accu=0.784, auc=0.900]


4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:08<00:00,  1.81it/s, train:=-----, loss=0.734, accu=0.783, auc=0.891]


4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:05<00:00,  2.74it/s, train:=-----, loss=0.716, accu=0.815, auc=0.887]

4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]





------------ 执行测试集 ------------ :
test_loss=0.722, test_accu=0.802, test_auc=0.864
4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]


100%|██████████| 15/15 [00:08<00:00,  1.69it/s, train:=-----, loss=0.718, accu=0.806, auc=0.884]


In [337]:
import logging
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
DATE_FORMAT = "%m/%d/%Y %H:%M:%S %p"

logging.basicConfig(filename='my.log', level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)
logging.info('aaa',exc_info=True, stack_info=True,extra={'epoch':str(epoch)})#,loss,accu,auc)

In [339]:
def get_logger(filename, verbosity=1, name=None):
    level_dict = {0: logging.DEBUG, 1: logging.INFO, 2: logging.WARNING}
    formatter = logging.Formatter(
        "[%(asctime)s][%(filename)s][line:%(lineno)d][%(levelname)s] %(message)s"
    )
    logger = logging.getLogger(name)
    logger.setLevel(level_dict[verbosity])

    # Output to file
    fh = logging.FileHandler(filename, "w")
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    # Output to terminal
    sh = logging.StreamHandler()
    sh.setFormatter(formatter)
    logger.addHandler(sh)

    return logger


In [340]:
logger = get_logger('./demo3-train.log')

logger.info('start training!')
logger.info('Epoch:[{}/{}]\t loss={:.5f}\t acc={:.3f}'.format(epoch, 10, loss, accu))
logger.info('finish training!')

[2023-03-08 20:10:22,335][1856998988.py][line:3][INFO] start training!
[2023-03-08 20:10:22,350][1856998988.py][line:4][INFO] Epoch:[9/10]	 loss=0.71763	 acc=0.806
[2023-03-08 20:10:22,351][1856998988.py][line:5][INFO] finish training!


In [295]:
# 实例化
model1 = Model(num_features, 256).to(gpu_device)
predictor1 = DotPredictor().to(gpu_device)
opt1 = torch.optim.Adam(list(model.parameters()) + list(predictor.parameters()))

model1.load_state_dict(torch.load("./models/model-demo3epoch=8.pt")['state'])

<All keys matched successfully>

In [296]:
test_accu, test_auc, test_pred_all, test_pred01_all, test_label_all, test_loss_all, test_size = get_test_result(
    model1, predictor, test_dataloader, threds)
print("------------ 执行测试集 ------------ :")
print("test_loss={:.3f}, test_accu={:.3f}, test_auc={:.3f}".format(test_loss_all/test_size, test_accu, test_auc))

4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
------------ 执行测试集 ------------ :
test_loss=0.721, test_accu=0.802, test_auc=0.863


In [314]:
# 推断 emb 模块
def inference(model, graph, infer_device, num_workers=4):
    with torch.no_grad():
        nodes = torch.arange(graph.number_of_nodes())
        # 保持跟上方一致。但也可以全采样，参考：https://docs.dgl.ai/guide/minibatch-inference.html
        sampler = dgl.dataloading.MultiLayerNeighborSampler([10,5]) 
        # dgl.dataloading.NodeDataLoader 在 0.8 以上版本已 depreciate
        train_dataloader = dgl.dataloading.DataLoader(
            graph=graph,
            indices=torch.arange(graph.number_of_nodes()),
            graph_sampler=sampler,
            batch_size=1024,
            shuffle=False,
            drop_last=False,
            num_workers=num_workers,
            device='cpu'
        )
        with train_dataloader.enable_cpu_affinity():
            result = []
            for input_nodes, output_nodes, mfgs in train_dataloader:
                mfgs = [mfg.int().to(infer_device) for mfg in mfgs]
                inputs = mfgs[0].srcdata['features']
                result.append(model(mfgs, inputs))
                mfgs = [mfg.int().to('cpu') for mfg in mfgs]
        return torch.cat(result)

In [315]:
model1.eval()
model1 = model1.cuda()

In [326]:
%%time
emb = inference(model1, graph=g, infer_device=gpu_device, num_workers=30)

30 DL workers are assigned to cpus [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29], main process will use cpus [30, 31]
CPU times: user 191 ms, sys: 1.89 s, total: 2.08 s
Wall time: 6.12 s


In [321]:
%%time
emb = inference(model1, graph=g, infer_device=gpu_device, num_workers=4)

4 DL workers are assigned to cpus [0, 1, 2, 3], main process will use cpus [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
CPU times: user 892 ms, sys: 388 ms, total: 1.28 s
Wall time: 2.99 s


In [322]:
%%time
emb = inference(model1, graph=g, infer_device=gpu_device, num_workers=8)

8 DL workers are assigned to cpus [0, 1, 2, 3, 4, 5, 6, 7], main process will use cpus [8, 9, 10, 11, 12, 13, 14, 15]
CPU times: user 703 ms, sys: 664 ms, total: 1.37 s
Wall time: 3.24 s


In [343]:
round(0.33333,3)

0.333