diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py new file mode 100644 index 000000000000..38d2ae8dc505 --- /dev/null +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py @@ -0,0 +1,70 @@ +# Copyright 2018, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import + +import logging +import threading + + +_LOGGER = logging.getLogger(__name__) +_HEARTBEAT_WORKER_NAME = 'Thread-Heartbeater' +# How often to send heartbeats in seconds. Determined as half the period of +# time where the Pub/Sub server will close the stream as inactive, which is +# 60 seconds. +_DEFAULT_PERIOD = 30 + + +class Heartbeater(object): + def __init__(self, manager, period=_DEFAULT_PERIOD): + self._thread = None + self._operational_lock = threading.Lock() + self._manager = manager + self._stop_event = threading.Event() + self._period = period + + def heartbeat(self): + """Periodically send heartbeats.""" + while self._manager.is_active and not self._stop_event.is_set(): + self._manager.heartbeat() + _LOGGER.debug('Sent heartbeat.') + self._stop_event.wait(timeout=self._period) + + _LOGGER.info('%s exiting.', _HEARTBEAT_WORKER_NAME) + + def start(self): + with self._operational_lock: + if self._thread is not None: + raise ValueError('Heartbeater is already running.') + + # Create and start the helper thread. + self._stop_event.clear() + thread = threading.Thread( + name=_HEARTBEAT_WORKER_NAME, + target=self.heartbeat) + thread.daemon = True + thread.start() + _LOGGER.debug('Started helper thread %s', thread.name) + self._thread = thread + + def stop(self): + with self._operational_lock: + self._stop_event.set() + + if self._thread is not None: + # The thread should automatically exit when the consumer is + # inactive. + self._thread.join() + + self._thread = None diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index aa0876798ad9..e9ffb794f82a 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -26,6 +26,7 @@ from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.subscriber._protocol import bidi from google.cloud.pubsub_v1.subscriber._protocol import dispatcher +from google.cloud.pubsub_v1.subscriber._protocol import heartbeater from google.cloud.pubsub_v1.subscriber._protocol import histogram from google.cloud.pubsub_v1.subscriber._protocol import leaser from google.cloud.pubsub_v1.subscriber._protocol import requests @@ -114,6 +115,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(), self._dispatcher = None self._leaser = None self._consumer = None + self._heartbeater = None @property def is_active(self): @@ -262,6 +264,15 @@ def send(self, request): else: self._rpc.send(request) + def heartbeat(self): + """Sends an empty request over the streaming pull RPC. + + This always sends over the stream, regardless of if + ``self._UNARY_REQUESTS`` is set or not. + """ + if self._rpc is not None and self._rpc.is_active: + self._rpc.send(types.StreamingPullRequest()) + def open(self, callback): """Begin consuming messages. @@ -291,6 +302,7 @@ def open(self, callback): self._consumer = bidi.BackgroundConsumer( self._rpc, self._on_response) self._leaser = leaser.Leaser(self) + self._heartbeater = heartbeater.Heartbeater(self) # Start the thread to pass the requests. self._dispatcher.start() @@ -301,6 +313,9 @@ def open(self, callback): # Start the lease maintainer thread. self._leaser.start() + # Start the stream heartbeater thread. + self._heartbeater.start() + def close(self, reason=None): """Stop consuming messages and shutdown all helper threads. @@ -331,6 +346,9 @@ def close(self, reason=None): _LOGGER.debug('Stopping dispatcher.') self._dispatcher.stop() self._dispatcher = None + _LOGGER.debug('Stopping heartbeater.') + self._heartbeater.stop() + self._heartbeater = None self._rpc = None self._closed = True diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_heartbeater.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_heartbeater.py new file mode 100644 index 000000000000..f9147a4d7e39 --- /dev/null +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_heartbeater.py @@ -0,0 +1,119 @@ +# Copyright 2018, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading + +from google.cloud.pubsub_v1.subscriber._protocol import heartbeater +from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager + +import mock +import pytest + + +def test_heartbeat_inactive(caplog): + caplog.set_level(logging.INFO) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + manager.is_active = False + + heartbeater_ = heartbeater.Heartbeater(manager) + + heartbeater_.heartbeat() + + assert 'exiting' in caplog.text + + +def test_heartbeat_stopped(caplog): + caplog.set_level(logging.INFO) + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + + heartbeater_ = heartbeater.Heartbeater(manager) + heartbeater_.stop() + + heartbeater_.heartbeat() + + assert 'exiting' in caplog.text + + +def make_sleep_mark_manager_as_inactive(heartbeater): + # Make sleep mark the manager as inactive so that heartbeat() + # exits at the end of the first run. + def trigger_inactive(timeout): + assert timeout + heartbeater._manager.is_active = False + + heartbeater._stop_event.wait = trigger_inactive + + +def test_heartbeat_once(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + heartbeater_ = heartbeater.Heartbeater(manager) + make_sleep_mark_manager_as_inactive(heartbeater_) + + heartbeater_.heartbeat() + + manager.heartbeat.assert_called_once() + + +@mock.patch('threading.Thread', autospec=True) +def test_start(thread): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + heartbeater_ = heartbeater.Heartbeater(manager) + + heartbeater_.start() + + thread.assert_called_once_with( + name=heartbeater._HEARTBEAT_WORKER_NAME, + target=heartbeater_.heartbeat) + + thread.return_value.start.assert_called_once() + + assert heartbeater_._thread is not None + + +@mock.patch('threading.Thread', autospec=True) +def test_start_already_started(thread): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + heartbeater_ = heartbeater.Heartbeater(manager) + heartbeater_._thread = mock.sentinel.thread + + with pytest.raises(ValueError): + heartbeater_.start() + + thread.assert_not_called() + + +def test_stop(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True) + heartbeater_ = heartbeater.Heartbeater(manager) + thread = mock.create_autospec(threading.Thread, instance=True) + heartbeater_._thread = thread + + heartbeater_.stop() + + assert heartbeater_._stop_event.is_set() + thread.join.assert_called_once() + assert heartbeater_._thread is None + + +def test_stop_no_join(): + heartbeater_ = heartbeater.Heartbeater(mock.sentinel.manager) + + heartbeater_.stop() diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 61d040a26fc1..829cc7580b42 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -23,6 +23,7 @@ from google.cloud.pubsub_v1.subscriber import scheduler from google.cloud.pubsub_v1.subscriber._protocol import bidi from google.cloud.pubsub_v1.subscriber._protocol import dispatcher +from google.cloud.pubsub_v1.subscriber._protocol import heartbeater from google.cloud.pubsub_v1.subscriber._protocol import leaser from google.cloud.pubsub_v1.subscriber._protocol import requests from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager @@ -216,6 +217,26 @@ def test_send_streaming(): manager._rpc.send.assert_called_once_with(mock.sentinel.request) +def test_heartbeat(): + manager = make_manager() + manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True) + manager._rpc.is_active = True + + manager.heartbeat() + + manager._rpc.send.assert_called_once_with(types.StreamingPullRequest()) + + +def test_heartbeat_inactive(): + manager = make_manager() + manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True) + manager._rpc.is_active = False + + manager.heartbeat() + + manager._rpc.send.assert_not_called() + + @mock.patch( 'google.cloud.pubsub_v1.subscriber._protocol.bidi.ResumableBidiRpc', autospec=True) @@ -228,11 +249,20 @@ def test_send_streaming(): @mock.patch( 'google.cloud.pubsub_v1.subscriber._protocol.dispatcher.Dispatcher', autospec=True) -def test_open(dispatcher, leaser, background_consumer, resumable_bidi_rpc): +@mock.patch( + 'google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater', + autospec=True) +def test_open( + heartbeater, dispatcher, leaser, background_consumer, + resumable_bidi_rpc): manager = make_manager() manager.open(mock.sentinel.callback) + heartbeater.assert_called_once_with(manager) + heartbeater.return_value.start.assert_called_once() + assert manager._heartbeater == heartbeater.return_value + dispatcher.assert_called_once_with(manager, manager._scheduler.queue) dispatcher.return_value.start.assert_called_once() assert manager._dispatcher == dispatcher.return_value @@ -285,27 +315,32 @@ def make_running_manager(): dispatcher.Dispatcher, instance=True) manager._leaser = mock.create_autospec( leaser.Leaser, instance=True) + manager._heartbeater = mock.create_autospec( + heartbeater.Heartbeater, instance=True) return ( manager, manager._consumer, manager._dispatcher, manager._leaser, - manager._scheduler) + manager._heartbeater, manager._scheduler) def test_close(): - manager, consumer, dispatcher, leaser, scheduler = make_running_manager() + manager, consumer, dispatcher, leaser, heartbeater, scheduler = ( + make_running_manager()) manager.close() consumer.stop.assert_called_once() leaser.stop.assert_called_once() dispatcher.stop.assert_called_once() + heartbeater.stop.assert_called_once() scheduler.shutdown.assert_called_once() assert manager.is_active is False def test_close_inactive_consumer(): - manager, consumer, dispatcher, leaser, scheduler = make_running_manager() + manager, consumer, dispatcher, leaser, heartbeater, scheduler = ( + make_running_manager()) consumer.is_active = False manager.close() @@ -313,11 +348,12 @@ def test_close_inactive_consumer(): consumer.stop.assert_not_called() leaser.stop.assert_called_once() dispatcher.stop.assert_called_once() + heartbeater.stop.assert_called_once() scheduler.shutdown.assert_called_once() def test_close_idempotent(): - manager, _, _, _, scheduler = make_running_manager() + manager, _, _, _, _, scheduler = make_running_manager() manager.close() manager.close() @@ -326,7 +362,7 @@ def test_close_idempotent(): def test_close_callbacks(): - manager, _, _, _, _ = make_running_manager() + manager, _, _, _, _, _ = make_running_manager() callback = mock.Mock() @@ -352,7 +388,7 @@ def test__get_initial_request(): def test_on_response(): - manager, _, dispatcher, _, scheduler = make_running_manager() + manager, _, dispatcher, _, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback # Set up the messages.