Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub has no way to track errors from the subscriber thread. #3888

Closed
theacodes opened this issue Aug 28, 2017 · 4 comments · Fixed by #4265
Closed

Pub/Sub has no way to track errors from the subscriber thread. #3888

theacodes opened this issue Aug 28, 2017 · 4 comments · Fixed by #4265
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API.

Comments

@theacodes
Copy link
Contributor

Presently if you subscribe and an error occurs in the consumer's thread there's no way to respond to the error on the main thread.

It just logs the exception and hangs:

Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
  File "/Users/jonwayne/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/jonwayne/.pyenv/versions/3.6.0/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
    self._policy.on_exception(exc)
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
    raise exception
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
    for response in response_generator:
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/grpc/_channel.py", line 363, in __next__
    return self._next()
  File "/Users/jonwayne/workspace/python-docs-samples/pubsub/cloud-client/env/lib/python3.6/site-packages/grpc/_channel.py", line 357, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.NOT_FOUND, Resource not found (resource=my-subs).)>

It seems that consumer.open should return a future. This future could be used to both block the main thread to consume message and pass through exceptions.

So the current sample:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Could instead be:

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    future = subscriber.subscribe(subscription_path, callback=callback)

    print('Listening for messages on {}'.format(subscription_path))
    # The subscriber is non-blocking but returns a future that allows us
    # to block the main thread and allow messages to process in the
    # background. This effectively blocks forever unless an error
    # occurs in the subscriber thread in which case it'll be
    # re-raised here.
    future.result()
@lukesneeringer
Copy link
Contributor

I think I agree with this but it is worth asking: Is there any situation where there are multiple exceptions? Futures can only have single resolutions.

I do think it is fine to have a future that never has a result (or only gets one if you explicitly close), but I am operating under the assumption that when the exception occurs, we will cause that thread to exit and set the exception on the future.

@theacodes
Copy link
Contributor Author

theacodes commented Aug 29, 2017 via email

@duilio
Copy link

duilio commented Sep 29, 2017

A workaround would be to override the default Policy used by the Client. For instance:

class Custom(Policy):
    def on_exception(self, exception):
        try:
            return super(Custom, self).on_exception(exception)
        except:
            # handle consumer exceptions here, for instance reject a promise or
            # set threading.Event to communicate the failure to the main thread.
            raise

@cgthayer
Copy link

cgthayer commented Oct 3, 2017

Having a future to check on would be a whole lot cleaner probably. At least it would be clear that control was being passed around, whereas having the on_exception() is vague and requires understanding the internals some.

We've had to do something similar because we see this error which never seems to recover (running in google container engine):

grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, OS Error)>

I'm inclined to believe that a connection broke or shutdown, causing a socket to trigger an "OS Error".
This is not the only error we see, but since they show up sporadically it's hard to know which ones wedge us and which are properly handled. For example DEADLINE_EXCEEDED is "normal" and processing goes on.

To work around the wedging, our code does something like this. We set a variable to indicate to the outer loop that the client might be bad.

class OurPolicy(thread.Policy):
    """                                                                                                                                                                                                                                                                                                                                                                                                               
    We occasionally see errors that google code doesn't recover from, so we                                                                                                                                                                                                                                                                                                                                           
    set a flag that let's the outer thread respond by restarting the client.                                                                                                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                                                                                                                                                      
    grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, OS Error)>                                                                                                                                                                                                                                                                                                           
                                                                                                                                                                                                                                                                                                                                                                                                                      
    """
    _exception_caught = None

    def __init__(self, *args, **kws):
        logger.info(f'Initializing our PubSub Policy Wrapper')  # noqa                                                                                                                                                                                                                                                                                                                                                
        return super(OurPolicy, self).__init__(*args, **kws)

    def on_exception(self, exc):
        # If this is DEADLINE_EXCEEDED, then we want to retry by returning None instead of raise-ing                                                                                                                                                                                                                                                                                                                  
        deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
        code_value = getattr(exc, 'code', lambda: None)()
        logger.error(f'Caught Exception in PubSub Policy Wrapper: code={code_value} exc={exc}')
        if code_value == deadline_exceeded:
            return
        OurPolicy._exception_caught = exc
        # will just raise exc                                                                                                                                                                                                                                                                                                                                                                                         
        return super(OurPolicy, self).on_exception(exc)


def get_publisher_client():
    return pubsub_v1.PublisherClient(
        batch_settings=types.BatchSettings(max_latency=0.001, max_messages=1))


def get_subscriber_client():
    return pubsub_v1.SubscriberClient(policy_class=OurPolicy)

then our listener can tell when we've hit an exception that would wedge our server, and instead it restarts the client:

class Worker(object):
   [..lots deleted..]

    def listen(self):
        logger.info('Listening, press Ctrl+C to exit.')
        subscriber = get_subscriber_client()
        subscription_path = subscriber.subscription_path(NOTIFY_PROJECT, self.subscription_name)
        while True:
            if not self.sub_client:
                logger.warning('Restarting pubsub subscriber client')
                self.sub_client = get_subscriber_client()
            self.sub_client.subscribe(subscription_path, callback=self.message_callback)
            try:
                # The subscriber is non-blocking, so we must keep the main thread from                                                                                                                                                                                                                                                                                                                                
                # exiting to allow it to process messages in the background.                                                                                                                                                                                                                                                                                                                                          
                logger.info(f'Listening for messages on topic {self.topic_name} @ {subscription_path}')
                while True:
                    time.sleep(1)
                    if OurPolicy._exception_caught:
                        exc = OurPolicy._exception_caught
                        OurPolicy._exception_caught = None
                        raise exc
            except KeyboardInterrupt:
                logger.info('Stopped listening for tasks.')
                break
            except Exception as e:
                logger.exception(e)
                self.sub_client = None
                from raven.contrib.django.raven_compat.models import client
                client.captureException()

That said, I'm not sure if we should add some mutexes here and be more thread-safe. Also this may have other issues since it's unclear if the old client ever shuts down cleanly. There's nothing in the API docs about it, and python can be lazy about this kind of resource cleanup, especially since worker threads may linger with a reference to the old client object.

https://googlecloudplatform.github.io/google-cloud-python/latest/pubsub/publisher/api/client.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants