-
Notifications
You must be signed in to change notification settings - Fork 10.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unexpected StatusCode.UNAVAILABLE: Connection reset by peer #19514
Comments
|
Self-made retry mechanism with exp. backoff helped mitigate this issue. |
|
How can we reproduce this problem? Do you have a minimal reproduction case? Does this problem occur in earlier versions of gRPC? |
|
@yutkin Would you be willing to share your solution? |
|
@geoffjukes You have to define an interceptor for RPC Errors. It will doing retries on failures with exponential backoff sleeping policy: class SleepingPolicy(abc.ABC):
@abc.abstractmethod
def sleep(self, try_i: int):
"""
How long to sleep in milliseconds.
:param try_i: the number of retry (starting from zero)
"""
assert try_i >= 0
class ExponentialBackoff(SleepingPolicy):
def __init__(self, *, init_backoff_ms: int, max_backoff_ms: int, multiplier: int):
self.init_backoff = randint(0, init_backoff_ms)
self.max_backoff = max_backoff_ms
self.multiplier = multiplier
def sleep(self, try_i: int):
sleep_range = min(
self.init_backoff * self.multiplier ** try_i, self.max_backoff
)
sleep_ms = randint(0, sleep_range)
logger.debug(f"Sleeping for {sleep_ms}")
time.sleep(sleep_ms / 1000)
class RetryOnRpcErrorClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.StreamUnaryClientInterceptor
):
def __init__(
self,
*,
max_attempts: int,
sleeping_policy: SleepingPolicy,
status_for_retry: Optional[Tuple[grpc.StatusCode]] = None,
):
self.max_attempts = max_attempts
self.sleeping_policy = sleeping_policy
self.status_for_retry = status_for_retry
def _intercept_call(self, continuation, client_call_details, request_or_iterator):
for try_i in range(self.max_attempts):
response = continuation(client_call_details, request_or_iterator)
if isinstance(response, grpc.RpcError):
# Return if it was last attempt
if try_i == (self.max_attempts - 1):
return response
# If status code is not in retryable status codes
if (
self.status_for_retry
and response.code() not in self.status_for_retry
):
return response
self.sleeping_policy.sleep(try_i)
else:
return response
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(continuation, client_call_details, request_iterator)Usage example: interceptors = (
RetryOnRpcErrorClientInterceptor(
max_attempts=4,
sleeping_policy=ExponentialBackoff(init_backoff_ms=100, max_backoff_ms=1600, multiplier=2),
status_for_retry=(grpc.StatusCode.UNAVAILABLE,),
),
)
stub = YourStub(
grpc.intercept_channel(grpc.insecure_channel("service-hostname:8100"), *interceptors)
) |
|
Thank you so much for sharing @yutkin, this is incredibly helpful. |
|
@yutkin Thanks for the detailed interceptor! Retries are not currently handled automatically within the gRPC library, either in Python or in C++, so doing retries like this within your application is definitely the way to go. If you'd like to contribute this interceptor, we have a Github project called |
|
What I don't understand is why would this happen in the first place? Is this a normal potentiality in the python GRPC library or does it say something about the service that is responding: It's also not clear here https://grpc.github.io/grpc/python/_modules/grpc.html#intercept_channel If there is support for Finally does anyone know what |
|
@iamliamc It says something about the connection between the client and the server. Yes, interceptors do indeed work with secure_channels. Interceptors are currently considered an experimental API. Which means that we technically reserve the right to change the API without making it a major version bump. In practice however, so many things have come to depend upon the current form of the interceptor API, that it's highly unlikely we'll do this. In fact, we probably need to to make it a priority to graduate the API from experimental in the not-too-distant future. |
HTTP: - Use persistent http session which is opened upon object initialization and not on each request - The default `persist_connection=False` will add a header making sure the connection is closed upon after each request. `persist_connection=True` will leave the connection open, which is more performant for frequent requests GRPC: - Using persistent channel opened on object initialization, similar to http, but, here there is no `persist_connection=False` - in grpc the channel is always persistent (no way to force close it or disable keepalive). - upgraded grpc to 1.26.0 which supports interceptors and added interceptors with exponential retries upon UNAVAILABLE response errors on the channel. See: grpc/grpc#19514
|
This issue/PR has been automatically marked as stale because it has not had any update (including commits, comments, labels, milestones, etc) for 30 days. It will be closed automatically if no further update occurs in 7 day. Thank you for your contributions! |
Grpc connection is sometimes reset on flaky networks, and this is not handled by the python grpc library. Solved by intercepting UNAVAILABLE responses and retrying the command. Adapted from this issue in python grpc repo: grpc/grpc#19514
Grpc connection is sometimes reset on flaky networks, and this is not handled by the python grpc library. Solved by intercepting UNAVAILABLE responses and retrying the command. Adapted from this issue in python grpc repo: grpc/grpc#19514
|
For anyone trying this approach with the class RetryOnRpcErrorClientInterceptor:
def __init__(
self,
*,
max_attempts: int,
sleeping_policy: SleepingPolicy,
status_for_retry: Optional[Tuple[grpc.StatusCode]] = None,
):
self.max_attempts = max_attempts
self.sleeping_policy = sleeping_policy
self.status_for_retry = status_for_retry
def _intercept_call(self, continuation, client_call_details, request_or_iterator):
for try_i in range(self.max_attempts):
response = continuation(client_call_details, request_or_iterator)
if isinstance(response, grpc.RpcError):
# Return if it was last attempt
if try_i == (self.max_attempts - 1):
return response
# If status code is not in retryable status codes
if (
self.status_for_retry
and response.code() not in self.status_for_retry
):
return response
self.sleeping_policy.sleep(try_i)
else:
return response
class UnaryUnaryRetryOnRpcErrorClientInterceptor(
RetryOnRpcErrorClientInterceptor,
grpc.aio.UnaryUnaryClientInterceptor
):
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
class UnaryUnaryRetryOnRpcErrorClientInterceptor(
RetryOnRpcErrorClientInterceptor,
grpc.aio.StreamUnaryClientInterceptor
):
def intercept_unary_unary(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator) |
|
We weirdly only experience this on versions > 1.25. We are also building from source now since we need to use OpenSSL instead of BoringSSL (our certs are somehow incompatible with BoringSSL but work fine with OpenSSL. We're working on replacing them with BoringSSL compatible certs), but I can't see how this issue would be related to OpenSSL. Has anyone else experienced this in the latest versions of gRPC (1.46.3)? |
|
Is the randomness in the sleep needed in this grpc case? |
What version of gRPC and what language are you using?
Python 3.6
grpcio==1.21.1What operating system (Linux, Windows,...) and version?
Ubuntu 18 in Docker
What runtime / compiler are you using (e.g. python version or version of gcc)
Running in Kubernetes with Envoy as a proxy
What did you do?
Invoke a function via gRPC
What did you expect to see?
Normal invocation and computed result
What did you see instead?
Sometimes (~ in 1-5 %) I get:
The text was updated successfully, but these errors were encountered: