Skip to content

Commit

Permalink
fix(python): prevent the GAE been blocked if GIE failed to generate s…
Browse files Browse the repository at this point in the history
…ubgraphs during subgraph queries (#3499)

Signed-off-by: Tao He <linzhu.ht@alibaba-inc.com>
  • Loading branch information
sighingnow authored Jan 21, 2024
1 parent a383a34 commit 6346651
Showing 1 changed file with 100 additions and 8 deletions.
108 changes: 100 additions & 8 deletions coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import itertools
import json
import logging
import os
import pickle
import random
import sys
import time
import traceback
import zipfile
from concurrent import futures
from io import BytesIO
Expand Down Expand Up @@ -458,7 +461,7 @@ def run_on_interactive_engine(self, dag_def: op_def_pb2.DagDef):
self._op_result_pool[op.key] = op_result
return message_pb2.RunStepResponse(head=response_head), []

def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef):
def _gremlin_to_subgraph(self, op: op_def_pb2.OpDef): # noqa: C901
gremlin_script = op.attr[types_pb2.GIE_GREMLIN_QUERY_MESSAGE].s.decode()
oid_type = op.attr[types_pb2.OID_TYPE].s.decode()
request_options = None
Expand Down Expand Up @@ -511,6 +514,8 @@ def create_global_graph_builder(
vertex_metadata["typename"] = "vineyard::ParallelStream"
vertex_metadata["__streams_-size"] = len(chunk_instances)

vertex_streams, edge_streams = [], []

# NB: we don't respect `num_workers`, instead, we create a substream
# on each vineyard instance.
#
Expand All @@ -532,6 +537,7 @@ def create_global_graph_builder(
edge = vineyard_client.create_metadata(edge_stream, instance_id)
vineyard_client.persist(edge.id)
edge_metadata.add_member("__streams_-%d" % worker, edge)
edge_streams.append(edge.id)

vertex_stream = vineyard.ObjectMeta()
vertex_stream["typename"] = "vineyard::RecordBatchStream"
Expand All @@ -545,6 +551,7 @@ def create_global_graph_builder(
vertex = vineyard_client.create_metadata(vertex_stream, instance_id)
vineyard_client.persist(vertex.id)
vertex_metadata.add_member("__streams_-%d" % worker, vertex)
vertex_streams.append(vertex.id)

chunk_stream = vineyard.ObjectMeta()
chunk_stream["typename"] = "vineyard::htap::PropertyGraphOutStream"
Expand All @@ -566,14 +573,56 @@ def create_global_graph_builder(
# build the parallel stream for edge
edge = vineyard_client.create_metadata(edge_metadata)
vineyard_client.persist(edge.id)
vineyard_client.put_name(edge.id, "__%s_edge_stream" % graph_name)
vineyard_client.put_name(edge.id, f"__{graph_name}_edge_stream")

# build the parallel stream for vertex
vertex = vineyard_client.create_metadata(vertex_metadata)
vineyard_client.persist(vertex.id)
vineyard_client.put_name(vertex.id, "__%s_vertex_stream" % graph_name)
vineyard_client.put_name(vertex.id, f"__{graph_name}_vertex_stream")

return (
repr(graph.id),
repr(edge.id),
repr(vertex.id),
vertex_streams,
edge_streams,
)

return repr(graph.id), repr(edge.id), repr(vertex.id)
def cleanup_stream(
graph_name,
vineyard_rpc_endpoint,
vertex_stream_id,
edge_stream_id,
vertex_streams,
edge_streams,
):
import vineyard

vineyard_client = vineyard.connect(*vineyard_rpc_endpoint.split(":"))

vertex_stream_id = vineyard.ObjectID(vertex_stream_id)
edge_stream_id = vineyard.ObjectID(edge_stream_id)
for s in itertools.chain(vertex_streams, edge_streams):
try:
vineyard_client.stop_stream(vineyard.ObjectID(s), failed=True)
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_stream(vineyard.ObjectID(s))
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_name(f"__{graph_name}_vertex_stream")
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_name(f"__{graph_name}_edge_stream")
except Exception: # noqa: E722, pylint: disable=broad-except
pass
try:
vineyard_client.drop_name(graph_name)
except Exception: # noqa: E722, pylint: disable=broad-except
pass

def load_subgraph(
graph_name,
Expand Down Expand Up @@ -658,6 +707,8 @@ def load_subgraph(
_graph_builder_id,
edge_stream_id,
vertex_stream_id,
vertex_streams,
edge_streams,
) = create_global_graph_builder(
graph_name,
executor_workers_num,
Expand All @@ -681,11 +732,52 @@ def load_subgraph(
gremlin_script,
graph_name,
)
gremlin_client.submit(
subgraph_script, request_options=request_options
).all().result()

return subgraph_task.result()
gremlin_error_message, graph_loading_error_message = None, None

try:
gremlin_client.submit(
subgraph_script, request_options=request_options
).all().result()
except Exception: # noqa: E722, pylint: disable=broad-except
# # abort the streams
e, err, _ = sys.exc_info()
gremlin_error_message = (
f"Exception during subgraph's gremlin query execution: "
f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
)
logger.error(gremlin_error_message)
# cancel the stream to let the analytical engine exit the current loop
logger.info("clean up stream ...")
cleanup_stream(
graph_name,
vineyard_rpc_endpoint,
vertex_stream_id,
edge_stream_id,
vertex_streams,
edge_streams,
)
logger.info("clean up stream finished ...")

subgraph_object = None
try:
subgraph_object = subgraph_task.result()
except Exception: # noqa: E722, pylint: disable=broad-except
e, err, _ = sys.exc_info()
graph_loading_error_message = (
f"Exception during subgraph's graph loading execution: "
f"'{e}', '{err}', with traceback: {traceback.format_exc()}"
)
logger.error(graph_loading_error_message)

if gremlin_error_message is not None or graph_loading_error_message is not None:
error_message = (
f"Error during subgraph execution, "
f'gremlin error: "{gremlin_error_message}", '
f'graph loading error: "{graph_loading_error_message}"'
)
raise RuntimeError(error_message)
return subgraph_object

# Learning engine related operations
# ==================================
Expand Down

0 comments on commit 6346651

Please sign in to comment.