## Introduction

In real world, there exists many huge graphs that can not be loaded in one machine, 
such as social networks and citation networks.

To deal with such graphs, PGL develops a Distributed Graph Engine Framework to 
support graph sampling on large scale graph networks for distributed GNN training.

In this tutorial, we will walk through the steps of performing distributed Graph Engine for graph sampling. 

We also develop a launch script for launch a distributed Graph Engine. To see more examples of distributed GNN training, please refer to [here](https://github.com/PaddlePaddle/PGL/tree/main/examples).


## Requirements

paddlepaddle>=2.1.0

pgl>=2.1.4

## example of how to start a distributed graph engine service

Supose we have a following graph that has two type of nodes (u and t).

Firstly, We should create a configuration file and specify the ip address of each machine. 
Here we use two ports to simulate two machines.

After creating the configuration file and ip adress file, we can now start two graph servers.

Then we can use the client to sample neighbors or sample nodes from graph servers.

In [1]:
import os
import sys
import re
import time
import tqdm
import argparse
import unittest
import shutil
import numpy as np

from pgl.utils.logger import log

from pgl.distributed import DistGraphClient, DistGraphServer


  "The Distributed Graph Engine is experimental, we will officially release it soon"


In [2]:
edges_file = """37	45	0.34
37	145	0.31
37	112	0.21
96	48	1.4
96	247	0.31
96	111	1.21
59	45	0.34
59	145	0.31
59	122	0.21
97	48	0.34
98	247	0.31
7	222	0.91
7	234	0.09
37	333	0.21
47	211	0.21
47	113	0.21
47	191	0.21
34	131	0.21
34	121	0.21
39	131	0.21"""

node_file = """u	98
u	97
u	96
u	7
u	59
t	48
u	47
t	45
u	39
u	37
u	34
t	333
t	247
t	234
t	222
t	211
t	191
t	145
t	131
t	122
t	121
t	113
t	112
t	111"""


tmp_path = "./tmp_distgraph_test"
if not os.path.exists(tmp_path):
    os.makedirs(tmp_path)

with open(os.path.join(tmp_path, "edges.txt"), 'w') as f:
    f.write(edges_file)

with open(os.path.join(tmp_path, "node_types.txt"), 'w') as f:
    f.write(node_file)

In [3]:
# configuration file
config = """
etype2files: "u2e2t:./tmp_distgraph_test/edges.txt"
symmetry: True

ntype2files: "u:./tmp_distgraph_test/node_types.txt,t:./tmp_distgraph_test/node_types.txt"

"""

ip_addr = """127.0.0.1:8342
127.0.0.1:8343"""


with open(os.path.join(tmp_path, "config.yaml"), 'w') as f:
    f.write(config)
    
with open(os.path.join(tmp_path, "ip_addr.txt"), 'w') as f:
    f.write(ip_addr)

In [4]:

config = os.path.join(tmp_path, "config.yaml")

ip_addr = os.path.join(tmp_path, "ip_addr.txt")
shard_num = 10
gserver1 = DistGraphServer(config, shard_num, ip_addr, server_id=0)
gserver2 = DistGraphServer(config, shard_num, ip_addr, server_id=1)



In [5]:
client1 = DistGraphClient(config, shard_num=shard_num, ip_config=ip_addr, client_id=0)

client1.load_edges()
client1.load_node_types()
print("data loading finished")

[INFO] 2021-06-18 18:56:30,655 [dist_graph.py:  200]:	load edges of type u2e2t from ./tmp_distgraph_test/edges.txt
[INFO] 2021-06-18 18:56:30,658 [dist_graph.py:  210]:	load nodes of type u from ./tmp_distgraph_test/node_types.txt
[INFO] 2021-06-18 18:56:30,658 [dist_graph.py:  210]:	load nodes of type t from ./tmp_distgraph_test/node_types.txt


data loading finished


In [6]:
# random sample nodes by node type
client1.random_sample_nodes(node_type="u", size=3)

[34, 59, 98]

In [15]:
# traverse all nodes from each server
node_generator = client1.node_batch_iter(batch_size=3, node_type="t", shuffle=True)
for nodes in node_generator:
    print(nodes)

[333, 111, 121]
[234, 113, 191]
[131, 122, 222]
[211, 112]
[45, 145, 247]
[48]


In [11]:
# sample neighbors
# note that the edge_type "u2eut" is defined in config.yaml file
nodes = [98, 7]
neighs = client1.sample_successor(nodes, max_degree=10, edge_type="u2e2t")
print(neighs)

[[247], [222, 234]]
