Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@

from six.moves import queue

from google.cloud.pubsub_v1.subscriber import _helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import helper_threads


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -212,7 +212,7 @@ def __iter__(self):

# A call to consumer.close() signaled us to stop generating
# requests.
if item == _helper_threads.STOP:
if item == helper_threads.STOP:
_LOGGER.debug('Cleanly exiting request generator.')
return

Expand Down Expand Up @@ -453,7 +453,7 @@ def _stop_no_join(self):
self._stopped.set()
_LOGGER.debug('Stopping helper thread %s', self._consumer_thread.name)
# Signal the request generator RPC to exit cleanly.
self.send_request(_helper_threads.STOP)
self.send_request(helper_threads.STOP)
thread = self._consumer_thread
self._consumer_thread = None
return thread
Expand Down
100 changes: 100 additions & 0 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright 2017, Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import collections
import logging
import threading

from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests


_LOGGER = logging.getLogger(__name__)
_CALLBACK_WORKER_NAME = 'Thread-CallbackRequestDispatcher'


class Dispatcher(object):
def __init__(self, queue, subscriber):
self._queue = queue
self._subscriber = subscriber
self._thread = None

def dispatch_callback(self, items):
"""Map the callback request to the appropriate gRPC request.

Args:
action (str): The method to be invoked.
kwargs (Dict[str, Any]): The keyword arguments for the method
specified by ``action``.

Raises:
ValueError: If ``action`` isn't one of the expected actions
"ack", "drop", "lease", "modify_ack_deadline" or "nack".
"""
if not self._subscriber.is_active:
return

batched_commands = collections.defaultdict(list)

for item in items:
batched_commands[item.__class__].append(item)

_LOGGER.debug('Handling %d batched requests', len(items))

if batched_commands[requests.LeaseRequest]:
self._subscriber.lease(batched_commands.pop(requests.LeaseRequest))
if batched_commands[requests.ModAckRequest]:
self._subscriber.modify_ack_deadline(
batched_commands.pop(requests.ModAckRequest))
# Note: Drop and ack *must* be after lease. It's possible to get both
# the lease the and ack/drop request in the same batch.
if batched_commands[requests.AckRequest]:
self._subscriber.ack(batched_commands.pop(requests.AckRequest))
if batched_commands[requests.NackRequest]:
self._subscriber.nack(batched_commands.pop(requests.NackRequest))
if batched_commands[requests.DropRequest]:
self._subscriber.drop(batched_commands.pop(requests.DropRequest))

def start(self):
"""Start a thread to dispatch requests queued up by callbacks.
Spawns a thread to run :meth:`dispatch_callback`.
"""
if self._thread is not None:
raise ValueError('Dispatcher is already running.')

worker = helper_threads.QueueCallbackWorker(
self._queue,
self.dispatch_callback,
max_items=self._subscriber.flow_control.max_request_batch_size,
max_latency=self._subscriber.flow_control.max_request_batch_latency
)
# Create and start the helper thread.
thread = threading.Thread(
name=_CALLBACK_WORKER_NAME,
target=worker,
)
thread.daemon = True
thread.start()
_LOGGER.debug('Started helper thread %s', thread.name)
self._thread = thread

def stop(self):
if self._thread is not None:
# Signal the worker to stop by queueing a "poison pill"
self._queue.put(helper_threads.STOP)
self._thread.join()

self._thread = None
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _get_many(queue_, max_items=None, max_latency=0):


class QueueCallbackWorker(object):
"""A helper that executes a callback for every item in the queue.
"""A helper that executes a callback for items sent in a queue.

Calls a blocking ``get()`` on the ``queue`` until it encounters
:attr:`STOP`.
Expand Down
46 changes: 46 additions & 0 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2017, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Base class for concurrency policy."""

from __future__ import absolute_import, division

import collections

# Namedtuples for management requests. Used by the Message class to communicate
# items of work back to the policy.
AckRequest = collections.namedtuple(
'AckRequest',
['ack_id', 'byte_size', 'time_to_ack'],
)

DropRequest = collections.namedtuple(
'DropRequest',
['ack_id', 'byte_size'],
)

LeaseRequest = collections.namedtuple(
'LeaseRequest',
['ack_id', 'byte_size'],
)

ModAckRequest = collections.namedtuple(
'ModAckRequest',
['ack_id', 'seconds'],
)

NackRequest = collections.namedtuple(
'NackRequest',
['ack_id', 'byte_size'],
)
6 changes: 3 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from six.moves import queue as queue_mod

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber import _helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber.futures import Future
from google.cloud.pubsub_v1.subscriber.policy import base
from google.cloud.pubsub_v1.subscriber.message import Message
Expand Down Expand Up @@ -158,7 +158,7 @@ def close(self):
raise ValueError('This policy has not been opened yet.')

# Stop consuming messages.
self._request_queue.put(_helper_threads.STOP)
self._request_queue.put(helper_threads.STOP)
self._dispatch_thread.join() # Wait until stopped.
self._dispatch_thread = None
self._consumer.stop_consuming()
Expand Down Expand Up @@ -186,7 +186,7 @@ def _start_dispatch(self):
"dispatch thread" member on the current policy.
"""
_LOGGER.debug('Starting callback requests worker.')
dispatch_worker = _helper_threads.QueueCallbackWorker(
dispatch_worker = helper_threads.QueueCallbackWorker(
self._request_queue,
self.dispatch_callback,
max_items=self.flow_control.max_request_batch_size,
Expand Down
83 changes: 83 additions & 0 deletions pubsub/google/cloud/pubsub_v1/subscriber/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2017, Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.cloud.pubsub_v1 import types


class Subscriber(object):
"""A consumer class based on :class:`threading.Thread`.

This consumer handles the connection to the Pub/Sub service and all of
the concurrency needs.

Args:
client (~.pubsub_v1.subscriber.client): The subscriber client used
to create this instance.
subscription (str): The name of the subscription. The canonical
format for this is
``projects/{project}/subscriptions/{subscription}``.
flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow
control settings.
executor (~concurrent.futures.ThreadPoolExecutor): (Optional.) A
ThreadPoolExecutor instance, or anything duck-type compatible
with it.
queue (~queue.Queue): (Optional.) A Queue instance, appropriate
for crossing the concurrency boundary implemented by
``executor``.
"""

def __init__(self, client, subscription, flow_control=types.FlowControl(),
scheduler_cls=None):
raise NotImplementedError

@property
def is_active(self):
raise NotImplementedError

@property
def flow_control(self):
raise NotImplementedError

@property
def future(self):
raise NotImplementedError

#
# User-facing subscriber management methods.
#

def open(self, callback):
raise NotImplementedError

def close(self):
raise NotImplementedError

#
# Message management methods
#

def ack(self, items):
raise NotImplementedError

def drop(self, items):
raise NotImplementedError

def lease(self, items):
raise NotImplementedError

def modify_ack_deadline(self, items):
raise NotImplementedError

def nack(self, items):
raise NotImplementedError
4 changes: 2 additions & 2 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from google.cloud.pubsub_v1 import subscriber
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber import _consumer
from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber.policy import base
from google.cloud.pubsub_v1.subscriber import _helper_threads


class Test_RequestQueueGenerator(object):
Expand Down Expand Up @@ -93,7 +93,7 @@ def test_exit_when_inactive_empty(self):

def test_exit_with_stop(self):
q = mock.create_autospec(queue.Queue, instance=True)
q.get.side_effect = [_helper_threads.STOP, queue.Empty()]
q.get.side_effect = [helper_threads.STOP, queue.Empty()]
rpc = mock.create_autospec(grpc.RpcContext, instance=True)
rpc.is_active.return_value = True

Expand Down
Loading