From 84e0afe9c3ce77b621958479d635e312cd2b984b Mon Sep 17 00:00:00 2001 From: Ye Cao Date: Wed, 24 May 2023 11:28:15 +0800 Subject: [PATCH] * Add a new pytest on kubernetes to test the lazy mode. * Add a new label for engine pods to indicate the specific engine. Signed-off-by: Ye Cao --- coordinator/gscoordinator/cluster_builder.py | 16 +++++ .../gscoordinator/kubernetes_launcher.py | 21 ++++-- .../assembly/src/bin/graphscope/giectl | 13 ++-- .../tests/kubernetes/test_demo_script.py | 69 +++++++++++++++++++ 4 files changed, 103 insertions(+), 16 deletions(-) diff --git a/coordinator/gscoordinator/cluster_builder.py b/coordinator/gscoordinator/cluster_builder.py index 0bec1b77f3d9..00d5021d90d5 100644 --- a/coordinator/gscoordinator/cluster_builder.py +++ b/coordinator/gscoordinator/cluster_builder.py @@ -102,7 +102,11 @@ def __init__( "app.kubernetes.io/instance": self._instance_id, "app.kubernetes.io/version": __version__, "app.kubernetes.io/component": "engine", + "app.kubernetes.io/engine": self.get_engine_type_from_prefix_name( + engine_pod_prefix + ), } + self._frontend_labels = self._engine_labels.copy() self._frontend_labels["app.kubernetes.io/component"] = "frontend" @@ -186,6 +190,18 @@ def vineyard_ipc_socket(self): def vineyard_deployment_exists(self): return self._vineyard_deployment is not None + def get_engine_type_from_prefix_name(self, prefix_name): + if "analytical" in prefix_name: + return "analytical" + elif "analytical-java" in prefix_name: + return "analytical-java" + elif "interactive" in prefix_name: + return "interactive" + elif "learning" in prefix_name: + return "learning" + else: + return "engine" + def base64_decode(self, string): return base64.b64decode(string).decode("utf-8", errors="ignore") diff --git a/coordinator/gscoordinator/kubernetes_launcher.py b/coordinator/gscoordinator/kubernetes_launcher.py index d63c84f7100d..1e0f3cc43ad4 100644 --- a/coordinator/gscoordinator/kubernetes_launcher.py +++ b/coordinator/gscoordinator/kubernetes_launcher.py @@ -214,6 +214,13 @@ def __init__( # if not ready, then set the vineyard deployment to None self._vineyard_deployment = None + # if the vineyard deployment is not set and use the eager mode, + # which means deploy the engine as a single pod and there is no + # external vineyard deployment. The vineyard objects are not + # shared between the engine pods, so raise an error here. + if self._mode == "lazy" and self._vineyard_deployment is None: + raise ValueError("If the mode is lazy, the vineyard deployment must be set") + self._pod_name_list = [] self._pod_ip_list = None self._pod_host_ip_list = None @@ -494,14 +501,14 @@ def _allocate_interactive_engine(self): return self.deploy_interactive_engine() def _distribute_interactive_process( - self, hosts, filter_name, object_id: int, schema_path: str + self, hosts, object_id: int, schema_path: str, engine_type: str ): """ Args: hosts (str): hosts of the graph. - filter_name (str): filter name of the pod name. object_id (int): object id of the graph. schema_path (str): path of the schema file. + engine_type (str): the engine_type of the engine pods' label. """ env = os.environ.copy() env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME @@ -518,7 +525,7 @@ def _distribute_interactive_process( str(self._interactive_port + 1), # executor rpc port str(self._interactive_port + 2), # frontend port self._coordinator_name, - filter_name, + engine_type, ] self._interactive_port += 3 logger.info("Create GIE instance with command: %s", " ".join(cmd)) @@ -543,12 +550,12 @@ def create_interactive_instance(self, object_id: int, schema_path: str): raise RuntimeError("Failed to allocate interactive engine") hosts = ",".join(pod_name_list) - filter_name = "gs-engine" - if self._mode == "laze": - filter_name = "gs-interactive" + engine_type = "engine" + if self._mode == "lazy": + engine_type = "interactive" return self._distribute_interactive_process( - hosts, filter_name, object_id, schema_path + hosts, object_id, schema_path, engine_type ) def close_interactive_instance(self, object_id): diff --git a/interactive_engine/assembly/src/bin/graphscope/giectl b/interactive_engine/assembly/src/bin/graphscope/giectl index f3a2ce5b5348..18b468839fb9 100755 --- a/interactive_engine/assembly/src/bin/graphscope/giectl +++ b/interactive_engine/assembly/src/bin/graphscope/giectl @@ -250,7 +250,8 @@ create_gremlin_instance_on_local() { # executor_rpc_port # frontend_port # coordinator_name: name of coordinator deployment object in k8s -# filter_name: the filter name of engine pod +# engine_type: the label name of engine selector, if set to "engine", the engine is a bundled one +# if set to "interactive", the engine is a standalone interactive engine ########################## create_gremlin_instance_on_k8s() { declare -r GRAPHSCOPE_RUNTIME=$1 @@ -265,17 +266,11 @@ create_gremlin_instance_on_k8s() { declare -r executor_rpc_port=$7 declare -r frontend_port=$8 declare -r coordinator_name=$9 # deployment name of coordinator - declare -r filter_name=${10} # filter name of engine pod + declare -r engine_type=${10} # engine type, "engine" or "interactive" instance_id=${coordinator_name#*-} - pod_names=$(kubectl get pod -lapp.kubernetes.io/component=engine,app.kubernetes.io/instance=${instance_id} -ojsonpath='{.items[*].metadata.name}' | grep ${filter_name} ) - - pod_ips="" - for pod_name in ${pod_names}; do - pod_ip=$(kubectl get pod ${pod_name} -o jsonpath='{.status.podIP}') - pod_ips="${pod_ips}${pod_ip} " - done + pod_ips=$(kubectl get pod -lapp.kubernetes.io/component=engine,app.kubernetes.io/instance=${instance_id},app.kubernetes.io/engine=${engine_type} -o jsonpath='{.items[*].status.podIP}') pegasus_hosts="" for pod in ${pod_ips}; do diff --git a/python/graphscope/tests/kubernetes/test_demo_script.py b/python/graphscope/tests/kubernetes/test_demo_script.py index 55d1f413a333..13c346f6a13b 100644 --- a/python/graphscope/tests/kubernetes/test_demo_script.py +++ b/python/graphscope/tests/kubernetes/test_demo_script.py @@ -98,6 +98,26 @@ def gs_session_distributed(): sess.close() +@pytest.fixture +def gs_session_with_lazy_mode(): + sess = graphscope.session( + num_workers=1, + k8s_image_registry=get_gs_registry_on_ci_env(), + k8s_image_tag=get_gs_tag_on_ci_env(), + k8s_coordinator_cpu=2, + k8s_coordinator_mem="4Gi", + k8s_vineyard_cpu=2, + k8s_vineyard_mem="512Mi", + k8s_engine_cpu=2, + k8s_engine_mem="4Gi", + vineyard_shared_mem="4Gi", + k8s_volumes=get_k8s_volumes(), + mode="lazy", + ) + yield sess + sess.close() + + @pytest.fixture def create_vineyard_deployment_on_single_node(): import vineyard @@ -308,6 +328,55 @@ def test_demo_distribute(gs_session_distributed, data_dir, modern_graph_data_dir # GNN engine +def test_demo_with_lazy_mode(): + graph = load_ldbc(gs_session_with_lazy_mode, data_dir) + + # Interactive engine + interactive = gs_session_with_lazy_mode.gremlin(graph) + sub_graph = interactive.subgraph( # noqa: F841 + 'g.V().hasLabel("person").outE("knows")' + ) + person_count = interactive.execute( + 'g.V().hasLabel("person").outE("knows").bothV().dedup().count()' + ).all()[0] + knows_count = interactive.execute( + 'g.V().hasLabel("person").outE("knows").count()' + ).all()[0] + interactive2 = gs_session_with_lazy_mode.gremlin(sub_graph) + sub_person_count = interactive2.execute("g.V().count()").all()[0] + sub_knows_count = interactive2.execute("g.E().count()").all()[0] + assert person_count == sub_person_count + assert knows_count == sub_knows_count + + # Analytical engine + # project the projected graph to simple graph. + simple_g = sub_graph.project(vertices={"person": []}, edges={"knows": []}) + + pr_result = graphscope.pagerank(simple_g, delta=0.8) + tc_result = graphscope.triangles(simple_g) + + # add the PageRank and triangle-counting results as new columns to the property graph + sub_graph.add_column(pr_result, {"Ranking": "r"}) + sub_graph.add_column(tc_result, {"TC": "r"}) + + # test subgraph on modern graph + mgraph = load_modern_graph(gs_session_with_lazy_mode, modern_graph_data_dir) + + # Interactive engine + minteractive = gs_session_with_lazy_mode.gremlin(mgraph) + msub_graph = minteractive.subgraph( # noqa: F841 + 'g.V().hasLabel("person").outE("knows")' + ) + person_count = minteractive.execute( + 'g.V().hasLabel("person").outE("knows").bothV().dedup().count()' + ).all()[0] + msub_interactive = gs_session_with_lazy_mode.gremlin(msub_graph) + sub_person_count = msub_interactive.execute("g.V().count()").all()[0] + assert person_count == sub_person_count + + # GNN engine + + def test_multiple_session(): namespace = "gs-multi-" + "".join( [random.choice(string.ascii_lowercase) for _ in range(6)]