Skip to content

Can not reopen Pub/Sub subscription with a different callback #4832

@bigunyak

Description

@bigunyak

I'm trying to do a simple thing: open Pub/Sub subscription with one callback just to drain out all messages that are currently stored in the subscription. Then reopen the same subscription with a different callback to do a proper processing. Unfortunately that doesn't seem to work.

The first time I open subscription like this:

    drained_num = [0]

    def discard_callback(message):
        drained_num[0] += 1
        message.ack()

    future = subscription.open(discard_callback)

    try:
        future.result(timeout=2)
    except TimeoutError:
        pass
    except Exception as ex:
        logger.error("Caught unexpected exception!", exc_info=True)
    finally:
        logger.info("Discarded %d messages", drained_num[0])
        subscription.close()

After that I try to reopen subscription with a different callback:

    def callback(message):
        pass

    future = subscription.open(callback)

    try:
        future.result(timeout=3)
    except TimeoutError:
        logger.info("Time is up!")
    except Exception as ex:
        logger.error("Caught unexpected exception!", exc_info=True)
    finally:
        logger.info("Closing connection")
        subscription.close()

And this fails with the following error:

MainThread:139861846668480 [ERROR] Caught unexpected exception!
Traceback (most recent call last):
  File "scripts/publish-receive.py", line 85, in main
    future.result(timeout=args.timeout)
  File "PATH/venv/lib/python2.7/site-packages/google/cloud/pubsub_v1/futures.py", line 103, in result
    raise err
RuntimeError: cannot schedule new futures after shutdown

I tried to don't call subscription.close() after reading subscription the first time but then I get another error:

Traceback (most recent call last):
  File "scripts/publish-receive.py", line 98, in <module>
    main()
  File "scripts/publish-receive.py", line 81, in main
    future = subscription.open(callback)
  File "PATH/venv/lib/python2.7/site-packages/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 261, in open
    raise ValueError('This policy has already been opened.')
ValueError: This policy has already been opened.

So, my question is it even possible to reopen subscription with a different callback?
I'm using google-cloud-pubsub-0.30.1.

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the Pub/Sub API.type: questionRequest for information or clarification. Not an issue.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions