Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intercept grpc.StatusCode.UNAVAILABLE and retry #1015

Merged
merged 1 commit into from
Sep 12, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 107 additions & 3 deletions rqd/rqd/rqnetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from builtins import object
from concurrent import futures
from random import shuffle
import abc
import atexit
import logging as log
import os
Expand Down Expand Up @@ -223,11 +224,23 @@ def closeChannel(self):
def __getChannel(self):
# TODO(bcipriano) Add support for the facility nameserver or drop this concept? (Issue #152)
if self.channel is None:
# create interceptors
interceptors = (
RetryOnRpcErrorClientInterceptor(
max_attempts=4,
sleeping_policy=ExponentialBackoff(init_backoff_ms=100,
max_backoff_ms=1600,
multiplier=2),
status_for_retry=(grpc.StatusCode.UNAVAILABLE,),
),
)

cuebots = rqd.rqconstants.CUEBOT_HOSTNAME.split()
shuffle(cuebots)
for cuebotHostname in cuebots:
self.channel = grpc.insecure_channel('%s:%s' % (cuebotHostname,
rqd.rqconstants.CUEBOT_GRPC_PORT))
if len(cuebots) > 0:
self.channel = grpc.insecure_channel('%s:%s' % (cuebots[0],
rqd.rqconstants.CUEBOT_GRPC_PORT),
*interceptors)
atexit.register(self.closeChannel)

def __getReportStub(self):
Expand All @@ -253,3 +266,94 @@ def reportRunningFrameCompletion(self, report):
request = rqd.compiled_proto.report_pb2.RqdReportRunningFrameCompletionRequest(
frame_complete_report=report)
stub.ReportRunningFrameCompletion(request, timeout=rqd.rqconstants.RQD_TIMEOUT)


# Python 2/3 compatible implementation of ABC
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})


class SleepingPolicy(ABC):
"""
Implement policy for sleeping between API retries
"""
@abc.abstractmethod
def sleep(self, attempt):
"""
How long to sleep in milliseconds.
:param attempt: the number of attempt (starting from zero)
"""
assert attempt >= 0


class ExponentialBackoff(SleepingPolicy):
"""
Implement policy that will increase retry period by exponentially in every try
"""
def __init__(self,
init_backoff_ms,
max_backoff_ms,
multiplier=2):
"""
inputs in ms
"""
self._init_backoff = init_backoff_ms
self._max_backoff = max_backoff_ms
self._multiplier = multiplier

def sleep(self, attempt):
sleep_time_ms = min(
self._init_backoff * self._multiplier ** attempt,
self._max_backoff
)
time.sleep(sleep_time_ms / 1000.0)


class RetryOnRpcErrorClientInterceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.StreamUnaryClientInterceptor
):
"""
Implement Client/Stream interceptors for GRPC channels to retry
calls that failed with retry-able states. This is required for
handling server interruptions that are not automatically handled
by grpc.insecure_channel
"""
def __init__(self,
max_attempts,
sleeping_policy,
status_for_retry=None):
self._max_attempts = max_attempts
self._sleeping_policy = sleeping_policy
self._retry_statuses = status_for_retry

def _intercept_call(self, continuation, client_call_details,
request_or_iterator):
for attempt in range(self._max_attempts):
try:
return continuation(client_call_details,
request_or_iterator)
except grpc.RpcError as response:
# Return if it was last attempt
if attempt == (self._max_attempts - 1):
return response

# If status code is not in retryable status codes
# pylint: disable=no-member
if self._retry_statuses \
and hasattr(response, 'code') \
and response.code() \
not in self._retry_statuses:
return response

self._sleeping_policy.sleep(attempt)

def intercept_unary_unary(self, continuation, client_call_details,
request):
return self._intercept_call(continuation, client_call_details,
request)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(continuation, client_call_details,
request_iterator)