In [None]:
import dgl
import torch as th
import argparse
import numpy as np

# Define hyperparameters

In [None]:
ip_config = None
conf_path = 'standalone_data/ogbn-products.json'
num_epochs = 20
num_hidden = 16
num_layers = 2
batch_size = 1000
batch_size_eval = 100000
dropout = 0.5
lr = 0.001
standalone = True

In [None]:
parser = argparse.ArgumentParser(description='GCN')
parser.add_argument('--ip_config', type=str, help='The file for IP configuration')
parser.add_argument('--conf_path', type=str, help='The path to the partition config file')
parser.add_argument('--num-epochs', type=int, default=20)
parser.add_argument('--num-hidden', type=int, default=16)
parser.add_argument('--num-layers', type=int, default=2)
parser.add_argument('--batch-size', type=int, default=1000)
parser.add_argument('--batch-size-eval', type=int, default=100000)
parser.add_argument('--standalone', action='store_true')
args = parser.parse_args()

ip_config = args.ip_config
conf_path = args.conf_path
num_epochs = args.num_epochs
num_hidden = args.num_hidden
num_layers = args.num_layers
batch_size = args.batch_size
batch_size_eval = args.batch_size_eval
standalone = args.standalone

# Create DistGraph

In [None]:
g = dgl.distributed.DistGraph(ip_config, 'ogbn-products', conf_file=conf_path)
print('#nodes:', g.number_of_nodes())
print('#edges:', g.number_of_edges())

In [None]:
train_nid = dgl.distributed.node_split(g.ndata['train_mask'])
valid_nid = dgl.distributed.node_split(g.ndata['val_mask'])
test_nid = dgl.distributed.node_split(g.ndata['test_mask'])
print('train set:', len(train_nid))
print('valid set:', len(valid_nid))
print('test set:', len(test_nid))

In [None]:
labels = g.ndata['labels'][0:g.number_of_nodes()]
uniq_labels = th.unique(labels)
num_labels = len(uniq_labels)
print('#labels:', num_labels)

# Define the model

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import dgl.nn as dglnn

class SAGE(nn.Module):
    def __init__(self, in_feats, n_hidden, n_classes, n_layers):
        super().__init__()
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.layers = nn.ModuleList()
        self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
        for i in range(1, n_layers - 1):
            self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
        self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
        
    def forward(self, blocks, x):
        for l, (layer, block) in enumerate(zip(self.layers, blocks)):
            x = layer(block, x)
            if l != self.n_layers - 1:
                x = F.relu(x)
        return x

In [None]:
import torch.optim as optim

print('#features:', g.ndata['features'].shape[1])
model = SAGE(g.ndata['features'].shape[1], num_hidden, num_labels, num_layers)

loss_fcn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# Distributed sampling

In [None]:
sampler = dgl.dataloading.MultiLayerNeighborSampler([10, 25])
train_dataloader = dgl.dataloading.NodeDataLoader(
    g, train_nid, sampler,
    batch_size=1024,
    shuffle=True,
    drop_last=False,
    num_workers=0
)
valid_dataloader = dgl.dataloading.NodeDataLoader(
    g, valid_nid, sampler,
    batch_size=1024,
    shuffle=False,
    drop_last=False,
    num_workers=0
)

In [None]:
from torch.utils.data import DataLoader

class NeighborSampler(object):
    def __init__(self, g, fanouts, sample_neighbors):
        self.g = g
        self.fanouts = fanouts
        self.sample_neighbors = sample_neighbors

    def sample_blocks(self, seeds):
        seeds = th.LongTensor(np.asarray(seeds))
        blocks = []
        for fanout in self.fanouts:
            # For each seed node, sample ``fanout`` neighbors.
            frontier = self.sample_neighbors(self.g, seeds, fanout, replace=True)
            # Then we compact the frontier into a bipartite graph for message passing.
            block = dgl.to_block(frontier, seeds)
            # Obtain the seed nodes for next layer.
            seeds = block.srcdata[dgl.NID]

            blocks.insert(0, block)
        return blocks

sampler = NeighborSampler(g, [10, 25], dgl.distributed.sample_neighbors)

# Create PyTorch DataLoader for constructing blocks
dataloader = DataLoader(
        dataset=train_nid.numpy(),
        batch_size=batch_size,
        collate_fn=sampler.sample_blocks,
        shuffle=True,
        drop_last=False)

# Training loop

In [None]:
import time

start = time.time()
for epoch in range(5):
    # Loop over the dataloader to sample the computation dependency graph as a list of
    # blocks.
    start = time.time()
    losses = []
    for step, blocks in enumerate(dataloader):
        input_nodes = blocks[0].srcdata[dgl.NID]
        seeds = blocks[-1].dstdata[dgl.NID]
        
        # Load the input features as well as output labels
        batch_inputs = g.ndata['features'][input_nodes]
        batch_labels = g.ndata['labels'][seeds]

        # Compute loss and prediction
        batch_pred = model(blocks, batch_inputs)
        loss = loss_fcn(batch_pred, batch_labels)
        optimizer.zero_grad()
        loss.backward()
        losses.append(loss.detach().cpu().numpy())

        # Aggregate gradients in multiple nodes.
        if not standalone:
            for param in model.parameters():
                if param.requires_grad and param.grad is not None:
                    th.distributed.all_reduce(param.grad.data,
                                              op=th.distributed.ReduceOp.SUM)
                    param.grad.data /= dgl.distributed.get_num_client()

        optimizer.step()
    print('epoch {}: training takes {:.3f} seconds, loss={:.3f}'.format(epoch, time.time() - start, np.mean(losses)))

In [None]:
import time
import sklearn.metrics

start = time.time()
for epoch in range(num_epochs):
    # Loop over the dataloader to sample the computation dependency graph as a list of
    # blocks.
    start = time.time()
    losses = []
    for step, (input_nodes, seeds, blocks) in enumerate(train_dataloader):
        # Load the input features as well as output labels
        batch_inputs = g.ndata['features'][input_nodes]
        batch_labels = g.ndata['labels'][seeds]

        # Compute loss and prediction
        batch_pred = model(blocks, batch_inputs)
        loss = loss_fcn(batch_pred, batch_labels)
        optimizer.zero_grad()
        loss.backward()
        losses.append(loss.detach().cpu().numpy())

        # Aggregate gradients in multiple nodes.
        if not standalone:
            for param in model.parameters():
                if param.requires_grad and param.grad is not None:
                    th.distributed.all_reduce(param.grad.data,
                                              op=th.distributed.ReduceOp.SUM)
                    param.grad.data /= dgl.distributed.get_num_client()

        optimizer.step()
    print('Epoch {}: training takes {:.3f} seconds, loss={:.3f}'.format(epoch, time.time() - start, np.mean(losses)))
    
    predictions = []
    labels = []
    start = time.time()
    with th.no_grad():
        for step, (input_nodes, seeds, blocks) in enumerate(valid_dataloader):
            inputs = g.ndata['features'][input_nodes]
            labels.append(g.ndata['labels'][seeds].numpy())
            predictions.append(model(blocks, inputs).argmax(1).numpy())
        predictions = np.concatenate(predictions)
        labels = np.concatenate(labels)
        accuracy = sklearn.metrics.accuracy_score(labels, predictions)
        print('Epoch {}: validation takes {:.3f} seconds, Validation Accuracy {}'.format(epoch, time.time() - start, accuracy))

# Inference

In [None]:
nodes = dgl.distributed.node_split(np.ones(g.number_of_nodes()),
                                   g.get_partition_book(), force_even=True)
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), num_hidden), th.float32)

sampler = dgl.dataloading.MultiLayerNeighborSampler([None])
test_dataloader = dgl.dataloading.NodeDataLoader(
    g, nodes, sampler,
    batch_size=10000,
    shuffle=False,
    drop_last=False,
    num_workers=0
)

start = time.time()
x = g.ndata['features']
for l, layer in enumerate(model.layers):
    if l == len(model.layers) - 1:
        y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), num_labels), th.float32)
    for input_nodes, seeds, blocks in test_dataloader:
        block = blocks[0]
        h = x[input_nodes]
        with th.no_grad():
            h = layer(block, h)
            if l != len(model.layers) - 1:
                h = F.relu(h)
            y[seeds] = h
    x = y
    g.barrier()

predictions = y[test_nid].argmax(1).numpy()
labels = g.ndata['labels'][test_nid]
accuracy = sklearn.metrics.accuracy_score(labels, predictions)
print('Test takes {:.3f} seconds, acc={:.3f}'.format(time.time() - start, accuracy))

In [None]:
sampler = dgl.dataloading.MultiLayerNeighborSampler([10, 25])
test_dataloader = dgl.dataloading.NodeDataLoader(
    g, test_nid, sampler,
    batch_size=1024,
    shuffle=False,
    drop_last=False,
    num_workers=0
)

start = time.time()
predictions = []
labels = []
with th.no_grad():
    for step, (input_nodes, seeds, blocks) in enumerate(test_dataloader):
        inputs = g.ndata['features'][input_nodes]
        labels.append(g.ndata['labels'][seeds].numpy())
        predictions.append(model(blocks, inputs).argmax(1).numpy())
    predictions = np.concatenate(predictions)
    labels = np.concatenate(labels)
    accuracy = sklearn.metrics.accuracy_score(labels, predictions)
    print('Epoch {} Test Accuracy {}'.format(epoch, accuracy))
print('Test takes {:.3f} seconds'.format(time.time() - start))