Skip to content

Commit

Permalink
🦉 Updates from OwlBot post-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-owl-bot[bot] committed Apr 18, 2024
1 parent ab779ff commit ae30761
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
52 changes: 26 additions & 26 deletions google/cloud/pubsub_v1/subscriber/_protocol/initial_modack.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,60 @@
_INITIAL_MODACK_SLEEP_DURATION = 0.1
_LEASE_WORKER_NAME = "Thread-InitialModAck"


class InitialModack(object):
def __init__(self, manager: "StreamingPullManager"):
self._thread: Optional[threading.Thread] = None
self._manager = manager

# a lock used for start/stop operations, protecting the _thread attribute
self._operational_lock = threading.Lock()

self._pending_initial_modacks = iter(())

# A lock ensuring that add/remove operations are atomic and cannot be
# intertwined. Protects _pending_initial_modacks
self._add_remove_lock = threading.Lock()

self._stop_event = threading.Event()



def add(self, ack_ids) -> None:
with self._add_remove_lock:
self._pending_initial_modacks = itertools.chain(self._pending_initial_modacks, ack_ids)


self._pending_initial_modacks = itertools.chain(
self._pending_initial_modacks, ack_ids
)

def modack(self) -> None:
#print(f"mk: initial_modack.modack() called at {datetime.datetime.now()}")
# print(f"mk: initial_modack.modack() called at {datetime.datetime.now()}")
while not self._stop_event.is_set():

with self._add_remove_lock:
copy_pending_initial_modacks = itertools.chain(iter(()), self._pending_initial_modacks)
copy_pending_initial_modacks = itertools.chain(
iter(()), self._pending_initial_modacks
)
self._pending_initial_modacks = iter(())
#print(f"mk: initial_modack.modack() self._pending_initial_modacks: {list(self._pending_initial_modacks)}")
# print(f"mk: initial_modack.modack() self._pending_initial_modacks: {list(self._pending_initial_modacks)}")

self._manager._send_lease_modacks(
copy_pending_initial_modacks, self._manager.ack_deadline, warn_on_invalid=False
copy_pending_initial_modacks,
self._manager.ack_deadline,
warn_on_invalid=False,
)

self._stop_event.wait(timeout=_INITIAL_MODACK_SLEEP_DURATION)



def start(self) -> None:
with self._operational_lock:
if self._thread is not None:
raise ValueError("InitialModack is already running.")

# Create and start the helper thread.
self._stop_event.clear()
thread = threading.Thread(
name=_LEASE_WORKER_NAME, target=self.modack
)
thread = threading.Thread(name=_LEASE_WORKER_NAME, target=self.modack)
thread.daemon = True
thread.start()
#print("mk: initial_modack.start(): initial_modack thread started")
# print("mk: initial_modack.start(): initial_modack thread started")
self._thread = thread



def stop(self) -> None:
with self._operational_lock:
self._stop_event.set()
Expand All @@ -76,5 +76,5 @@ def stop(self) -> None:
# inactive.
self._thread.join()

#print("mk: initial_modack.stop(): initial_modack thread stopped")
self._thread = None
# print("mk: initial_modack.stop(): initial_modack thread stopped")
self._thread = None
1 change: 0 additions & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def bytes(self) -> int:
"""The total size, in bytes, of all leased messages."""
return self._bytes


def add(self, items: Iterable[requests.LeaseRequest]) -> None:
"""Add messages to be managed by the leaser."""
with self._add_remove_lock:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def dispatcher(self) -> Optional[dispatcher.Dispatcher]:
def leaser(self) -> Optional[leaser.Leaser]:
"""The leaser helper."""
return self._leaser

@property
def initial_modack(self) -> Optional[initial_modack.InitialModack]:
"""The InitialModack helper."""
Expand Down Expand Up @@ -884,7 +884,7 @@ def open(

# Start the lease maintainer thread.
self._leaser.start()

# Start the InitialModack maintainer thread.
self._initial_modack.start()

Expand Down Expand Up @@ -952,7 +952,7 @@ def _shutdown(self, reason: Any = None) -> None:
_LOGGER.debug("Stopping leaser.")
assert self._leaser is not None
self._leaser.stop()

# Stop the initial_modack thread.
assert self._initial_modack is not None
self._initial_modack.stop()
Expand Down Expand Up @@ -1060,7 +1060,7 @@ def _send_lease_modacks(
requests.ModAckRequest(ack_id, self.ack_deadline, None)
for ack_id in ack_ids
]
#if len(items) > 0:
# if len(items) > 0:
# print(f"mk:_send_lease_modacks items: {items}")
# print(f"mk:_send_lease_modacks items received at time: {datetime.datetime.now()}")
assert self._dispatcher is not None
Expand Down Expand Up @@ -1094,7 +1094,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# IMPORTANT: Circumvent the wrapper class and operate on the raw underlying
# protobuf message to significantly gain on attribute access performance.
received_messages = response._pb.received_messages

current_time = Timestamp()
current_time.GetCurrentTime()
for rm in received_messages:
Expand Down Expand Up @@ -1130,9 +1130,9 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
else:
#print(f"mk: streaming_pull_manager: initial_modack called with: {list(ack_id_gen)}")
# print(f"mk: streaming_pull_manager: initial_modack called with: {list(ack_id_gen)}")
self._initial_modack.add(ack_id_gen)
#self._leaser.add2(ack_id_gen)
# self._leaser.add2(ack_id_gen)

with self._pause_resume_lock:
assert self._scheduler is not None
Expand Down

0 comments on commit ae30761

Please sign in to comment.