In [1]:
import nvtx
import torch.multiprocessing
import torch.nn.functional as F
import torchmetrics.functional as MF
import torch.distributed as dist
import gc, os
import argparse
import torch
import dgl
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.multiprocessing import spawn
from dgl.utils import pin_memory_inplace, gather_pinned_tensor_rows
from torch import Tensor
from dgl.heterograph import DGLBlock
from dgl.utils import pin_memory_inplace
from libra.util import load_dgl_graph, load_idx_split
from libra.timer import Timer
from libra.count_profiler import CountProfiler

In [2]:
# def load_partition_map(in_dir:str, is_pruned: bool, graph_name:str, world_size:int, mode:str, bal:str, is32: bool):
#     assert(mode in ["edge", "node", "samp"])
#     assert(bal in ["xbal", "bal"])
#     prefix = "pruned_" if is_pruned else ""
#     partition_map = torch.load(os.path.join(in_dir, f"{prefix}{graph_name}_w{world_size}_{mode}_{bal}.pt"))
#     if is32:
#         return partition_map.type(torch.int32)
#     else:
#         return partition_map.type(torch.int64)

# device = 0
# cur_dir = os.getcwd()
# cnt_dir = os.path.join(cur_dir, "cnt")
# graph_name = "com-friendster"
# data_dir = "/data/juelin/dataset/SNAP/"
# is_pruned = True
# prefix = "pruned_" if is_pruned else ""
    
# in_dir = os.path.join(data_dir, graph_name)
# train_idx, test_idx, valid_idx = load_idx_split(in_dir, is32=True)
# train_idx = train_idx.to(device)

# graph = load_dgl_graph(in_dir, is32=True, wsloop=True)
# v_num = graph.num_nodes()
# e_num = graph.num_edges()
# graph.create_formats_()
# graph.pin_memory_()
# print(f"{graph_name=} {v_num=} {e_num=}")

In [3]:
# pid_dir = os.path.join(cur_dir, "partition_ids")
# mode = "samp"
# partition_map_bal = load_partition_map(in_dir=pid_dir, is_pruned=is_pruned, graph_name=graph_name, world_size=4, mode=mode, bal="bal", is32=True).to(device)
# partition_map_xbal = load_partition_map(in_dir=pid_dir, is_pruned=is_pruned, graph_name=graph_name, world_size=4, mode=mode, bal="xbal", is32=True).to(device)
# partition_map_rand = torch.randint(0, 4, (v_num,), dtype = torch.int32).to(device)

In [4]:
# profiler = CountProfiler(partition_map_bal, partition_map_xbal, partition_map_rand)
# timer = Timer()
# epoch_num = 1
# num_edges = 0
# step = 0
# print(f"start sampling on graph {graph_name}")
# graph_sampler = dgl.dataloading.NeighborSampler(fanouts=[20, 20, 20])
# dataloader = dgl.dataloading.DataLoader(
#     graph=graph,               # The graph
#     indices=train_idx,         # The node IDs to iterate over in minibatches
#     graph_sampler=graph_sampler,     # The neighbor sampler
#     device=device,      # Put the sampled MFGs on CPU or GPU
#     use_ddp=False, # enable ddp if using mutiple gpus
#     # The following arguments are inherited from PyTorch DataLoader.
#     batch_size=1024,    # Batch size
#     shuffle=True,       # Whether to shuffle the nodes for every epoch
#     drop_last=False,    # Whether to drop the last incomplete batch
#     use_uva=graph.is_pinned(),
#     num_workers=0,
# )

# for epoch in range(epoch_num):
#     for input_nodes, output_nodes, blocks in dataloader:
#         step += 1
#         for layer, block in enumerate(blocks):
#             num_edges += block.num_edges()

#             src, dst = block.all_edges()
#             src = input_nodes[src]
#             dst = input_nodes[dst]
#             profiler.add_edges(src, dst)

#             unique_src = block.srcnodes()
#             unique_src = input_nodes[unique_src]
#             profiler.add_src(unique_src)

#             unique_dst = block.dstnodes()
#             unique_dst = input_nodes[unique_dst]
#             profiler.add_dst(unique_dst)

#         if step % 100 == 0:
#             print(f"{step=} {num_edges=}")
# print(f"duration = {timer.duration()} s")
# profiler.save(path=os.path.join(cnt_dir, f"{prefix}{graph_name}_{mode}.pt"))


In [5]:
def load_partition_map(in_dir:str, is_pruned: bool, graph_name:str, world_size:int, mode:str, bal:str, is32: bool):
    assert(mode in ["edge", "node", "samp"])
    assert(bal in ["xbal", "bal"])
    prefix = "pruned_" if is_pruned else ""
    partition_map = torch.load(os.path.join(in_dir, f"{prefix}{graph_name}_w{world_size}_{mode}_{bal}.pt"))
    if is32:
        return partition_map.type(torch.int32)
    else:
        return partition_map.type(torch.int64)

def prof(graph_name: str, data_dir: str, is_pruned: bool):
    device = 0
    cur_dir = os.getcwd()
    cnt_dir = os.path.join(cur_dir, "cnt")
    prefix = "pruned_" if is_pruned else ""

    in_dir = os.path.join(data_dir, graph_name)
    train_idx, test_idx, valid_idx = load_idx_split(in_dir, is32=True)
    train_idx = train_idx.to(device)

    graph = load_dgl_graph(in_dir, is32=True, wsloop=True)
    v_num = graph.num_nodes()
    e_num = graph.num_edges()
    graph.create_formats_()
    graph.pin_memory_()
    print(f"{graph_name=} {v_num=} {e_num=}")
    pid_dir = os.path.join(cur_dir, "partition_ids")
    partition_map_rand = torch.randint(0, 4, (v_num,), dtype = torch.int32).to(device)
    for mode in ["samp", "node", "edge"]:
        partition_map_bal = load_partition_map(in_dir=pid_dir, is_pruned=is_pruned, graph_name=graph_name, world_size=4, mode=mode, bal="bal", is32=True).to(device)
        partition_map_xbal = load_partition_map(in_dir=pid_dir, is_pruned=is_pruned, graph_name=graph_name, world_size=4, mode=mode, bal="xbal", is32=True).to(device)
        profiler = CountProfiler(partition_map_bal, partition_map_xbal, partition_map_rand)
        timer = Timer()
        epoch_num = 1
        num_edges = 0
        step = 0
        print(f"start sampling on graph {graph_name}")
        graph_sampler = dgl.dataloading.NeighborSampler(fanouts=[20, 20, 20])
        dataloader = dgl.dataloading.DataLoader(
            graph=graph,               # The graph
            indices=train_idx,         # The node IDs to iterate over in minibatches
            graph_sampler=graph_sampler,     # The neighbor sampler
            device=device,      # Put the sampled MFGs on CPU or GPU
            use_ddp=False, # enable ddp if using mutiple gpus
            # The following arguments are inherited from PyTorch DataLoader.
            batch_size=1024,    # Batch size
            shuffle=True,       # Whether to shuffle the nodes for every epoch
            drop_last=False,    # Whether to drop the last incomplete batch
            use_uva=graph.is_pinned(),
            num_workers=0,
        )

        for epoch in range(epoch_num):
            for input_nodes, output_nodes, blocks in dataloader:
                step += 1
                for layer, block in enumerate(blocks):
                    num_edges += block.num_edges()

                    src, dst = block.all_edges()
                    src = input_nodes[src]
                    dst = input_nodes[dst]
                    profiler.add_edges(src, dst)

                    unique_src = block.srcnodes()
                    unique_src = input_nodes[unique_src]
                    profiler.add_src(unique_src)

                    unique_dst = block.dstnodes()
                    unique_dst = input_nodes[unique_dst]
                    profiler.add_dst(unique_dst)

                if step % 100 == 0:
                    print(f"{step=} {num_edges=}")
        print(f"duration = {timer.duration()} s")
        profiler.save(path=os.path.join(cnt_dir, f"{prefix}{graph_name}_{mode}.pt"))

In [6]:
dataset = {
    "ogbn-products":   "/data/juelin/dataset/OGBN/processed",
    "ogbn-papers100M": "/data/juelin/dataset/OGBN/processed",
    "com-orkut": "/data/juelin/dataset/SNAP/",
    "com-friendster": "/data/juelin/dataset/SNAP/"
}

for graph_name, data_dir in dataset.items():
    for is_pruned in [True, False]:
        prof(graph_name, data_dir, is_pruned)
        prof(graph_name, data_dir, is_pruned)

graph_name='ogbn-products' v_num=2449029 e_num=126167053
start sampling on graph ogbn-products
step=100 num_edges=539207102
duration = 11.111 s
start sampling on graph ogbn-products
step=100 num_edges=539009834
duration = 11.06 s
start sampling on graph ogbn-products
step=100 num_edges=539079289
duration = 11.103 s
graph_name='ogbn-products' v_num=2449029 e_num=126167053
start sampling on graph ogbn-products
step=100 num_edges=538964659
duration = 11.13 s
start sampling on graph ogbn-products
step=100 num_edges=538629614
duration = 11.164 s
start sampling on graph ogbn-products
step=100 num_edges=539014129
duration = 11.159 s
graph_name='ogbn-products' v_num=2449029 e_num=126167053
start sampling on graph ogbn-products
step=100 num_edges=538901004
duration = 11.63 s
start sampling on graph ogbn-products
step=100 num_edges=538914203
duration = 11.363 s
start sampling on graph ogbn-products
step=100 num_edges=538867874
duration = 11.336 s
graph_name='ogbn-products' v_num=2449029 e_num=12