diff --git a/.github/workflows/gss.yml b/.github/workflows/gss.yml index db1928037bd6..98c3044152a9 100644 --- a/.github/workflows/gss.yml +++ b/.github/workflows/gss.yml @@ -151,19 +151,19 @@ jobs: - name: Prepare the log directory run: | # create the helm installation log directory - mkdir -p ${{ github.workspace }}/helm-installation-logs + mkdir -p ${{ github.workspace }}/k8s-ci-helm-installation-logs # create the demo fresh of helm installation log directory - mkdir -p ${{ github.workspace }}/demo-fresh-of-helm-installation-logs + mkdir -p ${{ github.workspace }}/k8s-ci-demo-fresh-of-helm-installation-logs # create the demo script of helm installation with pv log directory - mkdir -p ${{ github.workspace }}/demo-script-of-helm-installation-with-pv-logs + mkdir -p ${{ github.workspace }}/k8s-ci-demo-script-of-helm-installation-with-pv-logs # create the helm test of helm installation with pv log directory - mkdir -p ${{ github.workspace }}/helm-test-of-helm-installation-with-pv-logs + mkdir -p ${{ github.workspace }}/k8s-ci-helm-test-of-helm-installation-with-pv-logs # create the demo after restart of helm installation with pv log directory - mkdir -p ${{ github.workspace }}/demo-after-restart-of-helm-installation-with-pv-logs + mkdir -p ${{ github.workspace }}/k8s-ci-demo-after-restart-of-helm-installation-with-pv-logs - name: Setup SSH run: | @@ -202,13 +202,6 @@ jobs: helm install ci --set image.tag=${SHORT_SHA} ./graphscope-store helm test ci --timeout 5m0s - - name: upload the k8s logs to artifact - if: ${{ always() }} - uses: actions/upload-artifact@v2 - with: - name: helm-installation-logs - path: ${{ github.workspace }}/helm-installation-logs - - name: Stop to export kubernetes logs uses: dashanji/kubernetes-log-export-action@v4 env: @@ -254,13 +247,6 @@ jobs: python3 setup.py build_proto python3 -m pytest -s -vvv graphscope/tests/kubernetes/test_store_service.py -k test_demo_fresh - - name: upload the k8s logs to artifact - if: ${{ always() }} - uses: actions/upload-artifact@v2 - with: - name: demo-fresh-of-helm-installation-logs - path: ${{ github.workspace }}/demo-fresh-of-helm-installation-logs - - name: Stop to export kubernetes logs uses: dashanji/kubernetes-log-export-action@v4 env: @@ -282,13 +268,6 @@ jobs: cd ${GITHUB_WORKSPACE}/charts helm install ci --set image.tag=${SHORT_SHA} ./graphscope-store - - name: upload the k8s logs to artifact - if: ${{ always() }} - uses: actions/upload-artifact@v2 - with: - name: demo-script-of-helm-installation-with-pv-logs - path: ${{ github.workspace }}/demo-script-of-helm-installation-with-pv-logs - - name: Stop to export kubernetes logs uses: dashanji/kubernetes-log-export-action@v4 env: @@ -310,13 +289,6 @@ jobs: export NODE_IP=$(kubectl get nodes --namespace default -o jsonpath="{.items[0].status.addresses[0].address}") helm test ci --timeout 10m0s - - name: upload the k8s logs to artifact - if: ${{ always() }} - uses: actions/upload-artifact@v2 - with: - name: helm-test-of-helm-installation-with-pv-logs - path: ${{ github.workspace }}/helm-test-of-helm-installation-with-pv-logs - - name: Stop to export kubernetes logs uses: dashanji/kubernetes-log-export-action@v4 env: @@ -340,10 +312,10 @@ jobs: - name: upload the k8s logs to artifact if: ${{ always() }} - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 with: - name: demo-after-restart-of-helm-installation-with-pv-logs - path: ${{ github.workspace }}/demo-after-restart-of-helm-installation-with-pv-logs + name: k8s-test-logs + path: ${{ github.workspace }}/k8s-ci-*-logs - name: Stop to export kubernetes logs uses: dashanji/kubernetes-log-export-action@v4 diff --git a/coordinator/gscoordinator/cluster_builder.py b/coordinator/gscoordinator/cluster_builder.py index cdbb31ffb38f..968a077cd234 100644 --- a/coordinator/gscoordinator/cluster_builder.py +++ b/coordinator/gscoordinator/cluster_builder.py @@ -54,6 +54,7 @@ def __init__( engine_cpu, engine_mem, engine_pod_node_selector, + engine_pod_prefix, glog_level, image_pull_policy, image_pull_secrets, @@ -80,7 +81,7 @@ def __init__( with_mars, dataset_proxy, ): - self._gs_prefix = "gs-engine-" + self._gs_prefix = engine_pod_prefix self._analytical_prefix = "gs-analytical-" self._interactive_frontend_prefix = "gs-interactive-frontend-" @@ -101,7 +102,9 @@ def __init__( "app.kubernetes.io/instance": self._instance_id, "app.kubernetes.io/version": __version__, "app.kubernetes.io/component": "engine", + "app.kubernetes.io/engine_selector": self.engine_stateful_set_name, } + self._frontend_labels = self._engine_labels.copy() self._frontend_labels["app.kubernetes.io/component"] = "frontend" @@ -216,25 +219,6 @@ def get_base_machine_env(self): ] return env - def get_vineyard_socket_volume(self): - name = "vineyard-ipc-socket" - volume = kube_client.V1Volume(name=name) - if self._vineyard_deployment is None: - empty_dir = kube_client.V1EmptyDirVolumeSource() - volume.empty_dir = empty_dir - else: - path = f"/var/run/vineyard-kubernetes/{self._namespace}/{self._vineyard_deployment}" - host_path = kube_client.V1HostPathVolumeSource(path=path) - host_path.type = "Directory" - volume.host_path = host_path - - source_volume_mount = kube_client.V1VolumeMount( - name=name, mount_path="/tmp/vineyard_workspace" - ) - destination_volume_mount = source_volume_mount - - return volume, source_volume_mount, destination_volume_mount - def get_shm_volume(self): name = "host-shm" volume = kube_client.V1Volume(name=name) @@ -358,6 +342,20 @@ def get_dataset_container(self, volume_mounts): container.security_context = kube_client.V1SecurityContext(privileged=True) return container + def get_vineyard_socket_volume_from_vineyard_deployment(self): + name = "vineyard-ipc-socket" + + # Notice, the path must be same as the one in vineyardd_types.go + # https://github.com/v6d-io/v6d/blob/main/k8s/apis/k8s/v1alpha1/vineyardd_types.go#L125 + path = f"/var/run/vineyard-kubernetes/{self._namespace}/{self._vineyard_deployment}" + host_path = kube_client.V1HostPathVolumeSource(path=path) + host_path.type = "Directory" + volume = kube_client.V1Volume(name=name, host_path=host_path) + volume_mount = kube_client.V1VolumeMount( + name=name, mount_path="/tmp/vineyard_workspace" + ) + return volume, volume_mount + def get_engine_pod_spec(self): containers = [] volumes = [] @@ -366,6 +364,14 @@ def get_engine_pod_spec(self): volumes = [shm_volume[0]] engine_volume_mounts = [shm_volume[2]] + if self.vineyard_deployment_exists(): + ( + volume, + volume_mount, + ) = self.get_vineyard_socket_volume_from_vineyard_deployment() + volumes.append(volume) + engine_volume_mounts.append(volume_mount) + if self._volumes and self._volumes is not None: udf_volumes = ResourceBuilder.get_user_defined_volumes(self._volumes) volumes.extend(udf_volumes[0]) diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 7738a98941bd..40f1ad629abf 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -865,6 +865,12 @@ def parse_sys_args(): default=False, help="Mount the aliyun dataset bucket as a volume by ossfs.", ) + parser.add_argument( + "--k8s_deploy_mode", + type=str, + default="eager", + help="The deploying mode of graphscope, eager or lazy.", + ) parser.add_argument( "--monitor", type=str2bool, @@ -933,6 +939,7 @@ def get_launcher(args): with_mars=args.k8s_with_mars, enabled_engines=args.k8s_enabled_engines, dataset_proxy=args.dataset_proxy, + deploy_mode=args.k8s_deploy_mode, ) elif args.cluster_type == "hosts": launcher = LocalLauncher( diff --git a/coordinator/gscoordinator/kubernetes_launcher.py b/coordinator/gscoordinator/kubernetes_launcher.py index ea97cdc18c12..a9ea1adaa04e 100644 --- a/coordinator/gscoordinator/kubernetes_launcher.py +++ b/coordinator/gscoordinator/kubernetes_launcher.py @@ -109,6 +109,7 @@ def __init__( with_mars=False, enabled_engines="", dataset_proxy=None, + deploy_mode="eager", **kwargs, ): super().__init__() @@ -128,33 +129,32 @@ def __init__( self._image_registry = image_registry self._image_repository = image_repository self._image_tag = image_tag + self._image_pull_policy = image_pull_policy - image_pull_secrets = image_pull_secrets.split(",") if image_pull_secrets else [] + self._image_pull_secrets = ( + image_pull_secrets.split(",") if image_pull_secrets else [] + ) self._glog_level = parse_as_glog_level(log_level) + self._engine_pod_prefix = "gs-engine-" + self._num_workers = num_workers - self._vineyard_deployment = vineyard_deployment + self._vineyard_image = vineyard_image + self._vineyard_mem = vineyard_mem + self._vineyard_cpu = vineyard_cpu + self._vineyard_size = vineyard_shared_mem - if self.vineyard_deployment_exists(): - try: - self._apps_api.read_namespaced_deployment( - vineyard_deployment, self._namespace - ) - except K8SApiException: - logger.exception( - f"Vineyard deployment {self._namespace}/{vineyard_deployment} not found" - ) - self._vineyard_deployment = None + self._vineyard_deployment = vineyard_deployment self._engine_cpu = engine_cpu self._engine_mem = engine_mem + self._engine_pod_node_selector = engine_pod_node_selector self._vineyard_shared_mem = vineyard_shared_mem - self._vineyard_cpu = vineyard_cpu - self._vineyard_mem = vineyard_mem - self._vineyard_image = vineyard_image + self._volumes = volumes + self._dataset_proxy = dataset_proxy self._with_dataset = with_dataset self._preemptive = preemptive @@ -177,7 +177,7 @@ def __init__( ) for item in engines: - if item not in valid_engines: + if item not in valid_engines and item != "": raise ValueError( f"Not a valid engine name: {item}, valid engines are {valid_engines}" ) @@ -196,9 +196,64 @@ def __init__( self._mars_worker_cpu = mars_worker_cpu self._mars_worker_mem = mars_worker_mem + # check the validity of deploy mode + self._deploy_mode = deploy_mode + if self._deploy_mode not in ["eager", "lazy"]: + logger.error( + "Invalid mode %s, choose from 'eager' or 'lazy'. Proceeding with default mode: 'eager'", + self._deploy_mode, + ) + self._deploy_mode = "eager" + + self._vineyard_pod_name_list = [] + + if self._vineyard_deployment is not None: + self._deploy_vineyard_deployment_if_not_exist() + # check the if the vineyard deployment is ready again + if not self._check_if_vineyard_deployment_exist(): + # if not ready, then set the vineyard deployment to None + logger.error( + "Vineyard deployment %s is not ready, please check the deployment status." + "Proceeding with none vineyard deployment mode.", + self._vineyard_deployment, + ) + self._vineyard_deployment = None + + # if the vineyard deployment is not set and use the eager mode, + # which means deploy the engine as a single pod and there is no + # external vineyard deployment. The vineyard objects are not + # shared between the engine pods, so report an error here and set + # the mode to eager. + if self._deploy_mode == "lazy" and self._vineyard_deployment is None: + logger.error( + "Lazy mode is only possible with a vineyard deployment, " + "please add a vineyard deployment name by k8s_vineyard_deployment='vineyardd-sample'. " + "Proceeding with default mode: 'eager'" + ) + self._deploy_mode = "eager" + self._pod_name_list = [] self._pod_ip_list = [] - self._pod_host_ip_list = None + self._pod_host_ip_list = [] + + # analytical engine + self._analytical_pod_name = [] + self._analytical_pod_ip = [] + self._analytical_pod_host_ip = [] + # analytical java engine + self._analytical_java_pod_name = [] + self._analytical_java_pod_ip = [] + self._analytical_java_pod_host_ip = [] + # interactive engine + self._interactive_resource_object = {} + self._interactive_pod_name = {} + self._interactive_pod_ip = {} + self._interactive_pod_host_ip = {} + # learning engine + self._learning_resource_object = {} + self._learning_pod_name = {} + self._learning_pod_ip = {} + self._learning_pod_host_ip = {} self._analytical_engine_endpoint = None self._mars_service_endpoint = None @@ -223,35 +278,11 @@ def __init__( os.makedirs(self._instance_workspace, exist_ok=True) self._session_workspace = None - self._engine_cluster = EngineCluster( - engine_cpu=engine_cpu, - engine_mem=engine_mem, - engine_pod_node_selector=engine_pod_node_selector, - glog_level=self._glog_level, - image_pull_policy=image_pull_policy, - image_pull_secrets=image_pull_secrets, - image_registry=image_registry, - image_repository=image_repository, - image_tag=image_tag, - instance_id=instance_id, - learning_start_port=self._learning_start_port, - with_dataset=with_dataset, - namespace=namespace, - num_workers=num_workers, - preemptive=preemptive, - service_type=service_type, - vineyard_cpu=vineyard_cpu, - vineyard_deployment=vineyard_deployment, - vineyard_image=vineyard_image, - vineyard_mem=vineyard_mem, - vineyard_shared_mem=vineyard_shared_mem, - volumes=volumes, - with_mars=with_mars, - with_analytical=self._with_analytical, - with_analytical_java=self._with_analytical_java, - with_interactive=self._with_interactive, - with_learning=self._with_learning, - dataset_proxy=dataset_proxy, + self._engine_cluster = self._build_engine_cluster( + with_analytical_container=self._with_analytical, + with_analytical_java_container=self._with_analytical_java, + with_interactive_container=self._with_interactive, + with_learning_container=self._with_learning, ) self._vineyard_service_endpoint = None @@ -271,6 +302,47 @@ def type(self): def vineyard_deployment_exists(self): return self._vineyard_deployment is not None + # the argument `with_analytical_` means whether to add the analytical engine + # container to the engine statefulset, and the other three arguments are similar. + def _build_engine_cluster( + self, + with_analytical_container: bool, + with_analytical_java_container: bool, + with_interactive_container: bool, + with_learning_container: bool, + ): + return EngineCluster( + engine_cpu=self._engine_cpu, + engine_mem=self._engine_mem, + engine_pod_node_selector=self._engine_pod_node_selector, + engine_pod_prefix=self._engine_pod_prefix, + glog_level=self._glog_level, + image_pull_policy=self._image_pull_policy, + image_pull_secrets=self._image_pull_secrets, + image_registry=self._image_registry, + image_repository=self._image_repository, + image_tag=self._image_tag, + instance_id=self._instance_id, + learning_start_port=self._learning_start_port, + with_dataset=self._with_dataset, + namespace=self._namespace, + num_workers=self._num_workers, + preemptive=self._preemptive, + service_type=self._service_type, + vineyard_cpu=self._vineyard_cpu, + vineyard_deployment=self._vineyard_deployment, + vineyard_image=self._vineyard_image, + vineyard_mem=self._vineyard_mem, + vineyard_shared_mem=self._vineyard_shared_mem, + volumes=self._volumes, + with_mars=self._with_mars, + with_analytical=with_analytical_container, + with_analytical_java=with_analytical_java_container, + with_interactive=with_interactive_container, + with_learning=with_learning_container, + dataset_proxy=self._dataset_proxy, + ) + def get_coordinator_owner_references(self): owner_references = [] if self._coordinator_name: @@ -299,6 +371,10 @@ def get_namespace(self): def get_vineyard_stream_info(self): hosts = [f"{self._namespace}:{host}" for host in self._pod_name_list] + if self._vineyard_deployment is not None: + hosts = [ + f"{self._namespace}:{host}" for host in self._vineyard_pod_name_list + ] return "kubernetes", hosts def set_session_workspace(self, session_id): @@ -322,10 +398,11 @@ def hosts(self): @property def hosts_list(self): - return self._pod_name_list + return self._get_analytical_hosts() def distribute_file(self, path): - for pod in self._pod_name_list: + pod_name_list, _, _ = self._allocate_analytical_engine() + for pod in pod_name_list: container = self._engine_cluster.analytical_container_name try: # The library may exists in the analytical pod. @@ -356,13 +433,167 @@ def close_vineyard(self): # Use delete deployment instead pass - def create_interactive_instance(self, object_id: int, schema_path: str): + def check_if_engine_exist(self, engine_type, object_id=None): + """Checks if the engine with the given type exists. + + Args: + engine_type: The type of engine to check for. + object_id: The object id of the engine to check for. + + Returns: + True if the engine exists, False otherwise. + """ + + if object_id: + engine_pod_name_dict = getattr(self, f"_{engine_type}_pod_name") + engine_pod_name_list = engine_pod_name_dict.get(object_id, []) + engine_pod_ip_dict = getattr(self, f"_{engine_type}_pod_ip") + engine_pod_ip_list = engine_pod_ip_dict.get(object_id, []) + engine_pod_host_ip_dict = getattr(self, f"_{engine_type}_pod_host_ip") + engine_pod_host_ip_list = engine_pod_host_ip_dict.get(object_id, []) + else: + engine_pod_name_list = getattr(self, f"_{engine_type}_pod_name") + engine_pod_ip_list = getattr(self, f"_{engine_type}_pod_ip") + engine_pod_host_ip_list = getattr(self, f"_{engine_type}_pod_host_ip") + + return engine_pod_name_list and engine_pod_ip_list and engine_pod_host_ip_list + + def deploy_engine(self, engine_type, object_id=None): + """Deploys the engine with the given type. + + Args: + engine_type: The type of engine to deploy. + object_id: The object ID to deploy the engine with. + + Returns: + A tuple of the pod names, IP addresses, and host IP addresses of the + deployed engine and the response of the engine and service. + """ + + if not self.check_if_engine_exist(engine_type, object_id): + self._engine_pod_prefix = f"gs-{engine_type}-" + ( + f"{object_id}-" if object_id else "" + ).replace("_", "-") + self._engine_cluster = self._build_engine_cluster( + with_analytical_container=engine_type == "analytical", + with_analytical_java_container=engine_type == "analytical-java", + with_interactive_container=engine_type == "interactive", + with_learning_container=engine_type == "learning", + ) + response = self._create_engine_stateful_set() + self._waiting_for_services_ready() + + if object_id: + resource_object = getattr(self, f"_{engine_type}_resource_object") + pod_name = getattr(self, f"_{engine_type}_pod_name") + pod_ip = getattr(self, f"_{engine_type}_pod_ip") + pod_host_ip = getattr(self, f"_{engine_type}_pod_host_ip") + resource_object[object_id] = response + pod_name[object_id] = self._pod_name_list + pod_ip[object_id] = self._pod_ip_list + pod_host_ip[object_id] = self._pod_host_ip_list + else: + # Set the engine pod info + setattr(self, f"_{engine_type}_pod_name", self._pod_name_list) + setattr(self, f"_{engine_type}_pod_ip", self._pod_ip_list) + setattr(self, f"_{engine_type}_pod_host_ip", self._pod_host_ip_list) + + return ( + getattr(self, f"_{engine_type}_pod_name") + if object_id is None + else getattr(self, f"_{engine_type}_pod_name")[object_id], + getattr(self, f"_{engine_type}_pod_ip") + if object_id is None + else getattr(self, f"_{engine_type}_pod_ip")[object_id], + getattr(self, f"_{engine_type}_pod_host_ip") + if object_id is None + else getattr(self, f"_{engine_type}_pod_host_ip")[object_id], + ) + + def delete_engine_stateful_set_with_object_id(self, engine_type, object_id): + """delete the engine stateful set with the given object id. + + Args: + object_id (int): The object id of the engine to delete. + """ + resource_object = getattr(self, f"_{engine_type}_resource_object") + obj = resource_object.get(object_id, {}) + if obj: + delete_kubernetes_object( + api_client=self._api_client, + target=obj, + wait=self._waiting_for_delete, + timeout_seconds=self._timeout_seconds, + ) + + pod_name = getattr(self, f"_{engine_type}_pod_name") + pod_ip = getattr(self, f"_{engine_type}_pod_ip") + pod_host_ip = getattr(self, f"_{engine_type}_pod_host_ip") + del resource_object[object_id] + del pod_name[object_id] + del pod_ip[object_id] + del pod_host_ip[object_id] + + def deploy_analytical_engine(self): + return self.deploy_engine("analytical") + + def deploy_analytical_java_engine(self): + return self.deploy_engine("analytical-java") + + def deploy_interactive_engine(self, object_id): + pod_name_list, pod_ip_list, pod_host_ip_list = self.deploy_engine( + "interactive", object_id + ) + try: + response = self._core_api.read_namespaced_pod( + pod_name_list[0], self._namespace + ) + except K8SApiException: + logger.exception( + "Get pod %s error, please check if the pod is ready", + pod_name_list[0], + ) + owner_references = [ + kube_client.V1OwnerReference( + api_version=response.metadata.owner_references[0].api_version, + kind=response.metadata.owner_references[0].kind, + name=response.metadata.owner_references[0].name, + uid=response.metadata.owner_references[0].uid, + ) + ] + name = f"gs-interactive-frontend-{object_id}-{self._instance_id}" + self._create_frontend_deployment(name, owner_references) + + return pod_name_list, pod_ip_list, pod_host_ip_list + + def deploy_learning_engine(self, object_id): + return self.deploy_engine("learning", object_id) + + def delete_interactive_engine(self, object_id): + self.delete_engine_stateful_set_with_object_id("interactive", object_id) + + def delete_learning_engine(self, object_id): + self.delete_engine_stateful_set_with_object_id("learning", object_id) + + def _allocate_interactive_engine(self, object_id): + # check the interactive engine flag if not self._with_interactive: raise NotImplementedError("Interactive engine not enabled") + + # allocate analytical engine based on the mode + if self._deploy_mode == "eager": + return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list + return self.deploy_interactive_engine(object_id) + + def _distribute_interactive_process( + self, hosts, object_id: int, schema_path: str, engine_selector: str + ): """ Args: + hosts (str): hosts of the graph. object_id (int): object id of the graph. schema_path (str): path of the schema file. + engine_selector(str): the label selector of the engine. """ env = os.environ.copy() env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME @@ -373,12 +604,13 @@ def create_interactive_instance(self, object_id: int, schema_path: str): self._session_workspace, str(object_id), schema_path, - self.hosts, + hosts, container, str(self._interactive_port), # executor port str(self._interactive_port + 1), # executor rpc port str(self._interactive_port + 2), # frontend port self._coordinator_name, + engine_selector, ] self._interactive_port += 3 logger.info("Create GIE instance with command: %s", " ".join(cmd)) @@ -397,7 +629,29 @@ def create_interactive_instance(self, object_id: int, schema_path: str): ) return process + def create_interactive_instance(self, object_id: int, schema_path: str): + pod_name_list, _, _ = self._allocate_interactive_engine(object_id) + if not pod_name_list: + raise RuntimeError("Failed to allocate interactive engine") + hosts = ",".join(pod_name_list) + + engine_selector = "gs-engine-" + self._instance_id + if self._deploy_mode == "lazy": + engine_selector = ( + "gs-interactive-" + str(object_id) + "-" + self._instance_id + ) + + return self._distribute_interactive_process( + hosts, object_id, schema_path, engine_selector + ) + def close_interactive_instance(self, object_id): + if self._deploy_mode == "lazy": + logger.info("Close interactive instance with object id: %d", object_id) + self.delete_interactive_engine(object_id) + return None + pod_name_list, _, _ = self._allocate_interactive_engine(object_id) + hosts = ",".join(pod_name_list) env = os.environ.copy() env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME container = self._engine_cluster.interactive_executor_container_name @@ -406,7 +660,7 @@ def close_interactive_instance(self, object_id): "close_gremlin_instance_on_k8s", self._session_workspace, str(object_id), - self.hosts, + hosts, container, self._instance_id, ] @@ -632,14 +886,9 @@ def _inject_vineyard_as_sidecar(self, workload): def _create_engine_stateful_set(self): logger.info("Creating engine pods...") - # we don't need to create the headless service for engines here, - # as the etcd service is already created by the vineyardctl - # service = self._engine_cluster.get_engine_headless_service() - # service.metadata.owner_references = self._owner_references - # response = self._core_api.create_namespaced_service(self._namespace, service) - # self._resource_object.append(response) + stateful_set = self._engine_cluster.get_engine_stateful_set() - if self.vineyard_deployment_exists(): + if self._vineyard_deployment is not None: # schedule engine statefulset to the same node with vineyard deployment stateful_set = self._add_pod_affinity_for_vineyard_deployment( workload=stateful_set @@ -651,11 +900,14 @@ def _create_engine_stateful_set(self): self._namespace, stateful_set ) self._resource_object.append(response) + return response - def _create_frontend_deployment(self): + def _create_frontend_deployment(self, name=None, owner_references=None): logger.info("Creating frontend pods...") deployment = self._engine_cluster.get_interactive_frontend_deployment() - deployment.metadata.owner_references = self._owner_references + if name is not None: + deployment.metadata.name = name + deployment.metadata.owner_references = owner_references response = self._apps_api.create_namespaced_deployment( self._namespace, deployment ) @@ -763,7 +1015,6 @@ def _waiting_for_services_ready(self): self._analytical_engine_endpoint = ( f"{self._pod_ip_list[0]}:{self._random_analytical_engine_rpc_port}" ) - self._vineyard_service_endpoint = ( self._engine_cluster.get_vineyard_service_endpoint(self._api_client) ) @@ -825,28 +1076,50 @@ def _dump_resource_object(self): resource[self._coordinator_service_name] = "Service" self._resource_object.dump(extra_resource=resource) - def create_analytical_instance(self): - if not (self._with_analytical or self._with_analytical_java): - raise NotImplementedError("Analytical engine not enabled") - logger.info( - "Starting GAE rpc service on %s ...", self._analytical_engine_endpoint - ) + def _get_analytical_hosts(self): + pod_name_list = self._pod_name_list + if self._analytical_pod_name: + pod_name_list = self._analytical_pod_name + return pod_name_list + + def _allocate_analytical_engine(self): + # check the engine flag + if self._with_analytical and self._with_analytical_java: + logger.info( + "Analytical engine and analytical engine(java) cannot be enabled at the same time. " + "Proceeding to only enable analytical_java." + ) + self._with_analytical = False + elif not (self._with_analytical or self._with_analytical_java): + raise NotImplementedError( + "Neither analytical engine nor analytical engine(java) is enabled." + ) + # allocate analytical engine based on the mode + if self._deploy_mode == "eager": + return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list + else: + if self._with_analytical: + return self.deploy_analytical_engine() + else: + return self.deploy_analytical_java_engine() + + def _distribute_analytical_process(self, pod_name_list, pod_ip_list): # generate and distribute hostfile hosts = os.path.join(get_tempdir(), "kube_hosts") with open(hosts, "w") as f: - for i, pod_ip in enumerate(self._pod_ip_list): - f.write(f"{pod_ip} {self._pod_name_list[i]}\n") + for i, pod_ip in enumerate(pod_ip_list): + f.write(f"{pod_ip} {pod_name_list[i]}\n") container = self._engine_cluster.analytical_container_name - for pod in self._pod_name_list: + for pod in pod_name_list: logger.debug( run_kube_cp_command(hosts, "/tmp/hosts_of_nodes", pod, container, True) ) # launch engine rmcp = ResolveMPICmdPrefix(rsh_agent=True) - cmd, mpi_env = rmcp.resolve(self._num_workers, ",".join(self._pod_name_list)) + cmd, mpi_env = rmcp.resolve(self._num_workers, ",".join(pod_name_list)) cmd.append(ANALYTICAL_ENGINE_PATH) cmd.extend(["--host", "0.0.0.0"]) @@ -882,6 +1155,16 @@ def create_analytical_instance(self): setattr(self._analytical_engine_process, "stdout_watcher", stdout_watcher) setattr(self._analytical_engine_process, "stderr_watcher", stderr_watcher) + def create_analytical_instance(self): + pod_name_list, pod_ip_list, _ = self._allocate_analytical_engine() + if not pod_name_list or not pod_ip_list: + raise RuntimeError("Failed to allocate analytical engine.") + logger.info( + "Starting GAE rpc service on %s ...", self._analytical_engine_endpoint + ) + + self._distribute_analytical_process(pod_name_list, pod_ip_list) + def _delete_dangling_coordinator(self): # delete service try: @@ -934,14 +1217,76 @@ def _delete_dangling_coordinator(self): self._coordinator_name, ) + def _get_owner_reference_as_json(self): + owner_reference = [ + { + "apiVersion": self._owner_references[0].api_version, + "kind": self._owner_references[0].kind, + "name": self._owner_references[0].name, + "uid": self._owner_references[0].uid, + } + ] + owner_reference_json = json.dumps(owner_reference) + return owner_reference_json + + def _check_if_vineyard_deployment_exist(self): + try: + self._apps_api.read_namespaced_deployment( + self._vineyard_deployment, self._namespace + ) + except K8SApiException: + logger.info( + f"Vineyard deployment {self._namespace}/{self._vineyard_deployment} not exist" + ) + return False + return True + + def _deploy_vineyard_deployment_if_not_exist(self): + if not self._check_if_vineyard_deployment_exist(): + self._deploy_vineyard_deployment() + else: + logger.info( + "The external vineyard deployment %s is ready." + "Please make sure the type of the vineyard rpc service is the same as %s.", + self._vineyard_deployment, + self._service_type, + ) + + def _deploy_vineyard_deployment(self): + import vineyard + + owner_reference_json = self._get_owner_reference_as_json() + vineyard.deploy.vineyardctl.deploy.vineyard_deployment( + name=self._vineyard_deployment, + namespace=self._namespace, + vineyard_replicas=self._num_workers, + vineyard_etcd_replicas=self._num_workers, + vineyardd_image=self._vineyard_image, + vineyardd_memory=self._vineyard_mem, + vineyardd_cpu=self._vineyard_cpu, + vineyardd_size=self._vineyard_shared_mem, + vineyardd_service_type=self._service_type, + owner_references=owner_reference_json, + ) + vineyard_pods = self._core_api.list_namespaced_pod( + self._namespace, + label_selector="app.kubernetes.io/instance=" + + self._namespace + + "-" + + self._vineyard_deployment, + ) + for pod in vineyard_pods.items: + self._vineyard_pod_name_list.append(pod.metadata.name) + def start(self): if self._serving: return True try: - self._create_services() - self._waiting_for_services_ready() - self._dump_resource_object() - self._serving = True + if self._deploy_mode == "eager": + self._create_services() + self._waiting_for_services_ready() + self._dump_resource_object() + self._serving = True except Exception: # pylint: disable=broad-except time.sleep(1) logger.exception("Error when launching GraphScope on kubernetes cluster") @@ -991,9 +1336,19 @@ def stop(self, is_dangling=False): self._serving = False logger.info("Kubernetes launcher stopped") - def create_learning_instance(self, object_id, handle, config): + def _allocate_learining_engine(self, object_id): + # check the learning engine flag if not self._with_learning: raise NotImplementedError("Learning engine not enabled") + + # allocate learning engine based on the mode + if self._deploy_mode == "eager": + return self._pod_name_list, self._pod_ip_list, self._pod_host_ip_list + return self.deploy_learning_engine(object_id) + + def _distribute_learning_process( + self, pod_name_list, pod_host_ip_list, object_id, handle, config + ): # allocate service for ports # prepare arguments handle = json.loads( @@ -1005,7 +1360,7 @@ def create_learning_instance(self, object_id, handle, config): [ f"{pod_name}:{port}" for pod_name, port in zip( - self._pod_name_list, + pod_name_list, self._engine_cluster.get_learning_ports(self._learning_start_port), ) ] @@ -1043,13 +1398,24 @@ def create_learning_instance(self, object_id, handle, config): # Create Service self._create_learning_service(object_id) # update the port usage record - self._learning_start_port += len(self._pod_name_list) + self._learning_start_port += len(pod_name_list) # parse the service hosts and ports return self._engine_cluster.get_graphlearn_service_endpoint( - self._api_client, object_id, self._pod_host_ip_list + self._api_client, object_id, pod_host_ip_list + ) + + def create_learning_instance(self, object_id, handle, config): + pod_name_list, _, pod_host_ip_list = self._allocate_learining_engine(object_id) + if not pod_name_list or not pod_host_ip_list: + raise RuntimeError("Failed to allocate learning engine") + return self._distribute_learning_process( + pod_name_list, pod_host_ip_list, object_id, handle, config ) def close_learning_instance(self, object_id): + if self._deploy_mode == "lazy": + self.delete_learning_engine(object_id) + return if object_id not in self._learning_instance_processes: return # delete the services diff --git a/interactive_engine/assembly/src/bin/graphscope/giectl b/interactive_engine/assembly/src/bin/graphscope/giectl index de793def52bf..5a7c397f0208 100755 --- a/interactive_engine/assembly/src/bin/graphscope/giectl +++ b/interactive_engine/assembly/src/bin/graphscope/giectl @@ -250,6 +250,7 @@ create_gremlin_instance_on_local() { # executor_rpc_port # frontend_port # coordinator_name: name of coordinator deployment object in k8s +# engine_selector: the label name of engine selector ########################## create_gremlin_instance_on_k8s() { declare -r GRAPHSCOPE_RUNTIME=$1 @@ -264,22 +265,28 @@ create_gremlin_instance_on_k8s() { declare -r executor_rpc_port=$7 declare -r frontend_port=$8 declare -r coordinator_name=$9 # deployment name of coordinator + declare -r engine_selector=${10} instance_id=${coordinator_name#*-} - pod_ips=$(kubectl get pod -lapp.kubernetes.io/component=engine,app.kubernetes.io/instance=${instance_id} -o jsonpath='{.items[*].status.podIP}') + pod_ips=$(kubectl get pod -lapp.kubernetes.io/component=engine,app.kubernetes.io/instance=${instance_id},app.kubernetes.io/engine_selector=${engine_selector} -o jsonpath='{.items[*].status.podIP}') + pegasus_hosts="" for pod in ${pod_ips}; do pegasus_hosts="${pegasus_hosts},${pod}:${executor_rpc_port}" done pegasus_hosts=${pegasus_hosts:1} - frontend_name=$(kubectl get pod -lapp.kubernetes.io/component=frontend,app.kubernetes.io/instance=${instance_id} -o jsonpath='{.items[*].metadata.name}') + frontend_name=$(kubectl get pod -lapp.kubernetes.io/component=frontend,app.kubernetes.io/instance=${instance_id},app.kubernetes.io/engine_selector=${engine_selector} -o jsonpath='{.items[*].metadata.name}') + + log "wait for frontend pod ready." + kubectl wait --for=condition=Ready pod/${frontend_name} --timeout=60s launch_frontend_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} \ ${GRAPHSCOPE_HOME}/bin/giectl start_frontend \ ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port}" kubectl cp ${schema_path} ${frontend_name}:${schema_path} + kubectl exec ${frontend_name} -- /bin/bash -c "${launch_frontend_cmd}" network_servers="" @@ -302,7 +309,7 @@ create_gremlin_instance_on_k8s() { timeout_seconds=60 # random from range [50001, 51000) for interactive engine frontend_external_port=$(( ((RANDOM<<15)|RANDOM) % 1000 + 50000 )) - frontend_deployment_name=$(kubectl get deployment -lapp.kubernetes.io/component=frontend,app.kubernetes.io/instance=${instance_id} -o jsonpath='{.items[*].metadata.name}') + frontend_deployment_name=$(kubectl get deployment -lapp.kubernetes.io/component=frontend,app.kubernetes.io/instance=${instance_id},app.kubernetes.io/engine_selector=${engine_selector} -o jsonpath='{.items[*].metadata.name}') if [ "${GREMLIN_EXPOSE}" = "LoadBalancer" ]; then kubectl expose deployment ${frontend_deployment_name} --name=gremlin-${object_id} --port=${frontend_external_port} \ --target-port=${frontend_port} --type=LoadBalancer 1>/dev/null 2>&1 diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index f5a98f24a281..e23d287fd985 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -315,6 +315,7 @@ def __init__( k8s_engine_pod_node_selector=gs_config.k8s_engine_pod_node_selector, k8s_volumes=gs_config.k8s_volumes, k8s_waiting_for_delete=gs_config.k8s_waiting_for_delete, + k8s_deploy_mode=gs_config.k8s_deploy_mode, timeout_seconds=gs_config.timeout_seconds, dangling_timeout_seconds=gs_config.dangling_timeout_seconds, enabled_engines=gs_config.enabled_engines, @@ -483,6 +484,10 @@ def __init__( Expect this value to be greater than 5 (heartbeat interval). Disable dangling check by setting -1. + k8s_deploy_mode (str, optional): the deploy mode of engines on the kubernetes cluster. Default to eager. + eager: create all engine pods at once + lazy: create engine pods when called + k8s_waiting_for_delete (bool, optional): Waiting for service delete or not. Defaults to False. **kw (dict, optional): Other optional parameters will be put to :code:`**kw`. @@ -569,6 +574,7 @@ def __init__( "reconnect", "k8s_volumes", "k8s_waiting_for_delete", + "k8s_deploy_mode", "timeout_seconds", "dangling_timeout_seconds", "with_mars", @@ -614,6 +620,7 @@ def __init__( "," ) ) + for item in engines: if item not in valid_engines: raise ValueError( @@ -641,17 +648,7 @@ def __init__( if kw: raise ValueError("Value not recognized: ", list(kw.keys())) - if self._config_params["addr"]: - logger.info( - "Connecting graphscope session with address: %s", - self._config_params["addr"], - ) - else: - logger.info( - "Initializing graphscope session with parameters: %s", - self._config_params, - ) - + self._log_session_info() self._closed = False # coordinator service endpoint @@ -713,6 +710,18 @@ def session_id(self) -> str: def dag(self): return self._dag + def _log_session_info(self): + if self._config_params["addr"]: + logger.info( + "Connecting graphscope session with address: %s", + self._config_params["addr"], + ) + else: + logger.info( + "Initializing graphscope session with parameters: %s", + self._config_params, + ) + def _load_config(self, path, silent=True): config_path = os.path.expandvars(os.path.expanduser(path)) try: @@ -1347,6 +1356,7 @@ def set_option(**kwargs): - k8s_waiting_for_delete - timeout_seconds - dataset_download_retries + - k8s_deploy_mode Args: kwargs: dict @@ -1413,6 +1423,7 @@ def get_option(key): - k8s_waiting_for_delete - timeout_seconds - dataset_download_retries + - k8s_deploy_mode Args: key: str diff --git a/python/graphscope/config.py b/python/graphscope/config.py index f6d046e2ba99..685362a5149a 100644 --- a/python/graphscope/config.py +++ b/python/graphscope/config.py @@ -114,6 +114,11 @@ class GSConfig(object): # support resource preemption or resource guarantee preemptive = True + # the deploy mode of engines on the kubernetes cluster, default to eager + # eager: create all engine pods at once + # lazy: create engine pods when called + k8s_deploy_mode = "eager" + k8s_waiting_for_delete = False num_workers = 2 show_log = False diff --git a/python/graphscope/deploy/kubernetes/cluster.py b/python/graphscope/deploy/kubernetes/cluster.py index fe5210a37af7..c99940d77da0 100644 --- a/python/graphscope/deploy/kubernetes/cluster.py +++ b/python/graphscope/deploy/kubernetes/cluster.py @@ -95,6 +95,7 @@ def __init__( timeout_seconds=600, dangling_timeout_seconds=None, k8s_waiting_for_delete=False, + k8s_deploy_mode=None, with_dataset=False, **kwargs, ): @@ -434,6 +435,14 @@ def _get_coordinator_args(self): f"{self.base64_encode(json.dumps(self._saved_locals['k8s_engine_pod_node_selector']))}", ] ) + + if self._saved_locals["k8s_deploy_mode"] is not None: + args.extend( + [ + "--k8s_deploy_mode", + str(self._saved_locals["k8s_deploy_mode"]), + ] + ) print(args) return args diff --git a/python/graphscope/tests/kubernetes/test_demo_script.py b/python/graphscope/tests/kubernetes/test_demo_script.py index 55d1f413a333..36c270553c79 100644 --- a/python/graphscope/tests/kubernetes/test_demo_script.py +++ b/python/graphscope/tests/kubernetes/test_demo_script.py @@ -21,6 +21,7 @@ import random import string import tempfile +import time import numpy as np import pytest @@ -60,6 +61,56 @@ def get_gs_tag_on_ci_env(): return gs_config.k8s_image_tag +# get the num of engine pod which contains the specific name +def get_engine_pod_num(name, namespace): + from kubernetes import client + from kubernetes import config + from kubernetes.client.rest import ApiException + + config.load_kube_config() + v1 = client.CoreV1Api() + num = 0 + try: + pod_lists = v1.list_namespaced_pod(namespace=namespace) + for i in pod_lists.items: + if name in i.metadata.name and i.status.phase == "Running": + num += 1 + except ApiException as e: + if e.status == 404: + return num + else: + raise e + return num + + +def wait_for_pod_deletion(name, namespace, timeout=120): + from kubernetes import client + from kubernetes import config + from kubernetes.client.rest import ApiException + + config.load_kube_config() + v1 = client.CoreV1Api() + + # Wait for the pod to be deleted + start_time = time.time() + pod = "" + pod_lists = v1.list_namespaced_pod(namespace=namespace) + for i in pod_lists.items: + if name in i.metadata.name: + pod = i.metadata.name + logger.info("Waiting for pod %s to be deleted", pod) + while time.time() - start_time < timeout: + try: + _ = v1.read_namespaced_pod(pod, namespace) + except ApiException: + # The pod has been deleted + logger.info("Pod %s has been deleted", pod) + return + + time.sleep(1) + raise Exception("Pod not deleted after {} seconds".format(timeout)) + + @pytest.fixture def gs_session(): sess = graphscope.session( @@ -98,6 +149,28 @@ def gs_session_distributed(): sess.close() +@pytest.fixture +def gs_session_with_lazy_mode(): + sess = graphscope.session( + num_workers=1, + k8s_image_registry=get_gs_registry_on_ci_env(), + k8s_image_tag=get_gs_tag_on_ci_env(), + k8s_coordinator_cpu=2, + k8s_coordinator_mem="4Gi", + k8s_vineyard_cpu=2, + k8s_vineyard_mem="512Mi", + k8s_engine_cpu=2, + k8s_engine_mem="4Gi", + vineyard_shared_mem="4Gi", + k8s_vineyard_deployment="vineyardd-sample", + k8s_namespace="graphscope-system", + k8s_volumes=get_k8s_volumes(), + k8s_deploy_mode="lazy", + ) + yield sess + sess.close() + + @pytest.fixture def create_vineyard_deployment_on_single_node(): import vineyard @@ -308,6 +381,85 @@ def test_demo_distribute(gs_session_distributed, data_dir, modern_graph_data_dir # GNN engine +def test_demo_with_lazy_mode( + gs_session_with_lazy_mode, data_dir, modern_graph_data_dir +): + graph = load_ldbc(gs_session_with_lazy_mode, data_dir) + + interactive_name = "interactive-" + str(graph.vineyard_id) + namespace = gs_session_with_lazy_mode.info["namespace"] + assert get_engine_pod_num(interactive_name, namespace) == 0 + # Interactive engine + interactive = gs_session_with_lazy_mode.gremlin(graph) + sub_graph = interactive.subgraph( # noqa: F841 + 'g.V().hasLabel("person").outE("knows")' + ) + person_count = interactive.execute( + 'g.V().hasLabel("person").outE("knows").bothV().dedup().count()' + ).all()[0] + knows_count = interactive.execute( + 'g.V().hasLabel("person").outE("knows").count()' + ).all()[0] + assert get_engine_pod_num(interactive_name, namespace) == 1 + + interactive.close() + # wait for engine pod to be deleted + wait_for_pod_deletion(interactive_name, namespace) + assert get_engine_pod_num(interactive_name, namespace) == 0 + + interactive2_name = "interactive-" + str(sub_graph.vineyard_id) + assert get_engine_pod_num(interactive2_name, namespace) == 0 + + interactive2 = gs_session_with_lazy_mode.gremlin(sub_graph) + assert get_engine_pod_num(interactive2_name, namespace) == 1 + + sub_person_count = interactive2.execute("g.V().count()").all()[0] + sub_knows_count = interactive2.execute("g.E().count()").all()[0] + assert person_count == sub_person_count + assert knows_count == sub_knows_count + + interactive2.close() + # wait for engine pod to be deleted + wait_for_pod_deletion(interactive2_name, namespace) + assert get_engine_pod_num(interactive2_name, namespace) == 0 + + # Analytical engine + # project the projected graph to simple graph. + simple_g = sub_graph.project(vertices={"person": []}, edges={"knows": []}) + + pr_result = graphscope.pagerank(simple_g, delta=0.8) + tc_result = graphscope.triangles(simple_g) + + # add the PageRank and triangle-counting results as new columns to the property graph + sub_graph.add_column(pr_result, {"Ranking": "r"}) + sub_graph.add_column(tc_result, {"TC": "r"}) + + # test subgraph on modern graph + mgraph = load_modern_graph(gs_session_with_lazy_mode, modern_graph_data_dir) + + minteractive_name = "interactive-" + str(mgraph.vineyard_id) + assert get_engine_pod_num(minteractive_name, namespace) == 0 + # Interactive engine + minteractive = gs_session_with_lazy_mode.gremlin(mgraph) + assert get_engine_pod_num(minteractive_name, namespace) == 1 + msub_graph = minteractive.subgraph( # noqa: F841 + 'g.V().hasLabel("person").outE("knows")' + ) + person_count = minteractive.execute( + 'g.V().hasLabel("person").outE("knows").bothV().dedup().count()' + ).all()[0] + + minteractive.close() + # wait for engine pod to be deleted + wait_for_pod_deletion(minteractive_name, namespace) + assert get_engine_pod_num(minteractive_name, namespace) == 0 + msub_interactive = gs_session_with_lazy_mode.gremlin(msub_graph) + sub_person_count = msub_interactive.execute("g.V().count()").all()[0] + assert person_count == sub_person_count + + # GNN engine + + def test_multiple_session(): namespace = "gs-multi-" + "".join( [random.choice(string.ascii_lowercase) for _ in range(6)]