Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub Subscriber does not catch & retry UNAVAILABLE errors #4234

Closed
kir-titievsky opened this issue Oct 20, 2017 · 59 comments
Closed

Pub/Sub Subscriber does not catch & retry UNAVAILABLE errors #4234

kir-titievsky opened this issue Oct 20, 2017 · 59 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@kir-titievsky
Copy link

A basic Pub/Sub message consumer stops consuming messages after a retryable error (see stack trace below, but in short _Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>). The app does not crash but the stream never recovers and continue to receive messages. Interesting observations;

  • If I simply turn off WiFi on my laptop and run the same code, it keeps retrying until the machine is connected to the network and functions as expected. This tells me that this is a reaction to the specific StatusCode
  • The exception sometimes happens on startup sometimes mid-stream.

Expected behavior:

  • The application code would continue retrying to build the streamingPull connection and eventually recover and receive messages.
  • This would be handled and surfaced as a warning, rather than a thread-killing exception.

This might be the same issue as 2683. This comment, in particular, seems like the solution that I would expect the client library to implement.

Answers to standard questions:

  1. OS type and version
    MacOS Sierra 10.12.6
  2. Python version and virtual environment information python --version
    Python 2.7.10 (running in virtualenv)
  3. google-cloud-python version pip show google-cloud, pip show google-<service> or pip freeze
$ pip show google-cloud
Name: google-cloud
Version: 0.27.0
pip show google-cloud-pubsub
Name: google-cloud-pubsub
Version: 0.28.4
  1. Stacktrace if available
Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
    self._policy.on_exception(exc)
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 140, in on_exception
    raise exception
_Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
  1. Steps to reproduce
  • I was not able to reproduce this consistently. But it would happen ~1 in 10 times I ran the code.
  1. Code example
import time, datetime, sys
from google.cloud import pubsub_v1 as pubsub

subscription_name = "projects/%s/subscriptions/%s"%(sys.argv[1], sys.argv[2])
sleep_time_ms = 0
try:
    sleep_time_ms = int(sys.argv[3])
except Exception:
    print "Could not parse custom sleep time."
print "Using sleep time %g ms"%sleep_time_ms

def callback(message):
    t = time.time()
    time.sleep(float(sleep_time_ms)/1000)
    print "Message " + message.data + " acked in %g second"%(time.time() - t)
    message.ack()

subscription = pubsub.SubscriberClient().subscribe(subscription_name).open(callback=callback)
time.sleep(10000)
@theacodes
Copy link
Contributor

Rengerating pubsub to use api_core should solve this, I think.

@kir-titievsky
Copy link
Author

How would I do that?

@theacodes
Copy link
Contributor

We're working on it. :)

@gcbirzan
Copy link
Contributor

We're able to reproduce this consistently within 1-2 minutes of starting a snippet virtually identical to the one on https://cloud.google.com/pubsub/docs/pull#pubsub-pull-messages-async-python. There are no messages published on the topic. Our stacktrace is a bit different, though:

Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/gcbirzan/.virtualenvs/pubsub/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
    self._policy.on_exception(exc)
  File "/home/gcbirzan/.virtualenvs/pubsub/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 140, in on_exception
    raise exception
  File "/home/gcbirzan/.virtualenvs/pubsub/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
    for response in response_generator:
  File "/home/gcbirzan/.virtualenvs/pubsub/lib/python3.6/site-packages/grpc/_channel.py", line 363, in __next__
    return self._next()
  File "/home/gcbirzan/.virtualenvs/pubsub/lib/python3.6/site-packages/grpc/_channel.py", line 357, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

@thomas-vl
Copy link

Any update about this issue?

2 similar comments
@anroopak
Copy link

Any update about this issue?

@a13x6au3r
Copy link

Any update about this issue?

@murataksoy
Copy link

@kir-titievsky, the solution you mentioned seems to work at first, but the problem with this approach seems to be that the CPU usage goes up every time the UNAVAILABLE exception is handled this way. Thus, a fix seems to be necessary.

@lukesneeringer lukesneeringer added the api: pubsub Issues related to the Pub/Sub API. label Oct 26, 2017
@lukesneeringer
Copy link
Contributor

Hey folks,
I have an update on this issue. :-) I am working on it today.
As @murataksoy mentioned, it is not just as simple as adding an if line to return None from UNAVAILABLE, because threads are building up. (#3965)

Hoping to have a fix today or tomorrow.

@sss721
Copy link

sss721 commented Oct 27, 2017

Any update on this issue?

@ckmvishnu
Copy link

Commented out exception raising code. But events are not consistent. I currently have subscriptions for insert/delete/preempt VM operations, But I consistently miss out preempt events.

My edit was to ignore self._policy.on_exception(exc) in _blocking_consume method.

@lukesneeringer
Copy link
Contributor

Any update on this issue?

Sorry, I forgot to tag this issue. #4265 fixed this, and has been merged. A new version will be cut soon (within a day or two).

@lukesneeringer
Copy link
Contributor

Wait, sorry, I got confused. I still need to do the api_core integration to properly fix this. Right now the exception is bubbling properly (that is what #4265 did) but not retrying.

Re-opening, and a fix will be in soon.

@VuongNM
Copy link

VuongNM commented Nov 3, 2017

How is this going now Luke @lukesneeringer ?

@tseaver tseaver added the priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. label Nov 6, 2017
@shangyilim
Copy link

I am also hitting this issue using the code samples.

@sachin-shetty
Copy link

Do we have any workaround? my consumer gets stuck completely and does not even recover after calling open again.

@murataksoy
Copy link

One workaround that seemed to work for me was to generate a publisher client that sends a dummy pubsub message once every, e.g., 10 seconds to the subscriber client just to prevent it from throwing the exception. Not sure if that would work for everyone though...

@makrusak
Copy link

@sachin-shetty @murataksoy I tried solution with dummy messages repeating in 30 seconds, but got crash in 2 days.
This solution actually works but after some period of time it consumes too much CPU.
So I'm searching for workaround too.

@murataksoy
Copy link

@makrusak thanks for the feedback, I should also watch out for this I guess. One workaround I considered (but never implemented) was to stop / start the client itself, again every ~10 seconds. From what I understand, a client does not have to be listening at the time a pubsub message is sent, it can receive the message afterwards if it can start listening before a certain timeout is reached after the message is sent. Could that be an alternative solution?

@makrusak
Copy link

@murataksoy It can be a workaround. But all these approaches are too fragile. I can't imagine how to use them in a production environment and not to loose sleep :)
"client does not have to be listening at the time a pubsub message is sent" - of course it's one of the main purposes of Pub/Sub, otherwise it's useless.

@elinesterov
Copy link

I opened a ticket a few weeks ago with a question should I upgrade to the latest version or wait until all these issues will be resolved. Despite I was recommended to stay on the latest version, amount of issues and workarounds I have to put in my code drive me nuts. So in prod I still stay on a pretty old version which is 0.24.0.

@murataksoy
Copy link

@makrusak I totally agree, a fix is necessary. These are just some workarounds that some people might be willing to consider depending on their application until this bug is fixed.

@frankcarey
Copy link

Hi, Any updates or ETAs here?

@dhermes
Copy link
Contributor

dhermes commented Nov 27, 2017

@frankcarey You can follow along on #4444 for now. I am working actively to squash this and related bugs.

dhermes added a commit to dhermes/google-cloud-python that referenced this issue Nov 27, 2017
- Adding special handling for API core exceptions
- Retrying on both types of idempotent error
- Also doing a "drive-by" hygiene fix changing a global from
  `logger` to `_LOGGER`

Towards googleapis#4234.
dhermes added a commit to dhermes/google-cloud-python that referenced this issue Nov 27, 2017
- Adding special handling for API core exceptions
- Retrying on both types of idempotent error
- Also doing a "drive-by" hygiene fix changing a global from
  `logger` to `_LOGGER`

Towards googleapis#4234.
dhermes added a commit that referenced this issue Nov 27, 2017
- Adding special handling for API core exceptions
- Retrying on both types of idempotent error

Towards #4234.
@dhermes
Copy link
Contributor

dhermes commented Nov 27, 2017

Fixed by #4444. I ran a reproducible test case against the current master for 659 seconds and it did not fail with UNAVAILABLE (typically had failed within 90-200s):

...
00638401:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :The current p99 value is 10 seconds.
00638401:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Renewing lease for 0 ack IDs.
00638401:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Snoozing lease management for 8.997289 seconds.
00647408:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :The current p99 value is 10 seconds.
00647408:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Renewing lease for 0 ack IDs.
00647408:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Snoozing lease management for 4.038803 seconds.
00651451:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :The current p99 value is 10 seconds.
00651452:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Renewing lease for 0 ack IDs.
00651452:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Snoozing lease management for 7.508992 seconds.
00658687:DEBUG:google.cloud.pubsub_v1.subscriber._consumer:Thread-41             :Sending initial request: subscription: "projects/precise-truck-742/subscriptions/s-djh-local-1511820764773"
stream_ack_deadline_seconds: 10

00658966:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :The current p99 value is 10 seconds.
00658966:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Renewing lease for 0 ack IDs.
00658966:DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Thread-9              :Snoozing lease management for 5.914027 seconds.

@dhermes dhermes closed this as completed Nov 27, 2017
@dhermes
Copy link
Contributor

dhermes commented Nov 27, 2017

FYI for all these following this issue, I have pushed a release (0.29.1) and it is getting built right now (will end with a push to PyPI).

I don't think all issues have been resolved, but this at least will "gracefully" handle inactivity. (The UNAVAILABLE in the case of inactivity is almost certainly caused by the local gRPC client, not the backend.)

@makrusak
Copy link

makrusak commented Nov 29, 2017

Why this issue is closed?
With new version I still experience a lot of dangling threads and 100% consumption of one core on my machines. Such behavior appears after some running time.
I don't really want to restart my workers periodically to reduce useless CPU consumption

@dhermes
Copy link
Contributor

dhermes commented Nov 29, 2017

@makrusak This issue was resolved because the implementation now correctly handles UNAVAILABLE errors.

There are still a few very important issues to be tackled, though #3965 seems closest to what you're describing and isn't currently marked as "p1" (but it probably should be).

A few questions for you:

  • How much time is "some running time"?
  • When you say "With new version" you mean 0.29.1, yes?
  • Would you mind sharing / describing some code that I can use to reproduce? (I've been using a gist for tracking this and PubSub: set_exception can only be called once. can still occur "erroneously" #4463)
  • How are you diagnosing the "dangling threads"? (@jonparrott and I suspect there is some thread leakage, and I have added names to almost all of the threads we create to make this a bit easier to track down)

@makrusak
Copy link

@dhermes Yes, I installed and deployed 29.1 few minutes after this your comment.
It's working without fails and UNAVAILABLE exceptions from that time.
Its' CPU consumption rising slowly and now achieves 93% of single core without any load and messages in topic.
Diagnose: ps aux -T | grep "python3 subscriber.py" | wc -l
It returns 177 now. (I'm running it on container engine in isolation, so 177 is real number of threads of single subscriber process)
My code is super basic like sample from guide. If you want I can copy/paste guide example, deploy it the same way, wait 2 days and send you report.

@dhermes
Copy link
Contributor

dhermes commented Nov 29, 2017

Which guide are you referring to? (I'm not on the team that writes the guides and I wasn't part of the original implementation of Pub / Sub, so sorry for my ignorance.)

@makrusak
Copy link

makrusak commented Nov 29, 2017

@dhermes My logs look like: https://pastebin.com/Dn9Lnynb (and so on and so on). As you can see initial request is sending repeatedly each 1.5 minutes.
Pyhon sample from here, for example: https://cloud.google.com/pubsub/docs/pull.
I am just using future like this instead of

while True:
    subscription = subscriber.subscribe(subscription_path, callback=run_combine_process)
    subscription.future.result()

@dhermes
Copy link
Contributor

dhermes commented Nov 30, 2017

@makrusak I just cut another release https://pypi.org/project/google-cloud-pubsub/0.29.2/. This will at least stop some of the threads that weren't previously being stopped. However, it won't close the bidirectional consumer(s) on failure. You can do that by calling Policy.close() after the future returned from Policy.open() is done().

@makrusak
Copy link

@dhermes Thanks! I'll validate it today and hope that all critical bugs will be fixed soon.

@dhermes
Copy link
Contributor

dhermes commented Nov 30, 2017

@makrusak I've definitely traced down at last one place where threads are leaked, but I can't get the CPU usage to spike much more than 3% after running for 300 seconds (which ends up recovering from UNAVAILABLE 3 times, leaving 3 zombie threads).

@itamar-resonai
Copy link

I've updated from 0.28.4 to 0.29.2, and now I get the following error exactly 90 seconds after the client starts listening:

E1203 07:05:39.795156557      17 chttp2_transport.c:1005]    Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
Exception in thread Thread-ConsumerHelper-ConsumeBidirectionalStream:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 246, in _blocking_consume
    recover = self._policy.on_exception(exc)
  File "/usr/src/app/cloudapi/pubsub.py", line 28, in on_exception
    code_value = getattr(exc, 'code', lambda: None)()
TypeError: 'int' object is not callable

@dhermes
Copy link
Contributor

dhermes commented Dec 4, 2017

@itamar-resonai That line is from 0.29.0, not from 0.29.2.

Maybe there is an issue with your install?

@itamar-resonai
Copy link

Sorry, my bad.

It was indeed 0.29.2, but I didn't disable the IgnoreRpcFailureHack policy-workaround, so that what happened...

@brianbaquiran
Copy link

brianbaquiran commented May 29, 2018

It seems like the error still persists with pubsub 0.35.0. My pip freeze

google-api-core==1.2.0
googleapis-common-protos==1.5.3
google-auth==1.4.1
google-cloud-core==0.28.1
google-cloud-error-reporting==0.30.0
google-cloud-logging==1.6.0
google-cloud-pubsub==0.35.0
google-cloud-storage==1.10.0
google-resumable-media==0.3.1
grpc-google-iam-v1==0.11.4
grpcio==1.12.0

I am seeing the traceback:

Traceback (most recent call last):
 File "/usr/local/lib/python3.5/site-packages/google/api_core/grpc_helpers.py", line 76, in next return six.next(self._wrapped)
 File "/usr/local/lib/python3.5/site-packages/grpc/_channel.py", line 341, in __next__ return self._next()
 File "/usr/local/lib/python3.5/site-packages/grpc/_channel.py", line 335, in _next raise self grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

In case it matters, I'm running my code on GKE.

@tseaver tseaver added the type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. label May 29, 2018
@tseaver tseaver assigned theacodes and unassigned lukesneeringer May 29, 2018
@brianbaquiran
Copy link

Now, testing with 0.35.1, I am seeing the same error. Cutting-and-pasting from Stackdriver logs:

RPC termination has signaled streaming pull manager shutdown.
Stopping consumer.
Thread-ConsumeBidirectionalStream caught unexpected exception cannot join current thread and will exit.
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/google/api_core/grpc_helpers.py", line 76, in next
    return six.next(self._wrapped)
  File "/usr/local/lib/python3.5/site-packages/grpc/_channel.py", line 341, in __next__
    return self._next()
  File "/usr/local/lib/python3.5/site-packages/grpc/_channel.py", line 335, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>

The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 369, in _recoverable
    return method(*args, **kwargs)
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 258, in recv
    return next(self.call)
  File "/usr/local/lib/python3.5/site-packages/google/api_core/grpc_helpers.py", line 78, in next
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.ServiceUnavailable: 503 The service was unable to fulfill your request. Please try again. [code=8a75]

During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 456, in _thread_main
    response = self._bidi_rpc.recv()
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 382, in recv
    super(ResumableBidiRpc, self).recv)
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 373, in _recoverable
    self._finalize(exc)
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 323, in _finalize
    callback(result)
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 424, in _on_rpc_done
    self.close(reason=future)
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 321, in close
    self._consumer.stop()
  File "/usr/local/lib/python3.5/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py", line 493, in stop
    self._thread.join()
  File "/usr/local/lib/python3.5/threading.py", line 1051, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

Thread-ConsumeBidirectionalStream exiting
Cleanly exiting request generator.
Thread-LeaseMaintainer exiting.

My relevant pip freeze:

google-api-core==1.2.0
google-auth==1.4.2
google-cloud-core==0.28.1
google-cloud-error-reporting==0.30.0
google-cloud-logging==1.6.0
google-cloud-pubsub==0.35.1
google-cloud-storage==1.10.0
google-resumable-media==0.3.1
googleapis-common-protos==1.5.3
grpc-google-iam-v1==0.11.4
grpcio==1.12.0

@theacodes
Copy link
Contributor

Thanks, @brianbaquiran, let's track this over at #4234.

@jakeczyz
Copy link

jakeczyz commented Apr 6, 2019

@theacodes , the issue you linked to in the comment above is this self-same issue. :) Is there another issue you meant to link that discusses catching internal-thread pubsub exceptions? I haven't been able to find a satisfactory answer. Thanks.

https://stackoverflow.com/q/55552606/379037

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests