Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@

from google.cloud.pubsub_v1.subscriber import _helper_threads


_LOGGER = logging.getLogger(__name__)
_BIDIRECTIONAL_CONSUMER_NAME = 'ConsumeBidirectionalStream'


class Consumer(object):
Expand Down Expand Up @@ -250,7 +252,7 @@ def start_consuming(self):
self.active = True
self._exiting.clear()
self.helper_threads.start(
'ConsumeBidirectionalStream',
_BIDIRECTIONAL_CONSUMER_NAME,
self._request_queue,
self._blocking_consume,
)
Expand Down
32 changes: 23 additions & 9 deletions pubsub/google/cloud/pubsub_v1/subscriber/_helper_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def start(self, name, queue, target):

# Keep track of the helper thread, so we are able to stop it.
self._helper_threads[name] = _HelperThread(name, thread, queue)
_LOGGER.debug('Started helper thread {}'.format(name))
_LOGGER.debug('Started helper thread %s', name)
return thread

def stop(self, name):
Expand All @@ -86,7 +86,7 @@ def stop(self, name):

# Join the thread if it is still alive.
if helper_thread.thread.is_alive():
_LOGGER.debug('Stopping helper thread {}'.format(name))
_LOGGER.debug('Stopping helper thread %s', name)
helper_thread.queue.put(STOP)
helper_thread.thread.join()

Expand All @@ -102,9 +102,25 @@ def stop_all(self):


class QueueCallbackThread(object):
"""A helper thread that executes a callback for every item in
the queue.
"""A helper that executes a callback for every item in the queue.

.. note::

This is not actually a thread, but it is intended to be a target
for a thread.

Calls a blocking ``get()`` on the ``queue`` until it encounters
:attr:`STOP`.

Args:
queue (~queue.Queue): A Queue instance, appropriate for crossing the
concurrency boundary implemented by ``executor``. Items will
be popped off (with a blocking ``get()``) until :attr:`STOP`
is encountered.
callback (Callable): A callback that can process items pulled off
of the queue.
"""

def __init__(self, queue, callback):
self.queue = queue
self._callback = callback
Expand All @@ -113,14 +129,12 @@ def __call__(self):
while True:
item = self.queue.get()
if item == STOP:
break
_LOGGER.debug('Exiting the QueueCallbackThread.')
return

# Run the callback. If any exceptions occur, log them and
# continue.
try:
self._callback(item)
except Exception as exc:
_LOGGER.error('{class_}: {message}'.format(
class_=exc.__class__.__name__,
message=str(exc),
))
_LOGGER.error('%s: %s', exc.__class__.__name__, exc)
68 changes: 47 additions & 21 deletions pubsub/google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Message(object):
publish_time (datetime): The time that this message was originally
published.
"""

def __init__(self, message, ack_id, request_queue):
"""Construct the Message.

Expand Down Expand Up @@ -128,11 +129,16 @@ def ack(self):
receive any given message more than once.
"""
time_to_ack = math.ceil(time.time() - self._received_timestamp)
self._request_queue.put(('ack', {
'ack_id': self._ack_id,
'byte_size': self.size,
'time_to_ack': time_to_ack,
}))
self._request_queue.put(
(
'ack',
{
'ack_id': self._ack_id,
'byte_size': self.size,
'time_to_ack': time_to_ack,
},
),
)

def drop(self):
"""Release the message from lease management.
Expand All @@ -147,10 +153,15 @@ def drop(self):
both call this one. You probably do not want to call this method
directly.
"""
self._request_queue.put(('drop', {
'ack_id': self._ack_id,
'byte_size': self.size,
}))
self._request_queue.put(
(
'drop',
{
'ack_id': self._ack_id,
'byte_size': self.size,
},
),
)

def lease(self):
"""Inform the policy to lease this message continually.
Expand All @@ -159,10 +170,15 @@ def lease(self):
This method is called by the constructor, and you should never
need to call it manually.
"""
self._request_queue.put(('lease', {
'ack_id': self._ack_id,
'byte_size': self.size,
}))
self._request_queue.put(
(
'lease',
{
'ack_id': self._ack_id,
'byte_size': self.size,
},
),
)

def modify_ack_deadline(self, seconds):
"""Set the deadline for acknowledgement to the given value.
Expand All @@ -182,17 +198,27 @@ def modify_ack_deadline(self, seconds):
to. This should be between 0 and 600. Due to network latency,
values below 10 are advised against.
"""
self._request_queue.put(('modify_ack_deadline', {
'ack_id': self._ack_id,
'seconds': seconds,
}))
self._request_queue.put(
(
'modify_ack_deadline',
{
'ack_id': self._ack_id,
'seconds': seconds,
},
),
)

def nack(self):
"""Decline to acknowldge the given message.

This will cause the message to be re-delivered to the subscription.
"""
self._request_queue.put(('nack', {
'ack_id': self._ack_id,
'byte_size': self.size,
}))
self._request_queue.put(
(
'nack',
{
'ack_id': self._ack_id,
'byte_size': self.size,
},
),
)
11 changes: 6 additions & 5 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@


_LOGGER = logging.getLogger(__name__)
_CALLBACK_WORKER_NAME = 'CallbackRequestsWorker'


def _callback_completed(future):
Expand Down Expand Up @@ -104,10 +105,10 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
)

# Also maintain a request queue and an executor.
_LOGGER.debug('Creating callback requests thread (not starting).')
if executor is None:
executor = futures.ThreadPoolExecutor(max_workers=10)
self._executor = executor
_LOGGER.debug('Creating callback requests thread (not starting).')
self._callback_requests = _helper_threads.QueueCallbackThread(
self._request_queue,
self.on_callback_request,
Expand All @@ -116,7 +117,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
def close(self):
"""Close the existing connection."""
# Stop consuming messages.
self._consumer.helper_threads.stop('callback requests worker')
self._consumer.helper_threads.stop(_CALLBACK_WORKER_NAME)
self._consumer.stop_consuming()

# The subscription is closing cleanly; resolve the future if it is not
Expand Down Expand Up @@ -149,7 +150,7 @@ def open(self, callback):
_LOGGER.debug('Starting callback requests worker.')
self._callback = callback
self._consumer.helper_threads.start(
'CallbackRequestsWorker',
_CALLBACK_WORKER_NAME,
self._request_queue,
self._callback_requests,
)
Expand All @@ -159,7 +160,7 @@ def open(self, callback):

# Spawn a helper thread that maintains all of the leases for
# this policy.
_LOGGER.debug('Spawning lease maintenance worker.')
_LOGGER.debug('Starting lease maintenance worker.')
self._leaser = threading.Thread(
name='Thread-LeaseMaintenance',
target=self.maintain_leases,
Expand All @@ -171,7 +172,7 @@ def open(self, callback):
return self._future

def on_callback_request(self, callback_request):
"""Map the callback request to the appropriate GRPC request."""
"""Map the callback request to the appropriate gRPC request."""
action, kwargs = callback_request[0], callback_request[1]
getattr(self, action)(**kwargs)

Expand Down