Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure the correctness of retry (in the RunStep request) to avoid the strange "has no attribute" error #2094

Merged
merged 1 commit into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def _RunStep(self, request_iterator, context):
responses[0].head.results.extend(head.head.results)
responses.extend(bodies)
except grpc.RpcError as exc:
# Not raised by graphscope, maybe socket closed, etc
# Not raised by graphscope, maybe socket closed, etc.
context.set_code(exc.code())
context.set_details(exc.details())
for response in responses:
Expand Down
14 changes: 8 additions & 6 deletions python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ def __repr__(self):
return str(self)

def run(self, dag_def):
runstep_requests = self._grpc_utils.generate_runstep_requests(
self._session_id, dag_def
)
return self._run_step_impl(runstep_requests)
return self._run_step_impl(dag_def)

def fetch_logs(self):
if self._logs_fetching_thread is None:
Expand Down Expand Up @@ -178,8 +175,13 @@ def _close_session_impl(self):
response = self._stub.CloseSession(request)
return response

@handle_grpc_error
def _run_step_impl(self, runstep_requests):
@handle_grpc_error(False) # don't retry the "RunStep" request.
def _run_step_impl(self, dag_def):
# note that the "_impl" may be retried, thus the argument cannot be a
# generator or an iterator.
runstep_requests = self._grpc_utils.generate_runstep_requests(
self._session_id, dag_def
)
response = self._grpc_utils.parse_runstep_responses(
self._stub.RunStep(runstep_requests)
)
Expand Down
36 changes: 33 additions & 3 deletions python/graphscope/client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ def parse_runstep_responses(self, responses):
return response_head.head


def handle_grpc_error(fn):
"""Decorator to handle grpc error.
def handle_grpc_error_with_retry(fn, retry=True):
"""Decorator to handle grpc error. If retry is True, the function will
be retried for certain errors, e.g., network unavailable.

This function will retry max times with specific GRPC status.
See detail in `GRPC_MAX_RETRIES_BY_CODE`.
Expand All @@ -156,7 +157,7 @@ def with_grpc_catch(*args, **kwargs):
except grpc.RpcError as exc:
code = exc.code()
max_retries = GRPC_MAX_RETRIES_BY_CODE.get(code)
if max_retries is None:
if not retry or max_retries is None:
raise GRPCError(
"rpc %s failed: status %s" % (str(fn.__name__), exc)
)
Expand All @@ -177,6 +178,35 @@ def with_grpc_catch(*args, **kwargs):
return with_grpc_catch


def handle_grpc_error(fn_or_retry):
"""Decorator to handle grpc error, and accepts an optional arugment to control
whether the function should be retried for certain errors.

This decorator can be used as

.. code-block:: python

@handle_grpc_error
def fn(..)
...

or

.. code-block:: python

@handle_grpc_error(retry=True)
def fn(..)
...

The argument 'retry' by default is True, to keep consistent with the previous
behavior.
"""
if isinstance(fn_or_retry, bool):
return functools.partial(handle_grpc_error_with_retry, retry=fn_or_retry)
else:
return handle_grpc_error_with_retry(fn_or_retry)


def suppress_grpc_error(fn):
"""Decorator to suppress any grpc error."""

Expand Down