Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python): prevent the GAE been blocked if GIE failed to generate subgraphs during subgraph queries #3499

Merged
merged 1 commit into from
Jan 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 100 additions & 8 deletions coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import itertools
import json
import logging
import os
import pickle
import random
import sys
import time
import traceback
import zipfile
from concurrent import futures
from io import BytesIO
Expand Down Expand Up @@ -458,7 +461,7 @@
self._op_result_pool[op.key] = op_result
return message_pb2.RunStepResponse(head=response_head), []

def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef):
def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef): # noqa: C901
gremlin_script = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
oid_type = op.attr[types_pb2.OID_TYPE].s.decode()
request_options = None
Expand Down Expand Up @@ -511,6 +514,8 @@
vertex_metadata["typename"] = "vineyard::ParallelStream"
vertex_metadata["__streams_-size"] = len(chunk_instances)

vertex_streams, edge_streams = [], []

# NB: we don't respect `num_workers`, instead, we create a substream
# on each vineyard instance.
#
Expand All @@ -532,6 +537,7 @@
edge = vineyard_client.create_metadata(edge_stream, instance_id)
vineyard_client.persist(edge.id)
edge_metadata.add_member("__streams_-%d" % worker, edge)
edge_streams.append(edge.id)

vertex_stream = vineyard.ObjectMeta()
vertex_stream["typename"] = "vineyard::RecordBatchStream"
Expand All @@ -545,6 +551,7 @@
vertex = vineyard_client.create_metadata(vertex_stream, instance_id)
vineyard_client.persist(vertex.id)
vertex_metadata.add_member("__streams_-%d" % worker, vertex)
vertex_streams.append(vertex.id)

chunk_stream = vineyard.ObjectMeta()
chunk_stream["typename"] = "vineyard::htap::PropertyGraphOutStream"
Expand All @@ -566,14 +573,56 @@
# build the parallel stream for edge
edge = vineyard_client.create_metadata(edge_metadata)
vineyard_client.persist(edge.id)
vineyard_client.put_name(edge.id, "__%s_edge_stream" % graph_name)
vineyard_client.put_name(edge.id, f"__{graph_name}_edge_stream")

# build the parallel stream for vertex
vertex = vineyard_client.create_metadata(vertex_metadata)
vineyard_client.persist(vertex.id)
vineyard_client.put_name(vertex.id, "__%s_vertex_stream" % graph_name)
vineyard_client.put_name(vertex.id, f"__{graph_name}_vertex_stream")

return (
repr(graph.id),
repr(edge.id),
repr(vertex.id),
vertex_streams,
edge_streams,
)

return repr(graph.id), repr(edge.id), repr(vertex.id)
def cleanup_stream(
graph_name,
vineyard_rpc_endpoint,
vertex_stream_id,
edge_stream_id,
vertex_streams,
edge_streams,
):
import vineyard

vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))

vertex_stream_id = vineyard.ObjectID(vertex_stream_id)
edge_stream_id = vineyard.ObjectID(edge_stream_id)
for s in itertools.chain(vertex_streams, edge_streams):
try:
vineyard_client.stop_stream(vineyard.ObjectID(s), failed=True)
except Exception: # noqa: E722, pylint: disable=broad-except
pass

Check notice on line 609 in coordinator/gscoordinator/op_executor.py

View check run for this annotation

codefactor.io / CodeFactor

coordinator/gscoordinator/op_executor.py#L608-L609

Try, Except, Pass detected. (B110)
try:
vineyard_client.drop_stream(vineyard.ObjectID(s))
except Exception: # noqa: E722, pylint: disable=broad-except
pass

Check notice on line 613 in coordinator/gscoordinator/op_executor.py

View check run for this annotation

codefactor.io / CodeFactor

coordinator/gscoordinator/op_executor.py#L612-L613

Try, Except, Pass detected. (B110)
try:
vineyard_client.drop_name(f"__{graph_name}_vertex_stream")
except Exception: # noqa: E722, pylint: disable=broad-except
pass

Check notice on line 617 in coordinator/gscoordinator/op_executor.py

View check run for this annotation

codefactor.io / CodeFactor

coordinator/gscoordinator/op_executor.py#L616-L617

Try, Except, Pass detected. (B110)
try:
vineyard_client.drop_name(f"__{graph_name}_edge_stream")
except Exception: # noqa: E722, pylint: disable=broad-except
pass

Check notice on line 621 in coordinator/gscoordinator/op_executor.py

View check run for this annotation

codefactor.io / CodeFactor

coordinator/gscoordinator/op_executor.py#L620-L621

Try, Except, Pass detected. (B110)
try:
vineyard_client.drop_name(graph_name)
except Exception: # noqa: E722, pylint: disable=broad-except
pass

Check notice on line 625 in coordinator/gscoordinator/op_executor.py

View check run for this annotation

codefactor.io / CodeFactor

coordinator/gscoordinator/op_executor.py#L624-L625

Try, Except, Pass detected. (B110)

def load_subgraph(
graph_name,
Expand Down Expand Up @@ -658,6 +707,8 @@
_graph_builder_id,
edge_stream_id,
vertex_stream_id,
vertex_streams,
edge_streams,
) = create_global_graph_builder(
graph_name,
executor_workers_num,
Expand All @@ -681,11 +732,52 @@
gremlin_script,
graph_name,
)
gremlin_client.submit(
subgraph_script, request_options=request_options
).all().result()

return subgraph_task.result()
gremlin_error_message, graph_loading_error_message = None, None

try:
gremlin_client.submit(
subgraph_script, request_options=request_options
).all().result()
except Exception: # noqa: E722, pylint: disable=broad-except
# # abort the streams
e, err, _ = sys.exc_info()
gremlin_error_message = (
f"Exception during subgraph's gremlin query execution: "
f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
)
logger.error(gremlin_error_message)
# cancel the stream to let the analytical engine exit the current loop
logger.info("clean up stream ...")
cleanup_stream(
graph_name,
vineyard_rpc_endpoint,
vertex_stream_id,
edge_stream_id,
vertex_streams,
edge_streams,
)
logger.info("clean up stream finished ...")

subgraph_object = None
try:
subgraph_object = subgraph_task.result()
except Exception: # noqa: E722, pylint: disable=broad-except
e, err, _ = sys.exc_info()
graph_loading_error_message = (
f"Exception during subgraph's graph loading execution: "
f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
)
logger.error(graph_loading_error_message)

if gremlin_error_message is not None or graph_loading_error_message is not None:
error_message = (
f"Error during subgraph execution, "
f'gremlin error: "{gremlin_error_message}", '
f'graph loading error: "{graph_loading_error_message}"'
)
raise RuntimeError(error_message)
return subgraph_object

# Learning engine related operations
# ==================================
Expand Down
Loading