# Multi-Workers

In [580]:
from torch_geometric.datasets import Flickr
from torch.utils.data import DataLoader
import torch_geometric.transforms as T
import torch

transform = T.Compose([T.AddSelfLoops()])
dataset = Flickr("/mnt/nfs-ssd/raw-datasets/pyg-format/Flickr", transform=transform)
data = dataset[0]

kwargs = {'batch_size': 64, 'num_workers': 6, 'persistent_workers': True}
node_index = data.train_mask.nonzero(as_tuple=False).view(-1)
loader = DataLoader(node_index.tolist(), shuffle=True, pin_memory=True, **kwargs)
iter_loader = iter(loader)

In [None]:
torch.utils.data.get_worker_info()

# SubGraph

In [24]:
train_data = data.subgraph(data.train_mask)
train_data.num_nodes

44625

In [None]:
from torch_geometric.utils import subgraph

subg, attr = subgraph(data.train_mask, data.edge_index)
subg.size()

# EgoSampler
设置最大hop数目，可以限制时间开销随节点budget线性增长
时间开销随batch_size线性增长
(64) 100: 1.66; 200: 3.0; 400: 4.0
(32) 100: 0.7
(1) 100: 0.02

In [3]:
import sys
sys.path.append('/home/xhh/notebooks/GNN/pytorch-template/notebooks/')

('/home/xhh', 'notebooks')


In [16]:
from src.datamodules.datasets.loader import EgoGraphLoader
from src.models.components.assort_sampler import AdaptiveSampler

kwargs = {'batch_size': 32,
          'num_workers': 0,
          'persistent_workers': False,
          'pin_memory': False,
          'shuffle': True
          }

sampler = AdaptiveSampler(data, 50, max_hop=10)
ego_loader = EgoGraphLoader(data.train_mask, sampler, **kwargs)
iter_graphs = iter(ego_loader)
next(iter_graphs)

EgoDataBatch(x=[1538, 500], y=[32], p=[1538], hop=[32], ego_ptr=[32], batch=[1538], ptr=[33], batch_size=32, adj_t=[1538, 1538, nnz=3304])

## Time Analysis

In [75]:
from time import time
t = time()

runs = 5
for i in range(runs):
    batch = next(iter_graphs)

print(f'{(time() - t) / runs: .2f}s')

 0.72s


## Tensor Batch

In [594]:
from torch_sparse import SparseTensor
import copy
from src.datamodules.datasets.loader import to_sparse

nd = copy.copy(data)
row, col = nd.edge_index.cpu()
self_adj_t = SparseTensor(
                row=row, col=col,
                value=torch.arange(col.size(0)),
                sparse_sizes=(data.num_nodes, data.num_nodes)).t()

loader = DataLoader(node_index.tolist(), shuffle=True, batch_size=10)
iter_loader = iter(loader)

batch_nodes = next(iter_loader)
batch_size = batch_nodes.size(0)
batch_ptr = torch.arange(batch_nodes.size(0))

虽然mask的长度等于每一层的edge数目，但是采样的概率每个点的不同边都相同，本质上还是对点采样。

In [595]:
# hop loop
adj_t_1, v = self_adj_t.sample_adj(batch_nodes, -1, replace=False)
print(adj_t_1)

row, col, layer_e = adj_t_1.coo()

v_idx, ptrs, edge_inv = [], [], []
row = torch.cat([row, torch.Tensor([batch_size]).to(int)])
for bn in range(batch_size):
    ptr = (batch_ptr == bn).nonzero(as_tuple=True)[0]
    ptr_start, ptr_end = (row == ptr[0]).nonzero()[0], (row == ptr[-1]+1).nonzero()[0]
    idx, inv = torch.unique(col[ptr_start:ptr_end], return_inverse=True)
    edge_inv.append(ptr_start + inv)
    v_idx.append(idx)
    ptrs.append(torch.full((len(idx),), bn))
edge_inv = torch.cat(edge_inv)
v_idx = torch.cat(v_idx)
batch_idx = torch.cat(ptrs)

true_v = v[v_idx]
p_v = torch.rand(true_v.size(0))

mask = torch.zeros(true_v.size(0), dtype=torch.bool)
mask[torch.rand(true_v.size(0)) > 0.5] = 1

saved_p = p_v[mask]
saved_v = true_v[mask]
batch_ptr = batch_idx[mask]
saved_e = layer_e[mask[edge_inv]]
batch_e_ptr = batch_idx[edge_inv][mask[edge_inv]]

SparseTensor(row=tensor([0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
                           2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4,
                           4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 7, 7, 7,
                           7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 9]),
             col=tensor([ 0, 10, 11, 12, 13, 14, 15,  1, 16, 17, 18, 19, 20,  2, 21, 22, 23, 24,
                           25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,  3, 40, 41,
                           42, 43, 44, 45, 46, 47, 48,  4, 49, 50, 51, 52, 53, 54,  5, 55, 56, 57,
                           58, 59, 60, 61, 62,  6, 63, 64, 65, 66, 67, 68,  7, 69, 70, 71, 72, 73,
                           74, 75, 76, 77,  8, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87,  9, 88, 89,
                           90, 91, 92]),
             val=tensor([905388, 126264, 143733, 191227, 295451, 468778, 501083

In [657]:
adj_t_2, v = self_adj_t.sample_adj(saved_v, -1, replace=False)
print(adj_t_2)
row, col, layer_e = adj_t_2.coo()

v_idx, ptrs, edge_inv = [], [], []
row = torch.cat([row, torch.Tensor([len(saved_v)]).to(int)])
inc = 0
for bn in range(batch_size):
    ptr = (batch_ptr == bn).nonzero(as_tuple=True)[0]
    ptr_start, ptr_end = (row == ptr[0]).nonzero()[0], (row == ptr[-1]+1).nonzero()[0]
    idx, inv = torch.unique(col[ptr_start:ptr_end], return_inverse=True)
    v_idx.append(idx)
    edge_inv.append(inc + inv)
    ptrs.append(torch.full((len(idx),), bn))
    inc += len(idx)
edge_inv = torch.cat(edge_inv)
v_idx = torch.cat(v_idx)
batch_idx = torch.cat(ptrs)

true_v = v[v_idx]
p_v = torch.rand(true_v.size(0))

mask = torch.zeros(true_v.size(0), dtype=torch.bool)
mask[torch.rand(true_v.size(0)) > 0.5] = 1

saved_p2 = p_v[mask]
saved_v2 = true_v[mask]
batch_ptr2 = batch_idx[mask]
saved_e2 = layer_e[mask[edge_inv]]
batch_e_ptr2 = batch_idx[edge_inv][mask[edge_inv]]

SparseTensor(row=tensor([ 0,  0,  0,  ..., 41, 41, 41]),
             col=tensor([   0,    1,    2,  ..., 7606, 7607, 7608]),
             val=tensor([905388, 126264, 295451,  ..., 769512, 789102, 882301]),
             size=(42, 7609), nnz=7781, density=2.43%)


In [597]:
n_p = torch.cat([torch.ones(batch_size), saved_p, saved_p2])
n_id = torch.cat([batch_nodes, saved_v, saved_v2])
e_id = torch.cat([saved_e, saved_e2])
n_batch = torch.cat([torch.arange(batch_size).to(int), batch_ptr, batch_ptr2])
e_batch = torch.cat([batch_e_ptr, batch_e_ptr2])

In [606]:
from torch_geometric.data import Batch
from src.models.components.assort_sampler import EgoData
from torch_scatter import scatter
from torch_geometric.utils import sort_edge_index
edge_index = data.edge_index

egos = []
for bn in range(batch_size):
    bn_mask = n_batch == bn
    n_idx, inv_ptr = n_id[bn_mask].unique(return_inverse=True)
    p = scatter(n_p[bn_mask], inv_ptr, dim=-1, reduce='sum')

    """unique的inv是一个很优雅的local-e, 而且idx是排序过的unique节点id，和p的unique对应上"""
    sub_edge_index = edge_index[:, e_id[e_batch == bn]]
    local_e = sort_edge_index(sub_edge_index.unique(return_inverse=True)[1])

    ego_data = EgoData(data.x[n_idx], local_e, data.y[n_idx[inv_ptr[0]]], p)
    ego_data.ego_ptr = inv_ptr[0]
    egos.append(ego_data)

batch_data = Batch.from_data_list(egos)
batch_data.ego_ptr = (batch_data.ego_ptr + batch_data.ptr[:-1])
batch_data.batch_size = batch_data.ego_ptr.size(0)
batch_data

EgoDataBatch(x=[3890, 500], edge_index=[2, 3936], y=[10], p=[3890], ego_ptr=[10], batch=[3890], ptr=[11], batch_size=10)

得到完整的edge-id以及对应的batch—ptr, 使用scatter来为每个batch—node构造子图

In [527]:
batch_e_ptr.unsqueeze_(dim=0)
saved_e.unsqueeze_(dim=0)
print(batch_e_ptr, saved_e)

split = torch.full((batch_size, batch_e_ptr.size(-1)), -1, dtype=saved_e.dtype).scatter_(0, batch_e_ptr, saved_e)
split

tensor([[0, 0, 0, 0, 0, 1, 1, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 5,
         5, 5, 6, 6, 6, 6, 7, 7, 7, 7, 8, 8, 9, 9, 9, 9]]) tensor([[289655, 413177, 423719, 475812, 790157,  95388, 510110, 211813, 236211,
         557364, 202023, 267505, 376447, 388192, 436089, 728903, 799493,  48105,
         142453, 215252, 528591, 566319, 877079, 181362, 752320, 811426, 270179,
         287372, 308300, 556524,  35324,  90130, 107216, 113986, 213174, 356122,
         179921, 196460, 343852, 669161]])


In [427]:
# ego score
# 计算batch_node score
# 计算每一层u的score
# 根据batch_idx计算cos

x = torch.Tensor([[1], [2], [3]])
x.expand(3, 4)

tensor([[1., 1., 1., 1.],
        [2., 2., 2., 2.],
        [3., 3., 3., 3.]])

In [45]:
from torch.nn.utils.rnn import pack_sequence

x1 = torch.Tensor([1, 2, 3])
x2 = torch.Tensor([1, 2])
x3 = torch.Tensor([1])

ps = pack_sequence([x1, x2, x3])
ps

PackedSequence(data=tensor([1., 1., 1., 2., 2., 3.]), batch_sizes=tensor([3, 2, 1]), sorted_indices=None, unsorted_indices=None)

# Graph Loader

In [8]:
from src.datamodules.datasets.data import get_data

nd, _, _, _ = get_data('cora')
nd

Data(x=[2708, 1433], edge_index=[2, 13264], y=[2708], train_mask=[2708], val_mask=[2708], test_mask=[2708])

In [9]:
from torch_geometric.utils import k_hop_subgraph

sub = k_hop_subgraph(nd.val_mask, 2, nd.edge_index)
sub[3].sum()

tensor(11857)

In [11]:
from torch_geometric.loader import ClusterData
from src.datamodules.datasets.loader import NeighborLoader, ClusterLoader, SaintRwLoader, ShadowLoader

kwargs = {'batch_size': 512, 'shuffle': True}
train_loader = NeighborLoader(nd, input_nodes=nd.train_mask, num_neighbors=[25, 10], **kwargs)
# train_loader = ClusterLoader(ClusterData(nd, num_parts=1500, recursive=False, save_dir=dataset.processed_dir,), **kwargs)
# train_loader = SaintRwLoader(nd, batch_size=6000, walk_length=2, num_steps=5, sample_coverage=100, save_dir=dataset.processed_dir)
# train_loader = ShadowLoader(nd, depth=2, num_neighbors=10, node_idx=data.train_mask, **kwargs)
batch = next(train_loader.__iter__())
batch

Data(x=[1355, 1433], y=[140], train_mask=[1355], val_mask=[1355], test_mask=[1355], batch_size=140, adj_t=[1355, 1355, nnz=3556], ego_ptr=[140])