Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ def __init__(self, bidi_rpc, on_response):
self._paused = False
self._wake = threading.Condition()
self._thread = None
self._operational_lock = threading.Lock()

def _on_call_done(self, future):
# Resume the thread if it's paused, this prevents blocking forever
Expand Down Expand Up @@ -447,22 +448,26 @@ def _thread_main(self):

def start(self):
"""Start the background thread and begin consuming the thread."""
thread = threading.Thread(
name=_BIDIRECTIONAL_CONSUMER_NAME,
target=self._thread_main)
thread.daemon = True
thread.start()
self._thread = thread
_LOGGER.debug('Started helper thread %s', thread.name)
with self._operational_lock:
thread = threading.Thread(
name=_BIDIRECTIONAL_CONSUMER_NAME,
target=self._thread_main)
thread.daemon = True
thread.start()
self._thread = thread
_LOGGER.debug('Started helper thread %s', thread.name)

def stop(self):
"""Stop consuming the stream and shutdown the background thread."""
self._bidi_rpc.close()
with self._operational_lock:
self._bidi_rpc.close()

if self._thread is not None:
self._thread.join()
if self._thread is not None:
# Resume the thread to wake it up in case it is sleeping.
self.resume()
self._thread.join()

self._thread = None
self._thread = None

@property
def is_active(self):
Expand Down
137 changes: 103 additions & 34 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import threading

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests

Expand All @@ -27,10 +28,45 @@


class Dispatcher(object):
def __init__(self, queue, subscriber):
def __init__(self, manager, queue):

This comment was marked as spam.

This comment was marked as spam.

self._manager = manager
self._queue = queue
self._subscriber = subscriber
self._thread = None
self._operational_lock = threading.Lock()

def start(self):
"""Start a thread to dispatch requests queued up by callbacks.
Spawns a thread to run :meth:`dispatch_callback`.
"""
with self._operational_lock:
if self._thread is not None:
raise ValueError('Dispatcher is already running.')

flow_control = self._manager.flow_control
worker = helper_threads.QueueCallbackWorker(
self._queue,
self.dispatch_callback,
max_items=flow_control.max_request_batch_size,
max_latency=flow_control.max_request_batch_latency
)
# Create and start the helper thread.
thread = threading.Thread(
name=_CALLBACK_WORKER_NAME,
target=worker,
)
thread.daemon = True
thread.start()
_LOGGER.debug('Started helper thread %s', thread.name)
self._thread = thread

def stop(self):
with self._operational_lock:
if self._thread is not None:
# Signal the worker to stop by queueing a "poison pill"
self._queue.put(helper_threads.STOP)
self._thread.join()

self._thread = None

def dispatch_callback(self, items):
"""Map the callback request to the appropriate gRPC request.
Expand All @@ -44,7 +80,7 @@ def dispatch_callback(self, items):
ValueError: If ``action`` isn't one of the expected actions
"ack", "drop", "lease", "modify_ack_deadline" or "nack".
"""
if not self._subscriber.is_active:
if not self._manager.is_active:
return

batched_commands = collections.defaultdict(list)
Expand All @@ -55,46 +91,79 @@ def dispatch_callback(self, items):
_LOGGER.debug('Handling %d batched requests', len(items))

if batched_commands[requests.LeaseRequest]:
self._subscriber.lease(batched_commands.pop(requests.LeaseRequest))
self.lease(batched_commands.pop(requests.LeaseRequest))
if batched_commands[requests.ModAckRequest]:
self._subscriber.modify_ack_deadline(
self.modify_ack_deadline(
batched_commands.pop(requests.ModAckRequest))
# Note: Drop and ack *must* be after lease. It's possible to get both
# the lease the and ack/drop request in the same batch.
if batched_commands[requests.AckRequest]:
self._subscriber.ack(batched_commands.pop(requests.AckRequest))
self.ack(batched_commands.pop(requests.AckRequest))
if batched_commands[requests.NackRequest]:
self._subscriber.nack(batched_commands.pop(requests.NackRequest))
self.nack(batched_commands.pop(requests.NackRequest))
if batched_commands[requests.DropRequest]:
self._subscriber.drop(batched_commands.pop(requests.DropRequest))
self.drop(batched_commands.pop(requests.DropRequest))

def start(self):
"""Start a thread to dispatch requests queued up by callbacks.
Spawns a thread to run :meth:`dispatch_callback`.
def ack(self, items):
"""Acknowledge the given messages.

Args:
items(Sequence[AckRequest]): The items to acknowledge.
"""
if self._thread is not None:
raise ValueError('Dispatcher is already running.')

worker = helper_threads.QueueCallbackWorker(
self._queue,
self.dispatch_callback,
max_items=self._subscriber.flow_control.max_request_batch_size,
max_latency=self._subscriber.flow_control.max_request_batch_latency
)
# Create and start the helper thread.
thread = threading.Thread(
name=_CALLBACK_WORKER_NAME,
target=worker,
# If we got timing information, add it to the histogram.
for item in items:
time_to_ack = item.time_to_ack
if time_to_ack is not None:
self._manager.ack_histogram.add(time_to_ack)

ack_ids = [item.ack_id for item in items]
request = types.StreamingPullRequest(ack_ids=ack_ids)
self._manager.send(request)

# Remove the message from lease management.
self.drop(items)

def drop(self, items):
"""Remove the given messages from lease management.

Args:
items(Sequence[DropRequest]): The items to drop.
"""
self._manager.leaser.remove(items)
self._manager.maybe_resume_consumer()

def lease(self, items):
"""Add the given messages to lease management.

Args:
items(Sequence[LeaseRequest]): The items to lease.
"""
self._manager.leaser.add(items)
self._manager.maybe_pause_consumer()

def modify_ack_deadline(self, items):
"""Modify the ack deadline for the given messages.

Args:
items(Sequence[ModAckRequest]): The items to modify.
"""
ack_ids = [item.ack_id for item in items]
seconds = [item.seconds for item in items]

request = types.StreamingPullRequest(
modify_deadline_ack_ids=ack_ids,
modify_deadline_seconds=seconds,
)
thread.daemon = True
thread.start()
_LOGGER.debug('Started helper thread %s', thread.name)
self._thread = thread
self._manager.send(request)

def stop(self):
if self._thread is not None:
# Signal the worker to stop by queueing a "poison pill"
self._queue.put(helper_threads.STOP)
self._thread.join()
def nack(self, items):
"""Explicitly deny receipt of messages.

self._thread = None
Args:
items(Sequence[NackRequest]): The items to deny.
"""
self.modify_ack_deadline([
requests.ModAckRequest(ack_id=item.ack_id, seconds=0)
for item in items])
self.drop(
[requests.DropRequest(*item) for item in items])
57 changes: 30 additions & 27 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@


class Leaser(object):
def __init__(self, subscriber):
def __init__(self, manager):
self._thread = None
self._subscriber = subscriber
self._operational_lock = threading.Lock()
self._manager = manager

self._leased_messages = {}
"""dict[str, float]: A mapping of ack IDs to the local time when the
Expand Down Expand Up @@ -93,17 +94,17 @@ def remove(self, items):
self._bytes = 0

def maintain_leases(self):
"""Maintain all of the leases being managed by the subscriber.
"""Maintain all of the leases being managed.

This method modifies the ack deadline for all of the managed
ack IDs, then waits for most of that time (but with jitter), and
repeats.
"""
while self._subscriber.is_active and not self._stop_event.is_set():
while self._manager.is_active and not self._stop_event.is_set():
# Determine the appropriate duration for the lease. This is
# based off of how long previous messages have taken to ack, with
# a sensible default and within the ranges allowed by Pub/Sub.
p99 = self._subscriber.ack_histogram.percentile(99)
p99 = self._manager.ack_histogram.percentile(99)
_LOGGER.debug('The current p99 value is %d seconds.', p99)

# Make a copy of the leased messages. This is needed because it's
Expand All @@ -116,7 +117,7 @@ def maintain_leases(self):
# drop messages and allow Pub/Sub to resend them.
cutoff = (
time.time() -
self._subscriber.flow_control.max_lease_duration)
self._manager.flow_control.max_lease_duration)
to_drop = [
requests.DropRequest(ack_id, item.size)
for ack_id, item
Expand All @@ -127,11 +128,11 @@ def maintain_leases(self):
_LOGGER.warning(
'Dropping %s items because they were leased too long.',
len(to_drop))
self._subscriber.drop(to_drop)
self._manager.dispatcher.drop(to_drop)

# Remove dropped items from our copy of the leased messages (they
# have already been removed from the real one by
# self._subscriber.drop(), which calls self.remove()).
# self._manager.drop(), which calls self.remove()).
for item in to_drop:
leased_messages.pop(item.ack_id)

Expand All @@ -147,7 +148,7 @@ def maintain_leases(self):
# without any sort of race condition would require a
# way for ``send_request`` to fail when the consumer
# is inactive.
self._subscriber.modify_ack_deadline([
self._manager.dispatcher.modify_ack_deadline([
requests.ModAckRequest(ack_id, p99) for ack_id in ack_ids])

# Now wait an appropriate period of time and do this again.
Expand All @@ -163,25 +164,27 @@ def maintain_leases(self):
_LOGGER.info('%s exiting.', _LEASE_WORKER_NAME)

def start(self):
if self._thread is not None:
raise ValueError('Leaser is already running.')

# Create and start the helper thread.
self._stop_event.clear()
thread = threading.Thread(
name=_LEASE_WORKER_NAME,
target=self.maintain_leases)
thread.daemon = True
thread.start()
_LOGGER.debug('Started helper thread %s', thread.name)
self._thread = thread
with self._operational_lock:
if self._thread is not None:
raise ValueError('Leaser is already running.')

# Create and start the helper thread.
self._stop_event.clear()
thread = threading.Thread(
name=_LEASE_WORKER_NAME,
target=self.maintain_leases)
thread.daemon = True
thread.start()
_LOGGER.debug('Started helper thread %s', thread.name)
self._thread = thread

def stop(self):
self._stop_event.set()
with self._operational_lock:
self._stop_event.set()

if self._thread is not None:
# The thread should automatically exit when the consumer is
# inactive.
self._thread.join()
if self._thread is not None:
# The thread should automatically exit when the consumer is
# inactive.
self._thread.join()

self._thread = None
self._thread = None
Loading