Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate vineyard operator python API with Graphscope #2458

Merged
merged 9 commits into from
Mar 20, 2023
7 changes: 6 additions & 1 deletion .github/workflows/k8s-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,12 @@ jobs:

minikube start --base-image='registry-vpc.cn-hongkong.aliyuncs.com/graphscope/kicbase:v0.0.30' \
--cpus='12' --memory='32000mb' --disk-size='40000mb' \
siyuan0322 marked this conversation as resolved.
Show resolved Hide resolved
--mount=true --mount-string="${GS_TEST_DIR}:${GS_TEST_DIR}"

# create a kubernetes cluster with 2 nodes
#TODO(caoye)
#minikube start --base-image='registry-vpc.cn-hongkong.aliyuncs.com/graphscope/kicbase:v0.0.32' \
# --nodes 2 --cpus='6' --memory='16000mb' --disk-size='20000mb' \
# --mount=true --mount-string="${GS_TEST_DIR}:${GS_TEST_DIR}"

export GS_REGISTRY=""
export GS_TAG=${SHORT_SHA}
Expand Down
1 change: 0 additions & 1 deletion README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ GraphScope 遵循 [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2
- Wenfei Fan, Tao He, Longbin Lai, Xue Li, Yong Li, Zhao Li, Zhengping Qian, Chao Tian, Lei Wang, Jingbo Xu, Youyang Yao, Qiang Yin, Wenyuan Yu, Jingren Zhou, Diwen Zhu, Rong Zhu. [GraphScope: A Unified Engine For Big Graph Processing](http://vldb.org/pvldb/vol14/p2879-qian.pdf). The 47th International Conference on Very Large Data Bases (VLDB), industry, 2021.
- Jingbo Xu, Zhanning Bai, Wenfei Fan, Longbin Lai, Xue Li, Zhao Li, Zhengping Qian, Lei Wang, Yanyan Wang, Wenyuan Yu, Jingren Zhou. [GraphScope: A One-Stop Large Graph Processing System](http://vldb.org/pvldb/vol14/p2703-xu.pdf). The 47th International Conference on Very Large Data Bases (VLDB), demo, 2021


## 贡献

我们热忱欢迎和感谢来自社区的各种贡献!
Expand Down
6 changes: 3 additions & 3 deletions charts/graphscope/templates/coordinator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ spec:
- {{ printf "%s-%s" "coordinator" $fullname | quote }}
- "--k8s_coordinator_service_name"
- {{ printf "%s-%s" "coordinator-service" $fullname | quote }}
{{- if .Values.vineyard.daemonset }}
- "--k8s_vineyard_daemonset"
- {{ .Values.vineyard.daemonset }}
{{- if .Values.vineyard.deployment }}
- "--k8s_vineyard_deployment"
- {{ .Values.vineyard.deployment }}
{{- end }}
- "--k8s_vineyard_image"
- {{ .Values.vineyard.image.name }}:{{ .Values.vineyard.image.tag }}
Expand Down
8 changes: 4 additions & 4 deletions charts/graphscope/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ engines:
memory: 2Gi

vineyard:
# When `vineyard.daemonset` is set to the Helm release name, the coordinator will
# tries to discover the vineyard DaemonSet in current namespace, then use it if
# When `vineyard.deployment` is set to the Helm release name,
# the coordinator will try to discover the vineyard deployment in current namespace, then use it if
# found, and fallback to bundled vineyard container otherwise.
#
# The vineyard IPC socket is placed on host at /var/run/vineyard-{namespace}-{release}.
daemonset: ""
# The vineyard IPC socket is placed on host at /var/run/vineyard-kubernetes/{namespace}/{deployment}.
deployment: ""
image:
name: vineyardcloudnative/vineyardd
# Overrides the image tag whose default is the chart appVersion.
Expand Down
25 changes: 18 additions & 7 deletions coordinator/gscoordinator/cluster_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def __init__(
preemptive,
service_type,
vineyard_cpu,
vineyard_daemonset,
vineyard_deployment,
vineyard_image,
vineyard_mem,
vineyard_shared_mem,
Expand Down Expand Up @@ -126,7 +126,7 @@ def __init__(
self._image_pull_policy = image_pull_policy
self._image_pull_secrets = image_pull_secrets

self._vineyard_daemonset = vineyard_daemonset
self._vineyard_deployment = vineyard_deployment

self._with_analytical = with_analytical
self._with_analytical_java = with_analytical_java
Expand Down Expand Up @@ -182,6 +182,9 @@ def __init__(
def vineyard_ipc_socket(self):
return self._sock

def vineyard_deployment_exists(self):
return self._vineyard_deployment is not None

def base64_decode(self, string):
return base64.b64decode(string).decode("utf-8", errors="ignore")

Expand Down Expand Up @@ -216,11 +219,11 @@ def get_base_machine_env(self):
def get_vineyard_socket_volume(self):
name = "vineyard-ipc-socket"
volume = kube_client.V1Volume(name=name)
if self._vineyard_daemonset is None:
if self._vineyard_deployment is None:
empty_dir = kube_client.V1EmptyDirVolumeSource()
volume.empty_dir = empty_dir
else:
path = f"/var/run/vineyard-{self._namespace}-{self._vineyard_daemonset}"
path = f"/var/run/vineyard-kubernetes/{self._namespace}/{self._vineyard_deployment}"
host_path = kube_client.V1HostPathVolumeSource(path=path)
host_path.type = "Directory"
volume.host_path = host_path
Expand Down Expand Up @@ -396,6 +399,7 @@ def get_engine_pod_spec(self):
socket_volume = self.get_vineyard_socket_volume()
shm_volume = self.get_shm_volume()
volumes.extend([socket_volume[0], shm_volume[0]])

engine_volume_mounts = [socket_volume[2], shm_volume[2]]

if self._volumes and self._volumes is not None:
Expand Down Expand Up @@ -424,12 +428,13 @@ def get_engine_pod_spec(self):
self.get_learning_container(volume_mounts=engine_volume_mounts)
)

if self._vineyard_daemonset is None:
if self._vineyard_deployment is None:
containers.append(
self.get_vineyard_container(
volume_mounts=[socket_volume[1], shm_volume[1]]
)
)

if self._with_dataset:
dataset_volume = self.get_dataset_volume()
volumes.append(dataset_volume[0])
Expand Down Expand Up @@ -529,8 +534,11 @@ def vineyard_service_name(self):

def get_vineyard_service_endpoint(self, api_client):
# return f"{self.vineyard_service_name}:{self._vineyard_service_port}"
service_type = self._service_type
service_name = self.vineyard_service_name
service_type = self._service_type
if self.vineyard_deployment_exists():
service_name = self._vineyard_deployment + "-rpc"
service_type = "ClusterIP"
endpoints = get_service_endpoints(
api_client=api_client,
namespace=self._namespace,
Expand Down Expand Up @@ -585,8 +593,11 @@ def get_interactive_frontend_container(self):

def get_interactive_frontend_deployment(self, replicas=1):
name = self.frontend_deployment_name

container = self.get_interactive_frontend_container()
pod_spec = ResourceBuilder.get_pod_spec(containers=[container])
pod_spec = ResourceBuilder.get_pod_spec(
containers=[container],
)
template_spec = ResourceBuilder.get_pod_template_spec(
pod_spec, self._frontend_labels
)
Expand Down
14 changes: 7 additions & 7 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,12 @@ def parse_sys_args():
default="NodePort",
help="Service type, choose from 'NodePort' or 'LoadBalancer'.",
)
parser.add_argument(
"--k8s_vineyard_deployment",
type=str,
default=None,
help="The name of vineyard deployment, it should exist as expected.",
)
parser.add_argument(
"--k8s_coordinator_name",
type=str,
Expand All @@ -720,12 +726,6 @@ def parse_sys_args():
default="",
help="A list of comma separated secrets to pull image.",
)
parser.add_argument(
"--k8s_vineyard_daemonset",
type=str,
default=None,
help="Use the existing vineyard DaemonSet with name 'k8s_vineyard_daemonset'.",
)
parser.add_argument(
"--k8s_vineyard_cpu",
type=float,
Expand Down Expand Up @@ -925,7 +925,7 @@ def get_launcher(args):
service_type=args.k8s_service_type,
timeout_seconds=args.timeout_seconds,
vineyard_cpu=args.k8s_vineyard_cpu,
vineyard_daemonset=args.k8s_vineyard_daemonset,
vineyard_deployment=args.k8s_vineyard_deployment,
vineyard_image=args.k8s_vineyard_image,
vineyard_mem=args.k8s_vineyard_mem,
vineyard_shared_mem=args.vineyard_shared_mem,
Expand Down
73 changes: 62 additions & 11 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@
logger = logging.getLogger("graphscope")


class FakeKubeResponse:
def __init__(self, obj):
self.data = json.dumps(obj)


class KubernetesClusterLauncher(AbstractLauncher):
def __init__(
self,
Expand Down Expand Up @@ -95,7 +100,7 @@ def __init__(
service_type=None,
timeout_seconds=None,
vineyard_cpu=None,
vineyard_daemonset=None,
vineyard_deployment=None,
vineyard_image=None,
vineyard_mem=None,
vineyard_shared_mem=None,
Expand Down Expand Up @@ -131,15 +136,18 @@ def __init__(

self._num_workers = num_workers

self._vineyard_daemonset = vineyard_daemonset
if vineyard_daemonset is not None:
self._vineyard_deployment = vineyard_deployment

if self.vineyard_deployment_exists():
try:
self._apps_api.read_namespaced_daemon_set(
vineyard_daemonset, self._namespace
self._apps_api.read_namespaced_deployment(
vineyard_deployment, self._namespace
)
except K8SApiException:
logger.error(f"Vineyard daemonset {vineyard_daemonset} not found")
self._vineyard_daemonset = None
logger.exception(
f"Vineyard deployment {self._namespace}/{vineyard_deployment} not found"
)
self._vineyard_deployment = None

self._engine_cpu = engine_cpu
self._engine_mem = engine_mem
Expand Down Expand Up @@ -230,7 +238,7 @@ def __init__(
preemptive=preemptive,
service_type=service_type,
vineyard_cpu=vineyard_cpu,
vineyard_daemonset=vineyard_daemonset,
vineyard_deployment=vineyard_deployment,
vineyard_image=vineyard_image,
vineyard_mem=vineyard_mem,
vineyard_shared_mem=vineyard_shared_mem,
Expand All @@ -244,7 +252,7 @@ def __init__(
)

self._vineyard_service_endpoint = None
self.vineyard_internal_service_endpoint = None
self._vineyard_internal_service_endpoint = None
self._mars_service_endpoint = None
if self._with_mars:
self._mars_cluster = MarsCluster(
Expand All @@ -257,6 +265,9 @@ def __del__(self):
def type(self):
return types_pb2.K8S

def vineyard_deployment_exists(self):
return self._vineyard_deployment is not None

def get_coordinator_owner_references(self):
owner_references = []
if self._coordinator_name:
Expand Down Expand Up @@ -429,7 +440,14 @@ def _create_engine_stateful_set(self):
response = self._core_api.create_namespaced_service(self._namespace, service)
self._resource_object.append(response)
logger.info("Creating engine pods...")

stateful_set = self._engine_cluster.get_engine_stateful_set()
if self.vineyard_deployment_exists():
# schedule engine statefulset to the same node with vineyard deployment
stateful_set = self._add_pod_affinity_for_vineyard_deployment(
workload=stateful_set
)

stateful_set.metadata.owner_references = self._owner_references
response = self._apps_api.create_namespaced_stateful_set(
self._namespace, stateful_set
Expand Down Expand Up @@ -486,7 +504,7 @@ def _create_services(self):
if self._with_mars:
# scheduler used by Mars
self._create_mars_scheduler()
if self._vineyard_daemonset is None:
if self._vineyard_deployment is None:
self._create_vineyard_service()

def _waiting_for_services_ready(self):
Expand Down Expand Up @@ -560,7 +578,7 @@ def _waiting_for_services_ready(self):
self._vineyard_service_endpoint = (
self._engine_cluster.get_vineyard_service_endpoint(self._api_client)
)
self.vineyard_internal_endpoint = (
self._vineyard_internal_endpoint = (
f"{self._pod_ip_list[0]}:{self._engine_cluster._vineyard_service_port}"
)

Expand All @@ -575,6 +593,39 @@ def _waiting_for_services_ready(self):
)
logger.info("Mars service endpoint: %s", self._mars_service_endpoint)

# the function will add the podAffinity to the engine workload so that the workload
# will be scheduled to the same node with vineyard deployment.
# e.g. the vineyard deployment is named "vineyard-deployment" and the namespace is "graphscope-system",
# the podAffinity will be added to the engine workload as below:
# spec:
# affinity:
# podAffinity:
# requiredDuringSchedulingIgnoredDuringExecution:
# - labelSelector:
# matchExpressions:
# - key: app.kubernetes.io/instance
# operator: In
# values:
# - graphscope-system-vineyard-deployment # [vineyard deployment namespace]-[vineyard deployment name]
# topologyKey: kubernetes.io/hostname
def _add_pod_affinity_for_vineyard_deployment(self, workload):
import vineyard

workload_json = json.dumps(
self._api_client.sanitize_for_serialization(workload)
)
new_workload_json = vineyard.deploy.vineyardctl.schedule.workload(
resource=workload_json,
vineyardd_name=self._vineyard_deployment,
vineyardd_namespace=self._namespace,
capture=True,
)

normalized_workload_json = json.loads(new_workload_json)
fake_kube_response = FakeKubeResponse(normalized_workload_json)
new_workload = self._api_client.deserialize(fake_kube_response, type(workload))
return new_workload

def _dump_resource_object(self):
resource = {}
if self._delete_namespace:
Expand Down
11 changes: 9 additions & 2 deletions coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,10 @@ def load_subgraph(
vineyard_rpc_endpoint = engine_config["vineyard_rpc_endpoint"]
else:
vineyard_rpc_endpoint = self._launcher.vineyard_internal_endpoint
if self._launcher.vineyard_deployment_exists():
vineyard_rpc_endpoint = self._launcher._vineyard_service_endpoint
else:
vineyard_rpc_endpoint = self._launcher._vineyard_internal_endpoint
total_builder_chunks = executor_workers_num * threads_per_executor

(
Expand Down Expand Up @@ -749,7 +753,7 @@ def _process_data_sink(self, op: op_def_pb2.OpDef):
if self._launcher.type() == types_pb2.HOSTS:
vineyard_endpoint = engine_config["vineyard_rpc_endpoint"]
else:
vineyard_endpoint = self._launcher.vineyard_internal_endpoint
vineyard_endpoint = self._launcher._vineyard_internal_endpoint
vineyard_ipc_socket = engine_config["vineyard_socket"]
deployment, hosts = self._launcher.get_vineyard_stream_info()
dfstream = vineyard.io.open(
Expand Down Expand Up @@ -848,7 +852,10 @@ def _process_loader_func(loader, vineyard_endpoint, vineyard_ipc_socket):
if self._launcher.type() == types_pb2.HOSTS:
vineyard_endpoint = engine_config["vineyard_rpc_endpoint"]
else:
vineyard_endpoint = self._launcher.vineyard_internal_endpoint
if self._launcher.vineyard_deployment_exists():
vineyard_endpoint = self._launcher._vineyard_service_endpoint
else:
vineyard_endpoint = self._launcher._vineyard_internal_endpoint
vineyard_ipc_socket = engine_config["vineyard_socket"]

for loader in op.large_attr.chunk_meta_list.items:
Expand Down
Loading