From 48ecbda3f52b54581052e772f2f49b5d2f1090c9 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 20 Apr 2021 18:45:03 +0300 Subject: [PATCH 1/3] Improve Ringbuffer#read_many Before this, read_many was returning an immutable lazy list of items read. However, there were more information sent by the server which we were omitting. Now, read_many returns an instance of ReadResult, which the user can get the read count, size, next sequence to read from and item sequences. Note that, this is not a breaking change. ReadResult inherits from the ImmutableLazyDataList and hence is a list. So, the users of this API can use their code as it is without any change when they upgrade to 4.1. New users might benefit from the new APIs provided. Also added a filter parameter to read_many, refactored the implementation and updated some docstrings. --- hazelcast/proxy/ringbuffer.py | 268 +++++++++++++----- .../backward_compatible/proxy/hazelcast.xml | 2 + .../proxy/ringbuffer_test.py | 144 +++++++++- tests/util.py | 7 + 4 files changed, 351 insertions(+), 70 deletions(-) diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index 0f83cea12b..e5fe24430e 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -41,6 +41,7 @@ >>> sleep(sleepMS / 1000) >>> sleepMS *= 2 """ + MAX_BATCH_SIZE = 1000 """ The maximum number of items to be added to RingBuffer or read from RingBuffer at a time. @@ -48,24 +49,35 @@ class Ringbuffer(PartitionSpecificProxy): - """A Ringbuffer is a data-structure where the content is stored in a ring like structure. - - A Ringbuffer has a capacity so it won't grow beyond that capacity and endanger the stability of the system. - If that capacity is exceeded, than the oldest item in the Ringbuffer is overwritten. - The Ringbuffer has 2 always incrementing sequences: - - - Tail_sequence: This is the side where the youngest item is found. So the tail is the side of the Ringbuffer - where items are added to. - - Head_sequence: This is the side where the oldest items are found. So the head is the side where items gets - discarded. - - The items in the Ringbuffer can be found by a sequence that is in between (inclusive) the head and tail sequence. - - A Ringbuffer currently is not a distributed data-structure. So all data is stored in a single partition; comparable - to the IQueue implementation. But we'll provide an option to partition the data in the near future. A Ringbuffer - can be used in a similar way as a queue, but one of the key differences is that a queue.take is destructive, - meaning that only 1 thread is able to take an item. A Ringbuffer.read is not destructive, so you can have multiple - threads reading the same item multiple times. + """A Ringbuffer is a data-structure where the content is stored in a + ring like structure. + + A ringbuffer has a capacity so it won't grow beyond that capacity and + endanger the stability of the system. If that capacity is exceeded, than + the oldest item in the ringbuffer is overwritten. The ringbuffer has two + always incrementing sequences: + + - :func:`tail_sequence`: This is the side where the youngest item is found. + So the tail is the side of the ringbuffer where items are added to. + - :func:`head_sequence`: This is the side where the oldest items are found. + So the head is the side where items gets discarded. + + The items in the ringbuffer can be found by a sequence that is in between + (inclusive) the head and tail sequence. + + If data is read from a ringbuffer with a sequence that is smaller than the + head sequence, it means that the data is not available anymore and a + :class:`hazelcast.errors.StaleSequenceError` is thrown. + + A Ringbuffer currently is a replicated, but not partitioned data structure. + So all data is stored in a single partition, similarly to the + :class:`hazelcast.proxy.queue.Queue` implementation. + + A Ringbuffer can be used in a way similar to the Queue, but one of the key + differences is that a :func:`hazelcast.proxy.queue.Queue.take` is destructive, + meaning that only 1 thread is able to take an item. A :func:`read_one` is not + destructive, so you can have multiple threads reading the same item multiple + times. """ def __init__(self, service_name, name, context): @@ -86,6 +98,7 @@ def handler(message): request = ringbuffer_capacity_codec.encode_request(self.name) return self._invoke(request, handler) + return ImmediateFuture(self._capacity) def size(self): @@ -100,7 +113,8 @@ def size(self): def tail_sequence(self): """Returns the sequence of the tail. - The tail is the side of the Ringbuffer where the items are added to. The initial value of the tail is -1. + The tail is the side of the Ringbuffer where the items are added to. + The initial value of the tail is ``-1``. Returns: hazelcast.future.Future[int]: The sequence of the tail. @@ -111,9 +125,10 @@ def tail_sequence(self): def head_sequence(self): """Returns the sequence of the head. - The head is the side of the Ringbuffer where the oldest items in the Ringbuffer are found. - If the Ringbuffer is empty, the head will be one more than the tail. The initial value of - the head is 0 (1 more than tail). + The head is the side of the Ringbuffer where the oldest items in the + Ringbuffer are found. If the Ringbuffer is empty, the head will be one + more than the tail. The initial value of the head is ``0`` (``1`` more + than tail). Returns: hazelcast.future.Future[int]: The sequence of the head. @@ -133,36 +148,44 @@ def remaining_capacity(self): def add(self, item, overflow_policy=OVERFLOW_POLICY_OVERWRITE): """Adds the specified item to the tail of the Ringbuffer. - If there is no space in the Ringbuffer, the action is determined by overflow policy - as ``OVERFLOW_POLICY_OVERWRITE`` or ``OVERFLOW_POLICY_FAIL``. + If there is no space in the Ringbuffer, the action is determined by + overflow policy as ``OVERFLOW_POLICY_OVERWRITE`` or + ``OVERFLOW_POLICY_FAIL``. Args: item: The specified item to be added. - overflow_policy (int): the OverflowPolicy to be used when there is no space. + overflow_policy (int): the OverflowPolicy to be used when there is + no space. Returns: - hazelcast.future.Future[int]: The sequenceId of the added item, or ``-1`` if the add failed. + hazelcast.future.Future[int]: The sequenceId of the added item, or + ``-1`` if the add failed. """ item_data = self._to_data(item) request = ringbuffer_add_codec.encode_request(self.name, overflow_policy, item_data) return self._invoke(request, ringbuffer_add_codec.decode_response) def add_all(self, items, overflow_policy=OVERFLOW_POLICY_OVERWRITE): - """Adds all of the item in the specified collection to the tail of the Ringbuffer. + """Adds all of the item in the specified collection to the tail of the + Ringbuffer. - An add_all is likely to outperform multiple calls to add(object) due to better io utilization - and a reduced number of executed operations. The items are added in the order of the Iterator of the collection. + This is likely to outperform multiple calls to :func:`add` due + to better io utilization and a reduced number of executed operations. + The items are added in the order of the Iterator of the collection. - If there is no space in the Ringbuffer, the action is determined by overflow policy - as ``OVERFLOW_POLICY_OVERWRITE`` or ``OVERFLOW_POLICY_FAIL``. + If there is no space in the Ringbuffer, the action is determined by + overflow policy as ``OVERFLOW_POLICY_OVERWRITE`` or + ``OVERFLOW_POLICY_FAIL``. Args: - items (list): The specified collection which contains the items to be added. - overflow_policy (int): The OverflowPolicy to be used when there is no space. + items (list): The specified collection which contains the items + to be added. + overflow_policy (int): The OverflowPolicy to be used when there + is no space. Returns: - hazelcast.future.Future[int]: The sequenceId of the last written item, or ``-1`` - of the last write is failed. + hazelcast.future.Future[int]: The sequenceId of the last written item, + or ``-1`` of the last write is failed. """ check_not_empty(items, "items can't be empty") if len(items) > MAX_BATCH_SIZE: @@ -181,8 +204,9 @@ def add_all(self, items, overflow_policy=OVERFLOW_POLICY_OVERWRITE): def read_one(self, sequence): """Reads one item from the Ringbuffer. - If the sequence is one beyond the current tail, this call blocks until an item is added. - Currently it isn't possible to control how long this call is going to block. + If the sequence is one beyond the current tail, this call blocks until + an item is added. Currently it isn't possible to control how long + this call is going to block. Args: sequence (int): The sequence of the item to read. @@ -198,56 +222,162 @@ def handler(message): request = ringbuffer_read_one_codec.encode_request(self.name, sequence) return self._invoke(request, handler) - def read_many(self, start_sequence, min_count, max_count): + def read_many(self, start_sequence, min_count, max_count, filter=None): """Reads a batch of items from the Ringbuffer. - If the number of available items after the first read item is smaller than the max_count, - these items are returned. So it could be the number of items read is smaller than the max_count. - If there are less items available than min_count, then this call blocks. Reading a batch of items - is likely to perform better because less overhead is involved. + If the number of available items after the first read item is smaller + than the ``max_count``, these items are returned. So it could be the + number of items read is smaller than the ``max_count``. If there are + less items available than ``min_count``, then this call blocks. + + Warnings: + These blocking calls consume server memory and if there are many + calls, it can be possible to see leaking memory or + ``OutOfMemoryError`` s on the server. + + Reading a batch of items is likely to perform better because less + overhead is involved. + + A filter can be provided to only select items that need to be read. If + the filter is ``None``, all items are read. If the filter is not + ``None``, only items where the filter function returns true are + returned. Using filters is a good way to prevent getting items that + are of no value to the receiver. This reduces the amount of IO and the + number of operations being executed, and can result in a significant + performance improvement. Note that, filtering logic must be defined + on the server-side. + + If the ``start_sequence`` is smaller than the smallest sequence still + available in the Ringbuffer (:func:`head_sequence`), then the smallest + available sequence will be used as the start sequence and the + minimum/maximum number of items will be attempted to be read from there + on. + + If the ``start_sequence`` is bigger than the last available sequence + in the Ringbuffer (:func:`tail_sequence`), then the last available + sequence plus one will be used as the start sequence and the call will + block until further items become available and it can read at least the + minimum number of items. Args: - start_sequence (int): The start_sequence of the first item to read. + start_sequence (int): The start sequence of the first item to read. min_count (int): The minimum number of items to read. max_count (int): The maximum number of items to read. + filter: Filter to select returned elements. Returns: - hazelcast.future.Future[list]: The list of read items. + hazelcast.future.Future[ReadResult]: The list of read items. """ check_not_negative(start_sequence, "sequence can't be smaller than 0") + check_not_negative(min_count, "min count can't be smaller than 0") check_true(max_count >= min_count, "max count should be greater or equal to min count") check_true( max_count < MAX_BATCH_SIZE, "max count can't be greater than %d" % MAX_BATCH_SIZE ) - future = Future() request = ringbuffer_read_many_codec.encode_request( - self.name, start_sequence, min_count, max_count, None + self.name, start_sequence, min_count, max_count, self._to_data(filter) ) def handler(message): - return ImmutableLazyDataList( - ringbuffer_read_many_codec.decode_response(message)["items"], self._to_object + response = ringbuffer_read_many_codec.decode_response(message) + read_count = response["read_count"] + next_seq = response["next_seq"] + items = response["items"] + item_seqs = response["item_seqs"] + + return ReadResult(read_count, next_seq, items, item_seqs, self._to_object) + + def continuation(future): + # Since the first call to capacity + # is cached on the client-side, doing + # a capacity check each time should not + # be a problem + capacity = future.result() + + check_true( + min_count <= capacity, + "min count: %d should be smaller or equal to capacity: %d" % (min_count, capacity), ) - def check_capacity(capacity): - try: - capacity = capacity.result() - check_true( - min_count <= capacity, - "min count: %d should be smaller or equal to capacity: %d" - % (min_count, capacity), - ) - f = self._invoke(request, handler) - f.add_done_callback(set_result) - except Exception as e: - future.set_exception(e) - - def set_result(f): - try: - future.set_result(f.result()) - except Exception as e: - future.set_exception(e) - - self.capacity().add_done_callback(check_capacity) - return future + return self._invoke(request, handler) + + return self.capacity().continue_with(continuation) + + +class ReadResult(ImmutableLazyDataList): + """Defines the result of a :func:`Ringbuffer.read_many` operation.""" + + SEQUENCE_UNAVAILABLE = -1 + """Value returned from methods returning a sequence number when the + information is not available (e.g. because of rolling upgrade and some + members not returning the sequence). + """ + + def __init__(self, read_count, next_seq, items, item_seqs, to_object): + super(ReadResult, self).__init__(items, to_object) + self._read_count = read_count + self._next_seq = next_seq + self._item_seqs = item_seqs + + @property + def read_count(self): + """int: The number of items that have been read before filtering. + + If no filter is set, then the :attr:`read_count` will be equal to + :attr:`size`. + + But if a filter is applied, it could be that items are read, but are + filtered out. So if you are trying to make another read based on the + this then you should increment the sequence by :attr:`read_count` and + not by :attr:`size`. + + Otherwise you will be re-reading the same filtered messages. + """ + return self._read_count + + @property + def size(self): + """int: The result set size. + + See Also: + :attr:`read_count` + """ + return len(self._list_data) + + @property + def next_sequence_to_read_from(self): + """int: The sequence of the item following the last read item. + + This sequence can then be used to read items following the ones + returned by this result set. + + Usually this sequence is equal to the sequence used to retrieve this + result set incremented by the :attr:`read_count`. In cases when the + reader tolerates lost items, this is not the case. + + For instance, if the reader requests an item with a stale sequence (one + which has already been overwritten), the read will jump to the oldest + sequence and read from there. + + Similarly, if the reader requests an item in the future (e.g. because + the partition was lost and the reader was unaware of this), the read + method will jump back to the newest available sequence. + + Because of these jumps and only in the case when the reader is loss + tolerant, the next sequence must be retrieved using this method. + A return value of :const:`SEQUENCE_UNAVAILABLE` means that the + information is not available. + """ + return self._next_seq + + def get_sequence(self, index): + """Return the sequence number for the item at the given index. + + Args: + index (int): The index. + + Returns: + int: The sequence number for the ringbuffer item. + """ + return self._item_seqs[index] diff --git a/tests/integration/backward_compatible/proxy/hazelcast.xml b/tests/integration/backward_compatible/proxy/hazelcast.xml index 83049a30f9..4654287390 100644 --- a/tests/integration/backward_compatible/proxy/hazelcast.xml +++ b/tests/integration/backward_compatible/proxy/hazelcast.xml @@ -19,6 +19,8 @@ com.hazelcast.client.test.IdentifiedFactory + com.hazelcast.client.test.IdentifiedDataSerializableFactory + com.hazelcast.client.test.PortableFactory diff --git a/tests/integration/backward_compatible/proxy/ringbuffer_test.py b/tests/integration/backward_compatible/proxy/ringbuffer_test.py index a21b147b0d..be4130639b 100644 --- a/tests/integration/backward_compatible/proxy/ringbuffer_test.py +++ b/tests/integration/backward_compatible/proxy/ringbuffer_test.py @@ -1,8 +1,11 @@ import os +import time +import unittest from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, MAX_BATCH_SIZE +from hazelcast.serialization.api import IdentifiedDataSerializable from tests.base import SingleMemberTestCase -from tests.util import random_string +from tests.util import random_string, is_client_version_older_than, get_current_timestamp from hazelcast.six.moves import range CAPACITY = 10 @@ -114,3 +117,142 @@ def fill_ringbuffer(self, n=CAPACITY): def test_str(self): self.assertTrue(str(self.ringbuffer).startswith("Ringbuffer")) + + +@unittest.skipIf(is_client_version_older_than("4.1"), "Tests the features added in 4.1 version of the client") +class RingbufferReadManyTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open(os.path.join(dir_path, "hazelcast.xml")) as f: + return f.read() + + def setUp(self): + self.ringbuffer = self.client.get_ringbuffer( + "ClientRingbufferTestWithTTL-" + random_string() + ).blocking() + + def tearDown(self): + self.ringbuffer.destroy() + + def test_when_start_sequence_is_no_longer_available_gets_clamped(self): + self.fill_ringbuffer() + + result_set = self.ringbuffer.read_many(0, 1, CAPACITY) + self.assertEqual(CAPACITY, result_set.read_count) + self.assertEqual(CAPACITY, result_set.size) + self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + + for i in range(CAPACITY): + self.assertEqual(i, result_set[i]) + self.assertEqual(i, result_set.get_sequence(i)) + + def test_when_start_sequence_is_equal_to_tail_sequence(self): + self.fill_ringbuffer() + + result_set = self.ringbuffer.read_many(CAPACITY - 1, 1, CAPACITY) + self.assertEqual(1, result_set.read_count) + self.assertEqual(1, result_set.size) + self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + self.assertEqual(CAPACITY - 1, result_set[0]) + self.assertEqual(CAPACITY - 1, result_set.get_sequence(0)) + + def test_when_start_sequence_is_beyond_tail_sequence_then_blocks(self): + self.fill_ringbuffer() + + begin_time = get_current_timestamp() + result_set_future = self.ringbuffer._wrapped.read_many(CAPACITY + 1, 1, CAPACITY) + time.sleep(0.5) + self.assertFalse(result_set_future.done()) + time_passed = get_current_timestamp() - begin_time + self.assertTrue(time_passed >= 0.5) + + def test_when_min_count_items_are_not_available_then_blocks(self): + self.fill_ringbuffer() + + begin_time = get_current_timestamp() + result_set_future = self.ringbuffer._wrapped.read_many(CAPACITY, 2, 3) + time.sleep(0.5) + self.assertFalse(result_set_future.done()) + time_passed = get_current_timestamp() - begin_time + self.assertTrue(time_passed >= 0.5) + + def test_max_count(self): + # If more results are available than needed, the surplus results + # should not be read. + self.fill_ringbuffer() + + max_count = CAPACITY // 2 + result_set = self.ringbuffer.read_many(0, 0, max_count) + self.assertEqual(max_count, result_set.read_count) + self.assertEqual(max_count, result_set.size) + self.assertEqual(max_count, result_set.next_sequence_to_read_from) + + for i in range(max_count): + self.assertEqual(i, result_set[i]) + self.assertEqual(i, result_set.get_sequence(i)) + + def test_filter(self): + def item_factory(i): + if i % 2 == 0: + return "good%s" % i + return "bad%s" % i + + self.fill_ringbuffer(item_factory) + + expected_size = CAPACITY // 2 + + result_set = self.ringbuffer.read_many(0, 0, CAPACITY, PrefixFilter("good")) + self.assertEqual(CAPACITY, result_set.read_count) + self.assertEqual(expected_size, result_set.size) + self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + + for i in range(expected_size): + self.assertEqual(item_factory(i * 2), result_set[i]) + self.assertEqual(i * 2, result_set.get_sequence(i)) + + def test_filter_with_max_count(self): + def item_factory(i): + if i % 2 == 0: + return "good%s" % i + return "bad%s" % i + + self.fill_ringbuffer(item_factory) + + expected_size = 3 + + result_set = self.ringbuffer.read_many(0, 0, expected_size, PrefixFilter("good")) + self.assertEqual(expected_size * 2 - 1, result_set.read_count) + self.assertEqual(expected_size, result_set.size) + self.assertEqual(expected_size * 2 - 1, result_set.next_sequence_to_read_from) + + for i in range(expected_size): + self.assertEqual(item_factory(i * 2), result_set[i]) + self.assertEqual(i * 2, result_set.get_sequence(i)) + + def fill_ringbuffer(self, item_factory=lambda i: i): + for i in range(0, CAPACITY): + self.ringbuffer.add(item_factory(i)) + + +class PrefixFilter(IdentifiedDataSerializable): + def __init__(self, prefix): + self.prefix = prefix + + def write_data(self, object_data_output): + object_data_output.write_string(self.prefix) + + def read_data(self, object_data_input): + self.prefix = object_data_input.read_string() + + def get_factory_id(self): + return 666 + + def get_class_id(self): + return 14 diff --git a/tests/util.py b/tests/util.py index 909b7c0c24..93423d9e7b 100644 --- a/tests/util.py +++ b/tests/util.py @@ -2,6 +2,7 @@ import time from uuid import uuid4 +from hazelcast import __version__ from hazelcast.config import SSLProtocol from hazelcast.util import calculate_version @@ -97,6 +98,12 @@ def mark_server_version_at_least(test, client, expected_version): test.skipTest("Expected a newer server") +def is_client_version_older_than(expected_version): + version = calculate_version(__version__) + expected_version = calculate_version(expected_version) + return version < expected_version + + def open_connection_to_address(client, uuid): key = generate_key_owned_by_instance(client, uuid) m = client.get_map(random_string()).blocking() From 8fab8b3cea4323ee86e968b99a3d36f344c4ce34 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 21 Apr 2021 10:34:05 +0300 Subject: [PATCH 2/3] lint the test code --- .../integration/backward_compatible/proxy/ringbuffer_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/backward_compatible/proxy/ringbuffer_test.py b/tests/integration/backward_compatible/proxy/ringbuffer_test.py index be4130639b..b94e70b5f7 100644 --- a/tests/integration/backward_compatible/proxy/ringbuffer_test.py +++ b/tests/integration/backward_compatible/proxy/ringbuffer_test.py @@ -119,7 +119,9 @@ def test_str(self): self.assertTrue(str(self.ringbuffer).startswith("Ringbuffer")) -@unittest.skipIf(is_client_version_older_than("4.1"), "Tests the features added in 4.1 version of the client") +@unittest.skipIf( + is_client_version_older_than("4.1"), "Tests the features added in 4.1 version of the client" +) class RingbufferReadManyTest(SingleMemberTestCase): @classmethod def configure_client(cls, config): From ac8819b58aedfbc1211a907b80d32bcca12f51b0 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 26 Apr 2021 17:59:08 +0300 Subject: [PATCH 3/3] address review comments --- hazelcast/proxy/ringbuffer.py | 24 ++++---- .../proxy/ringbuffer_test.py | 56 ++++++++++++++----- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index e5fe24430e..52343ef9e5 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -49,8 +49,8 @@ class Ringbuffer(PartitionSpecificProxy): - """A Ringbuffer is a data-structure where the content is stored in a - ring like structure. + """A Ringbuffer is an append-only data-structure where the content is + stored in a ring like structure. A ringbuffer has a capacity so it won't grow beyond that capacity and endanger the stability of the system. If that capacity is exceeded, than @@ -149,8 +149,8 @@ def add(self, item, overflow_policy=OVERFLOW_POLICY_OVERWRITE): """Adds the specified item to the tail of the Ringbuffer. If there is no space in the Ringbuffer, the action is determined by - overflow policy as ``OVERFLOW_POLICY_OVERWRITE`` or - ``OVERFLOW_POLICY_FAIL``. + ``overflow_policy`` as :const:`OVERFLOW_POLICY_OVERWRITE` or + :const:`OVERFLOW_POLICY_FAIL`. Args: item: The specified item to be added. @@ -174,8 +174,8 @@ def add_all(self, items, overflow_policy=OVERFLOW_POLICY_OVERWRITE): The items are added in the order of the Iterator of the collection. If there is no space in the Ringbuffer, the action is determined by - overflow policy as ``OVERFLOW_POLICY_OVERWRITE`` or - ``OVERFLOW_POLICY_FAIL``. + ``overflow_policy`` as :const:`OVERFLOW_POLICY_OVERWRITE` or + :const:`OVERFLOW_POLICY_FAIL`. Args: items (list): The specified collection which contains the items @@ -240,9 +240,9 @@ def read_many(self, start_sequence, min_count, max_count, filter=None): A filter can be provided to only select items that need to be read. If the filter is ``None``, all items are read. If the filter is not - ``None``, only items where the filter function returns true are + ``None``, only items where the filter function returns true are returned. Using filters is a good way to prevent getting items that - are of no value to the receiver. This reduces the amount of IO and the + are of no value to the receiver. This reduces the amount of IO and the number of operations being executed, and can result in a significant performance improvement. Note that, filtering logic must be defined on the server-side. @@ -296,8 +296,8 @@ def continuation(future): capacity = future.result() check_true( - min_count <= capacity, - "min count: %d should be smaller or equal to capacity: %d" % (min_count, capacity), + max_count <= capacity, + "max count: %d should be smaller or equal to capacity: %d" % (max_count, capacity), ) return self._invoke(request, handler) @@ -328,8 +328,8 @@ def read_count(self): :attr:`size`. But if a filter is applied, it could be that items are read, but are - filtered out. So if you are trying to make another read based on the - this then you should increment the sequence by :attr:`read_count` and + filtered out. So, if you are trying to make another read based on + this, then you should increment the sequence by :attr:`read_count` and not by :attr:`size`. Otherwise you will be re-reading the same filtered messages. diff --git a/tests/integration/backward_compatible/proxy/ringbuffer_test.py b/tests/integration/backward_compatible/proxy/ringbuffer_test.py index b94e70b5f7..c30d12590f 100644 --- a/tests/integration/backward_compatible/proxy/ringbuffer_test.py +++ b/tests/integration/backward_compatible/proxy/ringbuffer_test.py @@ -144,16 +144,16 @@ def tearDown(self): self.ringbuffer.destroy() def test_when_start_sequence_is_no_longer_available_gets_clamped(self): - self.fill_ringbuffer() + self.fill_ringbuffer(item_count=CAPACITY + 1) result_set = self.ringbuffer.read_many(0, 1, CAPACITY) self.assertEqual(CAPACITY, result_set.read_count) self.assertEqual(CAPACITY, result_set.size) - self.assertEqual(CAPACITY, result_set.next_sequence_to_read_from) + self.assertEqual(CAPACITY + 1, result_set.next_sequence_to_read_from) - for i in range(CAPACITY): - self.assertEqual(i, result_set[i]) - self.assertEqual(i, result_set.get_sequence(i)) + for i in range(1, CAPACITY + 1): + self.assertEqual(i, result_set[i - 1]) + self.assertEqual(i, result_set.get_sequence(i - 1)) def test_when_start_sequence_is_equal_to_tail_sequence(self): self.fill_ringbuffer() @@ -168,22 +168,50 @@ def test_when_start_sequence_is_equal_to_tail_sequence(self): def test_when_start_sequence_is_beyond_tail_sequence_then_blocks(self): self.fill_ringbuffer() - begin_time = get_current_timestamp() result_set_future = self.ringbuffer._wrapped.read_many(CAPACITY + 1, 1, CAPACITY) time.sleep(0.5) self.assertFalse(result_set_future.done()) - time_passed = get_current_timestamp() - begin_time - self.assertTrue(time_passed >= 0.5) def test_when_min_count_items_are_not_available_then_blocks(self): self.fill_ringbuffer() - begin_time = get_current_timestamp() - result_set_future = self.ringbuffer._wrapped.read_many(CAPACITY, 2, 3) + result_set_future = self.ringbuffer._wrapped.read_many(CAPACITY - 1, 2, 3) time.sleep(0.5) self.assertFalse(result_set_future.done()) - time_passed = get_current_timestamp() - begin_time - self.assertTrue(time_passed >= 0.5) + + def test_when_some_waiting_needed(self): + self.fill_ringbuffer() + + result_set_future = self.ringbuffer._wrapped.read_many(CAPACITY - 1, 2, 3) + time.sleep(0.5) + self.assertFalse(result_set_future.done()) + + self.ringbuffer.add(CAPACITY) + + self.assertTrueEventually(lambda: self.assertTrue(result_set_future.done())) + + result_set = result_set_future.result() + self.assertEqual(2, result_set.read_count) + self.assertEqual(2, result_set.size) + self.assertEqual(CAPACITY + 1, result_set.next_sequence_to_read_from) + self.assertEqual(CAPACITY - 1, result_set[0]) + self.assertEqual(CAPACITY - 1, result_set.get_sequence(0)) + self.assertEqual(CAPACITY, result_set[1]) + self.assertEqual(CAPACITY, result_set.get_sequence(1)) + + def test_min_zero_when_item_available(self): + self.fill_ringbuffer() + + result_set = self.ringbuffer.read_many(0, 0, 1) + + self.assertEqual(1, result_set.read_count) + self.assertEqual(1, result_set.size) + + def test_min_zero_when_no_item_available(self): + result_set = self.ringbuffer.read_many(0, 0, 1) + + self.assertEqual(0, result_set.read_count) + self.assertEqual(0, result_set.size) def test_max_count(self): # If more results are available than needed, the surplus results @@ -238,8 +266,8 @@ def item_factory(i): self.assertEqual(item_factory(i * 2), result_set[i]) self.assertEqual(i * 2, result_set.get_sequence(i)) - def fill_ringbuffer(self, item_factory=lambda i: i): - for i in range(0, CAPACITY): + def fill_ringbuffer(self, item_factory=lambda i: i, item_count=CAPACITY): + for i in range(0, item_count): self.ringbuffer.add(item_factory(i))