From 1e6e69759b45ca56ea1d209be45154f0a968665c Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Wed, 13 May 2020 13:52:29 +0300 Subject: [PATCH 1/9] Initial ReliableTopic implementation: #201 --- examples/reliable-topic/__init__.py | 0 .../reliable-topic/reliable_topic_example.py | 29 ++ hazelcast/proxy/reliable_topic.py | 329 +++++++++++++++++- hazelcast/serialization/reliable_topic.py | 27 ++ tests/proxy/hazelcast_topic.xml | 30 ++ tests/proxy/reliable_topic_test.py | 295 ++++++++++++++++ 6 files changed, 705 insertions(+), 5 deletions(-) create mode 100644 examples/reliable-topic/__init__.py create mode 100644 examples/reliable-topic/reliable_topic_example.py create mode 100644 hazelcast/serialization/reliable_topic.py create mode 100644 tests/proxy/hazelcast_topic.xml create mode 100644 tests/proxy/reliable_topic_test.py diff --git a/examples/reliable-topic/__init__.py b/examples/reliable-topic/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/reliable-topic/reliable_topic_example.py b/examples/reliable-topic/reliable_topic_example.py new file mode 100644 index 0000000000..842227c36d --- /dev/null +++ b/examples/reliable-topic/reliable_topic_example.py @@ -0,0 +1,29 @@ +import time + +import hazelcast +from hazelcast import ClientConfig +from hazelcast.proxy.reliable_topic import ReliableMessageListener + + +class MyReliableMessageListener(ReliableMessageListener): + def on_message(self, event): + print("Got message: {}".format(event.message)) + print("Publish time: {}\n".format(event.publish_time)) + + +if __name__ == "__main__": + config = ClientConfig() + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + client = hazelcast.HazelcastClient(config) + listener = MyReliableMessageListener() + + reliable_topic = client.get_reliable_topic("reliable-topic") + registration_id = reliable_topic.add_listener(listener) + + for i in range(10): + reliable_topic.publish("Message " + str(i)) + time.sleep(0.1) + + reliable_topic.destroy() + reliable_topic.remove_listener(registration_id) + client.shutdown() diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index b518f03735..f1dbf87f16 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -1,12 +1,331 @@ -from hazelcast.proxy.base import Proxy +import time +import threading +from uuid import uuid4 +from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY +from hazelcast.exception import IllegalArgumentError, TopicOverflowError, HazelcastInstanceNotActiveError, \ + HazelcastClientNotActiveException, DistributedObjectDestroyedError, StaleSequenceError, OperationTimeoutError +from hazelcast.proxy.base import Proxy, TopicMessage +from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE +from hazelcast.serialization.reliable_topic import ReliableTopicMessage +from hazelcast.util import current_time_in_millis +from hazelcast.six.moves import queue -class ReliableTopic(Proxy): - def add_listener(self, on_message=None): +_INITIAL_BACKOFF = 0.1 +_MAX_BACKOFF = 2 + + +class ReliableMessageListener(object): + def on_message(self, item): + """ + Invoked when a message is received for the added reliable topic. + + :param: message the message that is received for the added reliable topic + """ raise NotImplementedError + def retrieve_initial_sequence(self): + """ + Retrieves the initial sequence from which this ReliableMessageListener + should start. + + Return -1 if there is no initial sequence and you want to start + from the next published message. + + If you intend to create a durable subscriber so you continue from where + you stopped the previous time, load the previous sequence and add 1. + If you don't add one, then you will be receiving the same message twice. + + :return: (int), the initial sequence + """ + return -1 + + def store_sequence(self, sequence): + """" + Informs the ReliableMessageListener that it should store the sequence. + This method is called before the message is processed. Can be used to + make a durable subscription. + + :param: (int) ``sequence`` the sequence + """ + pass + + def is_loss_tolerant(self): + """ + Checks if this ReliableMessageListener is able to deal with message loss. + Even though the reliable topic promises to be reliable, it can be that a + MessageListener is too slow. Eventually the message won't be available + anymore. + + If the ReliableMessageListener is not loss tolerant and the topic detects + that there are missing messages, it will terminate the + ReliableMessageListener. + + :return: (bool) ``True`` if the ReliableMessageListener is tolerant towards losing messages. + """ + return False + + def is_terminal(self): + """ + Checks if the ReliableMessageListener should be terminated based on an + exception thrown while calling on_message. + + :return: (bool) ``True` if the ReliableMessageListener should terminate itself, ``False`` if it should keep on running. + """ + raise False + + +class _MessageListener(object): + def __init__(self, uuid, proxy, to_object, listener): + self._id = uuid + self._proxy = proxy + self._to_object = to_object + self._listener = listener + self._cancelled_lock = threading.Lock() + self._cancelled = False + self._sequence = 0 + self._q = queue.Queue() + + def start(self): + tail_seq = self._proxy.ringbuffer.tail_sequence() + initial_seq = self._listener.retrieve_initial_sequence() + if initial_seq == -1: + initial_seq = tail_seq.result() + 1 + self._sequence = initial_seq + self._proxy.client.reactor.add_timer(0, self._next) + + def _handle_illegal_argument_error(self): + head_seq = self._proxy.ringbuffer.head_sequence().result() + self._proxy.logger.warning("MessageListener {} on topic {} requested a too large sequence. Jumping from old " + "sequence: {} to sequence: {}".format(self._id, self._proxy.name, self._sequence, + head_seq)) + self._sequence = head_seq + self._next() + + def _handle_stale_sequence_error(self): + head_seq = self._proxy.ringbuffer.head_sequence().result() + if self._listener.is_loss_tolerant: + self._sequence = head_seq + self._proxy.logger.warning("Topic {} ran into a stale sequence. Jumping from old sequence {} to new " + "sequence {}".format(self._proxy.name, self._sequence, head_seq)) + self._next() + return True + + self._proxy.logger.warning( + "Terminating Message Listener: {} on topic: {}. Reason: The listener was too slow or the retention " + "period of the message has been violated. Head: {}, sequence: {}".format(self._id, self._proxy.name, + head_seq, self._sequence)) + return False + + def _handle_operation_timeout_error(self): + self._proxy.logger.info("Message Listener ", self._proxy.id, "on topic: ", self._proxy.name, " timed out. " + + "Continuing from the last known sequence ", self._proxy.sequence) + self._next() + + def _handle_exception(self, exception): + base_msg = "Terminating Message Listener: " + self._id + " on topic: " + self._proxy.name + ". Reason: " + if isinstance(exception, IllegalArgumentError) and self._listener.is_loss_tolerant(): + self._handle_illegal_argument_error() + return + elif isinstance(exception, StaleSequenceError): + if self._handle_stale_sequence_error(): + return + elif isinstance(exception, OperationTimeoutError): + self._handle_operation_timeout_error() + return + elif isinstance(exception, HazelcastInstanceNotActiveError): + self._proxy.logger.info(base_msg + "HazelcastInstance is shutting down.") + elif isinstance(exception, HazelcastClientNotActiveException): + self._proxy.logger.info(base_msg + "HazelcastClient is shutting down.") + elif isinstance(exception, DistributedObjectDestroyedError): + self._proxy.logger.info(base_msg + "ReliableTopic is destroyed.") + else: + self._proxy.logger.warning(base_msg + "Unhandled error, message: " + str(exception)) + + self._cancel_and_remove_listener() + + def _terminate(self, exception): + with self._cancelled_lock: + if self._cancelled: + return True + + base_msg = "Terminating Message Listener: {} on topic: {}. Reason: ".format(self._id, self._proxy.name) + try: + terminate = self._listener.is_terminal() + if terminate: + self._proxy.logger.warning(base_msg + "Unhandled error: {}".format(str(exception))) + return True + + self._proxy.logger.warning("MessageListener {} on topic: {} ran into an error: {}". + format(self._id, self._proxy.name, str(exception))) + return False + + except Exception as e: + self._proxy.logger.warning(base_msg + "Unhandled error while calling ReliableMessageListener.is_terminal() " + "method: {}".format(str(e))) + return True + + def _process(self, msg): + try: + self._listener.on_message(msg) + except BaseException as e: + if self._terminate(e): + self._cancel_and_remove_listener() + + def _on_response(self, res): + try: + for message in res.result(): + with self._cancelled_lock: + if self._cancelled: + return + + msg = TopicMessage( + self._proxy.name, + message.payload, + message.publish_time, + message.publisher_address, + self._to_object + ) + self._listener.store_sequence(self._sequence) + self._process(msg) + self._sequence += 1 + + # Await for new messages + self._next() + except Exception as e: + self._handle_exception(e) + + def _next(self): + def _read_many(): + with self._cancelled_lock: + if self._cancelled: + return + + future = self._proxy.ringbuffer.read_many(self._sequence, 1, self._proxy.config.read_batch_size) + future.continue_with(self._on_response) + + self._proxy.client.reactor.add_timer(0, _read_many) + + def cancel(self): + with self._cancelled_lock: + self._cancelled = True + + def _cancel_and_remove_listener(self): + try: + # _proxy.remove_listener calls listener.cancel function + self._proxy.remove_listener(self._id) + except IllegalArgumentError as e: + # This listener is already removed + self._proxy.logger.debug("Failed to remove listener. Reason: {}".format(str(e))) + + +class ReliableTopic(Proxy): + """ + Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subscribers, which + is also known as a publish/subscribe (pub/sub) messaging model. Publish and subscriptions are cluster-wide. When a + member subscribes for a topic, it is actually registering for messages published by any member in the cluster, + including the new members joined after you added the listener. + + Messages are ordered, meaning that listeners(subscribers) will process the messages in the order they are actually + published. + + Hazelcast's Reliable Topic uses the same Topic interface as a regular topic. The main difference is that Reliable + Topic is backed up by the Ringbuffer data structure, a replicated but not partitioned data structure that stores + its data in a ring-like structure. + """ + + def __init__(self, client, service_name, name): + super(ReliableTopic, self).__init__(client, service_name, name) + + config = client.config.reliable_topic_configs.get(name, None) + if config is None: + config = ReliableTopicConfig() + + self.client = client + self.config = config + self._topic_overload_policy = self.config.topic_overload_policy + self.ringbuffer = client.get_ringbuffer("_hz_rb_" + name) + self._message_listeners_lock = threading.RLock() + self._message_listeners = {} + + def add_listener(self, reliable_topic_listener): + """ + Subscribes to this reliable topic. When someone publishes a message on this topic, on_message() method of + ReliableTopicListener is called. + + :param ReliableTopicListener: (Class), class to be used when a message is published. + :return: (str), a registration id which is used as a key to remove the listener. + """ + if not isinstance(reliable_topic_listener, ReliableMessageListener): + raise IllegalArgumentError("Message listener is not an instance of ReliableTopicListener") + + registration_id = str(uuid4()) + listener = _MessageListener(registration_id, self, self._to_object, reliable_topic_listener) + with self._message_listeners_lock: + self._message_listeners[registration_id] = listener + + listener.start() + return registration_id + + def _add_with_backoff(self, item): + sleep_time = _INITIAL_BACKOFF + while True: + seq_id = self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() + if seq_id != -1: + return + time.sleep(sleep_time) + sleep_time *= 2 + if sleep_time > _MAX_BACKOFF: + sleep_time = _MAX_BACKOFF + + def _add_or_fail(self, item): + seq_id = self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() + if seq_id == -1: + raise TopicOverflowError("failed to publish message to topic: " + self.name) + def publish(self, message): - raise NotImplementedError + """ + Publishes the message to all subscribers of this topic + + :param message: (object), the message to be published. + """ + # TODO: We need a publisher_address? + item = ReliableTopicMessage( + publish_time=current_time_in_millis(), + publisher_address="", + payload=self._to_data(message) + ) + if self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.BLOCK: + self._add_with_backoff(item) + elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.ERROR: + self._add_or_fail(item) + elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.DISCARD_NEWEST: + self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() + elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST: + self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_OVERWRITE).result() def remove_listener(self, registration_id): - raise NotImplementedError + """ + Stops receiving messages for the given message listener. If the given listener already removed, this method does + nothing. + + :param registration_id: (str), registration id of the listener to be removed. + :return: (bool), ``true`` if the listener is removed, ``false`` otherwise. + """ + with self._message_listeners_lock: + if registration_id not in self._message_listeners: + raise IllegalArgumentError("no listener is found with the given id : {}".format(registration_id)) + + listener = self._message_listeners[registration_id] + listener.cancel() + del self._message_listeners[registration_id] + return True + + return False + + def destroy(self): + """ + Destroys underlying Proxy and RingBuffer instances + """ + super(ReliableTopic, self).destroy() + return self.ringbuffer.destroy() diff --git a/hazelcast/serialization/reliable_topic.py b/hazelcast/serialization/reliable_topic.py new file mode 100644 index 0000000000..29ae6f0879 --- /dev/null +++ b/hazelcast/serialization/reliable_topic.py @@ -0,0 +1,27 @@ +from hazelcast.serialization.api import IdentifiedDataSerializable + + +class ReliableTopicMessage(IdentifiedDataSerializable): + FACTORY_ID = -18 + CLASS_ID = 2 + + def __init__(self, publish_time=None, publisher_address=None, payload=None): + self.publish_time = publish_time + self.publisher_address = publisher_address + self.payload = payload + + def read_data(self, object_data_input): + self.publish_time = object_data_input.read_long() + self.publisher_address = object_data_input.read_object() + self.payload = object_data_input.read_data() + + def write_data(self, object_data_output): + object_data_output.write_long(self.publish_time) + object_data_output.write_object(self.publisher_address) + object_data_output.write_data(self.payload) + + def get_factory_id(self): + return self.FACTORY_ID + + def get_class_id(self): + return self.CLASS_ID \ No newline at end of file diff --git a/tests/proxy/hazelcast_topic.xml b/tests/proxy/hazelcast_topic.xml new file mode 100644 index 0000000000..bdf5d31f33 --- /dev/null +++ b/tests/proxy/hazelcast_topic.xml @@ -0,0 +1,30 @@ + + + + + 3 + 10 + + + + 120 + 10 + + + + 0 + 10 + + + + 120 + 10 + + + + 120 + 10 + + \ No newline at end of file diff --git a/tests/proxy/reliable_topic_test.py b/tests/proxy/reliable_topic_test.py new file mode 100644 index 0000000000..d1aa44ce49 --- /dev/null +++ b/tests/proxy/reliable_topic_test.py @@ -0,0 +1,295 @@ +import os +import time +from datetime import datetime + +import hazelcast +from hazelcast import ClientConfig +from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY +from hazelcast.exception import IllegalArgumentError, TopicOverflowError +from hazelcast.proxy.reliable_topic import ReliableMessageListener +from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE +from hazelcast.serialization.reliable_topic import ReliableTopicMessage +from hazelcast.util import current_time_in_millis +from tests.base import SingleMemberTestCase +from tests.util import random_string, event_collector + + +class _ReliableTopicTestException(BaseException): + pass + + +class TestReliableMessageListener(ReliableMessageListener): + def __init__(self, collector): + self._collector = collector + + def on_message(self, event): + self._collector(event) + + +class ReliableTopicTest(SingleMemberTestCase): + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open(os.path.join(dir_path, "hazelcast_topic.xml")) as f: + return f.read() + + def setUp(self): + config = ClientConfig() + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + + discard_config = ReliableTopicConfig("discard") + discard_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_NEWEST + config.add_reliable_topic_config(discard_config) + + overwrite_config = ReliableTopicConfig("overwrite") + overwrite_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST + config.add_reliable_topic_config(overwrite_config) + + error_config = ReliableTopicConfig("error") + error_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.ERROR + config.add_reliable_topic_config(error_config) + + stale_config = ReliableTopicConfig("stale") + stale_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST + config.add_reliable_topic_config(stale_config) + + self.client = hazelcast.HazelcastClient(self.configure_client(config)) + self.reliable_topic = self.client.get_reliable_topic(random_string()).blocking() + self.registration_id = None + + def tearDown(self): + if self.registration_id is not None: + self.reliable_topic.remove_listener(self.registration_id) + + def test_add_listener(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + self.reliable_topic.publish('item-value') + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, 'item-value') + self.assertGreater(event.publish_time, 0) + + self.assertTrueEventually(assert_event, 5) + + def test_remove_listener(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + + reg_id = self.reliable_topic.add_listener(reliable_listener) + removed = self.reliable_topic.remove_listener(reg_id) + self.assertTrue(removed, True) + + def test_none_listener(self): + with self.assertRaises(IllegalArgumentError): + self.reliable_topic.add_listener("invalid-listener") + + def test_remove_listener_when_does_not_exist(self): + with self.assertRaises(IllegalArgumentError): + self.reliable_topic.remove_listener("id") + + def test_remove_listener_when_already_removed(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + + reg_id = self.reliable_topic.add_listener(reliable_listener) + self.reliable_topic.remove_listener(reg_id) + + with self.assertRaises(IllegalArgumentError): + self.reliable_topic.remove_listener(reg_id) + + def test_error_on_message_not_terminal(self): + collector = event_collector() + + class TestReliableMessageListenerNotTerminal(ReliableMessageListener): + def __init__(self, _collector): + self._collector = _collector + + def on_message(self, event): + if event.message == "raise-exception": + raise _ReliableTopicTestException("test-exception") + + self._collector(event) + + def is_terminal(self): + # TODO: Gereksiz? + return False + + reliable_listener = TestReliableMessageListenerNotTerminal(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('raise-exception') + self.reliable_topic.publish('work-normally') + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, 'work-normally') + self.assertGreater(event.publish_time, 0) + + self.assertTrueEventually(assert_event, 5) + + def test_error_on_message_terminal(self): + collector = event_collector() + + class TestReliableMessageListenerTerminal(ReliableMessageListener): + def __init__(self, _collector): + self._collector = _collector + + def on_message(self, event): + if event.message == "raise-exception": + raise _ReliableTopicTestException("test-exception") + + self._collector(event) + + def is_terminal(self): + return True + + reliable_listener = TestReliableMessageListenerTerminal(collector) + # This listener will be removed by the ReliableTopic implementation + self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('raise-exception') + self.reliable_topic.publish('work-normally') + time.sleep(0.5) + self.assertEqual(len(collector.events), 0) + + def test_error_on_message_terminal_exception(self): + collector = event_collector() + + class TestReliableMessageListenerTerminal(ReliableMessageListener): + def __init__(self, _collector): + self._collector = _collector + + def on_message(self, event): + if event.message == "raise-exception": + raise _ReliableTopicTestException("test-exception in on_message") + + self._collector(event) + + def is_terminal(self): + raise _ReliableTopicTestException("is_terminal failed") + + reliable_listener = TestReliableMessageListenerTerminal(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('raise-exception') + self.reliable_topic.publish('work-normally') + time.sleep(0.5) + self.assertEqual(len(collector.events), 0) + + def test_publish_many(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + for i in range(10): + self.reliable_topic.publish('message ' + str(i)) + + def assert_event(): + self.assertEqual(len(collector.events), 10) + + self.assertTrueEventually(assert_event, 10) + + def test_message_field_set_correctly(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + before_publish_time = current_time_in_millis() + time.sleep(0.1) + self.reliable_topic.publish('item-value') + time.sleep(0.1) + after_publish_time = current_time_in_millis() + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, 'item-value') + self.assertGreater(event.publish_time, before_publish_time) + self.assertLess(event.publish_time, after_publish_time) + + self.assertTrueEventually(assert_event, 5) + + def test_always_start_after_tail(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.reliable_topic.publish('1') + self.reliable_topic.publish('2') + self.reliable_topic.publish('3') + + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('4') + self.reliable_topic.publish('5') + self.reliable_topic.publish('6') + + def assert_event(): + self.assertEqual(len(collector.events), 3) + self.assertEqual(collector.events[0].message, "4") + self.assertEqual(collector.events[1].message, "5") + self.assertEqual(collector.events[2].message, "6") + + self.assertTrueEventually(assert_event, 5) + + def generate_items(self, n): + messages = [] + for i in range(n): + msg = ReliableTopicMessage( + publish_time=current_time_in_millis(), + publisher_address="", + payload=self.client.serialization_service.to_data(i+1) + ) + messages.append(msg) + + return messages + + def test_discard(self): + reliable_topic = self.client.get_reliable_topic("discard").blocking() + items = self.generate_items(10) + reliable_topic.ringbuffer.add_all(items, OVERFLOW_POLICY_FAIL) + + reliable_topic.publish(11) + seq = reliable_topic.ringbuffer.tail_sequence().result() + item = reliable_topic.ringbuffer.read_one(seq).result() + num = self.client.serialization_service.to_object(item.payload) + self.assertEqual(num, 10) + + def test_overwrite(self): + reliable_topic = self.client.get_reliable_topic("overwrite").blocking() + for i in range(10): + reliable_topic.publish(i+1) + + reliable_topic.publish(11) + seq = reliable_topic.ringbuffer.tail_sequence().result() + item = reliable_topic.ringbuffer.read_one(seq).result() + num = self.client.serialization_service.to_object(item.payload) + self.assertEqual(num, 11) + + def test_error(self): + reliable_topic = self.client.get_reliable_topic("error").blocking() + for i in range(10): + reliable_topic.publish(i+1) + + with self.assertRaises(TopicOverflowError): + reliable_topic.publish(11) + + def test_blocking(self): + reliable_topic = self.client.get_reliable_topic("blocking").blocking() + for i in range(10): + reliable_topic.publish(i+1) + + before = datetime.utcnow() + reliable_topic.publish(11) + time_diff = datetime.utcnow() - before + + seq = reliable_topic.ringbuffer.tail_sequence().result() + item = reliable_topic.ringbuffer.read_one(seq).result() + num = self.client.serialization_service.to_object(item.payload) + self.assertEqual(num, 11) + if time_diff.seconds <= 2: + self.fail("expected at least 2 seconds delay got %s" % time_diff.seconds) + From d44c3c15899d3658cd780d73ffbaed00f16aa120 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Tue, 26 May 2020 22:34:41 +0300 Subject: [PATCH 2/9] Client does not have _listener, but instead listener as the reference to the listener service --- hazelcast/proxy/__init__.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/hazelcast/proxy/__init__.py b/hazelcast/proxy/__init__.py index 2f509b8644..ec33e2e94b 100644 --- a/hazelcast/proxy/__init__.py +++ b/hazelcast/proxy/__init__.py @@ -83,3 +83,33 @@ def destroy_proxy(self, service_name, name, destroy_on_remote=True): def get_distributed_objects(self): return to_list(self._proxies.values()) +<<<<<<< HEAD +======= + + def add_distributed_object_listener(self, listener_func): + is_smart = self._client.config.network_config.smart_routing + request = client_add_distributed_object_listener_codec.encode_request(is_smart) + + def handle_distributed_object_event(**kwargs): + event = DistributedObjectEvent(**kwargs) + listener_func(event) + + def event_handler(client_message): + return client_add_distributed_object_listener_codec.handle(client_message, handle_distributed_object_event) + + def decode_add_listener(response): + return client_add_distributed_object_listener_codec.decode_response(response)["response"] + + def encode_remove_listener(registration_id): + return client_remove_distributed_object_listener_codec.encode_request(registration_id) + + return self._client.listener.register_listener(request, decode_add_listener, + encode_remove_listener, event_handler) + + def remove_distributed_object_listener(self, registration_id): + return self._client.listener.deregister_listener(registration_id) + + def _find_next_proxy_address(self): + # TODO: filter out lite members + return self._client.load_balancer.next_address() +>>>>>>> Client does not have _listener, but instead listener as the reference to the listener service From b020aecb65845117185506423269924290b3f4c1 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Tue, 26 May 2020 22:39:40 +0300 Subject: [PATCH 3/9] Remove leftovers and useless comments --- hazelcast/proxy/reliable_topic.py | 3 --- tests/proxy/reliable_topic_test.py | 1 - 2 files changed, 4 deletions(-) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index f1dbf87f16..c6a759d5c4 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -9,7 +9,6 @@ from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE from hazelcast.serialization.reliable_topic import ReliableTopicMessage from hazelcast.util import current_time_in_millis -from hazelcast.six.moves import queue _INITIAL_BACKOFF = 0.1 _MAX_BACKOFF = 2 @@ -84,7 +83,6 @@ def __init__(self, uuid, proxy, to_object, listener): self._cancelled_lock = threading.Lock() self._cancelled = False self._sequence = 0 - self._q = queue.Queue() def start(self): tail_seq = self._proxy.ringbuffer.tail_sequence() @@ -289,7 +287,6 @@ def publish(self, message): :param message: (object), the message to be published. """ - # TODO: We need a publisher_address? item = ReliableTopicMessage( publish_time=current_time_in_millis(), publisher_address="", diff --git a/tests/proxy/reliable_topic_test.py b/tests/proxy/reliable_topic_test.py index d1aa44ce49..25ed13a15e 100644 --- a/tests/proxy/reliable_topic_test.py +++ b/tests/proxy/reliable_topic_test.py @@ -116,7 +116,6 @@ def on_message(self, event): self._collector(event) def is_terminal(self): - # TODO: Gereksiz? return False reliable_listener = TestReliableMessageListenerNotTerminal(collector) From 4647b4aaa7acbd235b63490965a0b3a87917f930 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Tue, 26 May 2020 22:40:14 +0300 Subject: [PATCH 4/9] Correct documentation: ReliableTopicListener -> reliable_topic_listener --- hazelcast/proxy/reliable_topic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index c6a759d5c4..f227de8bbe 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -251,7 +251,7 @@ def add_listener(self, reliable_topic_listener): Subscribes to this reliable topic. When someone publishes a message on this topic, on_message() method of ReliableTopicListener is called. - :param ReliableTopicListener: (Class), class to be used when a message is published. + :param reliable_topic_listener: (Class), class to be used when a message is published. :return: (str), a registration id which is used as a key to remove the listener. """ if not isinstance(reliable_topic_listener, ReliableMessageListener): From 09525dc894b1aae7c100134c97be6ef778374dd1 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Sun, 31 May 2020 15:42:01 +0300 Subject: [PATCH 5/9] Prevent deadlock in error handler --- hazelcast/proxy/reliable_topic.py | 49 ++++++++++++++++++------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index f227de8bbe..5d3c79e9f3 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -93,27 +93,36 @@ def start(self): self._proxy.client.reactor.add_timer(0, self._next) def _handle_illegal_argument_error(self): - head_seq = self._proxy.ringbuffer.head_sequence().result() - self._proxy.logger.warning("MessageListener {} on topic {} requested a too large sequence. Jumping from old " - "sequence: {} to sequence: {}".format(self._id, self._proxy.name, self._sequence, - head_seq)) - self._sequence = head_seq - self._next() - - def _handle_stale_sequence_error(self): - head_seq = self._proxy.ringbuffer.head_sequence().result() - if self._listener.is_loss_tolerant: + def on_response(res): + head_seq = res.result() + self._proxy.logger.warning("MessageListener {} on topic {} requested a too large sequence. Jumping from " + "old sequence: {} to sequence: {}".format(self._id, self._proxy.name, + self._sequence, + head_seq)) self._sequence = head_seq - self._proxy.logger.warning("Topic {} ran into a stale sequence. Jumping from old sequence {} to new " - "sequence {}".format(self._proxy.name, self._sequence, head_seq)) self._next() - return True - self._proxy.logger.warning( - "Terminating Message Listener: {} on topic: {}. Reason: The listener was too slow or the retention " - "period of the message has been violated. Head: {}, sequence: {}".format(self._id, self._proxy.name, - head_seq, self._sequence)) - return False + future = self._proxy.ringbuffer.head_sequence() + future.add_done_callback(on_response) + + def _handle_stale_sequence_error(self): + def on_response(res): + head_seq = res.result() + if self._listener.is_loss_tolerant: + self._proxy.logger.warning("Topic {} ran into a stale sequence. Jumping from old sequence {} to new " + "sequence {}".format(self._proxy.name, self._sequence, head_seq)) + self._sequence = head_seq + self._next() + + else: + self._proxy.logger.warning( + "Terminating Message Listener: {} on topic: {}. Reason: The listener was too slow or the retention " + "period of the message has been violated. Head: {}, sequence: {}".format(self._id, self._proxy.name, + head_seq, self._sequence)) + self._cancel_and_remove_listener() + + future = self._proxy.ringbuffer.head_sequence() + future.add_done_callback(on_response) def _handle_operation_timeout_error(self): self._proxy.logger.info("Message Listener ", self._proxy.id, "on topic: ", self._proxy.name, " timed out. " + @@ -126,8 +135,8 @@ def _handle_exception(self, exception): self._handle_illegal_argument_error() return elif isinstance(exception, StaleSequenceError): - if self._handle_stale_sequence_error(): - return + self._handle_stale_sequence_error() + return elif isinstance(exception, OperationTimeoutError): self._handle_operation_timeout_error() return From c46a6b9cdc2d92e752136893c286f1541c3cd310 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Sun, 31 May 2020 15:43:11 +0300 Subject: [PATCH 6/9] Add new tests: test_client_not_active_error, test_distributed_object_destroyed_error, test_distributed_object_destroyed, test_stale --- tests/proxy/reliable_topic_test.py | 85 +++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/tests/proxy/reliable_topic_test.py b/tests/proxy/reliable_topic_test.py index 25ed13a15e..d6a92a6707 100644 --- a/tests/proxy/reliable_topic_test.py +++ b/tests/proxy/reliable_topic_test.py @@ -4,7 +4,7 @@ import hazelcast from hazelcast import ClientConfig -from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY +from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY, ClientProperties from hazelcast.exception import IllegalArgumentError, TopicOverflowError from hazelcast.proxy.reliable_topic import ReliableMessageListener from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE @@ -26,6 +26,17 @@ def on_message(self, event): self._collector(event) +class TestReliableMessageListenerLossTolerant(ReliableMessageListener): + def __init__(self, collector): + self._collector = collector + + def on_message(self, event): + self._collector(event) + + def is_loss_tolerant(self): + return True + + class ReliableTopicTest(SingleMemberTestCase): @classmethod def configure_cluster(cls): @@ -292,3 +303,75 @@ def test_blocking(self): if time_diff.seconds <= 2: self.fail("expected at least 2 seconds delay got %s" % time_diff.seconds) + def test_stale(self): + collector = event_collector() + self.reliable_topic = self.client.get_reliable_topic("stale").blocking() + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + items = self.generate_items(20) + self.reliable_topic.ringbuffer.add_all(items, overflow_policy=OVERFLOW_POLICY_OVERWRITE) + + def assert_event(): + self.assertEqual(len(collector.events), 10) + event = collector.events[9] + self.assertEqual(event.message, 20) + + self.assertTrueEventually(assert_event, 5) + + def test_distributed_object_destroyed(self): + config = ClientConfig() + config.network_config.connection_attempt_limit = 10 + config.set_property(ClientProperties.INVOCATION_TIMEOUT_SECONDS.name, 10) + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + + + client_two = hazelcast.HazelcastClient(self.configure_client(config)) + # TODO: shutdown + + collector = event_collector() + self.reliable_topic = client_two.get_reliable_topic("x") + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.rc.shutdownCluster(self.cluster.id) + self.cluster = self.create_cluster(self.rc, self.configure_cluster()) + self.cluster.start_member() + + self.reliable_topic.publish("aa") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, "aa") + + self.assertTrueEventually(assert_event, 5) + + def test_distributed_object_destroyed_error(self): + collector = event_collector() + self.reliable_topic = self.client.get_reliable_topic("differentReliableTopic") + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + self.reliable_topic.destroy() + + def assert_event(): + self.assertEqual(len(collector.events), 0) + + self.assertTrueEventually(assert_event, 5) + + def test_client_not_active_error(self): + config = ClientConfig() + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + client_two = hazelcast.HazelcastClient(self.configure_client(config)) + + collector = event_collector() + self.reliable_topic = self.client.get_reliable_topic("differentReliableTopic") + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + client_two.shutdown() + + def assert_event(): + self.assertEqual(len(collector.events), 0) + + self.assertTrueEventually(assert_event, 5) + From 773ab645e98cf34b03ca40dc25858bef6699dde0 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 21 Apr 2021 10:46:42 +0300 Subject: [PATCH 7/9] Implement ReliableTopic Implemented ReliableTopic on top of the Ringbuffer. Like the interface provided in the Java-side, we are offering methods to add/remove listeners and publish messages in batches or one by one. The provided API is non-blocking just like the other parts of the client. Documentation and code samples will be sent in a separate PR. --- docs/api/config.rst | 1 + docs/api/proxy/modules.rst | 1 + docs/api/proxy/reliable_topic.rst | 4 + examples/reliable-topic/__init__.py | 0 .../reliable-topic/reliable_topic_example.py | 29 - hazelcast/client.py | 18 + hazelcast/config.py | 110 +++ hazelcast/proxy/__init__.py | 30 - hazelcast/proxy/base.py | 39 +- hazelcast/proxy/reliable_topic.py | 910 +++++++++++++----- hazelcast/serialization/objects.py | 54 ++ hazelcast/serialization/reliable_topic.py | 27 - hazelcast/serialization/service.py | 8 +- .../proxy/hazelcast_topic.xml | 23 +- .../proxy/reliable_topic_test.py | 448 +++++++++ tests/proxy/reliable_topic_test.py | 377 -------- tests/unit/config_test.py | 52 + tests/unit/proxy/reliable_topic_test.py | 134 +++ 18 files changed, 1544 insertions(+), 721 deletions(-) create mode 100644 docs/api/proxy/reliable_topic.rst delete mode 100644 examples/reliable-topic/__init__.py delete mode 100644 examples/reliable-topic/reliable_topic_example.py create mode 100644 hazelcast/serialization/objects.py delete mode 100644 hazelcast/serialization/reliable_topic.py rename tests/{ => integration/backward_compatible}/proxy/hazelcast_topic.xml (59%) create mode 100644 tests/integration/backward_compatible/proxy/reliable_topic_test.py delete mode 100644 tests/proxy/reliable_topic_test.py create mode 100644 tests/unit/proxy/reliable_topic_test.py diff --git a/docs/api/config.rst b/docs/api/config.rst index d00061b3cd..acb5d960a1 100644 --- a/docs/api/config.rst +++ b/docs/api/config.rst @@ -11,3 +11,4 @@ Config .. autoclass:: UniqueKeyTransformation .. autoclass:: IndexType .. autoclass:: ReconnectMode +.. autoclass:: TopicOverloadPolicy diff --git a/docs/api/proxy/modules.rst b/docs/api/proxy/modules.rst index 5c14a87642..59b4bd019c 100644 --- a/docs/api/proxy/modules.rst +++ b/docs/api/proxy/modules.rst @@ -12,6 +12,7 @@ Hazelcast Proxies multi_map queue pn_counter + reliable_topic replicated_map ringbuffer set diff --git a/docs/api/proxy/reliable_topic.rst b/docs/api/proxy/reliable_topic.rst new file mode 100644 index 0000000000..84bb7f4a11 --- /dev/null +++ b/docs/api/proxy/reliable_topic.rst @@ -0,0 +1,4 @@ +ReliableTopic +============= + +.. automodule:: hazelcast.proxy.reliable_topic diff --git a/examples/reliable-topic/__init__.py b/examples/reliable-topic/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/examples/reliable-topic/reliable_topic_example.py b/examples/reliable-topic/reliable_topic_example.py deleted file mode 100644 index 842227c36d..0000000000 --- a/examples/reliable-topic/reliable_topic_example.py +++ /dev/null @@ -1,29 +0,0 @@ -import time - -import hazelcast -from hazelcast import ClientConfig -from hazelcast.proxy.reliable_topic import ReliableMessageListener - - -class MyReliableMessageListener(ReliableMessageListener): - def on_message(self, event): - print("Got message: {}".format(event.message)) - print("Publish time: {}\n".format(event.publish_time)) - - -if __name__ == "__main__": - config = ClientConfig() - config.set_property("hazelcast.serialization.input.returns.bytearray", True) - client = hazelcast.HazelcastClient(config) - listener = MyReliableMessageListener() - - reliable_topic = client.get_reliable_topic("reliable-topic") - registration_id = reliable_topic.add_listener(listener) - - for i in range(10): - reliable_topic.publish("Message " + str(i)) - time.sleep(0.1) - - reliable_topic.destroy() - reliable_topic.remove_listener(registration_id) - client.shutdown() diff --git a/hazelcast/client.py b/hazelcast/client.py index c57daed94b..3fa3399589 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -268,6 +268,20 @@ class SomeClassSerializer(StreamSerializer): it will be much out of order. If you don't care about ordering, set this value to ``0`` for unlimited ID validity. + reliable_topics (dict[str, dict[str, any]]): Dictionary of reliable + topic names and the corresponding reliable topic configurations as + a dictionary. The reliable topic configurations contains the + following options. When an option is missing from the + configuration, it will be set to its default value. + + - **overload_policy** (int|str): Policy to handle an overloaded + topic. By default, set to ``BLOCK``. See the + :class:`hazelcast.config.TopicOverloadPolicy` for possible values. + - **read_batch_size** (int): Number of messages the reliable topic + will try to read in batch. It will get at least one, but if + there are more available, then it will try to get more to + increase throughput. By default, set to ``10``. + labels (`list[str]`): Labels for the client to be sent to the cluster. heartbeat_interval (float): Time interval between the heartbeats sent by the client to the member nodes in seconds. By default, set to ``5.0``. @@ -379,6 +393,7 @@ def __init__(self, **kwargs): def _init_context(self): self._context.init_context( + self, self._config, self._invocation_service, self._internal_partition_service, @@ -696,6 +711,7 @@ class _ClientContext(object): """ def __init__(self): + self.client = None self.config = None self.invocation_service = None self.partition_service = None @@ -712,6 +728,7 @@ def __init__(self): def init_context( self, + client, config, invocation_service, partition_service, @@ -726,6 +743,7 @@ def init_context( proxy_session_manager, reactor, ): + self.client = client self.config = config self.invocation_service = invocation_service self.partition_service = partition_service diff --git a/hazelcast/config.py b/hazelcast/config.py index bc02f315fe..5d28b44777 100644 --- a/hazelcast/config.py +++ b/hazelcast/config.py @@ -204,6 +204,52 @@ class ReconnectMode(object): """ +class TopicOverloadPolicy(object): + """A policy to deal with an overloaded topic; so topic where there is no + place to store new messages. + + The reliable topic uses a :class:`hazelcast.proxy.ringbuffer.Ringbuffer` to + store the messages. A ringbuffer doesn't track where readers are, so + it has no concept of a slow consumers. This provides many advantages like + high performance reads, but it also gives the ability to the reader to + re-read the same message multiple times in case of an error. + + A ringbuffer has a limited, fixed capacity. A fast producer may overwrite + old messages that are still being read by a slow consumer. To prevent + this, we may configure a time-to-live on the ringbuffer. + + Once the time-to-live is configured, the :class:`TopicOverloadPolicy` + controls how the publisher is going to deal with the situation that a + ringbuffer is full and the oldest item in the ringbuffer is not old + enough to get overwritten. + + Keep in mind that this retention period (time-to-live) can keep messages + from being overwritten, even though all readers might have already completed + reading. + """ + + DISCARD_OLDEST = 0 + """Using this policy, a message that has not expired can be overwritten. + + No matter the retention period set, the overwrite will just overwrite + the item. + + This can be a problem for slow consumers because they were promised a + certain time window to process messages. But it will benefit producers + and fast consumers since they are able to continue. This policy sacrifices + the slow producer in favor of fast producers/consumers. + """ + + DISCARD_NEWEST = 1 + """The message that was to be published is discarded.""" + + BLOCK = 2 + """The caller will wait till there space in the ringbuffer.""" + + ERROR = 3 + """The publish call immediately fails.""" + + class BitmapIndexOptions(object): __slots__ = ("_unique_key", "_unique_key_transformation") @@ -513,6 +559,7 @@ class _Config(object): "_membership_listeners", "_lifecycle_listeners", "_flake_id_generators", + "_reliable_topics", "_labels", "_heartbeat_interval", "_heartbeat_timeout", @@ -563,6 +610,7 @@ def __init__(self): self._membership_listeners = [] self._lifecycle_listeners = [] self._flake_id_generators = {} + self._reliable_topics = {} self._labels = [] self._heartbeat_interval = _DEFAULT_HEARTBEAT_INTERVAL self._heartbeat_timeout = _DEFAULT_HEARTBEAT_TIMEOUT @@ -1083,6 +1131,27 @@ def flake_id_generators(self, value): else: raise TypeError("flake_id_generators must be a dict") + @property + def reliable_topics(self): + return self._reliable_topics + + @reliable_topics.setter + def reliable_topics(self, value): + if isinstance(value, dict): + configs = {} + for name, config in six.iteritems(value): + if not isinstance(name, six.string_types): + raise TypeError("Keys of reliable_topics must be strings") + + if not isinstance(config, dict): + raise TypeError("Values of reliable_topics must be dict") + + configs[name] = _ReliableTopicConfig.from_dict(config) + + self._reliable_topics = configs + else: + raise TypeError("reliable_topics must be a dict") + @property def labels(self): return self._labels @@ -1404,3 +1473,44 @@ def from_dict(cls, d): "Unrecognized config option for the flake id generator: %s" % k ) return config + + +class _ReliableTopicConfig(object): + __slots__ = ("_read_batch_size", "_overload_policy") + + def __init__(self): + self._read_batch_size = 10 + self._overload_policy = TopicOverloadPolicy.BLOCK + + @property + def read_batch_size(self): + return self._read_batch_size + + @read_batch_size.setter + def read_batch_size(self, value): + if isinstance(value, number_types): + if value <= 0: + raise ValueError("read_batch_size must be positive") + self._read_batch_size = value + else: + raise TypeError("read_batch_size must be a number") + + @property + def overload_policy(self): + return self._overload_policy + + @overload_policy.setter + def overload_policy(self, value): + self._overload_policy = try_to_get_enum_value(value, TopicOverloadPolicy) + + @classmethod + def from_dict(cls, d): + config = cls() + for k, v in six.iteritems(d): + try: + config.__setattr__(k, v) + except AttributeError: + raise InvalidConfigurationError( + "Unrecognized config option for the reliable topic: %s" % k + ) + return config diff --git a/hazelcast/proxy/__init__.py b/hazelcast/proxy/__init__.py index ec33e2e94b..2f509b8644 100644 --- a/hazelcast/proxy/__init__.py +++ b/hazelcast/proxy/__init__.py @@ -83,33 +83,3 @@ def destroy_proxy(self, service_name, name, destroy_on_remote=True): def get_distributed_objects(self): return to_list(self._proxies.values()) -<<<<<<< HEAD -======= - - def add_distributed_object_listener(self, listener_func): - is_smart = self._client.config.network_config.smart_routing - request = client_add_distributed_object_listener_codec.encode_request(is_smart) - - def handle_distributed_object_event(**kwargs): - event = DistributedObjectEvent(**kwargs) - listener_func(event) - - def event_handler(client_message): - return client_add_distributed_object_listener_codec.handle(client_message, handle_distributed_object_event) - - def decode_add_listener(response): - return client_add_distributed_object_listener_codec.decode_response(response)["response"] - - def encode_remove_listener(registration_id): - return client_remove_distributed_object_listener_codec.encode_request(registration_id) - - return self._client.listener.register_listener(request, decode_add_listener, - encode_remove_listener, event_handler) - - def remove_distributed_object_listener(self, registration_id): - return self._client.listener.deregister_listener(registration_id) - - def _find_next_proxy_address(self): - # TODO: filter out lite members - return self._client.load_balancer.next_address() ->>>>>>> Client does not have _listener, but instead listener as the reference to the listener service diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index dc00acff77..dcc4768dac 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -266,26 +266,45 @@ def __repr__(self): ) +_SENTINEL = object() + + class TopicMessage(object): - """Topic message. + """Topic message.""" - Attributes: - name (str): Name of the proxy that fired the event. - publish_time (int): UNIX time that the event is published as seconds. - member (hazelcast.core.MemberInfo): Member that fired the event. - """ + __slots__ = ("_name", "_message_data", "_message", "_publish_time", "_member", "_to_object") def __init__(self, name, message_data, publish_time, member, to_object): - self.name = name + self._name = name self._message_data = message_data - self.publish_time = publish_time - self.member = member + self._message = _SENTINEL + self._publish_time = publish_time + self._member = member self._to_object = to_object + @property + def name(self): + """str: Name of the proxy that fired the event.""" + return self._name + + @property + def publish_time(self): + """int: UNIX time that the event is published as seconds.""" + return self._publish_time + + @property + def member(self): + """hazelcast.core.MemberInfo: Member that fired the event.""" + return self._member + @property def message(self): """The message sent to Topic.""" - return self._to_object(self._message_data) + if self._message is not _SENTINEL: + return self._message + + self._message = self._to_object(self._message_data) + return self._message def get_entry_listener_flags(**kwargs): diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index 5d3c79e9f3..3649cbfd83 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -1,337 +1,781 @@ +import logging import time -import threading from uuid import uuid4 -from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY -from hazelcast.exception import IllegalArgumentError, TopicOverflowError, HazelcastInstanceNotActiveError, \ - HazelcastClientNotActiveException, DistributedObjectDestroyedError, StaleSequenceError, OperationTimeoutError +from hazelcast import six +from hazelcast.config import _ReliableTopicConfig, TopicOverloadPolicy +from hazelcast.core import MemberInfo, MemberVersion +from hazelcast.errors import ( + OperationTimeoutError, + IllegalArgumentError, + HazelcastClientNotActiveError, + ClientOfflineError, + HazelcastInstanceNotActiveError, + DistributedObjectDestroyedError, + TopicOverloadError, +) +from hazelcast.future import ImmediateFuture, Future from hazelcast.proxy.base import Proxy, TopicMessage from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE -from hazelcast.serialization.reliable_topic import ReliableTopicMessage -from hazelcast.util import current_time_in_millis +from hazelcast.serialization.objects import ReliableTopicMessage +from hazelcast.util import check_not_none _INITIAL_BACKOFF = 0.1 -_MAX_BACKOFF = 2 +_MAX_BACKOFF = 2.0 + +_RINGBUFFER_PREFIX = "_hz_rb_" + +_UNKNOWN_MEMBER_VERSION = MemberVersion(0, 0, 0) + +_logger = logging.getLogger(__name__) class ReliableMessageListener(object): - def on_message(self, item): + """A message listener for :class:`ReliableTopic`. + + A message listener will not be called concurrently (provided that it's + not registered twice). So there is no need to synchronize access to + the state it reads or writes. + + If a regular function is registered on a reliable topic, the message + listener works fine, but it can't do much more than listen to messages. + + This is an enhanced version of that to better integrate with the reliable + topic. + + **Durable Subscription** + + The ReliableMessageListener allows you to control where you want to start + processing a message when the listener is registered. This makes it + possible to create a durable subscription by storing the sequence of the + last message and using this as the sequence id to start from. + + **Error handling** + + The ReliableMessageListener also gives the ability to deal with errors + using the :func:`is_terminal` method. If a plain function is used, then it + won't terminate on errors and it will keep on running. But in some + cases it is better to stop running. + + **Global order** + + The ReliableMessageListener will always get all events in order (global + order). It will not get duplicates and there will only be gaps if it is + too slow. For more information see :func:`is_loss_tolerant`. + + **Delivery guarantees** + + Because the ReliableMessageListener controls which item it wants to + continue from upon restart, it is very easy to provide an at-least-once + or at-most-once delivery guarantee. The :func:`store_sequence` is always + called before a message is processed; so it can be persisted on some + non-volatile storage. When the :func:`retrieve_initial_sequence` returns + the stored sequence, then an at-least-once delivery is implemented since + the same item is now being processed twice. To implement an at-most-once + delivery guarantee, add 1 to the stored sequence when the + :func:`retrieve_initial_sequence` is called. + """ + + def on_message(self, message): """ Invoked when a message is received for the added reliable topic. - :param: message the message that is received for the added reliable topic + One should not block in this callback. If blocking is necessary, + consider delegating that task to an executor or a thread pool. + + + Args: + message (hazelcast.proxy.base.TopicMessage): The message that + is received for the topic """ - raise NotImplementedError + raise NotImplementedError("on_message") def retrieve_initial_sequence(self): """ Retrieves the initial sequence from which this ReliableMessageListener should start. - Return -1 if there is no initial sequence and you want to start + Return ``-1`` if there is no initial sequence and you want to start from the next published message. If you intend to create a durable subscriber so you continue from where - you stopped the previous time, load the previous sequence and add 1. + you stopped the previous time, load the previous sequence and add ``1``. If you don't add one, then you will be receiving the same message twice. - :return: (int), the initial sequence + Returns: + int: The initial sequence. """ - return -1 + raise NotImplementedError("retrieve_initial_sequence") def store_sequence(self, sequence): - """" + """ " Informs the ReliableMessageListener that it should store the sequence. This method is called before the message is processed. Can be used to make a durable subscription. - :param: (int) ``sequence`` the sequence + Args: + sequence (int): The sequence. """ - pass + raise NotImplementedError("store_sequence") def is_loss_tolerant(self): """ Checks if this ReliableMessageListener is able to deal with message loss. Even though the reliable topic promises to be reliable, it can be that a - MessageListener is too slow. Eventually the message won't be available - anymore. + ReliableMessageListener is too slow. Eventually the message won't be + available anymore. If the ReliableMessageListener is not loss tolerant and the topic detects that there are missing messages, it will terminate the ReliableMessageListener. - :return: (bool) ``True`` if the ReliableMessageListener is tolerant towards losing messages. + Returns: + bool: ``True`` if the ReliableMessageListener is tolerant towards + losing messages. """ - return False + raise NotImplementedError("is_loss_tolerant") - def is_terminal(self): + def is_terminal(self, error): """ Checks if the ReliableMessageListener should be terminated based on an - exception thrown while calling on_message. - - :return: (bool) ``True` if the ReliableMessageListener should terminate itself, ``False`` if it should keep on running. - """ - raise False + error raised while calling :func:`on_message`. + Args: + error (Exception): The error raised while calling + :func:`on_message` -class _MessageListener(object): - def __init__(self, uuid, proxy, to_object, listener): - self._id = uuid - self._proxy = proxy - self._to_object = to_object + Returns: + bool: ``True`` if the ReliableMessageListener should terminate itself, + ``False`` if it should keep on running. + """ + raise NotImplementedError("is_terminal") + + +class _MessageRunner(object): + def __init__( + self, + registration_id, + listener, + ringbuffer, + topic_name, + read_batch_size, + to_object, + runners, + ): + self._registration_id = registration_id self._listener = listener - self._cancelled_lock = threading.Lock() + self._ringbuffer = ringbuffer + self._topic_name = topic_name + self._read_batch_size = read_batch_size + self._to_object = to_object + self._runners = runners + self._sequence = listener.retrieve_initial_sequence() self._cancelled = False - self._sequence = 0 def start(self): - tail_seq = self._proxy.ringbuffer.tail_sequence() - initial_seq = self._listener.retrieve_initial_sequence() - if initial_seq == -1: - initial_seq = tail_seq.result() + 1 - self._sequence = initial_seq - self._proxy.client.reactor.add_timer(0, self._next) - - def _handle_illegal_argument_error(self): - def on_response(res): - head_seq = res.result() - self._proxy.logger.warning("MessageListener {} on topic {} requested a too large sequence. Jumping from " - "old sequence: {} to sequence: {}".format(self._id, self._proxy.name, - self._sequence, - head_seq)) - self._sequence = head_seq - self._next() - - future = self._proxy.ringbuffer.head_sequence() - future.add_done_callback(on_response) - - def _handle_stale_sequence_error(self): - def on_response(res): - head_seq = res.result() - if self._listener.is_loss_tolerant: - self._proxy.logger.warning("Topic {} ran into a stale sequence. Jumping from old sequence {} to new " - "sequence {}".format(self._proxy.name, self._sequence, head_seq)) - self._sequence = head_seq - self._next() + """Starts the message runner by checking the given sequence. - else: - self._proxy.logger.warning( - "Terminating Message Listener: {} on topic: {}. Reason: The listener was too slow or the retention " - "period of the message has been violated. Head: {}, sequence: {}".format(self._id, self._proxy.name, - head_seq, self._sequence)) - self._cancel_and_remove_listener() - - future = self._proxy.ringbuffer.head_sequence() - future.add_done_callback(on_response) - - def _handle_operation_timeout_error(self): - self._proxy.logger.info("Message Listener ", self._proxy.id, "on topic: ", self._proxy.name, " timed out. " + - "Continuing from the last known sequence ", self._proxy.sequence) - self._next() - - def _handle_exception(self, exception): - base_msg = "Terminating Message Listener: " + self._id + " on topic: " + self._proxy.name + ". Reason: " - if isinstance(exception, IllegalArgumentError) and self._listener.is_loss_tolerant(): - self._handle_illegal_argument_error() - return - elif isinstance(exception, StaleSequenceError): - self._handle_stale_sequence_error() - return - elif isinstance(exception, OperationTimeoutError): - self._handle_operation_timeout_error() + If the user provided a initial sequence via listener, we will + use it as it is. If not, we will ask server to get the tail + sequence and use it. + + Returns: + hazelcast.future.Future[None]: + """ + if self._sequence != -1: + # User provided a sequence to start from + return ImmediateFuture(None) + + def continuation(future): + sequence = future.result() + self._sequence = sequence + 1 + + # We are going to listen to next publication. + # We don't care about what already has been published. + return self._ringbuffer.tail_sequence().continue_with(continuation) + + def next_batch(self): + """Tries to read the next batch from the ringbuffer + and call the listener on items when it is done. + """ + if self._cancelled: return - elif isinstance(exception, HazelcastInstanceNotActiveError): - self._proxy.logger.info(base_msg + "HazelcastInstance is shutting down.") - elif isinstance(exception, HazelcastClientNotActiveException): - self._proxy.logger.info(base_msg + "HazelcastClient is shutting down.") - elif isinstance(exception, DistributedObjectDestroyedError): - self._proxy.logger.info(base_msg + "ReliableTopic is destroyed.") - else: - self._proxy.logger.warning(base_msg + "Unhandled error, message: " + str(exception)) - self._cancel_and_remove_listener() + self._ringbuffer.read_many(self._sequence, 1, self._read_batch_size).add_done_callback( + self._handle_next_batch + ) + + def cancel(self): + """Sets the cancelled flag and removes + the runner registration. + """ + self._cancelled = True + self._runners.pop(self._registration_id, None) + + def _handle_next_batch(self, future): + """Handles the result of the read_many request from + the ringbuffer. - def _terminate(self, exception): - with self._cancelled_lock: - if self._cancelled: - return True + Args: + future (hazelcast.future.Future): + """ + if self._cancelled: + return - base_msg = "Terminating Message Listener: {} on topic: {}. Reason: ".format(self._id, self._proxy.name) try: - terminate = self._listener.is_terminal() - if terminate: - self._proxy.logger.warning(base_msg + "Unhandled error: {}".format(str(exception))) - return True + result = future.result() - self._proxy.logger.warning("MessageListener {} on topic: {} ran into an error: {}". - format(self._id, self._proxy.name, str(exception))) - return False + # Check if there are any messages lost since the last read + # and whether or not the listener can tolerate that. + lost_count = (result.next_sequence_to_read_from - result.read_count) - self._sequence + if lost_count != 0 and not self._is_loss_tolerable(lost_count): + self.cancel() + return + + # Call the listener for each item read. + for i in range(result.size): + try: + message = result[i] + self._listener.store_sequence(result.get_sequence(i)) + + topic_message = TopicMessage( + self._topic_name, + message.payload, + message.publish_time, + MemberInfo( + message.publisher_address, None, None, False, _UNKNOWN_MEMBER_VERSION + ), + self._to_object, + ) + self._listener.on_message(topic_message) + except Exception as e: + if self._terminate(e): + self.cancel() + return + self._sequence = result.next_sequence_to_read_from + self.next_batch() except Exception as e: - self._proxy.logger.warning(base_msg + "Unhandled error while calling ReliableMessageListener.is_terminal() " - "method: {}".format(str(e))) + # read_many request failed. + if not self._handle_internal_error(e): + self.cancel() + + def _is_loss_tolerable(self, loss_count): + """Called when message loss is detected. + + Checks if the listener is able to tolerate the loss. + + Args: + loss_count (int): Number of lost messages. + + Returns: + bool: ``True`` if the listener may continue reading. + """ + if self._listener.is_loss_tolerant(): + _logger.debug( + "MessageListener %s on topic %s lost %s messages.", + self._listener, + self._topic_name, + loss_count, + ) return True - def _process(self, msg): - try: - self._listener.on_message(msg) - except BaseException as e: - if self._terminate(e): - self._cancel_and_remove_listener() + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: The listener was too slow or the retention period of the message has been violated. " + "%s messages lost.", + self._listener, + self._topic_name, + loss_count, + ) + return False - def _on_response(self, res): - try: - for message in res.result(): - with self._cancelled_lock: - if self._cancelled: - return + def _terminate(self, error): + """Checks if we should terminate the listener + based on the error we received while calling the + on_message for this message. - msg = TopicMessage( - self._proxy.name, - message.payload, - message.publish_time, - message.publisher_address, - self._to_object - ) - self._listener.store_sequence(self._sequence) - self._process(msg) - self._sequence += 1 + If the listener says that it should be terminated + for this error or it raises some error while + we were trying to call is_terminal, the listener + will be terminated. Otherwise, a log will be + printed and listener will continue. + + Args: + error (Exception): Error we received while + calling the listener. + + Returns: + bool: Should terminate the listener or not. + """ + if self._cancelled: + return True - # Await for new messages - self._next() + try: + terminate = self._listener.is_terminal(error) + if terminate: + _logger.warning( + "Terminating MessageListener %s on topic %s. Reason: Unhandled exception.", + self._listener, + self._topic_name, + exc_info=error, + ) + else: + _logger.debug( + "MessageListener %s on topic %s ran into an error.", + self._listener, + self._topic_name, + exc_info=error, + ) + return terminate except Exception as e: - self._handle_exception(e) + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: Unhandled exception while calling is_terminal method", + self._listener, + self._topic_name, + exc_info=e, + ) + return True - def _next(self): - def _read_many(): - with self._cancelled_lock: - if self._cancelled: - return + def _handle_internal_error(self, error): + """Called when the read_many request is failed. - future = self._proxy.ringbuffer.read_many(self._sequence, 1, self._proxy.config.read_batch_size) - future.continue_with(self._on_response) + Based on the error we receive, we will act differently. - self._proxy.client.reactor.add_timer(0, _read_many) + If we can tolerate the error, we will call next_batch + here. The reasoning behind is that, on some cases, we + do not immediately call next_batch, but make a request + to the server, and based on that, call next_batch. - def cancel(self): - with self._cancelled_lock: - self._cancelled = True + Args: + error (Exception): The error we received. - def _cancel_and_remove_listener(self): - try: - # _proxy.remove_listener calls listener.cancel function - self._proxy.remove_listener(self._id) - except IllegalArgumentError as e: - # This listener is already removed - self._proxy.logger.debug("Failed to remove listener. Reason: {}".format(str(e))) + Returns: + bool: ``True`` if the error is handled internally. + ``False`` otherwise. When, ``False`` is returned, + listener should be cancelled. + """ + if isinstance(error, HazelcastClientNotActiveError): + return self._handle_client_not_active_error() + elif isinstance(error, ClientOfflineError): + return self._handle_client_offline_error() + elif isinstance(error, OperationTimeoutError): + return self._handle_timeout_error() + elif isinstance(error, IllegalArgumentError): + return self._handle_illegal_argument_error(error) + elif isinstance(error, HazelcastInstanceNotActiveError): + return self._handle_instance_not_active_error() + elif isinstance(error, DistributedObjectDestroyedError): + return self._handle_distributed_object_destroyed_error() + else: + return self._handle_generic_error(error) + + def _handle_generic_error(self, error): + # Received an error we do not expect. + _logger.warning( + "Terminating MessageListener %s on topic %s. Reason: Unhandled exception.", + self._listener, + self._topic_name, + exc_info=error, + ) + return False + + def _handle_distributed_object_destroyed_error(self): + # Underlying ringbuffer is destroyed. It should only + # happen when the user destroys the reliable topic + # associated with it. + _logger.debug( + "Terminating MessageListener %s on topic %s. Reason: Topic is destroyed.", + self._listener, + self._topic_name, + ) + return False + def _handle_instance_not_active_error(self): + # This error should be received from the server. + # We do not throw it anywhere on the client. + _logger.debug( + "Terminating MessageListener %s on topic %s. Reason: Server is shutting down.", + self._listener, + self._topic_name, + ) + return False -class ReliableTopic(Proxy): + def _handle_client_offline_error(self): + # Client is reconnecting to cluster. + _logger.debug( + "MessageListener %s on topic %s got error. " + "Continuing from the last known sequence %s.", + self._listener, + self._topic_name, + self._sequence, + ) + self.next_batch() + return True + + def _handle_client_not_active_error(self): + # Client#shutdown is called. + _logger.debug( + "Terminating MessageListener %s on topic %s. Reason: Client is shutting down.", + self._listener, + self._topic_name, + ) + return False + + def _handle_timeout_error(self): + # read_many invocation to the server timed out. + _logger.debug( + "MessageListener %s on topic %s timed out. " + "Continuing from the last known sequence %s.", + self._listener, + self._topic_name, + self._sequence, + ) + self.next_batch() + return True + + def _handle_illegal_argument_error(self, error): + # Server sends this when it detects data loss + # on the underlying ringbuffer. + if self._listener.is_loss_tolerant(): + # Listener can tolerate message loss. Try + # to continue reading after getting head + # sequence, and try to read from there. + def on_response(future): + try: + head_sequence = future.result() + _logger.debug( + "MessageListener %s on topic %s requested a too large sequence. " + "Jumping from old sequence %s to sequence %s.", + self._listener, + self._topic_name, + self._sequence, + head_sequence, + exc_info=error, + ) + self._sequence = head_sequence + # We call next_batch only after getting the new head + # sequence and updating the our state with it. + self.next_batch() + except Exception as e: + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: After the ring buffer data related " + "to reliable topic is lost, client tried to get the " + "current head sequence to continue since the listener" + "is loss tolerant, but that request failed.", + self._listener, + self._topic_name, + exc_info=e, + ) + # We said that we can handle that error so the listener + # is not cancelled. But, we could not continue since + # our request to the server is failed. We should cancel + # the listener. + self.cancel() + + self._ringbuffer.head_sequence().add_done_callback(on_response) + return True + + _logger.warning( + "Terminating MessageListener %s on topic %s. " + "Reason: Underlying ring buffer data related to reliable topic is lost.", + self._listener, + self._topic_name, + ) + + return False + + +class _ReliableMessageListenerAdapter(ReliableMessageListener): + """Used when the user provided a function as the listener. + + That means user does not care about the other properties + of the listener. They just want to listen messages. Fill + the methods with expected defaults. """ - Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subscribers, which - is also known as a publish/subscribe (pub/sub) messaging model. Publish and subscriptions are cluster-wide. When a - member subscribes for a topic, it is actually registering for messages published by any member in the cluster, - including the new members joined after you added the listener. - Messages are ordered, meaning that listeners(subscribers) will process the messages in the order they are actually - published. + def __init__(self, on_message_fn): + self._on_message_fn = on_message_fn + + def on_message(self, message): + self._on_message_fn(message) + + def retrieve_initial_sequence(self): + # -1 indicates start from next message. + return -1 + + def store_sequence(self, sequence): + # no-op + pass - Hazelcast's Reliable Topic uses the same Topic interface as a regular topic. The main difference is that Reliable - Topic is backed up by the Ringbuffer data structure, a replicated but not partitioned data structure that stores + def is_loss_tolerant(self): + # terminate the listener on message loss + return False + + def is_terminal(self, error): + # do not terminate the listener or errors + return False + + +def _no_op_continuation(future): + # Used when we just care about whether + # the ringbuffer request is failed. We just + # check the result and return nothing. + future.result() + + +class ReliableTopic(Proxy): + """Hazelcast provides distribution mechanism for publishing messages that + are delivered to multiple subscribers, which is also known as a + publish/subscribe (pub/sub) messaging model. Publish and subscriptions are + cluster-wide. When a member subscribes for a topic, it is actually + registering for messages published by any member in the cluster, including + the new members joined after you added the listener. + + Messages are ordered, meaning that listeners(subscribers) will process the + messages in the order they are actually published. + + Hazelcast's Reliable Topic uses the same Topic interface as a regular topic. + The main difference is that Reliable Topic is backed up by the Ringbuffer + data structure, a replicated but not partitioned data structure that stores its data in a ring-like structure. """ - def __init__(self, client, service_name, name): - super(ReliableTopic, self).__init__(client, service_name, name) + def __init__(self, service_name, name, context): + super(ReliableTopic, self).__init__(service_name, name, context) - config = client.config.reliable_topic_configs.get(name, None) + config = context.config.reliable_topics.get(name, None) if config is None: - config = ReliableTopicConfig() + config = _ReliableTopicConfig() - self.client = client - self.config = config - self._topic_overload_policy = self.config.topic_overload_policy - self.ringbuffer = client.get_ringbuffer("_hz_rb_" + name) - self._message_listeners_lock = threading.RLock() - self._message_listeners = {} + self._config = config + self._context = context + self._ringbuffer = context.client.get_ringbuffer(_RINGBUFFER_PREFIX + name) + self._runners = {} - def add_listener(self, reliable_topic_listener): - """ - Subscribes to this reliable topic. When someone publishes a message on this topic, on_message() method of - ReliableTopicListener is called. + def publish(self, message): + """Publishes the message to all subscribers of this topic. + + Args: + message: The message. - :param reliable_topic_listener: (Class), class to be used when a message is published. - :return: (str), a registration id which is used as a key to remove the listener. + Returns: + hazelcast.future.Future[None]: """ - if not isinstance(reliable_topic_listener, ReliableMessageListener): - raise IllegalArgumentError("Message listener is not an instance of ReliableTopicListener") + check_not_none(message, "Message cannot be None") - registration_id = str(uuid4()) - listener = _MessageListener(registration_id, self, self._to_object, reliable_topic_listener) - with self._message_listeners_lock: - self._message_listeners[registration_id] = listener - - listener.start() - return registration_id - - def _add_with_backoff(self, item): - sleep_time = _INITIAL_BACKOFF - while True: - seq_id = self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() - if seq_id != -1: - return - time.sleep(sleep_time) - sleep_time *= 2 - if sleep_time > _MAX_BACKOFF: - sleep_time = _MAX_BACKOFF + payload = self._to_data(message) + topic_message = ReliableTopicMessage(time.time(), None, payload) - def _add_or_fail(self, item): - seq_id = self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() - if seq_id == -1: - raise TopicOverflowError("failed to publish message to topic: " + self.name) + overload_policy = self._config.overload_policy + if overload_policy == TopicOverloadPolicy.BLOCK: + return self._add_with_backoff(topic_message) + elif overload_policy == TopicOverloadPolicy.ERROR: + return self._add_or_fail(topic_message) + elif overload_policy == TopicOverloadPolicy.DISCARD_OLDEST: + return self._add_or_overwrite(topic_message) + elif overload_policy == TopicOverloadPolicy.DISCARD_NEWEST: + return self._add_or_discard(topic_message) - def publish(self, message): + def publish_all(self, messages): + """Publishes all messages to all subscribers of this topic. + + Args: + messages (list): Messages to publish. + + Returns: + hazelcast.future.Future[None]: """ - Publishes the message to all subscribers of this topic + check_not_none(messages, "Messages cannot be None") + + topic_messages = [] + + for message in messages: + check_not_none(message, "Message cannot be None") + payload = self._to_data(message) + topic_messages.append(ReliableTopicMessage(time.time(), None, payload)) + + overload_policy = self._config.overload_policy + if overload_policy == TopicOverloadPolicy.BLOCK: + return self._add_messages_with_backoff(topic_messages) + elif overload_policy == TopicOverloadPolicy.ERROR: + return self._add_messages_or_fail(topic_messages) + elif overload_policy == TopicOverloadPolicy.DISCARD_OLDEST: + return self._add_messages_or_overwrite(topic_messages) + elif overload_policy == TopicOverloadPolicy.DISCARD_NEWEST: + return self._add_messages_or_discard(topic_messages) + + def add_listener(self, listener): + """Subscribes to this reliable topic. + + It can be either a simple function or an instance of an + :class:`ReliableMessageListener`. When a function is passed, a + :class:`ReliableMessageListener` is created out of that with + sensible default values. - :param message: (object), the message to be published. + When a message is published, the, + :func:`ReliableMessageListener.on_message` method of the given + listener (or the function passed) is called. + + More than one message listener can be added on one instance. + + Args: + listener (function or ReliableMessageListener): Listener to add. + + Returns: + hazelcast.future.Future[str]: The registration id. """ - item = ReliableTopicMessage( - publish_time=current_time_in_millis(), - publisher_address="", - payload=self._to_data(message) + check_not_none(listener, "None listener is not allowed") + + registration_id = str(uuid4()) + reliable_message_listener = self._to_reliable_message_listener(listener) + + runner = _MessageRunner( + registration_id, + reliable_message_listener, + self._ringbuffer, + self.name, + self._config.read_batch_size, + self._to_object, + self._runners, ) - if self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.BLOCK: - self._add_with_backoff(item) - elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.ERROR: - self._add_or_fail(item) - elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.DISCARD_NEWEST: - self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() - elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST: - self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_OVERWRITE).result() + + def continuation(future): + future.result() + # If the runner started successfully, register it. + self._runners[registration_id] = runner + runner.next_batch() + return registration_id + + return runner.start().continue_with(continuation) def remove_listener(self, registration_id): - """ - Stops receiving messages for the given message listener. If the given listener already removed, this method does - nothing. + """Stops receiving messages for the given message listener. - :param registration_id: (str), registration id of the listener to be removed. - :return: (bool), ``true`` if the listener is removed, ``false`` otherwise. - """ - with self._message_listeners_lock: - if registration_id not in self._message_listeners: - raise IllegalArgumentError("no listener is found with the given id : {}".format(registration_id)) + If the given listener already removed, this method does nothing. - listener = self._message_listeners[registration_id] - listener.cancel() - del self._message_listeners[registration_id] - return True + Args: + registration_id (str): ID of listener registration. - return False + Returns: + hazelcast.future.Future[bool]: ``True`` if registration is + removed, ``False`` otherwise. + """ + check_not_none(registration_id, "Registration id cannot be None") + runner = self._runners.get(registration_id, None) + if not runner: + return ImmediateFuture(False) + + runner.cancel() + return ImmediateFuture(True) def destroy(self): """ - Destroys underlying Proxy and RingBuffer instances + Destroys underlying Proxy and RingBuffer instances. """ + + for runner in list(six.itervalues(self._runners)): + runner.cancel() + + self._runners.clear() + super(ReliableTopic, self).destroy() - return self.ringbuffer.destroy() + return self._ringbuffer.destroy() + + def _add_or_fail(self, message): + def continuation(future): + sequence_id = future.result() + if sequence_id == -1: + raise TopicOverloadError( + "Failed to publish message %s on topic %s." % (message, self.name) + ) + + return self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL).continue_with(continuation) + + def _add_messages_or_fail(self, messages): + def continuation(future): + sequence_id = future.result() + if sequence_id == -1: + raise TopicOverloadError("Failed to publish messages on topic %s." % self.name) + + return self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL).continue_with(continuation) + + def _add_or_overwrite(self, message): + return self._ringbuffer.add(message, OVERFLOW_POLICY_OVERWRITE).continue_with( + _no_op_continuation + ) + + def _add_messages_or_overwrite(self, messages): + return self._ringbuffer.add_all(messages, OVERFLOW_POLICY_OVERWRITE).continue_with( + _no_op_continuation + ) + + def _add_or_discard(self, message): + return self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL).continue_with( + _no_op_continuation + ) + + def _add_messages_or_discard(self, messages): + return self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL).continue_with( + _no_op_continuation + ) + + def _add_with_backoff(self, message): + future = Future() + self._try_adding_with_backoff(message, _INITIAL_BACKOFF, future) + return future + + def _add_messages_with_backoff(self, messages): + future = Future() + self._try_adding_messages_with_backoff(messages, _INITIAL_BACKOFF, future) + return future + + def _try_adding_with_backoff(self, message, backoff, future): + def callback(add_future): + try: + sequence_id = add_future.result() + if sequence_id != -1: + future.set_result(None) + return + + self._context.reactor.add_timer( + backoff, + lambda: self._try_adding_with_backoff( + message, min(_MAX_BACKOFF, 2 * backoff), future + ), + ) + except Exception as e: + future.set_result(e) + + self._ringbuffer.add(message, OVERFLOW_POLICY_FAIL).add_done_callback(callback) + + def _try_adding_messages_with_backoff(self, messages, backoff, future): + def callback(add_future): + try: + sequence_id = add_future.result() + if sequence_id != -1: + future.set_result(None) + return + + self._context.reactor.add_timer( + backoff, + lambda: self._try_adding_messages_with_backoff( + messages, min(_MAX_BACKOFF, 2 * backoff), future + ), + ) + except Exception as e: + future.set_result(e) + + self._ringbuffer.add_all(messages, OVERFLOW_POLICY_FAIL).add_done_callback(callback) + + @staticmethod + def _to_reliable_message_listener(listener): + if isinstance(listener, ReliableMessageListener): + return listener + + if not callable(listener): + raise TypeError("Listener must be a callable") + + return _ReliableMessageListenerAdapter(listener) diff --git a/hazelcast/serialization/objects.py b/hazelcast/serialization/objects.py new file mode 100644 index 0000000000..f0bdd0a5d7 --- /dev/null +++ b/hazelcast/serialization/objects.py @@ -0,0 +1,54 @@ +from hazelcast.serialization.api import IdentifiedDataSerializable +from hazelcast.serialization.data import Data +from hazelcast.util import to_millis + + +# Had to put this class into the serialization module +# since it is required in two places. +# 1 - In the serialization service construction, since +# its factory is registered. +# 2 - In ReliableTopic proxy, since publish wraps messages +# into this type. +# If this class was at the proxy module, we would get +# cyclic dependencies. +class ReliableTopicMessage(IdentifiedDataSerializable): + """The Object that is going to be stored in the Ringbuffer. + It contains the actual message payload and some metadata. + """ + + FACTORY_ID = -9 + CLASS_ID = 2 + + def __init__(self, publish_time=None, publisher_address=None, payload=None): + # publish_time is in seconds but server sends/expects to receive + # it in milliseconds. + self.publish_time = publish_time + self.publisher_address = publisher_address + self.payload = payload + + def read_data(self, object_data_input): + self.publish_time = object_data_input.read_long() / 1000.0 + self.publisher_address = object_data_input.read_object() + self.payload = _read_data_from(object_data_input) + + def write_data(self, object_data_output): + object_data_output.write_long(to_millis(self.publish_time)) + object_data_output.write_object(self.publisher_address) + _write_data_to(object_data_output, self.payload) + + def get_factory_id(self): + return self.FACTORY_ID + + def get_class_id(self): + return self.CLASS_ID + + +def _read_data_from(inp): + array = inp.read_byte_array() + if array is None: + return None + return Data(array) + + +def _write_data_to(out, data): + out.write_byte_array(data.to_bytes()) diff --git a/hazelcast/serialization/reliable_topic.py b/hazelcast/serialization/reliable_topic.py deleted file mode 100644 index 29ae6f0879..0000000000 --- a/hazelcast/serialization/reliable_topic.py +++ /dev/null @@ -1,27 +0,0 @@ -from hazelcast.serialization.api import IdentifiedDataSerializable - - -class ReliableTopicMessage(IdentifiedDataSerializable): - FACTORY_ID = -18 - CLASS_ID = 2 - - def __init__(self, publish_time=None, publisher_address=None, payload=None): - self.publish_time = publish_time - self.publisher_address = publisher_address - self.payload = payload - - def read_data(self, object_data_input): - self.publish_time = object_data_input.read_long() - self.publisher_address = object_data_input.read_object() - self.payload = object_data_input.read_data() - - def write_data(self, object_data_output): - object_data_output.write_long(self.publish_time) - object_data_output.write_object(self.publisher_address) - object_data_output.write_data(self.payload) - - def get_factory_id(self): - return self.FACTORY_ID - - def get_class_id(self): - return self.CLASS_ID \ No newline at end of file diff --git a/hazelcast/serialization/service.py b/hazelcast/serialization/service.py index bcfbc3d854..50299874ef 100644 --- a/hazelcast/serialization/service.py +++ b/hazelcast/serialization/service.py @@ -1,6 +1,7 @@ import uuid from hazelcast.serialization.base import BaseSerializationService +from hazelcast.serialization.objects import ReliableTopicMessage from hazelcast.serialization.portable.classdef import FieldType from hazelcast.serialization.portable.context import PortableContext from hazelcast.serialization.portable.serializer import PortableSerializer @@ -40,7 +41,7 @@ def __init__( ) # merge configured factories with built in ones - factories = {} + factories = self._get_builtin_identified_factories() factories.update(config.data_serializable_factories) self._registry._data_serializer = IdentifiedDataSerializer(factories) self._register_constant_serializers() @@ -54,6 +55,11 @@ def __init__( if global_serializer: self._registry._global_serializer = global_serializer() + def _get_builtin_identified_factories(self): + return { + ReliableTopicMessage.FACTORY_ID: {ReliableTopicMessage.CLASS_ID: ReliableTopicMessage} + } + def _register_constant_serializers(self): self._registry.register_constant_serializer(self._registry._null_serializer, type(None)) self._registry.register_constant_serializer(self._registry._data_serializer) diff --git a/tests/proxy/hazelcast_topic.xml b/tests/integration/backward_compatible/proxy/hazelcast_topic.xml similarity index 59% rename from tests/proxy/hazelcast_topic.xml rename to tests/integration/backward_compatible/proxy/hazelcast_topic.xml index bdf5d31f33..0bb31dc027 100644 --- a/tests/proxy/hazelcast_topic.xml +++ b/tests/integration/backward_compatible/proxy/hazelcast_topic.xml @@ -1,30 +1,25 @@ - + - - - 3 + 10 + 2 - 120 - 10 - - - - 0 10 + 120 - 120 10 + 120 - 120 10 + 120 \ No newline at end of file diff --git a/tests/integration/backward_compatible/proxy/reliable_topic_test.py b/tests/integration/backward_compatible/proxy/reliable_topic_test.py new file mode 100644 index 0000000000..2dd5fabc32 --- /dev/null +++ b/tests/integration/backward_compatible/proxy/reliable_topic_test.py @@ -0,0 +1,448 @@ +import os +import unittest + +from hazelcast.config import TopicOverloadPolicy +from hazelcast.errors import TopicOverloadError +from hazelcast.proxy.reliable_topic import ReliableMessageListener +from tests.base import SingleMemberTestCase +from tests.util import ( + is_client_version_older_than, + random_string, + event_collector, + get_current_timestamp, +) + +CAPACITY = 10 + + +@unittest.skipIf( + is_client_version_older_than("4.1"), "Tests the features added in 4.1 version of the client" +) +class ReliableTopicTest(SingleMemberTestCase): + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open(os.path.join(dir_path, "hazelcast_topic.xml")) as f: + return f.read() + + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + config["reliable_topics"] = { + "discard": { + "overload_policy": TopicOverloadPolicy.DISCARD_NEWEST, + }, + "overwrite": { + "overload_policy": TopicOverloadPolicy.DISCARD_OLDEST, + }, + "block": { + "overload_policy": TopicOverloadPolicy.BLOCK, + }, + "error": { + "overload_policy": TopicOverloadPolicy.ERROR, + }, + } + return config + + def setUp(self): + self.topics = [] + self.topic = self.get_topic(random_string()) + + def tearDown(self): + for topic in self.topics: + topic.destroy() + + def test_add_listener_with_function(self): + topic = self.get_topic(random_string()) + + collector = event_collector() + registration_id = topic.add_listener(collector) + self.assertIsNotNone(registration_id) + + topic.publish("a") + topic.publish("b") + + self.assertTrueEventually( + lambda: self.assertEqual(["a", "b"], list(map(lambda m: m.message, collector.events))) + ) + + def test_add_listener(self): + topic = self.get_topic(random_string()) + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + registration_id = topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + topic.publish("a") + topic.publish("b") + + self.assertTrueEventually(lambda: self.assertEqual(["a", "b"], messages)) + + def test_add_listener_with_retrieve_initial_sequence(self): + topic = self.get_topic(random_string()) + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return 5 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + registration_id = topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + topic.publish_all(range(10)) + + self.assertTrueEventually(lambda: self.assertEqual(list(range(5, 10)), messages)) + + def test_add_listener_with_store_sequence(self): + topic = self.get_topic(random_string()) + + sequences = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + pass + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + sequences.append(sequence) + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + registration_id = topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + topic.publish_all(["item-%s" % i for i in range(20)]) + + self.assertTrueEventually(lambda: self.assertEqual(list(range(20)), sequences)) + + def test_add_listener_with_loss_tolerant_listener_on_message_loss(self): + topic = self.get_topic("overwrite") # has capacity of 10 + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return True + + def is_terminal(self, error): + return False + + registration_id = topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + # will overwrite first 10 messages, hence they will be lost + topic.publish_all(range(2 * CAPACITY)) + + self.assertTrueEventually( + lambda: self.assertEqual(list(range(CAPACITY, 2 * CAPACITY)), messages) + ) + + def test_add_listener_with_non_loss_tolerant_listener_on_message_loss(self): + topic = self.get_topic("overwrite") # has capacity of 10 + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + messages.append(message.message) + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return False + + registration_id = topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + # will overwrite first 10 messages, hence they will be lost + topic.publish_all(range(2 * CAPACITY)) + + self.assertEqual(0, len(messages)) + + # Should be cancelled on message loss + self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._runners))) + + def test_add_listener_when_on_message_raises_error(self): + topic = self.get_topic(random_string()) + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + message = message.message + if message < 5: + messages.append(message) + else: + raise ValueError("expected") + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return isinstance(error, ValueError) + + registration_id = topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + topic.publish_all(range(10)) + + self.assertTrueEventually(lambda: self.assertEqual(list(range(5)), messages)) + + # Should be cancelled since on_message raised error + self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._runners))) + + def test_add_listener_when_on_message_and_is_terminal_raises_error(self): + topic = self.get_topic(random_string()) + + messages = [] + + class Listener(ReliableMessageListener): + def on_message(self, message): + message = message.message + if message < 5: + messages.append(message) + else: + raise ValueError("expected") + + def retrieve_initial_sequence(self): + return -1 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + raise error + + registration_id = topic.add_listener(Listener()) + self.assertIsNotNone(registration_id) + + topic.publish_all(range(10)) + + self.assertTrueEventually(lambda: self.assertEqual(list(range(5)), messages)) + + # Should be cancelled since on_message raised error + self.assertTrueEventually(lambda: self.assertEqual(0, len(topic._runners))) + + def test_add_listener_with_non_callable(self): + topic = self.get_topic(random_string()) + with self.assertRaises(TypeError): + topic.add_listener(3) + + def test_remove_listener(self): + topic = self.get_topic(random_string()) + + registration_id = topic.add_listener(lambda m: m) + self.assertTrue(topic.remove_listener(registration_id)) + + def test_remove_listener_does_not_receive_messages_after_removal(self): + topic = self.get_topic(random_string()) + + collector = event_collector() + registration_id = topic.add_listener(collector) + self.assertTrue(topic.remove_listener(registration_id)) + + topic.publish_all(range(10)) + + self.assertEqual(0, len(collector.events)) + + def test_remove_listener_twice(self): + topic = self.get_topic(random_string()) + registration_id = topic.add_listener(lambda m: m) + self.assertTrue(topic.remove_listener(registration_id)) + self.assertFalse(topic.remove_listener(registration_id)) + + def test_publish_with_discard_newest_policy(self): + topic = self.get_topic("discard") + + collector = event_collector() + topic.add_listener(collector) + + for i in range(2 * CAPACITY): + topic.publish(i) + + self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), self.get_ringbuffer_data(topic)) + + def test_publish_with_discard_oldest_policy(self): + topic = self.get_topic("overwrite") + + collector = event_collector() + topic.add_listener(collector) + + for i in range(2 * CAPACITY): + topic.publish(i) + + self.assertTrueEventually(lambda: self.assertEqual(2 * CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY, 2 * CAPACITY)), self.get_ringbuffer_data(topic)) + + def test_publish_with_block_policy(self): + topic = self.get_topic("block") + + collector = event_collector() + topic.add_listener(collector) + + for i in range(CAPACITY): + topic.publish(i) + + begin_time = get_current_timestamp() + + for i in range(CAPACITY, 2 * CAPACITY): + topic.publish(i) + + time_passed = get_current_timestamp() - begin_time + + # TTL is set in the XML config + self.assertTrue(time_passed >= 2.0) + + self.assertTrueEventually(lambda: self.assertEqual(2 * CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY, CAPACITY * 2)), self.get_ringbuffer_data(topic)) + + def test_publish_with_error_policy(self): + topic = self.get_topic("error") + + collector = event_collector() + topic.add_listener(collector) + + for i in range(CAPACITY): + topic.publish(i) + + for i in range(CAPACITY, 2 * CAPACITY): + with self.assertRaises(TopicOverloadError): + topic.publish(i) + + self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), self.get_ringbuffer_data(topic)) + + def test_publish_all_with_discard_newest_policy(self): + topic = self.get_topic("discard") + + collector = event_collector() + topic.add_listener(collector) + + topic.publish_all(range(CAPACITY)) + topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + + self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), self.get_ringbuffer_data(topic)) + + def test_publish_all_with_discard_oldest_policy(self): + topic = self.get_topic("overwrite") + + collector = event_collector() + topic.add_listener(collector) + + topic.publish_all(range(CAPACITY)) + topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + + self.assertTrueEventually(lambda: self.assertEqual(2 * CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY, 2 * CAPACITY)), self.get_ringbuffer_data(topic)) + + def test_publish_all_with_block_policy(self): + topic = self.get_topic("block") + + collector = event_collector() + topic.add_listener(collector) + + topic.publish_all(range(CAPACITY)) + + begin_time = get_current_timestamp() + topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + time_passed = get_current_timestamp() - begin_time + + # TTL is set in the XML config + self.assertTrue(time_passed >= 2.0) + + self.assertTrueEventually(lambda: self.assertEqual(2 * CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY, CAPACITY * 2)), self.get_ringbuffer_data(topic)) + + def test_publish_all_with_error_policy(self): + topic = self.get_topic("error") + + collector = event_collector() + topic.add_listener(collector) + + topic.publish_all(range(CAPACITY)) + + with self.assertRaises(TopicOverloadError): + topic.publish_all(range(CAPACITY, 2 * CAPACITY)) + + self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) + self.assertEqual(list(range(CAPACITY)), self.get_ringbuffer_data(topic)) + + def get_ringbuffer_data(self, topic): + ringbuffer = topic._ringbuffer + return list( + map( + lambda m: topic._to_object(m.payload), + ringbuffer.read_many( + ringbuffer.head_sequence().result(), CAPACITY, CAPACITY + ).result(), + ) + ) + + def get_topic(self, name): + topic = self.client.get_reliable_topic(name) + self.topics.append(topic) + return topic.blocking() diff --git a/tests/proxy/reliable_topic_test.py b/tests/proxy/reliable_topic_test.py deleted file mode 100644 index d6a92a6707..0000000000 --- a/tests/proxy/reliable_topic_test.py +++ /dev/null @@ -1,377 +0,0 @@ -import os -import time -from datetime import datetime - -import hazelcast -from hazelcast import ClientConfig -from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY, ClientProperties -from hazelcast.exception import IllegalArgumentError, TopicOverflowError -from hazelcast.proxy.reliable_topic import ReliableMessageListener -from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE -from hazelcast.serialization.reliable_topic import ReliableTopicMessage -from hazelcast.util import current_time_in_millis -from tests.base import SingleMemberTestCase -from tests.util import random_string, event_collector - - -class _ReliableTopicTestException(BaseException): - pass - - -class TestReliableMessageListener(ReliableMessageListener): - def __init__(self, collector): - self._collector = collector - - def on_message(self, event): - self._collector(event) - - -class TestReliableMessageListenerLossTolerant(ReliableMessageListener): - def __init__(self, collector): - self._collector = collector - - def on_message(self, event): - self._collector(event) - - def is_loss_tolerant(self): - return True - - -class ReliableTopicTest(SingleMemberTestCase): - @classmethod - def configure_cluster(cls): - path = os.path.abspath(__file__) - dir_path = os.path.dirname(path) - with open(os.path.join(dir_path, "hazelcast_topic.xml")) as f: - return f.read() - - def setUp(self): - config = ClientConfig() - config.set_property("hazelcast.serialization.input.returns.bytearray", True) - - discard_config = ReliableTopicConfig("discard") - discard_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_NEWEST - config.add_reliable_topic_config(discard_config) - - overwrite_config = ReliableTopicConfig("overwrite") - overwrite_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST - config.add_reliable_topic_config(overwrite_config) - - error_config = ReliableTopicConfig("error") - error_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.ERROR - config.add_reliable_topic_config(error_config) - - stale_config = ReliableTopicConfig("stale") - stale_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST - config.add_reliable_topic_config(stale_config) - - self.client = hazelcast.HazelcastClient(self.configure_client(config)) - self.reliable_topic = self.client.get_reliable_topic(random_string()).blocking() - self.registration_id = None - - def tearDown(self): - if self.registration_id is not None: - self.reliable_topic.remove_listener(self.registration_id) - - def test_add_listener(self): - collector = event_collector() - reliable_listener = TestReliableMessageListener(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - self.reliable_topic.publish('item-value') - - def assert_event(): - self.assertEqual(len(collector.events), 1) - event = collector.events[0] - self.assertEqual(event.message, 'item-value') - self.assertGreater(event.publish_time, 0) - - self.assertTrueEventually(assert_event, 5) - - def test_remove_listener(self): - collector = event_collector() - reliable_listener = TestReliableMessageListener(collector) - - reg_id = self.reliable_topic.add_listener(reliable_listener) - removed = self.reliable_topic.remove_listener(reg_id) - self.assertTrue(removed, True) - - def test_none_listener(self): - with self.assertRaises(IllegalArgumentError): - self.reliable_topic.add_listener("invalid-listener") - - def test_remove_listener_when_does_not_exist(self): - with self.assertRaises(IllegalArgumentError): - self.reliable_topic.remove_listener("id") - - def test_remove_listener_when_already_removed(self): - collector = event_collector() - reliable_listener = TestReliableMessageListener(collector) - - reg_id = self.reliable_topic.add_listener(reliable_listener) - self.reliable_topic.remove_listener(reg_id) - - with self.assertRaises(IllegalArgumentError): - self.reliable_topic.remove_listener(reg_id) - - def test_error_on_message_not_terminal(self): - collector = event_collector() - - class TestReliableMessageListenerNotTerminal(ReliableMessageListener): - def __init__(self, _collector): - self._collector = _collector - - def on_message(self, event): - if event.message == "raise-exception": - raise _ReliableTopicTestException("test-exception") - - self._collector(event) - - def is_terminal(self): - return False - - reliable_listener = TestReliableMessageListenerNotTerminal(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - - self.reliable_topic.publish('raise-exception') - self.reliable_topic.publish('work-normally') - - def assert_event(): - self.assertEqual(len(collector.events), 1) - event = collector.events[0] - self.assertEqual(event.message, 'work-normally') - self.assertGreater(event.publish_time, 0) - - self.assertTrueEventually(assert_event, 5) - - def test_error_on_message_terminal(self): - collector = event_collector() - - class TestReliableMessageListenerTerminal(ReliableMessageListener): - def __init__(self, _collector): - self._collector = _collector - - def on_message(self, event): - if event.message == "raise-exception": - raise _ReliableTopicTestException("test-exception") - - self._collector(event) - - def is_terminal(self): - return True - - reliable_listener = TestReliableMessageListenerTerminal(collector) - # This listener will be removed by the ReliableTopic implementation - self.reliable_topic.add_listener(reliable_listener) - - self.reliable_topic.publish('raise-exception') - self.reliable_topic.publish('work-normally') - time.sleep(0.5) - self.assertEqual(len(collector.events), 0) - - def test_error_on_message_terminal_exception(self): - collector = event_collector() - - class TestReliableMessageListenerTerminal(ReliableMessageListener): - def __init__(self, _collector): - self._collector = _collector - - def on_message(self, event): - if event.message == "raise-exception": - raise _ReliableTopicTestException("test-exception in on_message") - - self._collector(event) - - def is_terminal(self): - raise _ReliableTopicTestException("is_terminal failed") - - reliable_listener = TestReliableMessageListenerTerminal(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - - self.reliable_topic.publish('raise-exception') - self.reliable_topic.publish('work-normally') - time.sleep(0.5) - self.assertEqual(len(collector.events), 0) - - def test_publish_many(self): - collector = event_collector() - reliable_listener = TestReliableMessageListener(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - for i in range(10): - self.reliable_topic.publish('message ' + str(i)) - - def assert_event(): - self.assertEqual(len(collector.events), 10) - - self.assertTrueEventually(assert_event, 10) - - def test_message_field_set_correctly(self): - collector = event_collector() - reliable_listener = TestReliableMessageListener(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - - before_publish_time = current_time_in_millis() - time.sleep(0.1) - self.reliable_topic.publish('item-value') - time.sleep(0.1) - after_publish_time = current_time_in_millis() - - def assert_event(): - self.assertEqual(len(collector.events), 1) - event = collector.events[0] - self.assertEqual(event.message, 'item-value') - self.assertGreater(event.publish_time, before_publish_time) - self.assertLess(event.publish_time, after_publish_time) - - self.assertTrueEventually(assert_event, 5) - - def test_always_start_after_tail(self): - collector = event_collector() - reliable_listener = TestReliableMessageListener(collector) - self.reliable_topic.publish('1') - self.reliable_topic.publish('2') - self.reliable_topic.publish('3') - - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - - self.reliable_topic.publish('4') - self.reliable_topic.publish('5') - self.reliable_topic.publish('6') - - def assert_event(): - self.assertEqual(len(collector.events), 3) - self.assertEqual(collector.events[0].message, "4") - self.assertEqual(collector.events[1].message, "5") - self.assertEqual(collector.events[2].message, "6") - - self.assertTrueEventually(assert_event, 5) - - def generate_items(self, n): - messages = [] - for i in range(n): - msg = ReliableTopicMessage( - publish_time=current_time_in_millis(), - publisher_address="", - payload=self.client.serialization_service.to_data(i+1) - ) - messages.append(msg) - - return messages - - def test_discard(self): - reliable_topic = self.client.get_reliable_topic("discard").blocking() - items = self.generate_items(10) - reliable_topic.ringbuffer.add_all(items, OVERFLOW_POLICY_FAIL) - - reliable_topic.publish(11) - seq = reliable_topic.ringbuffer.tail_sequence().result() - item = reliable_topic.ringbuffer.read_one(seq).result() - num = self.client.serialization_service.to_object(item.payload) - self.assertEqual(num, 10) - - def test_overwrite(self): - reliable_topic = self.client.get_reliable_topic("overwrite").blocking() - for i in range(10): - reliable_topic.publish(i+1) - - reliable_topic.publish(11) - seq = reliable_topic.ringbuffer.tail_sequence().result() - item = reliable_topic.ringbuffer.read_one(seq).result() - num = self.client.serialization_service.to_object(item.payload) - self.assertEqual(num, 11) - - def test_error(self): - reliable_topic = self.client.get_reliable_topic("error").blocking() - for i in range(10): - reliable_topic.publish(i+1) - - with self.assertRaises(TopicOverflowError): - reliable_topic.publish(11) - - def test_blocking(self): - reliable_topic = self.client.get_reliable_topic("blocking").blocking() - for i in range(10): - reliable_topic.publish(i+1) - - before = datetime.utcnow() - reliable_topic.publish(11) - time_diff = datetime.utcnow() - before - - seq = reliable_topic.ringbuffer.tail_sequence().result() - item = reliable_topic.ringbuffer.read_one(seq).result() - num = self.client.serialization_service.to_object(item.payload) - self.assertEqual(num, 11) - if time_diff.seconds <= 2: - self.fail("expected at least 2 seconds delay got %s" % time_diff.seconds) - - def test_stale(self): - collector = event_collector() - self.reliable_topic = self.client.get_reliable_topic("stale").blocking() - reliable_listener = TestReliableMessageListenerLossTolerant(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - - items = self.generate_items(20) - self.reliable_topic.ringbuffer.add_all(items, overflow_policy=OVERFLOW_POLICY_OVERWRITE) - - def assert_event(): - self.assertEqual(len(collector.events), 10) - event = collector.events[9] - self.assertEqual(event.message, 20) - - self.assertTrueEventually(assert_event, 5) - - def test_distributed_object_destroyed(self): - config = ClientConfig() - config.network_config.connection_attempt_limit = 10 - config.set_property(ClientProperties.INVOCATION_TIMEOUT_SECONDS.name, 10) - config.set_property("hazelcast.serialization.input.returns.bytearray", True) - - - client_two = hazelcast.HazelcastClient(self.configure_client(config)) - # TODO: shutdown - - collector = event_collector() - self.reliable_topic = client_two.get_reliable_topic("x") - reliable_listener = TestReliableMessageListenerLossTolerant(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - - self.rc.shutdownCluster(self.cluster.id) - self.cluster = self.create_cluster(self.rc, self.configure_cluster()) - self.cluster.start_member() - - self.reliable_topic.publish("aa") - - def assert_event(): - self.assertEqual(len(collector.events), 1) - event = collector.events[0] - self.assertEqual(event.message, "aa") - - self.assertTrueEventually(assert_event, 5) - - def test_distributed_object_destroyed_error(self): - collector = event_collector() - self.reliable_topic = self.client.get_reliable_topic("differentReliableTopic") - reliable_listener = TestReliableMessageListenerLossTolerant(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - self.reliable_topic.destroy() - - def assert_event(): - self.assertEqual(len(collector.events), 0) - - self.assertTrueEventually(assert_event, 5) - - def test_client_not_active_error(self): - config = ClientConfig() - config.set_property("hazelcast.serialization.input.returns.bytearray", True) - client_two = hazelcast.HazelcastClient(self.configure_client(config)) - - collector = event_collector() - self.reliable_topic = self.client.get_reliable_topic("differentReliableTopic") - reliable_listener = TestReliableMessageListenerLossTolerant(collector) - self.registration_id = self.reliable_topic.add_listener(reliable_listener) - client_two.shutdown() - - def assert_event(): - self.assertEqual(len(collector.events), 0) - - self.assertTrueEventually(assert_event, 5) - diff --git a/tests/unit/config_test.py b/tests/unit/config_test.py index b31354671e..84f706395b 100644 --- a/tests/unit/config_test.py +++ b/tests/unit/config_test.py @@ -12,6 +12,7 @@ UniqueKeyTransformation, QueryConstants, BitmapIndexOptions, + TopicOverloadPolicy, ) from hazelcast.errors import InvalidConfigurationError from hazelcast.serialization.api import IdentifiedDataSerializable, Portable, StreamSerializer @@ -639,6 +640,57 @@ def test_flake_id_generators(self): self.assertEqual(20, fig_config.prefetch_count) self.assertEqual(30, fig_config.prefetch_validity) + def test_reliable_topics_invalid_configs(self): + config = self.config + self.assertEqual({}, config.reliable_topics) + + invalid_configs = [ + ({123: "123"}, TypeError), + ({"123": 123}, TypeError), + (None, TypeError), + ({"x": {"overload_policy": None}}, TypeError), + ({"x": {"overload_policy": -1}}, TypeError), + ({"x": {"read_batch_size": None}}, TypeError), + ({"x": {"read_batch_size": -1}}, ValueError), + ({"x": {"read_batch_size": 0}}, ValueError), + ({"x": {"invalid_option": -10}}, InvalidConfigurationError), + ] + + for c, e in invalid_configs: + with self.assertRaises(e): + config.reliable_topics = c + + def test_reliable_topics_defaults(self): + config = self.config + config.reliable_topics = {"a": {}} + topic_config = config.reliable_topics["a"] + self.assertEqual(TopicOverloadPolicy.BLOCK, topic_config.overload_policy) + self.assertEqual(10, topic_config.read_batch_size) + + def test_reliable_topics_with_a_few_changes(self): + config = self.config + config.reliable_topics = { + "a": { + "read_batch_size": 42, + } + } + topic_config = config.reliable_topics["a"] + self.assertEqual(TopicOverloadPolicy.BLOCK, topic_config.overload_policy) + self.assertEqual(42, topic_config.read_batch_size) + + def test_reliable_topics(self): + config = self.config + config.reliable_topics = { + "a": { + "overload_policy": TopicOverloadPolicy.ERROR, + "read_batch_size": 42, + } + } + + topic_config = config.reliable_topics["a"] + self.assertEqual(TopicOverloadPolicy.ERROR, topic_config.overload_policy) + self.assertEqual(42, topic_config.read_batch_size) + def test_labels(self): config = self.config self.assertEqual([], config.labels) diff --git a/tests/unit/proxy/reliable_topic_test.py b/tests/unit/proxy/reliable_topic_test.py new file mode 100644 index 0000000000..07d9307b24 --- /dev/null +++ b/tests/unit/proxy/reliable_topic_test.py @@ -0,0 +1,134 @@ +import itertools +import unittest + +from mock import MagicMock + +from hazelcast.errors import ( + HazelcastClientNotActiveError, + ClientOfflineError, + OperationTimeoutError, + IllegalArgumentError, + HazelcastInstanceNotActiveError, + DistributedObjectDestroyedError, +) +from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture, Future +from hazelcast.proxy import ReliableTopic +from hazelcast.proxy.reliable_topic import ReliableMessageListener + + +def no_op(_): + pass + + +class ReliableTopicErrorTest(unittest.TestCase): + def setUp(self): + self.ringbuffer = MagicMock() + self.ringbuffer.tail_sequence.return_value = ImmediateFuture(0) + client = MagicMock() + client.get_ringbuffer.return_value = self.ringbuffer + context = MagicMock(client=client) + self.topic = ReliableTopic("", "", context) + + def test_read_many_on_client_not_active_error(self): + self.error_on_first_read_many(HazelcastClientNotActiveError("expected")) + self.topic.add_listener(no_op) + self.assertEqual(1, self.ringbuffer.read_many.call_count) + self.assertEqual(0, len(self.topic._runners)) + + def test_read_many_on_client_offline_error(self): + self.error_on_first_read_many(ClientOfflineError()) + self.topic.add_listener(no_op) + self.assertEqual(2, self.ringbuffer.read_many.call_count) + self.assertEqual(1, len(self.topic._runners)) + + def test_read_many_on_timeout_error(self): + self.error_on_first_read_many(OperationTimeoutError("expected")) + self.topic.add_listener(no_op) + self.assertEqual(2, self.ringbuffer.read_many.call_count) + self.assertEqual(1, len(self.topic._runners)) + + def test_read_many_on_illegal_argument_error(self): + self.error_on_first_read_many(IllegalArgumentError("expected")) + self.topic.add_listener(no_op) + self.assertEqual(1, self.ringbuffer.read_many.call_count) + self.assertEqual(0, len(self.topic._runners)) + + def test_read_many_on_illegal_argument_error_with_loss_tolerant_listener(self): + self.error_on_first_read_many(IllegalArgumentError("expected")) + self.ringbuffer.head_sequence.return_value = ImmediateFuture(0) + + class Listener(ReliableMessageListener): + def on_message(self, message): + pass + + def retrieve_initial_sequence(self): + return 0 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return True + + def is_terminal(self, error): + pass + + self.topic.add_listener(Listener()) + self.assertEqual(2, self.ringbuffer.read_many.call_count) + self.assertEqual(1, len(self.topic._runners)) + + def test_read_many_on_illegal_argument_error_with_loss_tolerant_listener_when_head_sequence_fails( + self, + ): + self.error_on_first_read_many(IllegalArgumentError("expected")) + self.ringbuffer.head_sequence.return_value = ImmediateExceptionFuture( + RuntimeError("expected") + ) + + class Listener(ReliableMessageListener): + def on_message(self, message): + pass + + def retrieve_initial_sequence(self): + return 0 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return True + + def is_terminal(self, error): + pass + + self.topic.add_listener(Listener()) + self.assertEqual(1, self.ringbuffer.read_many.call_count) + self.assertEqual(0, len(self.topic._runners)) + + def test_read_many_on_instance_not_active_error(self): + self.error_on_first_read_many(HazelcastInstanceNotActiveError("expected")) + self.topic.add_listener(no_op) + self.assertEqual(1, self.ringbuffer.read_many.call_count) + self.assertEqual(0, len(self.topic._runners)) + + def test_read_many_on_distributed_object_destroyed_error(self): + self.error_on_first_read_many(DistributedObjectDestroyedError("expected")) + self.topic.add_listener(no_op) + self.assertEqual(1, self.ringbuffer.read_many.call_count) + self.assertEqual(0, len(self.topic._runners)) + + def test_read_many_on_generic_error(self): + self.error_on_first_read_many(RuntimeError("expected")) + self.topic.add_listener(no_op) + self.assertEqual(1, self.ringbuffer.read_many.call_count) + self.assertEqual(0, len(self.topic._runners)) + + def error_on_first_read_many(self, error): + counter = itertools.count() + + def read_many_mock(*_): + if next(counter) == 0: + return ImmediateExceptionFuture(error) + return Future() + + self.ringbuffer.read_many.side_effect = read_many_mock From c994ae1520b463768abf47a7da7933e7346bafd7 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 26 Apr 2021 15:53:24 +0300 Subject: [PATCH 8/9] construct memberinfo only if the publisher_address is non-none --- hazelcast/proxy/reliable_topic.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index 3649cbfd83..e9a26072ee 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -238,13 +238,17 @@ def _handle_next_batch(self, future): message = result[i] self._listener.store_sequence(result.get_sequence(i)) + member = None + if message.publisher_address: + member = MemberInfo( + message.publisher_address, None, None, False, _UNKNOWN_MEMBER_VERSION + ) + topic_message = TopicMessage( self._topic_name, message.payload, message.publish_time, - MemberInfo( - message.publisher_address, None, None, False, _UNKNOWN_MEMBER_VERSION - ), + member, self._to_object, ) self._listener.on_message(topic_message) From ffa573ecbd15b2b86877bf2c2c484447ced0dd71 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 27 Apr 2021 12:11:51 +0300 Subject: [PATCH 9/9] add durable subscription test --- .../proxy/reliable_topic_test.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/integration/backward_compatible/proxy/reliable_topic_test.py b/tests/integration/backward_compatible/proxy/reliable_topic_test.py index 2dd5fabc32..110af3502f 100644 --- a/tests/integration/backward_compatible/proxy/reliable_topic_test.py +++ b/tests/integration/backward_compatible/proxy/reliable_topic_test.py @@ -431,6 +431,55 @@ def test_publish_all_with_error_policy(self): self.assertTrueEventually(lambda: self.assertEqual(CAPACITY, len(collector.events))) self.assertEqual(list(range(CAPACITY)), self.get_ringbuffer_data(topic)) + def test_durable_subscription(self): + topic = self.get_topic(random_string()) + + class DurableListener(ReliableMessageListener): + def __init__(self): + self.objects = [] + self.sequences = [] + self.sequence = -1 + + def on_message(self, message): + self.objects.append(message.message) + + def retrieve_initial_sequence(self): + if self.sequence == -1: + return self.sequence + + # +1 to read the next item + return self.sequence + 1 + + def store_sequence(self, sequence): + self.sequences.append(sequence) + self.sequence = sequence + + def is_loss_tolerant(self): + return False + + def is_terminal(self, error): + return True + + listener = DurableListener() + + registration_id = topic.add_listener(listener) + topic.publish("item1") + + self.assertTrueEventually(lambda: self.assertEqual(["item1"], listener.objects)) + + self.assertTrue(topic.remove_listener(registration_id)) + + topic.publish("item2") + topic.publish("item3") + + topic.add_listener(listener) + + def assertion(): + self.assertEqual(["item1", "item2", "item3"], listener.objects) + self.assertEqual([0, 1, 2], listener.sequences) + + self.assertTrueEventually(assertion) + def get_ringbuffer_data(self, topic): ringbuffer = topic._ringbuffer return list(