Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub / Sub Publisher: Async publish breaks around 1000 messages #4575

Closed
anorth2 opened this issue Dec 12, 2017 · 9 comments
Closed

Pub / Sub Publisher: Async publish breaks around 1000 messages #4575

anorth2 opened this issue Dec 12, 2017 · 9 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@anorth2
Copy link

anorth2 commented Dec 12, 2017

Am unsure if this is a bug (can't find documentation explaining this behavior).
I can send up to 999 pubsub messages and wait for results before continuing, but if I try to send 1000, none of them complete. However, if I run in debug mode, they will complete (because I have a breakpoint in the code allowing more execution time).

It exhibits this failure both in the main thread, or calling Publisher.publish from a thread.

  1. OS type and version
    MacOS High Sierra

  2. Python version and virtual environment information python --version
    Python 3.6.3

  3. google-cloud-python version pip show google-cloud, pip show google-<service> or pip freeze

google-api-core==0.1.2
google-auth==1.2.1
google-cloud-core==0.28.0
google-cloud-pubsub==0.29.2
google-gax==0.15.16
googleapis-common-protos==1.5.3
grpc-google-iam-v1==0.11.4

  1. Stacktrace if available

None, hanging, no exception.

  1. Steps to reproduce

Publish 1000 messages storing their futures. None of the futures will complete.

  1. Code example
    Goal here is to send specified number of messages per second and sleep.
    def publish_function(self):
        # save all futures so we can wait for them to complete
        # Callbacks receive the future as their only argument, as defined in
        # the Future interface.
        def callback(future):
            message_id = future.result()
            self.logger.debug(f"Sent message: {message_id}")

        # The callback is added once you get the future. If you add a callback
        # and the future is already done, it will simply be executed immediately.
        future = self.publisher.publish(self.topic_path, data=self.message)
        print(future)
        future.add_done_callback(callback)
        self.results.append(future)

    def publish(self, total_messages, messages_per_second):
        i = 0
        sleep_time = 1
        if messages_per_second < 1:
            sleep_time = float(sleep_time) / float(messages_per_second)
        logger.debug(f"Sleep time is: {sleep_time}")
        while i < total_messages and not self.exit:
            start = time.time()
            # number of times to send per second
            # if less than one, then the sleep timer handles the wait. We just send one message here
            for j in range(int(math.ceil(messages_per_second))):
                self.publish_function()
                i += 1
                self.next_id()
                # exit inner loop if we hit the total to send
                if i >= total_messages or self.exit:
                    self.stop()
                    break

            # apply result to all futures in self.results. This blocks until the future it's called on completes
            # list forces the map to execute since map returns an iterable


            list(map(futures.Future.result, self.results)) <<<<<<<<<<<--------- ***HANGS HERE***

            end = time.time()
            logger.info(
                f"Took {end - start} to send {messages_per_second} messages in thread {current_thread().getName()}")
            logger.debug(f"If greater than 0 seconds, sleeping for {sleep_time - (end - start)}")
            if sleep_time - (end - start) > 0:
                time.sleep(sleep_time - (end - start))
@dhermes dhermes added the api: pubsub Issues related to the Pub/Sub API. label Dec 12, 2017
@dhermes
Copy link
Contributor

dhermes commented Dec 12, 2017

@anorth2 I'm fairly certain it's a bug. I ran into it yesterday when reproducing a subscriber issue that required a large backlog of published messages. I got as high as 1375 messages through before failure, but not consistently.

Thanks for filing.

@dhermes dhermes added priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Dec 12, 2017
@dhermes dhermes self-assigned this Dec 12, 2017
@anorth2
Copy link
Author

anorth2 commented Dec 12, 2017

@dhermes Also wanted to ask, are there expected side effects from calling publisher.publish from a thread?

if I use the following code list(map(Future.result, running_futures)) (where the Future is from from concurrent.futures._base import Future)
where running_futures is a list of threads each executing publisher.publish, I get an error from the publisher where len(message_ids) != len(futures)

@dhermes
Copy link
Contributor

dhermes commented Dec 12, 2017

@anorth2 To provide a quick answer: I don't know. Since I've started the "hardening" process on the highly concurrent Pub / Sub implementation (i.e. 0.28.x and beyond) I have only worked on the subscriber. But I hope to know more later this week.

@dhermes dhermes changed the title PubSub Publisher Client: Question (possible bug?) with Publisher Client Pub / Sub Publisher: Async publish breaks around 1000 messages Dec 15, 2017
@dhermes
Copy link
Contributor

dhermes commented Dec 16, 2017

I have a reproducible sample that I am debugging here: https://github.com/dhermes/google-cloud-pubsub-performance/tree/master/publish-many

dhermes added a commit to dhermes/google-cloud-python that referenced this issue Dec 18, 2017
dhermes added a commit to dhermes/google-cloud-python that referenced this issue Dec 18, 2017
Unit tests will be coming in the next commit.

Fixes googleapis#4575.
dhermes added a commit to dhermes/google-cloud-python that referenced this issue Dec 19, 2017
Unit tests will be coming in the next commit.

Fixes googleapis#4575.
@kir-titievsky
Copy link

kir-titievsky commented Dec 19, 2017 via email

@dhermes
Copy link
Contributor

dhermes commented Dec 19, 2017

Soon: see #4619

@dhermes
Copy link
Contributor

dhermes commented Dec 19, 2017

@kir-titievsky
Copy link

kir-titievsky commented Dec 19, 2017

This is still not working for me, but with a new error:

import time
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path('XXXX', 'XXX')

for n in range(1000):
    data = u'Message number {}'.format(n)
    def dt(n, t):
        def g(message_id):
            print "%g\t%g\t%g"%(n, time.clock() - t , t)
        return g

    data = data.encode('utf-8')
    publisher.publish(topic_path, data=data).add_done_callback(dt(n, time.clock()))
pip show google-cloud-pubsub
Name: google-cloud-pubsub
Version: 0.30.0
Summary: Python Client for Google Cloud Pub/Sub
Home-page: https://github.com/GoogleCloudPlatform/google-cloud-python
Author: Google Cloud Platform
Author-email: googleapis-packages@google.com
License: Apache 2.0
Location: /Users/kir/cloud/env/lib/python2.7/site-packages
Requires: psutil, google-auth, google-api-core, grpc-google-iam-v1

Errors (many of these:)

ERROR:root:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x108c85410>" raised exception!
Traceback (most recent call last):
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/grpc/_plugin_wrapping.py", line 74, in __call__
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/auth/transport/grpc.py", line 77, in __call__
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/auth/transport/grpc.py", line 65, in _get_authorization_headers
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/auth/credentials.py", line 121, in before_request
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/oauth2/credentials.py", line 117, in refresh
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/oauth2/_client.py", line 191, in refresh_grant
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/oauth2/_client.py", line 105, in _token_endpoint_request
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/google/auth/transport/requests.py", line 122, in __call__
  File "/Users/kir/cloud/env/lib/python2.7/site-packages/six.py", line 737, in raise_from
TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x10958cd90>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known',))

@dhermes
Copy link
Contributor

dhermes commented Dec 19, 2017

Discussed with @kir-titievsky on Hangouts. Seems to be an intermittent HTTP issue (the error indicates that https://accounts.google.com was unreachable).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

3 participants