Skip to content

Commit

Permalink
feat(learning): Integrate GLTorch into GraphScope (#3230)
Browse files Browse the repository at this point in the history
This PR integrates
[GLTorch](https://github.com/alibaba/graphlearn-for-pytorch) into
GraphScope, allowing training GNNs using GLTorch in the local mode of
GraphScope.

---------

Co-authored-by: Hongyi ZHANG <50618951+Zhanghyi@users.noreply.github.com>
Co-authored-by: Jia Zhibin <56682441+Jia-zb@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 25, 2023
1 parent 7314f8c commit 3374342
Show file tree
Hide file tree
Showing 21 changed files with 906 additions and 20 deletions.
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
[submodule "flex/grin"]
path = flex/grin
url = https://github.com/GraphScope/GRIN.git

[submodule "learning_engine/graphlearn-for-pytorch"]
path = learning_engine/graphlearn-for-pytorch
url = https://github.com/alibaba/graphlearn-for-pytorch.git
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ NETWORKX ?= ON
# testing build option
BUILD_TEST ?= OFF

# whether to build graphlearn-torch extension (graphlearn is built by default)
WITH_GLTORCH ?= ON

# INSTALL_PREFIX is environment variable, but if it is not set, then set default value
ifeq ($(INSTALL_PREFIX),)
Expand Down Expand Up @@ -76,6 +78,9 @@ client: learning
python3 -m pip install -r requirements.txt -r requirements-dev.txt --user && \
export PATH=$(PATH):$(HOME)/.local/bin && \
python3 setup.py build_ext --inplace --user && \
if [ $(WITH_GLTORCH) = ON ]; then \
python3 setup.py build_gltorch_ext --inplace --user; \
fi && \
python3 -m pip install --user --no-build-isolation --editable $(CLIENT_DIR) && \
rm -rf $(CLIENT_DIR)/*.egg-info

Expand Down
8 changes: 6 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,14 @@ def _match_frontend_endpoint(pattern, lines):
def CreateLearningInstance(self, request, context):
object_id = request.object_id
logger.info("Create learning instance with object id %ld", object_id)
handle, config = request.handle, request.config
handle, config, learning_backend = (
request.handle,
request.config,
request.learning_backend,
)
try:
endpoints = self._launcher.create_learning_instance(
object_id, handle, config
object_id, handle, config, learning_backend
)
self._object_manager.put(object_id, LearningInstanceManager(object_id))
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ def _distribute_learning_process(
self._learning_instance_processes[object_id] = []
for pod_index, pod in enumerate(self._pod_name_list):
container = LEARNING_CONTAINER_NAME
sub_cmd = f"python3 -m gscoordinator.learning {handle} {config} {pod_index}"
sub_cmd = f"python3 -m gscoordinator.launch_graphlearn {handle} {config} {pod_index}"
cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
logger.debug("launching learning server: %s", " ".join(cmd))
proc = subprocess.Popen(
Expand Down Expand Up @@ -1321,7 +1321,7 @@ def _distribute_learning_process(
self._api_client, object_id, pod_host_ip_list
)

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
pod_name_list, _, pod_host_ip_list = self._allocate_learning_engine(object_id)
if not pod_name_list or not pod_host_ip_list:
raise RuntimeError("Failed to allocate learning engine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def launch_server(handle, config, server_index):

if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: ./learning.py <handle> <config> <server_index>", file=sys.stderr)
print(
"Usage: ./launch_graphlearn.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
Expand Down
93 changes: 93 additions & 0 deletions coordinator/gscoordinator/launch_graphlearn_torch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import json
import logging
import os.path as osp
import sys

import graphscope.learning.graphlearn_torch as glt
import torch
from graphscope.learning.gl_torch_graph import GLTorchGraph

logger = logging.getLogger("graphscope")


def decode_arg(arg):
if isinstance(arg, dict):
return arg
return json.loads(
base64.b64decode(arg.encode("utf-8", errors="ignore")).decode(
"utf-8", errors="ignore"
)
)


def run_server_proc(proc_rank, handle, config, server_rank, dataset):
glt.distributed.init_server(
num_servers=handle["num_servers"],
server_rank=server_rank,
dataset=dataset,
master_addr=handle["master_addr"],
master_port=handle["server_client_master_port"],
num_rpc_threads=16,
is_dynamic=True,
)
logger.info(f"-- [Server {server_rank}] Waiting for exit ...")
glt.distributed.wait_and_shutdown_server()
logger.info(f"-- [Server {server_rank}] Exited ...")


def launch_graphlearn_torch_server(handle, config, server_rank):
logger.info(f"-- [Server {server_rank}] Initializing server ...")

edge_dir = config.pop("edge_dir")
random_node_split = config.pop("random_node_split")
dataset = glt.distributed.DistDataset(edge_dir=edge_dir)
dataset.load_vineyard(
vineyard_id=str(handle["vineyard_id"]),
vineyard_socket=handle["vineyard_socket"],
**config,
)
if random_node_split is not None:
dataset.random_node_split(**random_node_split)
logger.info(f"-- [Server {server_rank}] Initializing server ...")

torch.multiprocessing.spawn(
fn=run_server_proc, args=(handle, config, server_rank, dataset), nprocs=1
)


if __name__ == "__main__":
if len(sys.argv) < 3:
logger.info(
"Usage: ./launch_graphlearn_torch.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
config = decode_arg(sys.argv[2])
server_index = int(sys.argv[3])
config = GLTorchGraph.reverse_transform_config(config)

logger.info(
f"launch_graphlearn_torch_server handle: {handle} config: {config} server_index: {server_index}"
)
launch_graphlearn_torch_server(handle, config, server_index)
4 changes: 3 additions & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def create_interactive_instance(
pass

@abstractmethod
def create_learning_instance(self, object_id: int, handle: str, config: str):
def create_learning_instance(
self, object_id: int, handle: str, config: str, learning_backend: int
):
pass

@abstractmethod
Expand Down
79 changes: 76 additions & 3 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from graphscope.framework.utils import get_java_version
from graphscope.framework.utils import get_tempdir
from graphscope.framework.utils import is_free_port
from graphscope.proto import message_pb2
from graphscope.proto import types_pb2

from gscoordinator.launcher import AbstractLauncher
Expand Down Expand Up @@ -245,7 +246,19 @@ def _popen_helper(cmd, cwd, env, stdout=None, stderr=None):
)
return process

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
return self._create_graphlearn_instance(
object_id=object_id, handle=handle, config=config
)
elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
return self._create_graphlearn_torch_instance(
object_id=object_id, handle=handle, config=config
)
else:
raise ValueError("invalid learning backend")

def _create_graphlearn_instance(self, object_id, handle, config):
# prepare argument
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
Expand Down Expand Up @@ -275,12 +288,12 @@ def create_learning_instance(self, object_id, handle, config):
cmd = [
sys.executable,
"-m",
"gscoordinator.learning",
"gscoordinator.launch_graphlearn",
handle,
config,
str(index),
]
logger.debug("launching learning server: %s", " ".join(cmd))
logger.debug("launching graphlearn server: %s", " ".join(cmd))

proc = self._popen_helper(cmd, cwd=None, env=env)
stdout_watcher = PipeWatcher(proc.stdout, sys.stdout)
Expand All @@ -289,6 +302,66 @@ def create_learning_instance(self, object_id, handle, config):
self._learning_instance_processes[object_id].append(proc)
return server_list

def _create_graphlearn_torch_instance(self, object_id, handle, config):
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
"utf-8", errors="ignore"
)
)

server_client_master_port = get_free_port("localhost")
handle["server_client_master_port"] = server_client_master_port

server_list = [f"localhost:{server_client_master_port}"]
# for train, val and test
for _ in range(3):
server_list.append("localhost:" + str(get_free_port("localhost")))

handle = base64.b64encode(
json.dumps(handle).encode("utf-8", errors="ignore")
).decode("utf-8", errors="ignore")

# launch the server
env = os.environ.copy()
# set coordinator dir to PYTHONPATH
python_path = (
env.get("PYTHONPATH", "")
+ os.pathsep
+ os.path.dirname(os.path.dirname(__file__))
)
env["PYTHONPATH"] = python_path

self._learning_instance_processes[object_id] = []
for index in range(self._num_workers):
cmd = [
sys.executable,
"-m",
"gscoordinator.launch_graphlearn_torch",
handle,
config,
str(index),
]
logger.debug("launching graphlearn_torch server: %s", " ".join(str(cmd)))

proc = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
errors="replace",
universal_newlines=True,
bufsize=1,
)
stdout_watcher = PipeWatcher(
proc.stdout,
sys.stdout,
suppressed=(not logger.isEnabledFor(logging.DEBUG)),
)
setattr(proc, "stdout_watcher", stdout_watcher)
self._learning_instance_processes[object_id].append(proc)
return server_list

def close_analytical_instance(self):
self._stop_subprocess(self._analytical_engine_process, kill=True)
self._analytical_engine_endpoint = None
Expand Down
29 changes: 26 additions & 3 deletions docs/learning_engine/guide_and_examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ tutorial_node_classification_k8s
This section contains a guide for the learning engine and a number of examples.

```{tip}
We assume you has read the [getting_started](getting_started.md) section and know how to launch a GraphScope session.
We assume you has read the [getting_started](getting_started.md) section and
know how to launch a GraphScope session.
```

We present an end-to-end example, demonstrating how GLE trains a node classification model on a citation network using the local mode of GraphScope.
We present an end-to-end example, demonstrating how GLE trains a node
classification model on a citation network using the local mode of GraphScope.

````{panels}
:header: text-center
Expand All @@ -31,7 +33,11 @@ We present an end-to-end example, demonstrating how GLE trains a node classifica
Training a Node Classification Model on Your Local Machine.
````

GraphScope is designed for processing large graphs, which are usually hard to fit in the memory of a single machine. With vineyard as the distributed in-memory data manager, GraphScope supports run on a cluster managed by Kubernetes(k8s). Next, we revisit the example we present in the first tutorial, showing how GraphScope process the node classification task on a Kubernetes cluster.
GraphScope is designed for processing large graphs, which are usually hard to
fit in the memory of a single machine. With vineyard as the distributed
in-memory data manager, GraphScope supports run on a cluster managed by
Kubernetes(k8s). Next, we revisit the example we present in the first tutorial,
showing how GraphScope process the node classification task on a Kubernetes cluster.


````{panels}
Expand All @@ -45,3 +51,20 @@ GraphScope is designed for processing large graphs, which are usually hard to fi
^^^^^^^^^^^^^^
Training a Node Classification Model on K8s Cluster
````


GraphScope is also compatible with PyG models, the following examples shows
ho2 to train a PyG model using GraphScope on your local machine.


````{panels}
:header: text-center
:column: col-lg-12 p-2
```{link-button} tutorial_node_classification_pyg_local.html
:text: Tutorial
:classes: btn-block stretched-link
```
^^^^^^^^^^^^^^
Training a Node Classification Model(PyG) on Your Local Machine
````
Loading

0 comments on commit 3374342

Please sign in to comment.