Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Mitigate busy reopen loop in ResumableBidiRpc consuming 100% CPU #8193

Merged
merged 4 commits into from Jun 18, 2019

Conversation

plamut
Copy link
Contributor

@plamut plamut commented May 30, 2019

Closes #7910.

This PR fixes the issue with ResumableBidiRpc that can enter a busy re-open loop consuming lots of CPU in the process . The comment on the issue explains this in more detail.

How to test

Steps to reproduce:

  • (optional) Configure more verbose logging in the subscriber script:
import logging 
log_format = (
    "%(levelname)-8s [%(asctime)s] %(threadName)-33s "
    "[%(name)s] [%(filename)s:%(lineno)d][%(funcName)s] %(message)s"
)
logging.basicConfig(
    level=logging.DEBUG,
    format=log_format,
)
  • Disable your internet connection
  • Start the streaming pull using a susbscriber client instance:
subscriber.subscribe(SUBSCRIPTION_PATH, callback=my_callback)

while True:
    try:
        time.sleep(60)
    except KeyboardInterrupt:
        break

Actual result (before the fix):
The ResumableBidiRpc class tries to re-establish the stream many times in rapid succession, resulting in a 100% CPU spike and a ton of log output:

...
DEBUG    [2019-05-15 10:06:01,440] Thread-ConsumeBidirectionalStream [_recoverable] Call to retryable <bound method ResumableBidiRpc._recv of <google.api_core.bidi.ResumableBidiRpc object at 0x7fb5eb91d630>> caused 503 DNS resolution failed.
INFO     [2019-05-15 10:06:01,440] Thread-ConsumeBidirectionalStream [_should_recover] Observed recoverable stream error 503 DNS resolution failed
DEBUG    [2019-05-15 10:06:01,440] Thread-ConsumeBidirectionalStream [_recoverable] Re-opening stream from retryable <bound method ResumableBidiRpc._recv of <google.api_core.bidi.ResumableBidiRpc object at 0x7fb5eb91d630>>.
INFO     [2019-05-15 10:06:01,441] Thread-ConsumeBidirectionalStream [_reopen] Re-established stream
INFO     [2019-05-15 10:06:01,441] Thread-1                          [_should_recover] Observed recoverable stream error 503 DNS resolution failed
DEBUG    [2019-05-15 10:06:01,441] Thread-1                          [_on_call_done] Re-opening stream from gRPC callback.
DEBUG    [2019-05-15 10:06:01,441] Thread-1                          [_reopen] Stream was already re-established.
INFO     [2019-05-15 10:06:01,441] Thread-1                          [_should_recover] Observed recoverable stream error 503 DNS resolution failed
DEBUG    [2019-05-15 10:06:01,441] Thread-1                          [_on_call_done] Re-opening stream from gRPC callback.
INFO     [2019-05-15 10:06:01,441] Thread-1                          [_reopen] Re-established stream
DEBUG    [2019-05-15 10:06:01,441] Thread-ConsumeBidirectionalStream [_recoverable] Call to retryable <bound method ResumableBidiRpc._recv of <google.api_core.bidi.ResumableBidiRpc object at 0x7fb5eb91d630>> caused 503 DNS resolution failed.
INFO     [2019-05-15 10:06:01,442] Thread-ConsumeBidirectionalStream [_should_recover] Observed recoverable stream error 503 DNS resolution failed
DEBUG    [2019-05-15 10:06:01,442] Thread-ConsumeBidirectionalStream [_recoverable] Re-opening stream from retryable <bound method ResumableBidiRpc._recv of <google.api_core.bidi.ResumableBidiRpc object at 0x7fb5eb91d630>>.
INFO     [2019-05-15 10:06:01,442] Thread-ConsumeBidirectionalStream [_reopen] Re-established stream
INFO     [2019-05-15 10:06:01,442] Thread-1                          [_should_recover] Observed recoverable stream error 503 DNS resolution failed
DEBUG    [2019-05-15 10:06:01,442] Thread-1                          [_on_call_done] Re-opening stream from gRPC callback.
DEBUG    [2019-05-15 10:06:01,442] Thread-1                          [_reopen] Stream was already re-established.
INFO     [2019-05-15 10:06:01,442] Thread-1                          [_should_recover] Observed recoverable stream error 503 DNS resolution failed
DEBUG    [2019-05-15 10:06:01,442] Thread-1                          [_on_call_done] Re-opening stream from gRPC callback.
INFO     [2019-05-15 10:06:01,443] Thread-1                          [_reopen] Re-established stream
...

This also happens if the streaming pull is running normally, and the internet connection is shut down in the middle of it - eventually the busy re-open loop starts. What's worse, this happens more than a single thread - both the gRPC channel thread (?) and the consumer helper thread try to re-establish the stream.

Expected result (after the fix):
The re-open calls are throttled, and the CPU consumption is "normal".

Things to do/discuss:

  • Fix test coverage (one branch appears to be missing).
  • What should the default throttle params be set to?
  • ResumalbeBidiRpc is also used in the Firestore client, does this PR (negatively) affect it in any way?
    ANSWER: No, because this PR does not change the former's default behavior.
  • Tests rely on timing, and even with a sizeable safety buffer, this is not ideal. Would there be any objections to adding freezegun as an API core dependency?

@plamut plamut added api: pubsub Issues related to the Pub/Sub API. api: core labels May 30, 2019
@plamut plamut requested a review from crwilcox as a code owner May 30, 2019 12:23
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label May 30, 2019
@plamut plamut changed the title Core: ISS-7910 Core: Mitigate busy reopen loop in ResumableBidiRpc consuming 100% CPU May 30, 2019
@sduskis sduskis requested review from busunkim96 and removed request for crwilcox May 30, 2019 12:39
@plamut plamut force-pushed the iss-7910 branch 2 times, most recently from 2839d0b to d201ab1 Compare May 30, 2019 14:10
@busunkim96 busunkim96 requested a review from crwilcox May 31, 2019 20:33
@busunkim96
Copy link
Contributor

@crwilcox Could you comment on the impact to Firestore?

@sduskis sduskis requested review from crwilcox and removed request for crwilcox June 4, 2019 13:45
@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label Jun 6, 2019
@plamut
Copy link
Contributor Author

plamut commented Jun 11, 2019

FWIW, I modified the ResumalbeBidiRpc class to not use throttling by default, meaning that Firestore will not be affected by this PR.

@sduskis
Copy link
Contributor

sduskis commented Jun 17, 2019

@crwilcox, what's the good word on this PR?

@crwilcox
Copy link
Contributor

@sduskis I had left comments in the python chat for Peter last week. Let me copy them here.

self._entry_lock = threading.Lock()

def __enter__(self):
with self._entry_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lock, but deque should be threadsafe. Was this done to keep things more straightforward? There is reference to the first element and then removing it in a check, which would have to be changed if we moved to a non-locking impl I think?

Copy link
Contributor Author

@plamut plamut Jun 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial idea was that the lock would serve as a gatekeeper, and if any thread needs to sleep while holding it, other threads trying to enter would also be forced to wait. With a lock, achieving that correctly would be straightforward. The lock was placed there before implementing the rest of the logic - one could say to avoid accidentally overlooking an edge case withing the logic, yes.

(deque operations themselves are thread-safe, but the rest of the logic surrounding it might not be, thus I went with a conservative approach)

Here's one risky scenario, assume _past_entries == [1, 2, 3, 4] and access_limit == 3:

T   1    2    3    4    5
----|----|----|----|----|--
      |_____________|
                    ↑A
        |____________|
                     ↑B
  • Thread A enters the manager at T=4+ε, computes its cutoff_time, and determines that entry 1 is now irrelevant.
  • Just before left-popping an item, thread B enters at T=4+2ε, determines the same, and removes 1 from the queue. It then figures out that there are already three entries in the window, and goes to sleep.
  • Thread A is resumed, and pops the leftmost item, but that's actually entry 2 now! With only 3 and 4 left in the queue, thread proceeds without waiting, because it only saw two items in its window.

Since the queue length and the entries in it are essentially a shared state, having a lock around that avoids tricky scenarios. Besides, the lock is released almost immediately after new entries are again allowed, as the lock-holding thread is put to sleep for just the right amount of time.

Do you concur, or have I overlooked something?

(BTW, double checking concurrency logic is always appreciated, so thanks!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is by far the easier approach here. There would be some complicated bits to make this lock free. I just was just curious if this was considered :)

api_core/google/api_core/bidi.py Outdated Show resolved Hide resolved
The commit renames the entry_cap parameter to access_limit, and
changes the type of the time_window argument from float to timedelta.
@lidizheng
Copy link

I might be wrong about the usage of this feature, but I want to suggest an alternative algorithm.

Since _pastEntries is private, I guess users don't need to keep the timestamps. In this case, there is a well-known rate limiting algorithm, token bucket, might apply well to this situation.

  1. It doesn't need to keep the timestamp, so there is no need for complex data structure;
  2. It's fast, the time/space complexity are both O(1).

I found an Python implementation at rate_limit.py.

self._entry_lock = threading.Lock()

def __enter__(self):
with self._entry_lock:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is by far the easier approach here. There would be some complicated bits to make this lock free. I just was just curious if this was considered :)

@sduskis sduskis merged commit 6b0c5ad into googleapis:master Jun 18, 2019
@TheKevJames
Copy link

Updating to google-cloud-pubsub==0.42.0 seems to be throwing a TypeError which looks to be related to this PR:

TypeError: __init__() got an unexpected keyword argument 'throttle_reopen'
  File "main.py", line 48, in main
    flow_control=get_flow_control())
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/client.py", line 212, in subscribe
    manager.open(callback=callback, on_callback_error=future.set_exception)
  File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 386, in open
    throttle_reopen=True,

Not overly familiar with this code base, but looks like there might need to be a version bump to api-core as well, since pubsub 0.42.0 now relies on the new code added in this PR? For reference, installing google-cloud-pubsub==0.42.0 seems to have picked up the following relevant library versions:

google-api-core | 1.11.1
google-auth | 1.6.3
google-cloud-pubsub | 0.42.0
googleapis-common-protos | 1.6.0
grpc-google-iam-v1 | 0.11.4
grpcio | 1.21.1

@busunkim96
Copy link
Contributor

@TheKevJames We've made that bump and cut another release, please use 0.42.1. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: core api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement. 🚨 This issue needs some love.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pubsub: Pull Subscriber unable to re-connect after a while
8 participants