Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Multiple subscribers with flow control #237

@osmanbaskaya

Description

@osmanbaskaya

Hello,

I'd like to use two subscribers to process messages from one specific topic. These two subscribers run on different compute engine instances. I also want each subscriber to have only one message to pull from the topic. In other words, if the subscriber has already a message to work on (haven't acked yet), it shouldn't pull another one.

When I set the max_messages in flow-control settings to 1, there is only one subscriber working actively even though multiple subscribers are running from different instances. When I set the max_messages=2, then from time to time the same subscriber took 2 messages and starves the other subscriber. I am pretty sure I can achieve what I described above and missing something. Could you help me to figure this out?

Env details and code snippets are below:

Environment details

  1. Pubsub related
  2. Mac OS Catalina 10.15.7
    Python 3.8.0
    Name: google-cloud-pubsub
    Version: 2.1.0

Code example

from google.cloud import pubsub_v1
import time


class Consumer:
    def __init__(self, project, subscription_id=None, consumer_fn=None):
        if subscription_id is None:
            raise ValueError("Subscription ID cannot be None.")

        self.project = project
        self.subscriber_client = pubsub_v1.SubscriberClient()

        self.subscription_id = subscription_id

        self.subscription_path = self.subscriber_client.subscription_path(
            self.project, self.subscription_id
        )
        self.consumer_fn = consumer_fn or self.consumer_fn

    @staticmethod
    def consumer_fn(points):
        # do something here
        total = sum(points)
        time.sleep(10)

    def process_message(self, message):
        data = message.data.decode("utf-8")
        points = [int(point) for point in data.split()]
        start, end = points
        self.consumer_fn(range(start, end))
        message.ack()

    def consume(self, max_messages_in_flow=1):
        while True:
            flow_control = pubsub_v1.types.FlowControl(
                max_messages=max_messages_in_flow
            )
            streaming_pull_future = self.subscriber_client.subscribe(
                self.subscription_path,
                callback=self.process_message,
                flow_control=flow_control,
            )
            try:
                logger.info(f"Listening for messages on {self.subscription_path}")
                streaming_pull_future.result()
            except TimeoutError:
                pass
            except Exception as e:
                logger.error(f"Got error while processing the the message {e}")
            except KeyboardInterrupt:
                streaming_pull_future.cancel()
                self.subscriber_client.close()
                logger.info("Consumer will stop gracefully.")
                break

Thanks!

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.type: questionRequest for information or clarification. Not an issue.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions