Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure inactive request generator is stopped before spawning new one in Pub / Sub consumer. #4503

Merged
merged 9 commits into from Dec 1, 2017

Conversation

dhermes
Copy link
Contributor

@dhermes dhermes commented Nov 30, 2017

This makes the request_queue used by Consumer._request_generator_thread() local to the caller. This way on recovery a new queue can be used for the new thread.

@dhermes dhermes added the api: pubsub Issues related to the Pub/Sub API. label Nov 30, 2017
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Nov 30, 2017
dhermes added a commit to dhermes/google-cloud-pubsub-performance that referenced this pull request Nov 30, 2017
Copy link
Contributor

@lukesneeringer lukesneeringer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One item that I believe to be wrong, and one question.

@@ -244,7 +252,19 @@ def _blocking_consume(self):
break
except Exception as exc:
recover = self._policy.on_exception(exc)
if not recover:
if recover:
with threading.Lock():

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@@ -60,12 +60,13 @@ def __init__(self):
def __contains__(self, needle):
return needle in self._helper_threads

def start(self, name, queue, target):
def start(self, name, queue_put, target):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

with threading.Lock():
# Must lock so that ``self.send_request()`` cannot add
# more items to the queue while it is being replaced.
previous_queue = self._request_queue

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@dhermes
Copy link
Contributor Author

dhermes commented Dec 1, 2017

@jonparrott @lukesneeringer PTAL

try:
request_generator.close()
except ValueError as exc:
if exc.args != ('generator already executing',):

This comment was marked as spam.

This comment was marked as spam.

"""
with self._put_lock:
try:
request_generator.close()

This comment was marked as spam.

@@ -60,12 +60,13 @@ def __init__(self):
def __contains__(self, needle):
return needle in self._helper_threads

def start(self, name, queue, target):
def start(self, name, queue_put, target):

This comment was marked as spam.

@dhermes dhermes changed the title Possible solution for zombie threads in Pub / Sub consumer. Make sure inactive request generator is stopped before spawning new one in Pub / Sub consumer. Dec 1, 2017
# queue **was** and remains empty.
self._request_queue.put(_helper_threads.STOP)
# Wait for the request generator to ``.get()`` the ``STOP``.
while not self._request_queue.empty():

This comment was marked as spam.

except ValueError as exc:
if exc.args != ('generator already executing',):
raise
if not self._request_queue.empty():

This comment was marked as spam.

@dhermes dhermes force-pushed the pubsub-zombie-1 branch 2 times, most recently from 640524c to bc44c1e Compare December 1, 2017 20:06
'Waiting for active request generator to receive STOP')
while not self._request_queue.empty():
pass
request_generator.close()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

- Made the "already closed" test use an actual generator instead
  of a mock.
- Removed data race from `test_stop_request_generator_running()`
- Added tracking of the values received from the queue in
  test generator
- Dropping the "extra" check
- Making it return `True/False` rather than throwing an exception,
  this way the recovery can just be negated instead of a panic
- Added logging statements to indicate if it was stopped and when
  the helper is actually waiting on `Queue.empty()`
@dhermes
Copy link
Contributor Author

dhermes commented Dec 1, 2017

@lukesneeringer @jonparrott PTAL. Phew I think it's finally merge-able.

'request queue is not empty.')
return False
# If we **cannot** close the request generator,
# then there is no blocking get on the queue. Since

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

yield value


def test_stop_request_generator_not_running():

This comment was marked as spam.

Also updating `test_stop_request_generator_queue_non_empty()` to
use a real generator rather than a mock.
@dhermes
Copy link
Contributor Author

dhermes commented Dec 1, 2017

I added explanations for each unit test @jonparrott. Also updated one of them to use a real generator instead of a mock. Will merge once CI is green.

This code is no fun.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants