Skip to content

Commit

Permalink
Enable to connect to the existed ETCD cluster in GraphScope (#1359)
Browse files Browse the repository at this point in the history
Co-authored-by: siyuan0322 <siyuan0322@gmail.com>
  • Loading branch information
wuyueandrew and siyuan0322 committed Mar 11, 2022
1 parent 5ea7c0d commit 2c56d49
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 29 deletions.
51 changes: 31 additions & 20 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import subprocess
import sys
import time
import uuid
import traceback

try:
from kubernetes import client as kube_client
Expand Down Expand Up @@ -171,6 +171,7 @@ def __init__(
dataset_image=None,
coordinator_name=None,
coordinator_service_name=None,
etcd_addrs=None,
etcd_num_pods=None,
etcd_cpu=None,
etcd_mem=None,
Expand Down Expand Up @@ -210,6 +211,7 @@ def __init__(

# random for multiple k8s cluster in the same namespace
self._engine_name = self._engine_name_prefix + self._saved_locals["instance_id"]
self._etcd_addrs = etcd_addrs
self._etcd_name = self._etcd_name_prefix + self._saved_locals["instance_id"]
self._etcd_service_name = (
self._etcd_service_name_prefix + self._saved_locals["instance_id"]
Expand Down Expand Up @@ -497,18 +499,14 @@ def _create_mars_scheduler(self):
if not self._exists_vineyard_daemonset(
self._saved_locals["vineyard_daemonset"]
):
port = self._random_etcd_listen_client_service_port
etcd_endpoints = ["http://%s:%s" % (self._etcd_service_name, port)]
for i in range(self._etcd_num_pods):
etcd_endpoints.append("http://%s-%d:%s" % (self._etcd_name, i, port))
scheduler_builder.add_vineyard_container(
name=self._vineyard_container_name,
image=self._saved_locals["gs_image"],
cpu=self._saved_locals["vineyard_cpu"],
mem=self._saved_locals["vineyard_mem"],
shared_mem=self._saved_locals["vineyard_shared_mem"],
preemptive=self._saved_locals["preemptive"],
etcd_endpoints=etcd_endpoints,
etcd_endpoints=self._get_etcd_endpoints(),
port=self._vineyard_service_port,
)

Expand Down Expand Up @@ -634,18 +632,14 @@ def _create_engine_replicaset(self):
if not self._exists_vineyard_daemonset(
self._saved_locals["vineyard_daemonset"]
):
port = self._random_etcd_listen_client_service_port
etcd_endpoints = ["http://%s:%s" % (self._etcd_service_name, port)]
for i in range(self._etcd_num_pods):
etcd_endpoints.append("http://%s-%d:%s" % (self._etcd_name, i, port))
engine_builder.add_vineyard_container(
name=self._vineyard_container_name,
image=self._saved_locals["gs_image"],
cpu=self._saved_locals["vineyard_cpu"],
mem=self._saved_locals["vineyard_mem"],
shared_mem=self._saved_locals["vineyard_shared_mem"],
preemptive=self._saved_locals["preemptive"],
etcd_endpoints=etcd_endpoints,
etcd_endpoints=self._get_etcd_endpoints(),
port=self._vineyard_service_port,
)

Expand Down Expand Up @@ -862,10 +856,7 @@ def _create_interactive_engine_service(self):
zetcd_exec = shutil.which("zetcd")
if not zetcd_exec:
raise RuntimeError("zetcd command not found.")
port = self._random_etcd_listen_client_service_port
etcd_endpoints = ["http://%s:%s" % (self._etcd_service_name, port)]
for i in range(self._etcd_num_pods):
etcd_endpoints.append("http://%s-%d:%s" % (self._etcd_name, i, port))
etcd_endpoints = self._get_etcd_endpoints()
cmd = [
zetcd_exec,
"--zkaddr",
Expand Down Expand Up @@ -908,10 +899,29 @@ def _create_interactive_engine_service(self):
)
)

def _config_etcd_endpoint(self):
if self._etcd_addrs is None:
self._create_etcd()
self._etcd_endpoint = self._get_etcd_service_endpoint()
logger.info("Etcd created, endpoint is %s", self._etcd_endpoint)
else:
self._etcd_endpoint = self._etcd_addrs
logger.info("External Etcd endpoint is %s", self._etcd_endpoint)

def _get_etcd_endpoints(self):
etcd_addrs = []
if self._etcd_addrs is None:
port = self._random_etcd_listen_client_service_port
etcd_addrs.append("%s:%s" % (self._etcd_service_name, port))
for i in range(self._etcd_num_pods):
etcd_addrs.append("%s-%d:%s" % (self._etcd_name, i, port))
else:
etcd_addrs = self._etcd_addrs.split(",")
etcd_endpoints = ["http://%s" % i for i in etcd_addrs if i]
return etcd_endpoints

def _create_services(self):
self._create_etcd()
self._etcd_endpoint = self._get_etcd_service_endpoint()
logger.info("Etcd is ready, endpoint is {}".format(self._etcd_endpoint))
self._config_etcd_endpoint()

# create interactive engine service
logger.info("Creating interactive engine service...")
Expand Down Expand Up @@ -1173,8 +1183,9 @@ def start(self):
except Exception as e:
time.sleep(1)
logger.error(
"Error when launching GraphScope on kubernetes cluster: %s",
str(e),
"Error when launching GraphScope on kubernetes cluster: %s, with traceback: %s",
repr(e),
traceback.format_exc(),
)
self.stop()
return False
Expand Down
8 changes: 8 additions & 0 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,12 @@ def parse_sys_args():
default="256Mi",
help="Memory of engine container, suffix with ['Mi', 'Gi', 'Ti'].",
)
parser.add_argument(
"--etcd_addrs",
type=str,
default=None,
help="The addr of external etcd cluster, with formats like 'etcd01:port,etcd02:port,etcd03:port' ",
)
parser.add_argument(
"--k8s_etcd_num_pods",
type=int,
Expand Down Expand Up @@ -1470,6 +1476,7 @@ def launch_graphscope():
dataset_image=args.k8s_dataset_image,
coordinator_name=args.k8s_coordinator_name,
coordinator_service_name=args.k8s_coordinator_service_name,
etcd_addrs=args.etcd_addrs,
etcd_num_pods=args.k8s_etcd_num_pods,
etcd_cpu=args.k8s_etcd_cpu,
etcd_mem=args.k8s_etcd_mem,
Expand Down Expand Up @@ -1500,6 +1507,7 @@ def launch_graphscope():
launcher = LocalLauncher(
num_workers=args.num_workers,
hosts=args.hosts,
etcd_addrs=args.etcd_addrs,
vineyard_socket=args.vineyard_socket,
shared_mem=args.vineyard_shared_mem,
log_level=args.log_level,
Expand Down
23 changes: 14 additions & 9 deletions coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(
self,
num_workers,
hosts,
etcd_addrs,
vineyard_socket,
shared_mem,
log_level,
Expand All @@ -130,6 +131,7 @@ def __init__(
super().__init__()
self._num_workers = num_workers
self._hosts = hosts
self._etcd_addrs = etcd_addrs
self._vineyard_socket = vineyard_socket
self._shared_mem = shared_mem
self._glog_level = parse_as_glog_level(log_level)
Expand Down Expand Up @@ -310,10 +312,18 @@ def _find_etcd(self):
etcd = [etcd]
return etcd

def _config_etcd(self):
if self._etcd_addrs is None:
self._launch_etcd()
else:
self._etcd_endpoint = "http://" + self._etcd_addrs
logger.info("External Etcd endpoint is %s", self._etcd_endpoint)

def _launch_etcd(self):
etcd_exec = self._find_etcd()
self._etcd_peer_port = 2380 if is_free_port(2380) else get_free_port()
self._etcd_client_port = 2379 if is_free_port(2379) else get_free_port()
self._etcd_endpoint = "http://127.0.0.1:{0}".format(str(self._etcd_client_port))

env = os.environ.copy()
env.update({"ETCD_MAX_TXN_OPS": "102400"})
Expand All @@ -326,7 +336,7 @@ def _launch_etcd(self):
"--listen-client-urls",
"http://0.0.0.0:{0}".format(str(self._etcd_client_port)),
"--advertise-client-urls",
"http://127.0.0.1:{0}".format(str(self._etcd_client_port)),
self._etcd_endpoint,
"--initial-cluster",
"default=http://127.0.0.1:{0}".format(str(self._etcd_peer_port)),
"--initial-advertise-peer-urls",
Expand Down Expand Up @@ -375,7 +385,7 @@ def _launch_zetcd(self):
"--zkaddr",
"0.0.0.0:{}".format(self._zookeeper_port),
"--endpoints",
"localhost:{0}".format(self._etcd_client_port),
self._etcd_endpoint,
]

process = subprocess.Popen(
Expand Down Expand Up @@ -426,12 +436,7 @@ def _create_vineyard(self):
cmd = self._find_vineyardd()
cmd.extend(["--socket", vineyard_socket])
cmd.extend(["--size", self._shared_mem])
cmd.extend(
[
"-etcd_endpoint",
"http://localhost:{0}".format(self._etcd_client_port),
]
)
cmd.extend(["-etcd_endpoint", self._etcd_endpoint])
cmd.extend(["-etcd_prefix", f"vineyard.gsa.{ts}"])
env = os.environ.copy()
env["GLOG_v"] = str(self._glog_level)
Expand Down Expand Up @@ -477,7 +482,7 @@ def _create_vineyard(self):

def _create_services(self):
# create etcd
self._launch_etcd()
self._config_etcd()
# create vineyard
self._create_vineyard()
# create GAE rpc service
Expand Down
2 changes: 2 additions & 0 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ def __init__(
k8s_image_pull_secrets=gs_config.k8s_image_pull_secrets,
k8s_coordinator_cpu=gs_config.k8s_coordinator_cpu,
k8s_coordinator_mem=gs_config.k8s_coordinator_mem,
etcd_addrs=gs_config.etcd_addrs,
k8s_etcd_num_pods=gs_config.k8s_etcd_num_pods,
k8s_etcd_cpu=gs_config.k8s_etcd_cpu,
k8s_etcd_mem=gs_config.k8s_etcd_mem,
Expand Down Expand Up @@ -557,6 +558,7 @@ def __init__(
"k8s_image_pull_secrets",
"k8s_coordinator_cpu",
"k8s_coordinator_mem",
"etcd_addrs",
"k8s_etcd_num_pods",
"k8s_etcd_cpu",
"k8s_etcd_mem",
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class GSConfig(object):
k8s_coordinator_mem = "2Gi"

# etcd resource configuration
etcd_addrs = None
k8s_etcd_num_pods = 1
k8s_etcd_cpu = 1.0
k8s_etcd_mem = "512Mi"
Expand Down
5 changes: 5 additions & 0 deletions python/graphscope/deploy/hosts/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(
self,
hosts=None,
port=None,
etcd_addrs=None,
num_workers=None,
vineyard_socket=None,
timeout_seconds=None,
Expand All @@ -61,6 +62,7 @@ def __init__(
):
self._hosts = hosts
self._port = port
self._etcd_addrs = etcd_addrs
self._num_workers = num_workers
self._vineyard_socket = vineyard_socket
self._timeout_seconds = timeout_seconds
Expand Down Expand Up @@ -105,6 +107,9 @@ def _launch_coordinator(self):
self._instance_id,
]

if self._etcd_addrs is not None:
cmd.extend(["--etcd_addrs", self._etcd_addrs])

if self._vineyard_shared_mem is not None:
cmd.extend(["--vineyard_shared_mem", self._vineyard_shared_mem])

Expand Down
3 changes: 3 additions & 0 deletions python/graphscope/deploy/kubernetes/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(
k8s_engine_mem=None,
k8s_coordinator_cpu=None,
k8s_coordinator_mem=None,
etcd_addrs=None,
k8s_etcd_num_pods=None,
k8s_etcd_cpu=None,
k8s_etcd_mem=None,
Expand Down Expand Up @@ -460,6 +461,8 @@ def _build_coordinator_cmd(self):
self._saved_locals["k8s_dataset_image"],
]
)
if self._saved_locals["etcd_addrs"] is not None:
cmd.extend(["--etcd_addrs", self._saved_locals["etcd_addrs"]])
return ["-c", " ".join(cmd)]

def _create_services(self):
Expand Down

0 comments on commit 2c56d49

Please sign in to comment.