Skip to content

Commit

Permalink
Use vineyardctl API to inject the vineyard sidecar (#2612)
Browse files Browse the repository at this point in the history
This pull request refactors the vineyard deployment in GraphScope to use
a sidecar container instead of a separate container in the engine pod.
This improves the performance and stability of the system and simplifies
the management of vineyard. It also fixes some typos and errors in the
documentation. The main changes are in
`coordinator/gscoordinator/cluster_builder.py` and
`coordinator/gscoordinator/kubernetes_launcher.py`.

---------
Signed-off-by: Ye Cao <caoye.cao@alibaba-inc.com>
  • Loading branch information
dashanji committed May 31, 2023
1 parent 7fd430b commit fdc626e
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 84 deletions.
63 changes: 4 additions & 59 deletions coordinator/gscoordinator/cluster_builder.py
Expand Up @@ -337,40 +337,6 @@ def get_learning_container(self, volume_mounts):
]
return container

def get_vineyard_container(self, volume_mounts):
name = self.vineyard_container_name
image = self._vineyard_image
sts_name = self.engine_stateful_set_name
svc_name = sts_name + "-headless"
pod0_dns = f"{sts_name}-0.{svc_name}.{self._namespace}.svc.cluster.local"
vineyard_cmd = (
f"vineyardd -size {self._vineyard_shared_mem} -socket {self._sock}"
)
args = f"""
[[ `hostname` =~ -([0-9]+)$ ]] || exit 1;
ordinal=${{BASH_REMATCH[1]}};
if (( $ordinal == 0 )); then
{vineyard_cmd} -etcd_endpoint http://0.0.0.0:{self._etcd_port}
else
until nslookup {pod0_dns}; do sleep 1; done;
{vineyard_cmd} -etcd_endpoint http://{pod0_dns}:{self._etcd_port}
fi;
"""
args = ["bash", "-c", args]
container = self.get_engine_container_helper(
name,
image,
args,
volume_mounts,
self._vineyard_requests,
self._vineyard_requests,
)
container.ports = [
kube_client.V1ContainerPort(container_port=self._vineyard_service_port),
kube_client.V1ContainerPort(container_port=self._etcd_port),
]
return container

def get_mars_container(self):
_ = self.mars_container_name
return
Expand All @@ -396,11 +362,9 @@ def get_engine_pod_spec(self):
containers = []
volumes = []

socket_volume = self.get_vineyard_socket_volume()
shm_volume = self.get_shm_volume()
volumes.extend([socket_volume[0], shm_volume[0]])

engine_volume_mounts = [socket_volume[2], shm_volume[2]]
volumes = [shm_volume[0]]
engine_volume_mounts = [shm_volume[2]]

if self._volumes and self._volumes is not None:
udf_volumes = ResourceBuilder.get_user_defined_volumes(self._volumes)
Expand Down Expand Up @@ -428,13 +392,6 @@ def get_engine_pod_spec(self):
self.get_learning_container(volume_mounts=engine_volume_mounts)
)

if self._vineyard_deployment is None:
containers.append(
self.get_vineyard_container(
volume_mounts=[socket_volume[1], shm_volume[1]]
)
)

if self._with_dataset:
dataset_volume = self.get_dataset_volume()
volumes.append(dataset_volume[0])
Expand Down Expand Up @@ -481,25 +438,14 @@ def get_engine_headless_service(self):
service_spec = ResourceBuilder.get_service_spec(
"ClusterIP", ports, self._engine_labels, None
)

# Necessary, create a headless service for statefulset
service_spec.cluster_ip = "None"
service = ResourceBuilder.get_service(
self._namespace, name, service_spec, self._engine_labels
)
return service

def get_vineyard_service(self):
service_type = self._service_type
name = f"{self._vineyard_prefix}{self._instance_id}"
ports = [kube_client.V1ServicePort(name=name, port=self._vineyard_service_port)]
service_spec = ResourceBuilder.get_service_spec(
service_type, ports, self._engine_labels, None
)
service = ResourceBuilder.get_service(
self._namespace, name, service_spec, self._engine_labels
)
return service

def get_learning_service(self, object_id, start_port):
service_type = self._service_type
num_workers = self._num_workers
Expand Down Expand Up @@ -530,15 +476,14 @@ def frontend_deployment_name(self):

@property
def vineyard_service_name(self):
return f"{self._vineyard_prefix}{self._instance_id}"
return f"{self.engine_stateful_set_name}-{self._instance_id}-vineyard-rpc"

def get_vineyard_service_endpoint(self, api_client):
# return f"{self.vineyard_service_name}:{self._vineyard_service_port}"
service_name = self.vineyard_service_name
service_type = self._service_type
if self.vineyard_deployment_exists():
service_name = self._vineyard_deployment + "-rpc"
service_type = "ClusterIP"
endpoints = get_service_endpoints(
api_client=api_client,
namespace=self._namespace,
Expand Down
223 changes: 207 additions & 16 deletions coordinator/gscoordinator/kubernetes_launcher.py
Expand Up @@ -152,6 +152,10 @@ def __init__(
self._engine_mem = engine_mem
self._vineyard_shared_mem = vineyard_shared_mem

self._vineyard_cpu = vineyard_cpu
self._vineyard_mem = vineyard_mem
self._vineyard_image = vineyard_image

self._with_dataset = with_dataset
self._preemptive = preemptive
self._service_type = service_type
Expand Down Expand Up @@ -432,22 +436,218 @@ def _create_mars_scheduler(self):
)
self._resource_object.append(response)

# The function is used to inject vineyard as a sidecar container into the workload
# and return the json string of new workload which is injected with vineyard sidecar
#
# Assume we have a workload json as below:
#
# {
# "apiVersion": "apps/v1",
# "kind": "Deployment",
# "metadata": {
# "name": "nginx-deployment",
# "namespace": "vineyard-job"
# },
# "spec": {
# "selector": {
# "matchLabels": {
# "app": "nginx"
# }
# },
# "template": {
# "metadata": {
# "labels": {
# "app": "nginx"
# }
# },
# "spec": {
# "containers": [
# {
# "name": "nginx",
# "image": "nginx:1.14.2",
# "ports": [
# {
# "containerPort": 80
# }
# ]
# }
# ]
# }
# }
# }
# }
#
# The function will return a new workload json as below:
#
# {
# "apiVersion": "apps/v1",
# "kind": "Deployment",
# "metadata": {
# "creationTimestamp": null,
# "name": "nginx-deployment",
# "namespace": "vineyard-job"
# },
# "spec": {
# "selector": {
# "matchLabels": {
# "app": "nginx"
# }
# }
# },
# "template": {
# "metadata": null,
# "labels": {
# "app": "nginx",
# "app.vineyard.io/name": "vineyard-sidecar"
# },
# "spec": {
# "containers": [
# {
# "command": null,
# "image": "nginx:1.14.2",
# "name": "nginx",
# "ports": [
# {
# "containerPort": 80
# }
# ],
# "volumeMounts": [
# {
# "mountPath": "/var/run",
# "name": "vineyard-socket"
# }
# ]
# },
# {
# "command": [
# "/bin/bash",
# "-c",
# "/usr/bin/wait-for-it.sh -t 60 vineyard-sidecar-etcd-service.vineyard-job.svc.cluster.local:2379; \\\n
# sleep 1; /usr/local/bin/vineyardd --sync_crds true --socket /var/run/vineyard.sock --size 256Mi \\\n
# --stream_threshold 80 --etcd_cmd etcd --etcd_prefix /vineyard \\\n
# --etcd_endpoint http://vineyard-sidecar-etcd-service:2379\n"
# ],
# "env": [
# {
# "name": "VINEYARDD_UID",
# "value": null
# },
# {
# "name": "VINEYARDD_NAME",
# "value": "vineyard-sidecar"
# },
# {
# "name": "VINEYARDD_NAMESPACE",
# "value": "vineyard-job"
# }
# ],
# "image": "vineyardcloudnative/vineyardd:latest",
# "imagePullPolicy": "IfNotPresent",
# "name": "vineyard-sidecar",
# "ports": [
# {
# "containerPort": 9600,
# "name": "vineyard-rpc",
# "protocol": "TCP"
# }
# ],
# "volumeMounts": [
# {
# "mountPath": "/var/run",
# "name": "vineyard-socket"
# }
# ]
# }
# ],
# "volumes": [
# {
# "emptyDir": {},
# "name": "vineyard-socket"
# }
# ]
# }
# }
# }

def _inject_vineyard_as_sidecar(self, workload):
import vineyard

# create the annotations for the workload's template if not exists
if workload.spec.template.metadata.annotations is None:
workload.spec.template.metadata.annotations = {}

# create the labels for the workload's template if not exists
if workload.spec.template.metadata.labels is None:
workload.spec.template.metadata.labels = {}

workload_json = json.dumps(
self._api_client.sanitize_for_serialization(workload)
)

sts_name = (
f"{self._engine_cluster.engine_stateful_set_name}-{self._instance_id}"
)
owner_reference = [
{
"apiVersion": self._owner_references[0].api_version,
"kind": self._owner_references[0].kind,
"name": self._owner_references[0].name,
"uid": self._owner_references[0].uid,
}
]

owner_reference_json = json.dumps(owner_reference)
# inject vineyard sidecar into the workload
#
# the name is used to specify the name of the sidecar container, which is also the
# labelSelector of the rpc service and the etcd service.
#
# the apply_resources is used to apply resources to the kubernetes cluster during
# the injection.
#
# for more details about vineyardctl inject, please refer to the link below:
# https://github.com/v6d-io/v6d/tree/main/k8s/cmd#vineyardctl-inject

new_workload_json = vineyard.deploy.vineyardctl.inject(
resource=workload_json,
sidecar_volume_mountpath="/tmp/vineyard_workspace",
name=sts_name + "-vineyard",
apply_resources=True,
owner_references=owner_reference_json,
sidecar_image=self._vineyard_image,
sidecar_size=self._vineyard_shared_mem,
sidecar_cpu=self._vineyard_cpu,
sidecar_memory=self._vineyard_mem,
sidecar_service_type=self._service_type,
output="json",
capture=True,
)

normalized_workload_json = json.loads(new_workload_json)
final_workload_json = json.loads(normalized_workload_json["workload"])

fake_kube_response = FakeKubeResponse(final_workload_json)

new_workload = self._api_client.deserialize(fake_kube_response, type(workload))
return new_workload

def _create_engine_stateful_set(self):
logger.info("Create engine headless services...")
service = self._engine_cluster.get_engine_headless_service()
service.metadata.owner_references = self._owner_references
response = self._core_api.create_namespaced_service(self._namespace, service)
self._resource_object.append(response)
logger.info("Creating engine pods...")

# we don't need to create the headless service for engines here,
# as the etcd service is already created by the vineyardctl
# service = self._engine_cluster.get_engine_headless_service()
# service.metadata.owner_references = self._owner_references
# response = self._core_api.create_namespaced_service(self._namespace, service)
# self._resource_object.append(response)
stateful_set = self._engine_cluster.get_engine_stateful_set()
if self.vineyard_deployment_exists():
# schedule engine statefulset to the same node with vineyard deployment
stateful_set = self._add_pod_affinity_for_vineyard_deployment(
workload=stateful_set
)
else:
stateful_set = self._inject_vineyard_as_sidecar(stateful_set)

stateful_set.metadata.owner_references = self._owner_references
response = self._apps_api.create_namespaced_stateful_set(
self._namespace, stateful_set
)
Expand All @@ -469,13 +669,6 @@ def _create_frontend_service(self):
response = self._core_api.create_namespaced_service(self._namespace, service)
self._resource_object.append(response)

def _create_vineyard_service(self):
logger.info("Creating vineyard service...")
service = self._engine_cluster.get_vineyard_service()
service.metadata.owner_references = self._owner_references
response = self._core_api.create_namespaced_service(self._namespace, service)
self._resource_object.append(response)

def _create_learning_service(self, object_id):
logger.info("Creating learning service...")
service = self._engine_cluster.get_learning_service(
Expand Down Expand Up @@ -503,8 +696,6 @@ def _create_services(self):
if self._with_mars:
# scheduler used by Mars
self._create_mars_scheduler()
if self._vineyard_deployment is None:
self._create_vineyard_service()

def _waiting_for_services_ready(self):
logger.info("Waiting for services ready...")
Expand Down

0 comments on commit fdc626e

Please sign in to comment.