Skip to content

Commit

Permalink
Add nodeselector arguments to control where the pod will be launched (#…
Browse files Browse the repository at this point in the history
…2087)


Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed Sep 28, 2022
1 parent 9f1f644 commit 2119d48
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 2 deletions.
17 changes: 16 additions & 1 deletion coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ def __init__(
mars_worker_mem=None,
mars_scheduler_cpu=None,
mars_scheduler_mem=None,
etcd_pod_node_selector=None,
engine_pod_node_selector=None,
with_mars=False,
image_pull_policy=None,
image_pull_secrets=None,
Expand Down Expand Up @@ -236,7 +238,18 @@ def __init__(
else:
self._image_pull_secrets = []

self._volumes = json.loads(volumes)
if volumes:
self._volumes = json.loads(volumes)
else:
self._volumes = dict()
if etcd_pod_node_selector:
self._etcd_pod_node_selector = json.loads(etcd_pod_node_selector)
else:
self._etcd_pod_node_selector = dict()
if engine_pod_node_selector:
self._engine_pod_node_selector = json.loads(engine_pod_node_selector)
else:
self._engine_pod_node_selector = dict()

self._host0 = None
self._pod_name_list = None
Expand Down Expand Up @@ -546,6 +559,8 @@ def _create_engine_replicaset(self):
num_workers=self._num_workers,
image_pull_policy=self._saved_locals["image_pull_policy"],
)
if self._engine_pod_node_selector:
engine_builder.add_engine_pod_node_selector(self._engine_pod_node_selector)
# volume1 is for vineyard ipc socket
# MaxGraph: /home/maxgraph/data/vineyard
if self._exists_vineyard_daemonset(self._saved_locals["vineyard_daemonset"]):
Expand Down
14 changes: 14 additions & 0 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,18 @@ def parse_sys_args():
default="2Gi",
help="Memory of Mars scheduler container, default: 2Gi",
)
parser.add_argument(
"--k8s_etcd_pod_node_selector",
type=str,
default="",
help="Node selector for etcd pods, default is None",
)
parser.add_argument(
"--k8s_engine_pod_node_selector",
type=str,
default="",
help="Node selector for engine pods, default is None",
)
parser.add_argument(
"--k8s_volumes",
type=str,
Expand Down Expand Up @@ -1713,6 +1725,8 @@ def launch_graphscope():
mars_worker_mem=args.k8s_mars_worker_mem,
mars_scheduler_cpu=args.k8s_mars_scheduler_cpu,
mars_scheduler_mem=args.k8s_mars_scheduler_mem,
etcd_pod_node_selector=args.k8s_etcd_pod_node_selector,
engine_pod_node_selector=args.k8s_engine_pod_node_selector,
with_mars=args.k8s_with_mars,
image_pull_policy=args.k8s_image_pull_policy,
image_pull_secrets=args.k8s_image_pull_secrets,
Expand Down
18 changes: 18 additions & 0 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ def __init__(
k8s_mars_worker_mem=gs_config.mars_worker_mem,
k8s_mars_scheduler_cpu=gs_config.mars_scheduler_cpu,
k8s_mars_scheduler_mem=gs_config.mars_scheduler_mem,
k8s_coordinator_pod_node_selector=gs_config.k8s_coordinator_pod_node_selector,
k8s_etcd_pod_node_selector=gs_config.k8s_etcd_pod_node_selector,
k8s_engine_pod_node_selector=gs_config.k8s_engine_pod_node_selector,
k8s_volumes=gs_config.k8s_volumes,
k8s_waiting_for_delete=gs_config.k8s_waiting_for_delete,
timeout_seconds=gs_config.timeout_seconds,
Expand Down Expand Up @@ -450,6 +453,18 @@ def __init__(
k8s_mars_scheduler_mem (str, optional):
Minimum number of memory request for mars scheduler container. Defaults to '2Gi'.
k8s_coordinator_pod_node_selector (dict, optional):
Node selector to the coordinator pod on k8s. Default is None.
See also: https://tinyurl.com/3nx6k7ph
k8s_etcd_pod_node_selector (dict, optional):
Node selector to the etcd pod on k8s. Default is None.
See also: https://tinyurl.com/3nx6k7ph
k8s_engine_pod_node_selector = None
Node selector to the engine pod on k8s. Default is None.
See also: https://tinyurl.com/3nx6k7ph
with_mars (bool, optional):
Launch graphscope with mars. Defaults to False.
Expand Down Expand Up @@ -590,6 +605,9 @@ def __init__(
"k8s_mars_worker_mem",
"k8s_mars_scheduler_cpu",
"k8s_mars_scheduler_mem",
"k8s_coordinator_pod_node_selector",
"k8s_etcd_pod_node_selector",
"k8s_engine_pod_node_selector",
"with_mars",
"reconnect",
"k8s_volumes",
Expand Down
5 changes: 5 additions & 0 deletions python/graphscope/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ class GSConfig(object):
mars_scheduler_cpu = 0.2
mars_scheduler_mem = "2Mi"

# the node selector can be a dict, see also: https://tinyurl.com/3nx6k7ph
k8s_coordinator_pod_node_selector = None
k8s_etcd_pod_node_selector = None
k8s_engine_pod_node_selector = None

# launch graphscope with mars
with_mars = False

Expand Down
25 changes: 25 additions & 0 deletions python/graphscope/deploy/kubernetes/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def __init__(
k8s_mars_worker_mem=None,
k8s_mars_scheduler_cpu=None,
k8s_mars_scheduler_mem=None,
k8s_coordinator_pod_node_selector=None,
k8s_etcd_pod_node_selector=None,
k8s_engine_pod_node_selector=None,
with_mars=None,
k8s_volumes=None,
timeout_seconds=None,
Expand Down Expand Up @@ -370,6 +373,10 @@ def _create_coordinator(self):
ports=ports,
module_name=self._coordinator_module_name,
)
if self._saved_locals["k8s_coordinator_pod_node_selector"] is not None:
coordinator_builder.add_coordinator_pod_node_selector(
self._saved_locals["k8s_coordinator_pod_node_selector"]
)

targets.append(
self._app_api.create_namespaced_deployment(
Expand Down Expand Up @@ -479,6 +486,24 @@ def _build_coordinator_cmd(self):
str(self._saved_locals["etcd_listening_peer_port"]),
]
)
if self._saved_locals["k8s_etcd_pod_node_selector"] is not None:
cmd.extend(
[
"--k8s_etcd_pod_node_selector",
"'{0}'".format(
json.dumps(self._saved_locals["k8s_etcd_pod_node_selector"])
),
]
)
if self._saved_locals["k8s_engine_pod_node_selector"] is not None:
cmd.extend(
[
"--k8s_engine_pod_node_selector",
"'{0}'".format(
json.dumps(self._saved_locals["k8s_engine_pod_node_selector"])
),
]
)
return ["-c", " ".join(cmd)]

def _create_services(self):
Expand Down
42 changes: 41 additions & 1 deletion python/graphscope/deploy/kubernetes/resource_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ def __init__(
self._envs = dict()
self._image_pull_secrets = []
self._host_network = False
self._node_selector = dict()

self.add_field_envs(BASE_MACHINE_ENVS)

Expand Down Expand Up @@ -507,13 +508,19 @@ def add_volume(self, vol):
def add_image_pull_secret(self, name):
self._image_pull_secrets.append(LocalObjectRefBuilder(name))

def add_pod_node_selector(self, node_selector):
if node_selector:
for k, v in node_selector.items():
self._node_selector[k] = v

def build_template_spec(self):
result = {
"hostNetwork": self._host_network,
"containers": [ctn for ctn in self._containers],
"volumes": [vol.build() for vol in self._volumes] or None,
"imagePullSecrets": [ips.build() for ips in self._image_pull_secrets]
or None,
"nodeSelector": self._node_selector or None,
}
return dict((k, v) for k, v in result.items() if v)

Expand Down Expand Up @@ -555,6 +562,7 @@ def __init__(self, name, labels, replicas, image_pull_policy):
self._annotations = dict()
self._image_pull_secrets = []
self._host_network = False
self._node_selector = dict()

self.add_field_envs(BASE_MACHINE_ENVS)

Expand Down Expand Up @@ -597,13 +605,19 @@ def add_volume(self, vol):
def add_image_pull_secret(self, name):
self._image_pull_secrets.append(LocalObjectRefBuilder(name))

def add_pod_node_selector(self, node_selector):
if node_selector:
for k, v in node_selector.items():
self._node_selector[k] = v

def build_pod_spec(self):
result = {
"hostNetwork": self._host_network,
"containers": [ctn for ctn in self._containers],
"volumes": [vol.build() for vol in self._volumes] or None,
"imagePullSecrets": [ips.build() for ips in self._image_pull_secrets]
or None,
"nodeSelector": self._node_selector or None,
}
return dict((k, v) for k, v in result.items() if v)

Expand Down Expand Up @@ -944,12 +958,22 @@ def add_mars_scheduler_container(

super().add_annotation("kubectl.kubernetes.io/default-container", name)

def add_engine_pod_node_selector(self, node_selector):
if node_selector:
super().add_pod_node_selector(node_selector)


class PodBuilder(object):
"""Base builder for k8s pod."""

def __init__(
self, name, labels, hostname=None, subdomain=None, restart_policy="Never"
self,
name,
labels,
hostname=None,
subdomain=None,
restart_policy="Never",
node_selector=None,
):
self._name = name
self._labels = labels
Expand All @@ -960,6 +984,10 @@ def __init__(
self._containers = []
self._image_pull_secrets = []
self._volumes = []
if node_selector:
self._node_selector = node_selector
else:
self._node_selector = dict()

def add_volume(self, vol):
if isinstance(vol, list):
Expand All @@ -983,6 +1011,7 @@ def build_pod_spec(self):
"imagePullSecrets": [ips.build() for ips in self._image_pull_secrets]
or None,
"restartPolicy": self._restart_policy,
"nodeSelector": self._node_selector or None,
}
)

Expand Down Expand Up @@ -1033,6 +1062,7 @@ def __init__(
self._restart_policy = restart_policy
self._image_pull_secrets = image_pull_secrets
self._max_txn_ops = 1024000
self._node_selector = dict()

self._envs = dict()
self._volumes = []
Expand Down Expand Up @@ -1076,6 +1106,7 @@ def build(self):
hostname=name,
subdomain=self._service_name,
restart_policy=self._restart_policy,
node_selector=self._node_selector,
)

# volumes
Expand Down Expand Up @@ -1166,6 +1197,11 @@ def build_readiness_probe(self):
self._listen_peer_service_port, timeout=15, period=10, failure_thresh=8
)

def add_etcd_pod_node_selector(self, node_selector):
if node_selector:
for k, v in node_selector.items():
self._node_selector[k] = v


class GSGraphManagerBuilder(DeploymentBuilder):
"""Builder for graphscope interactive graph manager."""
Expand Down Expand Up @@ -1309,5 +1345,9 @@ def add_coordinator_container(self, name, image, cpu, mem, preemptive, **kwargs)
)
)

def add_coordinator_pod_node_selector(self, node_selector):
if node_selector:
super().add_pod_node_selector(node_selector)

def build_readiness_probe(self, port):
return TcpProbeBuilder(port=port, timeout=15, period=10, failure_thresh=8)

0 comments on commit 2119d48

Please sign in to comment.