-
Notifications
You must be signed in to change notification settings - Fork 214
Subscriber shutdown should optionally wait for the currently executing callbacks to complete #150
Description
Environment details
- OS type and version: osx
- Python version: 3.8
google-cloud-pubsubversion: 1.6.1
Steps to reproduce
When shutting down the subscriber, in-flight messages cannot be acked because the consumer stops first, and the dispatcher checks consumer.is_active before sending any requests. The in-flight tasks are correctly waited for by the shutdown sequence, but the ack() never gets sent
- Start subscriber
- Shutdown subscriber by calling cancel() on the returned future
- In-flight messages are processed and the shutdown wait for them to return
- Calling ack() on the messages will not be sent to pubsub as the consumer is already shutting down
Code example
import time
from google.cloud import pubsub_v1
project_id = "test"
subscription_id = "sub1"
def on_message(message):
time.sleep(5)
message.ack()
with pubsub_v1.SubscriberClient() as subscriber:
subscription_path = subscriber.subscription_path(project_id, subscription_id)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=on_message)
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()Just after receiving the message, hit ctrl-c, the process will shutdown, wait for the 5 seconds sleep() in the on_message callback, and then exit.
The message.ack() has no effect as it is not sent to pubsub server.
Is this expected ? This means there is no way to gracefully shutdown the subscriber without loosing currently executing callbacks?
I tried to switch the shutdown sequence to first stopping the scheduler, then the consumer in streaming_pull_manager.py:510 and it seems to correctly allow the callbacks to terminate and process the ack(), but i fear this is only a partial workaround because it means new messages may still come in.
Ideally, the consumer would stop receiving new messages, nack() all held messages, and allow for currently executing callbacks to terminate, process the ack/nack/drop requests, then continue shutting down ?