Skip to content

Commit

Permalink
* Add a new pytest on kubernetes to test the lazy mode.
Browse files Browse the repository at this point in the history
* Add a new label for engine pods to indicate the specific engine.

Signed-off-by: Ye Cao <caoye.cao@alibaba-inc.com>
  • Loading branch information
dashanji committed May 24, 2023
1 parent 5c86233 commit 84e0afe
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 16 deletions.
16 changes: 16 additions & 0 deletions coordinator/gscoordinator/cluster_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ def __init__(
"app.kubernetes.io/instance": self._instance_id,
"app.kubernetes.io/version": __version__,
"app.kubernetes.io/component": "engine",
"app.kubernetes.io/engine": self.get_engine_type_from_prefix_name(
engine_pod_prefix
),
}

self._frontend_labels = self._engine_labels.copy()
self._frontend_labels["app.kubernetes.io/component"] = "frontend"

Expand Down Expand Up @@ -186,6 +190,18 @@ def vineyard_ipc_socket(self):
def vineyard_deployment_exists(self):
return self._vineyard_deployment is not None

def get_engine_type_from_prefix_name(self, prefix_name):
if "analytical" in prefix_name:
return "analytical"
elif "analytical-java" in prefix_name:
return "analytical-java"
elif "interactive" in prefix_name:
return "interactive"
elif "learning" in prefix_name:
return "learning"
else:
return "engine"

def base64_decode(self, string):
return base64.b64decode(string).decode("utf-8", errors="ignore")

Expand Down
21 changes: 14 additions & 7 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ def __init__(
# if not ready, then set the vineyard deployment to None
self._vineyard_deployment = None

# if the vineyard deployment is not set and use the eager mode,
# which means deploy the engine as a single pod and there is no
# external vineyard deployment. The vineyard objects are not
# shared between the engine pods, so raise an error here.
if self._mode == "lazy" and self._vineyard_deployment is None:
raise ValueError("If the mode is lazy, the vineyard deployment must be set")

self._pod_name_list = []
self._pod_ip_list = None
self._pod_host_ip_list = None
Expand Down Expand Up @@ -494,14 +501,14 @@ def _allocate_interactive_engine(self):
return self.deploy_interactive_engine()

def _distribute_interactive_process(
self, hosts, filter_name, object_id: int, schema_path: str
self, hosts, object_id: int, schema_path: str, engine_type: str
):
"""
Args:
hosts (str): hosts of the graph.
filter_name (str): filter name of the pod name.
object_id (int): object id of the graph.
schema_path (str): path of the schema file.
engine_type (str): the engine_type of the engine pods' label.
"""
env = os.environ.copy()
env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
Expand All @@ -518,7 +525,7 @@ def _distribute_interactive_process(
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2), # frontend port
self._coordinator_name,
filter_name,
engine_type,
]
self._interactive_port += 3
logger.info("Create GIE instance with command: %s", " ".join(cmd))
Expand All @@ -543,12 +550,12 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
raise RuntimeError("Failed to allocate interactive engine")
hosts = ",".join(pod_name_list)

filter_name = "gs-engine"
if self._mode == "laze":
filter_name = "gs-interactive"
engine_type = "engine"
if self._mode == "lazy":
engine_type = "interactive"

return self._distribute_interactive_process(
hosts, filter_name, object_id, schema_path
hosts, object_id, schema_path, engine_type
)

def close_interactive_instance(self, object_id):
Expand Down
13 changes: 4 additions & 9 deletions interactive_engine/assembly/src/bin/graphscope/giectl
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ create_gremlin_instance_on_local() {
# executor_rpc_port
# frontend_port
# coordinator_name: name of coordinator deployment object in k8s
# filter_name: the filter name of engine pod
# engine_type: the label name of engine selector, if set to "engine", the engine is a bundled one
# if set to "interactive", the engine is a standalone interactive engine
##########################
create_gremlin_instance_on_k8s() {
declare -r GRAPHSCOPE_RUNTIME=$1
Expand All @@ -265,17 +266,11 @@ create_gremlin_instance_on_k8s() {
declare -r executor_rpc_port=$7
declare -r frontend_port=$8
declare -r coordinator_name=$9 # deployment name of coordinator
declare -r filter_name=${10} # filter name of engine pod
declare -r engine_type=${10} # engine type, "engine" or "interactive"

instance_id=${coordinator_name#*-}

pod_names=$(kubectl get pod -lapp.kubernetes.io/component=engine,app.kubernetes.io/instance=${instance_id} -ojsonpath='{.items[*].metadata.name}' | grep ${filter_name} )

pod_ips=""
for pod_name in ${pod_names}; do
pod_ip=$(kubectl get pod ${pod_name} -o jsonpath='{.status.podIP}')
pod_ips="${pod_ips}${pod_ip} "
done
pod_ips=$(kubectl get pod -lapp.kubernetes.io/component=engine,app.kubernetes.io/instance=${instance_id},app.kubernetes.io/engine=${engine_type} -o jsonpath='{.items[*].status.podIP}')

pegasus_hosts=""
for pod in ${pod_ips}; do
Expand Down
69 changes: 69 additions & 0 deletions python/graphscope/tests/kubernetes/test_demo_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ def gs_session_distributed():
sess.close()


@pytest.fixture
def gs_session_with_lazy_mode():
sess = graphscope.session(
num_workers=1,
k8s_image_registry=get_gs_registry_on_ci_env(),
k8s_image_tag=get_gs_tag_on_ci_env(),
k8s_coordinator_cpu=2,
k8s_coordinator_mem="4Gi",
k8s_vineyard_cpu=2,
k8s_vineyard_mem="512Mi",
k8s_engine_cpu=2,
k8s_engine_mem="4Gi",
vineyard_shared_mem="4Gi",
k8s_volumes=get_k8s_volumes(),
mode="lazy",
)
yield sess
sess.close()


@pytest.fixture
def create_vineyard_deployment_on_single_node():
import vineyard
Expand Down Expand Up @@ -308,6 +328,55 @@ def test_demo_distribute(gs_session_distributed, data_dir, modern_graph_data_dir
# GNN engine


def test_demo_with_lazy_mode():
graph = load_ldbc(gs_session_with_lazy_mode, data_dir)

# Interactive engine
interactive = gs_session_with_lazy_mode.gremlin(graph)
sub_graph = interactive.subgraph( # noqa: F841
'g.V().hasLabel("person").outE("knows")'
)
person_count = interactive.execute(
'g.V().hasLabel("person").outE("knows").bothV().dedup().count()'
).all()[0]
knows_count = interactive.execute(
'g.V().hasLabel("person").outE("knows").count()'
).all()[0]
interactive2 = gs_session_with_lazy_mode.gremlin(sub_graph)
sub_person_count = interactive2.execute("g.V().count()").all()[0]
sub_knows_count = interactive2.execute("g.E().count()").all()[0]
assert person_count == sub_person_count
assert knows_count == sub_knows_count

# Analytical engine
# project the projected graph to simple graph.
simple_g = sub_graph.project(vertices={"person": []}, edges={"knows": []})

pr_result = graphscope.pagerank(simple_g, delta=0.8)
tc_result = graphscope.triangles(simple_g)

# add the PageRank and triangle-counting results as new columns to the property graph
sub_graph.add_column(pr_result, {"Ranking": "r"})
sub_graph.add_column(tc_result, {"TC": "r"})

# test subgraph on modern graph
mgraph = load_modern_graph(gs_session_with_lazy_mode, modern_graph_data_dir)

# Interactive engine
minteractive = gs_session_with_lazy_mode.gremlin(mgraph)
msub_graph = minteractive.subgraph( # noqa: F841
'g.V().hasLabel("person").outE("knows")'
)
person_count = minteractive.execute(
'g.V().hasLabel("person").outE("knows").bothV().dedup().count()'
).all()[0]
msub_interactive = gs_session_with_lazy_mode.gremlin(msub_graph)
sub_person_count = msub_interactive.execute("g.V().count()").all()[0]
assert person_count == sub_person_count

# GNN engine


def test_multiple_session():
namespace = "gs-multi-" + "".join(
[random.choice(string.ascii_lowercase) for _ in range(6)]
Expand Down

0 comments on commit 84e0afe

Please sign in to comment.