Skip to content

Pub/Sub: future.result() doesn't raise exceptions from callback #5812

@gruzewski

Description

@gruzewski

Hi. I am trying to setup up a simple client that subscribes to a PubSub and consumes messages. I have a problem with handling exceptions raised in callback method. I would like to be able to handle them in the main thread. The reason why I would like to do it is those exceptions are impossible to recover from and the app should crash and not consume messages. In the subscribe method's docstring of Pub/Sub Client I have found that:

This method starts the receiver in the background and returns a
        *Future* representing its execution. Waiting on the future (calling
        ``result()``) will block forever or until a non-recoverable error
        is encountered (such as loss of network connectivity). Cancelling the
        future will signal the process to shutdown gracefully and exit.

And example code snippet:

from google.cloud.pubsub_v1 import subscriber

            subscriber_client = pubsub.SubscriberClient()

            # existing subscription
            subscription = subscriber_client.subscription_path(
                'my-project-id', 'my-subscription')

            def callback(message):
                print(message)
                message.ack()

            future = subscriber.subscribe(
                subscription, callback)

            try:
                future.result()
            except KeyboardInterrupt:
                future.cancel()

Unfortunately I cannot get my code to capture any exceptions and cancel future. I am pretty sure I am doing something wrong.

OS type and version: macOs High Sierra (10.13.6)
Python version: Python 3.6.5
Google Cloud Lib versions:

google-api-core (1.3.0)
google-auth (1.5.1)
google-cloud-pubsub (0.37.1)
googleapis-common-protos (1.5.3)
grpc-google-iam-v1 (0.11.4)
grpcio (1.14.1)
protobuf (3.6.1)

Stacktrace:

INFO:__main__:Message {
  data: b'{"id":"02121f45-4608-4b4d-858e-31004d6ea12e","proj...'
  attributes: {
    "buildId": "02121f45-4608-4b4d-858e-31004d6ea12e",
    "status": "WORKING"
  }
}
ERROR:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Top-level exception occurred in callback while processing a message
Traceback (most recent call last):
  File "/Users/gruzewski/.pyenv/versions/3.6.5/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 63, in _wrap_callback_errors
    callback(message)
  File "/Users/gruzewski/Library/Preferences/PyCharm2018.1/scratches/scratch.py", line 12, in callback
    raise RuntimeError('Test')
RuntimeError: Test
INFO:__main__:Message {
  data: b'{"id":"02121f45-4608-4b4d-858e-31004d6ea12e","proj...'
  attributes: {
    "buildId": "02121f45-4608-4b4d-858e-31004d6ea12e",
    "status": "SUCCESS"
  }
}

Steps to reproduce:

1. Run the example code.
2. Turn off wifi.

Code example:

import logging

from google.cloud import pubsub_v1


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def callback(message):
    logger.info(message)
    raise RuntimeError('Test')


def main():
    subscriber = pubsub_v1.SubscriberClient()

    future = subscriber.subscribe('subscription/path', callback=callback)

    try:
        future.result()
        logger.error('I should not be here.')
    except RuntimeError as e:
        logger.exception('Exception during listing for messages. Shutting down. Exception: {}'.format(e))
        future.cancel()
        exit(1)

if __name__ == "__main__":
    main()

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions