From b8cb7c404ed38841d46bf2cd35d8fe3fa812bf21 Mon Sep 17 00:00:00 2001 From: DongZe Li <9546726@qq.com> Date: Mon, 22 Mar 2021 23:21:59 +0800 Subject: [PATCH] Distributed ETCD with 3 pods. (#197) * Deploy ETCD with 3 pods * Expose etcd pod's number as a param named k8s_etcd_num_pods to client --- coordinator/gscoordinator/cluster.py | 32 +-- coordinator/gscoordinator/coordinator.py | 7 + python/graphscope/client/session.py | 5 + python/graphscope/config.py | 1 + .../graphscope/deploy/kubernetes/cluster.py | 6 + .../deploy/kubernetes/resource_builder.py | 258 +++++++++++++----- 6 files changed, 225 insertions(+), 84 deletions(-) diff --git a/coordinator/gscoordinator/cluster.py b/coordinator/gscoordinator/cluster.py index 1af99c017fd3..449bc287a985 100644 --- a/coordinator/gscoordinator/cluster.py +++ b/coordinator/gscoordinator/cluster.py @@ -161,6 +161,7 @@ def __init__( gie_graph_manager_image=None, coordinator_name=None, coordinator_service_name=None, + etcd_num_pods=None, etcd_cpu=None, etcd_mem=None, zookeeper_cpu=None, @@ -231,6 +232,7 @@ def __init__( # etcd pod info self._etcd_image = etcd_image + self._etcd_num_pods = etcd_num_pods self._etcd_cpu = etcd_cpu self._etcd_mem = etcd_mem @@ -390,32 +392,30 @@ def _create_etcd(self): time.sleep(1) - # create etcd deployment + # create etcd cluster etcd_builder = self._gs_etcd_builder_cls( - name=self._etcd_name, - labels=labels, - replicas=1, - image_pull_policy=self._image_pull_policy, - ) - - for name in self._image_pull_secrets: - etcd_builder.add_image_pull_secret(name) - - etcd_builder.add_etcd_container( - name=self._etcd_container_name, + name_prefix=self._etcd_name, + container_name=self._etcd_container_name, service_name=self._etcd_service_name, image=self._etcd_image, cpu=self._etcd_cpu, mem=self._etcd_mem, preemptive=self._preemptive, + labels=labels, + image_pull_policy=self._image_pull_policy, + num_pods=self._etcd_num_pods, + restart_policy="Always", + image_pull_secrets=self._image_pull_secrets, listen_peer_service_port=self._random_etcd_listen_peer_service_port, listen_client_service_port=self._random_etcd_listen_client_service_port, ) - self._resource_object.append( - self._app_api.create_namespaced_deployment( - self._namespace, etcd_builder.build() + + for pod_builder in etcd_builder.build(): + self._resource_object.append( + self._core_api.create_namespaced_pod( + self._namespace, pod_builder.build() + ) ) - ) def _create_vineyard_service(self): labels = {"name": self._engine_name} # vineyard in engine pod diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 1d7a7b5df837..c023251c62ed 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -831,6 +831,12 @@ def parse_sys_args(): default="256Mi", help="Memory of engine container, suffix with ['Mi', 'Gi', 'Ti'].", ) + parser.add_argument( + "--k8s_etcd_num_pods", + type=int, + default=3, + help="The number of etcd pods.", + ) parser.add_argument( "--k8s_etcd_cpu", type=float, @@ -918,6 +924,7 @@ def launch_graphscope(): gie_graph_manager_image=args.k8s_gie_graph_manager_image, coordinator_name=args.k8s_coordinator_name, coordinator_service_name=args.k8s_coordinator_service_name, + etcd_num_pods=args.k8s_etcd_num_pods, etcd_cpu=args.k8s_etcd_cpu, etcd_mem=args.k8s_etcd_mem, zookeeper_cpu=args.k8s_zookeeper_cpu, diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index ec6a51fbea32..917257075c50 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -160,6 +160,7 @@ def __init__( k8s_image_pull_secrets=gs_config.k8s_image_pull_secrets, k8s_coordinator_cpu=gs_config.k8s_coordinator_cpu, k8s_coordinator_mem=gs_config.k8s_coordinator_mem, + k8s_etcd_num_pods=gs_config.k8s_etcd_num_pods, k8s_etcd_cpu=gs_config.k8s_etcd_cpu, k8s_etcd_mem=gs_config.k8s_etcd_mem, k8s_zookeeper_cpu=gs_config.k8s_zookeeper_cpu, @@ -230,6 +231,8 @@ def __init__( k8s_coordinator_mem (str, optional): Minimum number of memory request for coordinator pod. Defaults to '4Gi'. + k8s_etcd_num_pods (int, optional): The number of etcd pods. Defaults to 3. + k8s_etcd_cpu (float, optional): Minimum number of CPU cores request for etcd pod. Defaults to 0.5. k8s_etcd_mem (str, optional): Minimum number of memory request for etcd pod. Defaults to '128Mi'. @@ -343,6 +346,7 @@ def __init__( "k8s_zookeeper_image", "k8s_coordinator_cpu", "k8s_coordinator_mem", + "k8s_etcd_num_pods", "k8s_etcd_cpu", "k8s_etcd_mem", "k8s_zookeeper_cpu", @@ -728,6 +732,7 @@ def _connect(self): vineyard_cpu=self._config_params["k8s_vineyard_cpu"], vineyard_mem=self._config_params["k8s_vineyard_mem"], vineyard_shared_mem=self._config_params["k8s_vineyard_shared_mem"], + etcd_num_pods=self._config_params["k8s_etcd_num_pods"], etcd_cpu=self._config_params["k8s_etcd_cpu"], etcd_mem=self._config_params["k8s_etcd_mem"], zookeeper_cpu=self._config_params["k8s_zookeeper_cpu"], diff --git a/python/graphscope/config.py b/python/graphscope/config.py index aa3a501e942a..ffc34f8691f1 100644 --- a/python/graphscope/config.py +++ b/python/graphscope/config.py @@ -47,6 +47,7 @@ class GSConfig(object): k8s_coordinator_mem = "4Gi" # etcd resource configuration + k8s_etcd_num_pods = 3 k8s_etcd_cpu = 0.5 k8s_etcd_mem = "128Mi" diff --git a/python/graphscope/deploy/kubernetes/cluster.py b/python/graphscope/deploy/kubernetes/cluster.py index 643ba0687965..7a1a1aed7cca 100644 --- a/python/graphscope/deploy/kubernetes/cluster.py +++ b/python/graphscope/deploy/kubernetes/cluster.py @@ -89,6 +89,9 @@ class KubernetesCluster(object): image_pull_secrets: list of str, optional A list of secret name used to pulling image. Defaults to None. + etcd_num_pods: int + The number of etcd pods. + etcd_cpu: float Minimum number of CPU cores request for etcd pod. @@ -178,6 +181,7 @@ def __init__( engine_mem=None, coordinator_cpu=None, coordinator_mem=None, + etcd_num_pods=None, etcd_cpu=None, etcd_mem=None, zookeeper_cpu=None, @@ -211,6 +215,7 @@ def __init__( elif not isinstance(self._image_pull_secrets, list): self._image_pull_secrets = [self._image_pull_secrets] + self._etcd_num_pods = etcd_num_pods self._etcd_cpu = etcd_cpu self._etcd_mem = etcd_mem self._zookeeper_cpu = zookeeper_cpu @@ -452,6 +457,7 @@ def _create_coordinator(self): coordinator_cpu=self._coordinator_cpu, coordinator_mem=self._coordinator_mem, coordinator_service_name=self._coordinator_service_name, + etcd_num_pods=self._etcd_num_pods, etcd_cpu=self._etcd_cpu, etcd_mem=self._etcd_mem, zookeeper_cpu=self._zookeeper_cpu, diff --git a/python/graphscope/deploy/kubernetes/resource_builder.py b/python/graphscope/deploy/kubernetes/resource_builder.py index f31d1d305f42..10a105612e46 100644 --- a/python/graphscope/deploy/kubernetes/resource_builder.py +++ b/python/graphscope/deploy/kubernetes/resource_builder.py @@ -693,24 +693,65 @@ def add_engine_container(self, name, image, cpu, mem, preemptive, **kwargs): ) -class GSEtcdBuilder(DeploymentBuilder): - """Builder for graphscope etcd.""" - - _requests_cpu = 0.5 - _requests_mem = "128Mi" +class PodBuilder(object): + """Base builder for k8s pod.""" - def __init__(self, name, labels, image_pull_policy, replicas=1): + def __init__( + self, name, labels, hostname=None, subdomain=None, restart_policy="Never" + ): self._name = name self._labels = labels - self._replicas = replicas - self._image_pull_policy = image_pull_policy - super().__init__( - self._name, self._labels, self._replicas, self._image_pull_policy + self._hostname = hostname + self._subdomain = subdomain + self._restart_policy = restart_policy + + self._containers = [] + self._image_pull_secrets = [] + self._volumes = [] + + def add_volume(self, vol): + if isinstance(vol, list): + self._volumes.extend(vol) + else: + self._volumes.append(vol) + + def add_container(self, ctn): + self._containers.append(ctn) + + def add_image_pull_secret(self, name): + self._image_pull_secrets.append(LocalObjectRefBuilder(name)) + + def build_pod_spec(self): + return _remove_nones( + { + "hostname": self._hostname, + "subdomain": self._subdomain, + "containers": [ctn for ctn in self._containers], + "volumes": [vol.build() for vol in self._volumes] or None, + "imagePullSecrets": [ips.build() for ips in self._image_pull_secrets] + or None, + "restartPolicy": self._restart_policy, + } ) - def add_etcd_container( + def build(self): + return { + "kind": "Pod", + "metadata": {"name": self._name, "labels": self._labels}, + "spec": self.build_pod_spec(), + } + + +class GSEtcdBuilder(object): + """Builder for graphscope etcd.""" + + _requests_cpu = 0.5 + _requests_mem = "128Mi" + + def __init__( self, - name, + name_prefix, + container_name, service_name, image, cpu, @@ -718,73 +759,150 @@ def add_etcd_container( preemptive, listen_peer_service_port, listen_client_service_port, + labels, + image_pull_policy, + num_pods=3, + restart_policy="Always", + image_pull_secrets=None, max_txn_ops=1024000, ): - cmd = [ - "etcd", - "--name", - service_name, - "--max-txn-ops=%s" % max_txn_ops, - "--initial-advertise-peer-urls", - "http://%s:%s" % (service_name, str(listen_peer_service_port)), - "--advertise-client-urls=http://%s:%s" - % (service_name, str(listen_client_service_port)), - "--data-dir=/var/lib/etcd", - "--listen-client-urls=http://0.0.0.0:%s" % str(listen_client_service_port), - "--listen-peer-urls=http://0.0.0.0:%s" % str(listen_peer_service_port), - "--initial-cluster", - "%s=http://%s:%s" - % ( - service_name, - service_name, - str(listen_peer_service_port), - ), - "--initial-cluster-state", - "new", - ] + self._name_prefix = name_prefix + self._container_name = container_name + self._service_name = service_name + self._image = image + self._cpu = cpu + self._mem = mem + self._preemptive = preemptive + self._listen_peer_service_port = listen_peer_service_port + self._listen_client_service_port = listen_client_service_port + self._labels = labels + self._image_pull_policy = image_pull_policy + self._num_pods = num_pods + self._restart_policy = restart_policy + self._image_pull_secrets = image_pull_secrets + self._max_txn_ops = 1024000 - resources_dict = { - "requests": ResourceBuilder(self._requests_cpu, self._requests_mem).build() - if preemptive - else ResourceBuilder(cpu, mem).build(), - "limits": ResourceBuilder(cpu, mem).build(), - } + self._envs = dict() + self._volumes = [] - volumeMounts = [] - for vol in self._volumes: - for vol_mount in vol.build_mount(): - volumeMounts.append(vol_mount) + def add_volume(self, vol): + if isinstance(vol, list): + self._volumes.extend(vol) + else: + self._volumes.append(vol) - super().add_container( - _remove_nones( - { - "command": cmd, - "env": [env.build() for env in self._envs.values()] or None, - "image": image, - "name": name, - "imagePullPolicy": self._image_pull_policy, - "resources": dict((k, v) for k, v in resources_dict.items() if v) - or None, - "ports": [ - PortBuilder(listen_peer_service_port).build(), - PortBuilder(listen_client_service_port).build(), - ], - "volumeMounts": volumeMounts or None, - "livenessProbe": self.build_liveness_probe( - listen_client_service_port - ).build(), - "readinessProbe": None, - "lifecycle": None, - } + def add_env(self, name, value=None): + self._envs[name] = ContainerEnvBuilder(name, value) + + def add_simple_envs(self, envs): + for k, v in envs.items() or (): + self.add_env(k, v) + + def build(self): + """ + Returns: a list of :class:`PodBuilder`. + """ + pods_name = [] + initial_cluster = "" + for i in range(self._num_pods): + name = "%s-%s" % (self._name_prefix, str(i)) + pods_name.append(name) + initial_cluster += "%s=%s," % ( + name, + "http://%s:%s" + % ( + "%s.%s" % (name, self._service_name), + str(self._listen_peer_service_port), + ), ) - ) + # remove last comma + initial_cluster = initial_cluster[0:-1] + + pods_builder = [] + for _, name in enumerate(pods_name): + pod_builder = PodBuilder( + name=name, + labels=self._labels, + hostname=name, + subdomain=self._service_name, + restart_policy=self._restart_policy, + ) + + # volumes + pod_builder.add_volume(self._volumes) + + cmd = [ + "etcd", + "--name", + name, + "--max-txn-ops=%s" % self._max_txn_ops, + "--initial-advertise-peer-urls", + "http://%s:%s" + % ( + "%s.%s" % (name, self._service_name), + str(self._listen_peer_service_port), + ), + "--advertise-client-urls=http://%s:%s" + % (name, str(self._listen_client_service_port)), + "--data-dir=/var/lib/etcd", + "--listen-client-urls=http://0.0.0.0:%s" + % str(self._listen_client_service_port), + "--listen-peer-urls=http://0.0.0.0:%s" + % str(self._listen_peer_service_port), + "--initial-cluster", + initial_cluster, + "--initial-cluster-state", + "new", + ] + + resources_dict = { + "requests": ResourceBuilder( + self._requests_cpu, self._requests_mem + ).build() + if self._preemptive + else ResourceBuilder(self._cpu, self._mem).build(), + "limits": ResourceBuilder(self._cpu, self._mem).build(), + } + + volumeMounts = [] + for vol in self._volumes: + for vol_mount in vol.build_mount(): + volumeMounts.append(vol_mount) + + pod_builder.add_container( + _remove_nones( + { + "command": cmd, + "env": [env.build() for env in self._envs.values()] or None, + "image": self._image, + "name": self._container_name, + "imagePullPolicy": self._image_pull_policy, + "resources": dict( + (k, v) for k, v in resources_dict.items() if v + ) + or None, + "ports": [ + PortBuilder(self._listen_peer_service_port).build(), + PortBuilder(self._listen_client_service_port).build(), + ], + "volumeMounts": volumeMounts or None, + "livenessProbe": self.build_liveness_probe().build(), + "readinessProbe": None, + "lifecycle": None, + } + ) + ) + + pods_builder.append(pod_builder) + + return pods_builder - def build_liveness_probe(self, listen_client_service_port): + def build_liveness_probe(self): liveness_cmd = [ "/bin/sh", "-ec", "ETCDCTL_API=3 etcdctl --endpoints=http://[127.0.0.1]:%s get foo" - % str(listen_client_service_port), + % str(self._listen_client_service_port), ] return ExecProbeBuilder(liveness_cmd, timeout=15, failure_thresh=8) @@ -931,6 +1049,7 @@ def add_coordinator_container( coordinator_cpu, coordinator_mem, coordinator_service_name, + etcd_num_pods, etcd_cpu, etcd_mem, zookeeper_cpu, @@ -965,6 +1084,7 @@ def add_coordinator_container( self._coordinator_cpu = coordinator_cpu self._coordinator_mem = coordinator_mem self._coordinator_service_name = coordinator_service_name + self._etcd_num_pods = etcd_num_pods self._etcd_cpu = etcd_cpu self._etcd_mem = etcd_mem self._zookeeper_cpu = zookeeper_cpu @@ -1066,6 +1186,8 @@ def build_container_command(self): self._coordinator_name, "--k8s_coordinator_service_name", self._coordinator_service_name, + "--k8s_etcd_num_pods", + str(self._etcd_num_pods), "--k8s_etcd_cpu", str(self._etcd_cpu), "--k8s_etcd_mem",