Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2018, Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import logging
import threading


_LOGGER = logging.getLogger(__name__)
_HEARTBEAT_WORKER_NAME = 'Thread-Heartbeater'
# How often to send heartbeats in seconds. Determined as half the period of
# time where the Pub/Sub server will close the stream as inactive, which is
# 60 seconds.
_DEFAULT_PERIOD = 30


class Heartbeater(object):
def __init__(self, manager, period=_DEFAULT_PERIOD):
self._thread = None
self._operational_lock = threading.Lock()
self._manager = manager
self._stop_event = threading.Event()
self._period = period

def heartbeat(self):
"""Periodically send heartbeats."""
while self._manager.is_active and not self._stop_event.is_set():
self._manager.heartbeat()
_LOGGER.debug('Sent heartbeat.')
self._stop_event.wait(timeout=self._period)

_LOGGER.info('%s exiting.', _HEARTBEAT_WORKER_NAME)

def start(self):
with self._operational_lock:
if self._thread is not None:
raise ValueError('Heartbeater is already running.')

# Create and start the helper thread.
self._stop_event.clear()
thread = threading.Thread(
name=_HEARTBEAT_WORKER_NAME,
target=self.heartbeat)
thread.daemon = True
thread.start()
_LOGGER.debug('Started helper thread %s', thread.name)
self._thread = thread

def stop(self):
with self._operational_lock:
self._stop_event.set()

This comment was marked as spam.

This comment was marked as spam.


if self._thread is not None:
# The thread should automatically exit when the consumer is
# inactive.
self._thread.join()

self._thread = None
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber._protocol import bidi
from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
from google.cloud.pubsub_v1.subscriber._protocol import histogram
from google.cloud.pubsub_v1.subscriber._protocol import leaser
from google.cloud.pubsub_v1.subscriber._protocol import requests
Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
self._dispatcher = None
self._leaser = None
self._consumer = None
self._heartbeater = None

@property
def is_active(self):
Expand Down Expand Up @@ -262,6 +264,15 @@ def send(self, request):
else:
self._rpc.send(request)

def heartbeat(self):
"""Sends an empty request over the streaming pull RPC.

This always sends over the stream, regardless of if
``self._UNARY_REQUESTS`` is set or not.
"""
if self._rpc is not None and self._rpc.is_active:
self._rpc.send(types.StreamingPullRequest())

def open(self, callback):
"""Begin consuming messages.

Expand Down Expand Up @@ -291,6 +302,7 @@ def open(self, callback):
self._consumer = bidi.BackgroundConsumer(
self._rpc, self._on_response)
self._leaser = leaser.Leaser(self)
self._heartbeater = heartbeater.Heartbeater(self)

# Start the thread to pass the requests.
self._dispatcher.start()
Expand All @@ -301,6 +313,9 @@ def open(self, callback):
# Start the lease maintainer thread.
self._leaser.start()

# Start the stream heartbeater thread.
self._heartbeater.start()

def close(self, reason=None):
"""Stop consuming messages and shutdown all helper threads.

Expand Down Expand Up @@ -331,6 +346,9 @@ def close(self, reason=None):
_LOGGER.debug('Stopping dispatcher.')
self._dispatcher.stop()
self._dispatcher = None
_LOGGER.debug('Stopping heartbeater.')
self._heartbeater.stop()
self._heartbeater = None

self._rpc = None
self._closed = True
Expand Down
119 changes: 119 additions & 0 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_heartbeater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright 2018, Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import threading

from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager

import mock
import pytest


def test_heartbeat_inactive(caplog):
caplog.set_level(logging.INFO)
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True)
manager.is_active = False

heartbeater_ = heartbeater.Heartbeater(manager)

heartbeater_.heartbeat()

assert 'exiting' in caplog.text


def test_heartbeat_stopped(caplog):
caplog.set_level(logging.INFO)
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True)

heartbeater_ = heartbeater.Heartbeater(manager)
heartbeater_.stop()

heartbeater_.heartbeat()

assert 'exiting' in caplog.text


def make_sleep_mark_manager_as_inactive(heartbeater):
# Make sleep mark the manager as inactive so that heartbeat()
# exits at the end of the first run.
def trigger_inactive(timeout):
assert timeout
heartbeater._manager.is_active = False

heartbeater._stop_event.wait = trigger_inactive


def test_heartbeat_once():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True)
heartbeater_ = heartbeater.Heartbeater(manager)
make_sleep_mark_manager_as_inactive(heartbeater_)

heartbeater_.heartbeat()

manager.heartbeat.assert_called_once()


@mock.patch('threading.Thread', autospec=True)
def test_start(thread):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True)
heartbeater_ = heartbeater.Heartbeater(manager)

heartbeater_.start()

thread.assert_called_once_with(
name=heartbeater._HEARTBEAT_WORKER_NAME,
target=heartbeater_.heartbeat)

thread.return_value.start.assert_called_once()

assert heartbeater_._thread is not None


@mock.patch('threading.Thread', autospec=True)
def test_start_already_started(thread):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True)
heartbeater_ = heartbeater.Heartbeater(manager)
heartbeater_._thread = mock.sentinel.thread

with pytest.raises(ValueError):
heartbeater_.start()

thread.assert_not_called()


def test_stop():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True)
heartbeater_ = heartbeater.Heartbeater(manager)
thread = mock.create_autospec(threading.Thread, instance=True)
heartbeater_._thread = thread

heartbeater_.stop()

assert heartbeater_._stop_event.is_set()
thread.join.assert_called_once()
assert heartbeater_._thread is None


def test_stop_no_join():
heartbeater_ = heartbeater.Heartbeater(mock.sentinel.manager)

heartbeater_.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.pubsub_v1.subscriber import scheduler
from google.cloud.pubsub_v1.subscriber._protocol import bidi
from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
from google.cloud.pubsub_v1.subscriber._protocol import leaser
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
Expand Down Expand Up @@ -216,6 +217,26 @@ def test_send_streaming():
manager._rpc.send.assert_called_once_with(mock.sentinel.request)


def test_heartbeat():
manager = make_manager()
manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
manager._rpc.is_active = True

manager.heartbeat()

manager._rpc.send.assert_called_once_with(types.StreamingPullRequest())


def test_heartbeat_inactive():
manager = make_manager()
manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
manager._rpc.is_active = False

manager.heartbeat()

manager._rpc.send.assert_not_called()


@mock.patch(
'google.cloud.pubsub_v1.subscriber._protocol.bidi.ResumableBidiRpc',
autospec=True)
Expand All @@ -228,11 +249,20 @@ def test_send_streaming():
@mock.patch(
'google.cloud.pubsub_v1.subscriber._protocol.dispatcher.Dispatcher',
autospec=True)
def test_open(dispatcher, leaser, background_consumer, resumable_bidi_rpc):
@mock.patch(
'google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater',
autospec=True)
def test_open(
heartbeater, dispatcher, leaser, background_consumer,
resumable_bidi_rpc):
manager = make_manager()

manager.open(mock.sentinel.callback)

heartbeater.assert_called_once_with(manager)
heartbeater.return_value.start.assert_called_once()
assert manager._heartbeater == heartbeater.return_value

dispatcher.assert_called_once_with(manager, manager._scheduler.queue)
dispatcher.return_value.start.assert_called_once()
assert manager._dispatcher == dispatcher.return_value
Expand Down Expand Up @@ -285,39 +315,45 @@ def make_running_manager():
dispatcher.Dispatcher, instance=True)
manager._leaser = mock.create_autospec(
leaser.Leaser, instance=True)
manager._heartbeater = mock.create_autospec(
heartbeater.Heartbeater, instance=True)

return (
manager, manager._consumer, manager._dispatcher, manager._leaser,
manager._scheduler)
manager._heartbeater, manager._scheduler)


def test_close():
manager, consumer, dispatcher, leaser, scheduler = make_running_manager()
manager, consumer, dispatcher, leaser, heartbeater, scheduler = (
make_running_manager())

manager.close()

consumer.stop.assert_called_once()
leaser.stop.assert_called_once()
dispatcher.stop.assert_called_once()
heartbeater.stop.assert_called_once()
scheduler.shutdown.assert_called_once()

assert manager.is_active is False


def test_close_inactive_consumer():
manager, consumer, dispatcher, leaser, scheduler = make_running_manager()
manager, consumer, dispatcher, leaser, heartbeater, scheduler = (
make_running_manager())
consumer.is_active = False

manager.close()

consumer.stop.assert_not_called()
leaser.stop.assert_called_once()
dispatcher.stop.assert_called_once()
heartbeater.stop.assert_called_once()
scheduler.shutdown.assert_called_once()


def test_close_idempotent():
manager, _, _, _, scheduler = make_running_manager()
manager, _, _, _, _, scheduler = make_running_manager()

manager.close()
manager.close()
Expand All @@ -326,7 +362,7 @@ def test_close_idempotent():


def test_close_callbacks():
manager, _, _, _, _ = make_running_manager()
manager, _, _, _, _, _ = make_running_manager()

callback = mock.Mock()

Expand All @@ -352,7 +388,7 @@ def test__get_initial_request():


def test_on_response():
manager, _, dispatcher, _, scheduler = make_running_manager()
manager, _, dispatcher, _, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback

# Set up the messages.
Expand Down