Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow dropping cleaned-up keys #911

Merged
merged 16 commits into from
May 5, 2023
36 changes: 24 additions & 12 deletions google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
# limitations under the License.

import collections
import logging
import typing
from typing import Any, Callable, Iterable, Optional

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1 import subscriber


_LOGGER = logging.getLogger(__name__)


class MessagesOnHold(object):
"""Tracks messages on hold by ordering key. Not thread-safe."""

Expand Down Expand Up @@ -113,14 +117,17 @@ def activate_ordering_keys(

Args:
ordering_keys:
The ordering keys to activate. May be empty.
The ordering keys to activate. May be empty, or contain duplicates.
schedule_message_callback:
The callback to call to schedule a message to be sent to the user.
"""
for key in ordering_keys:
assert (
self._pending_ordered_messages.get(key) is not None
), "A message queue should exist for every ordered message in flight."
pending_ordered_messages = self._pending_ordered_messages.get(key)
if pending_ordered_messages is None:
_LOGGER.warning(
"No message queue exists for message ordering key: %s.", key
)
continue
next_msg = self._get_next_for_ordering_key(key)
if next_msg:
# Schedule the next message because the previous was dropped.
Expand Down Expand Up @@ -154,15 +161,20 @@ def _get_next_for_ordering_key(
def _clean_up_ordering_key(self, ordering_key: str) -> None:
"""Clean up state for an ordering key with no pending messages.

Args:
Args
ordering_key: The ordering key to clean up.
"""
message_queue = self._pending_ordered_messages.get(ordering_key)
assert (
message_queue is not None
), "Cleaning up ordering key that does not exist."
assert not len(message_queue), (
"Ordering key must only be removed if there are no messages "
"left for that key."
)
if message_queue is None:
_LOGGER.warning(
"Tried to clean up ordering key that does not exist: %s", ordering_key
)
return
if len(message_queue) > 0:
_LOGGER.warning(
"Tried to clean up ordering key: %s with %d messages remaining.",
ordering_key,
len(message_queue),
)
return
del self._pending_ordered_messages[ordering_key]
102 changes: 102 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,72 @@ def test_ordered_messages_one_key():
assert moh.size == 0


def test_ordered_messages_drop_duplicate_keys(caplog):
moh = messages_on_hold.MessagesOnHold()

msg1 = make_message(ack_id="ack1", ordering_key="key1")
moh.put(msg1)
assert moh.size == 1

msg2 = make_message(ack_id="ack2", ordering_key="key1")
moh.put(msg2)
assert moh.size == 2

# Get first message for "key1"
assert moh.get() == msg1
assert moh.size == 1

# Still waiting on the previously-sent message for "key1", and there are no
# other messages, so return None.
assert moh.get() is None
assert moh.size == 1

# Activate "key1".
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1", "key1"], callback_tracker)
assert callback_tracker.called
assert callback_tracker.message == msg2
assert moh.size == 0
assert len(moh._pending_ordered_messages) == 0

# Activate "key1" again
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

# Activate "key1" again. There are no other messages for that key, so clean
# up state for that key.
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

msg3 = make_message(ack_id="ack3", ordering_key="key1")
moh.put(msg3)
assert moh.size == 1

# Get next message for "key1"
assert moh.get() == msg3
assert moh.size == 0

# Activate "key1".
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

# Activate "key1" again. There are no other messages for that key, so clean
# up state for that key.
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called

# Activate "key1" again after being cleaned up. There are no other messages for that key, so clean
# up state for that key.
callback_tracker = ScheduleMessageCallbackTracker()
moh.activate_ordering_keys(["key1"], callback_tracker)
assert not callback_tracker.called
assert "No message queue exists for message ordering key: key1" in caplog.text


def test_ordered_messages_two_keys():
moh = messages_on_hold.MessagesOnHold()

Expand Down Expand Up @@ -278,3 +344,39 @@ def test_ordered_and_unordered_messages_interleaved():
# No messages left.
assert moh.get() is None
assert moh.size == 0


def test_cleanup_nonexistent_key(caplog):
moh = messages_on_hold.MessagesOnHold()
moh._clean_up_ordering_key("non-existent-key")
assert (
"Tried to clean up ordering key that does not exist: non-existent-key"
in caplog.text
)


def test_cleanup_key_with_messages(caplog):
moh = messages_on_hold.MessagesOnHold()

# Put message with "key1".
msg1 = make_message(ack_id="ack1", ordering_key="key1")
moh.put(msg1)
assert moh.size == 1

# Put another message "key1"
msg2 = make_message(ack_id="ack2", ordering_key="key1")
moh.put(msg2)
assert moh.size == 2

# Get first message for "key1"
assert moh.get() == msg1
assert moh.size == 1

# Get first message for "key1"
assert moh.get() is None
assert moh.size == 1

moh._clean_up_ordering_key("key1")
assert (
"Tried to clean up ordering key: key1 with 1 messages remaining." in caplog.text
)