From 499740b480f88d58494cc538d32792ea90760175 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Wed, 13 May 2020 13:52:29 +0300 Subject: [PATCH 1/7] Initial ReliableTopic implementation: #201 --- examples/reliable-topic/__init__.py | 0 .../reliable-topic/reliable_topic_example.py | 29 ++ hazelcast/config.py | 50 +++ hazelcast/exception.py | 4 + hazelcast/proxy/__init__.py | 6 +- hazelcast/proxy/reliable_topic.py | 329 +++++++++++++++++- hazelcast/proxy/ringbuffer.py | 2 +- hazelcast/serialization/reliable_topic.py | 27 ++ hazelcast/serialization/service.py | 11 +- tests/proxy/hazelcast_topic.xml | 30 ++ tests/proxy/reliable_topic_test.py | 295 ++++++++++++++++ 11 files changed, 771 insertions(+), 12 deletions(-) create mode 100644 examples/reliable-topic/__init__.py create mode 100644 examples/reliable-topic/reliable_topic_example.py create mode 100644 hazelcast/serialization/reliable_topic.py create mode 100644 tests/proxy/hazelcast_topic.xml create mode 100644 tests/proxy/reliable_topic_test.py diff --git a/examples/reliable-topic/__init__.py b/examples/reliable-topic/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/reliable-topic/reliable_topic_example.py b/examples/reliable-topic/reliable_topic_example.py new file mode 100644 index 0000000000..842227c36d --- /dev/null +++ b/examples/reliable-topic/reliable_topic_example.py @@ -0,0 +1,29 @@ +import time + +import hazelcast +from hazelcast import ClientConfig +from hazelcast.proxy.reliable_topic import ReliableMessageListener + + +class MyReliableMessageListener(ReliableMessageListener): + def on_message(self, event): + print("Got message: {}".format(event.message)) + print("Publish time: {}\n".format(event.publish_time)) + + +if __name__ == "__main__": + config = ClientConfig() + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + client = hazelcast.HazelcastClient(config) + listener = MyReliableMessageListener() + + reliable_topic = client.get_reliable_topic("reliable-topic") + registration_id = reliable_topic.add_listener(listener) + + for i in range(10): + reliable_topic.publish("Message " + str(i)) + time.sleep(0.1) + + reliable_topic.destroy() + reliable_topic.remove_listener(registration_id) + client.shutdown() diff --git a/hazelcast/config.py b/hazelcast/config.py index cba15d9e34..2e0f4a30ce 100644 --- a/hazelcast/config.py +++ b/hazelcast/config.py @@ -73,6 +73,13 @@ MAXIMUM_PREFETCH_COUNT = 100000 +TOPIC_OVERLOAD_POLICY = enum( + DISCARD_OLDEST=0, + DISCARD_NEWEST=1, + BLOCK=2, + ERROR=3 +) + class ClientConfig(object): """ @@ -107,6 +114,9 @@ def __init__(self): self.flake_id_generator_configs = {} """Flake ID generator configuration which maps "config-name" : FlakeIdGeneratorConfig """ + self.reliable_topic_configs = {} + """ReliableTopic configuration""" + self.serialization_config = SerializationConfig() """Hazelcast serialization configuration""" @@ -162,6 +172,10 @@ def add_flake_id_generator_config(self, flake_id_generator_config): self.flake_id_generator_configs[flake_id_generator_config.name] = flake_id_generator_config return self + def add_reliable_topic_config(self, reliable_topic_config): + self.reliable_topic_configs[reliable_topic_config.name] = reliable_topic_config + return self + def get_property_or_default(self, key, default): """ Client property accessor with fallback to default value. @@ -538,6 +552,42 @@ def __init__(self): """ +class ReliableTopicConfig(object): + """ + ReliableTopicConfig contains the configuration for the client regarding + :class:`~hazelcast.proxy.reliable_topic.ReliableTopic` + """ + + def __init__(self, name="default"): + self._name = name + self._read_batch_size = 10 + self._topic_overload_policy = TOPIC_OVERLOAD_POLICY.BLOCK + + @property + def name(self): + return self._name + + @name.setter + def name(self, name): + self._name = name + + @property + def read_batch_size(self): + return self._read_batch_size + + @read_batch_size.setter + def read_batch_size(self, read_batch_size): + self._read_batch_size = read_batch_size + + @property + def topic_overload_policy(self): + return self._topic_overload_policy + + @topic_overload_policy.setter + def topic_overload_policy(self, topic_overload_policy): + self._topic_overload_policy = topic_overload_policy + + class FlakeIdGeneratorConfig(object): """ FlakeIdGeneratorConfig contains the configuration for the client regarding diff --git a/hazelcast/exception.py b/hazelcast/exception.py index 92ebff81f7..0a52a85335 100644 --- a/hazelcast/exception.py +++ b/hazelcast/exception.py @@ -238,6 +238,10 @@ class TopicOverloadError(HazelcastError): pass +class TopicOverflowError(HazelcastError): + pass + + class TopologyChangedError(HazelcastError): pass diff --git a/hazelcast/proxy/__init__.py b/hazelcast/proxy/__init__.py index 030cbaa245..8e01c039af 100644 --- a/hazelcast/proxy/__init__.py +++ b/hazelcast/proxy/__init__.py @@ -117,11 +117,11 @@ def decode_add_listener(response): def encode_remove_listener(registration_id): return client_remove_distributed_object_listener_codec.encode_request(registration_id) - return self._client.listener.register_listener(request, decode_add_listener, - encode_remove_listener, event_handler) + return self._client._listener.register_listener(request, decode_add_listener, + encode_remove_listener, event_handler) def remove_distributed_object_listener(self, registration_id): - return self._client.listener.deregister_listener(registration_id) + return self._client._listener.deregister_listener(registration_id) def _find_next_proxy_address(self): # TODO: filter out lite members diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index b518f03735..f1dbf87f16 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -1,12 +1,331 @@ -from hazelcast.proxy.base import Proxy +import time +import threading +from uuid import uuid4 +from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY +from hazelcast.exception import IllegalArgumentError, TopicOverflowError, HazelcastInstanceNotActiveError, \ + HazelcastClientNotActiveException, DistributedObjectDestroyedError, StaleSequenceError, OperationTimeoutError +from hazelcast.proxy.base import Proxy, TopicMessage +from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE +from hazelcast.serialization.reliable_topic import ReliableTopicMessage +from hazelcast.util import current_time_in_millis +from hazelcast.six.moves import queue -class ReliableTopic(Proxy): - def add_listener(self, on_message=None): +_INITIAL_BACKOFF = 0.1 +_MAX_BACKOFF = 2 + + +class ReliableMessageListener(object): + def on_message(self, item): + """ + Invoked when a message is received for the added reliable topic. + + :param: message the message that is received for the added reliable topic + """ raise NotImplementedError + def retrieve_initial_sequence(self): + """ + Retrieves the initial sequence from which this ReliableMessageListener + should start. + + Return -1 if there is no initial sequence and you want to start + from the next published message. + + If you intend to create a durable subscriber so you continue from where + you stopped the previous time, load the previous sequence and add 1. + If you don't add one, then you will be receiving the same message twice. + + :return: (int), the initial sequence + """ + return -1 + + def store_sequence(self, sequence): + """" + Informs the ReliableMessageListener that it should store the sequence. + This method is called before the message is processed. Can be used to + make a durable subscription. + + :param: (int) ``sequence`` the sequence + """ + pass + + def is_loss_tolerant(self): + """ + Checks if this ReliableMessageListener is able to deal with message loss. + Even though the reliable topic promises to be reliable, it can be that a + MessageListener is too slow. Eventually the message won't be available + anymore. + + If the ReliableMessageListener is not loss tolerant and the topic detects + that there are missing messages, it will terminate the + ReliableMessageListener. + + :return: (bool) ``True`` if the ReliableMessageListener is tolerant towards losing messages. + """ + return False + + def is_terminal(self): + """ + Checks if the ReliableMessageListener should be terminated based on an + exception thrown while calling on_message. + + :return: (bool) ``True` if the ReliableMessageListener should terminate itself, ``False`` if it should keep on running. + """ + raise False + + +class _MessageListener(object): + def __init__(self, uuid, proxy, to_object, listener): + self._id = uuid + self._proxy = proxy + self._to_object = to_object + self._listener = listener + self._cancelled_lock = threading.Lock() + self._cancelled = False + self._sequence = 0 + self._q = queue.Queue() + + def start(self): + tail_seq = self._proxy.ringbuffer.tail_sequence() + initial_seq = self._listener.retrieve_initial_sequence() + if initial_seq == -1: + initial_seq = tail_seq.result() + 1 + self._sequence = initial_seq + self._proxy.client.reactor.add_timer(0, self._next) + + def _handle_illegal_argument_error(self): + head_seq = self._proxy.ringbuffer.head_sequence().result() + self._proxy.logger.warning("MessageListener {} on topic {} requested a too large sequence. Jumping from old " + "sequence: {} to sequence: {}".format(self._id, self._proxy.name, self._sequence, + head_seq)) + self._sequence = head_seq + self._next() + + def _handle_stale_sequence_error(self): + head_seq = self._proxy.ringbuffer.head_sequence().result() + if self._listener.is_loss_tolerant: + self._sequence = head_seq + self._proxy.logger.warning("Topic {} ran into a stale sequence. Jumping from old sequence {} to new " + "sequence {}".format(self._proxy.name, self._sequence, head_seq)) + self._next() + return True + + self._proxy.logger.warning( + "Terminating Message Listener: {} on topic: {}. Reason: The listener was too slow or the retention " + "period of the message has been violated. Head: {}, sequence: {}".format(self._id, self._proxy.name, + head_seq, self._sequence)) + return False + + def _handle_operation_timeout_error(self): + self._proxy.logger.info("Message Listener ", self._proxy.id, "on topic: ", self._proxy.name, " timed out. " + + "Continuing from the last known sequence ", self._proxy.sequence) + self._next() + + def _handle_exception(self, exception): + base_msg = "Terminating Message Listener: " + self._id + " on topic: " + self._proxy.name + ". Reason: " + if isinstance(exception, IllegalArgumentError) and self._listener.is_loss_tolerant(): + self._handle_illegal_argument_error() + return + elif isinstance(exception, StaleSequenceError): + if self._handle_stale_sequence_error(): + return + elif isinstance(exception, OperationTimeoutError): + self._handle_operation_timeout_error() + return + elif isinstance(exception, HazelcastInstanceNotActiveError): + self._proxy.logger.info(base_msg + "HazelcastInstance is shutting down.") + elif isinstance(exception, HazelcastClientNotActiveException): + self._proxy.logger.info(base_msg + "HazelcastClient is shutting down.") + elif isinstance(exception, DistributedObjectDestroyedError): + self._proxy.logger.info(base_msg + "ReliableTopic is destroyed.") + else: + self._proxy.logger.warning(base_msg + "Unhandled error, message: " + str(exception)) + + self._cancel_and_remove_listener() + + def _terminate(self, exception): + with self._cancelled_lock: + if self._cancelled: + return True + + base_msg = "Terminating Message Listener: {} on topic: {}. Reason: ".format(self._id, self._proxy.name) + try: + terminate = self._listener.is_terminal() + if terminate: + self._proxy.logger.warning(base_msg + "Unhandled error: {}".format(str(exception))) + return True + + self._proxy.logger.warning("MessageListener {} on topic: {} ran into an error: {}". + format(self._id, self._proxy.name, str(exception))) + return False + + except Exception as e: + self._proxy.logger.warning(base_msg + "Unhandled error while calling ReliableMessageListener.is_terminal() " + "method: {}".format(str(e))) + return True + + def _process(self, msg): + try: + self._listener.on_message(msg) + except BaseException as e: + if self._terminate(e): + self._cancel_and_remove_listener() + + def _on_response(self, res): + try: + for message in res.result(): + with self._cancelled_lock: + if self._cancelled: + return + + msg = TopicMessage( + self._proxy.name, + message.payload, + message.publish_time, + message.publisher_address, + self._to_object + ) + self._listener.store_sequence(self._sequence) + self._process(msg) + self._sequence += 1 + + # Await for new messages + self._next() + except Exception as e: + self._handle_exception(e) + + def _next(self): + def _read_many(): + with self._cancelled_lock: + if self._cancelled: + return + + future = self._proxy.ringbuffer.read_many(self._sequence, 1, self._proxy.config.read_batch_size) + future.continue_with(self._on_response) + + self._proxy.client.reactor.add_timer(0, _read_many) + + def cancel(self): + with self._cancelled_lock: + self._cancelled = True + + def _cancel_and_remove_listener(self): + try: + # _proxy.remove_listener calls listener.cancel function + self._proxy.remove_listener(self._id) + except IllegalArgumentError as e: + # This listener is already removed + self._proxy.logger.debug("Failed to remove listener. Reason: {}".format(str(e))) + + +class ReliableTopic(Proxy): + """ + Hazelcast provides distribution mechanism for publishing messages that are delivered to multiple subscribers, which + is also known as a publish/subscribe (pub/sub) messaging model. Publish and subscriptions are cluster-wide. When a + member subscribes for a topic, it is actually registering for messages published by any member in the cluster, + including the new members joined after you added the listener. + + Messages are ordered, meaning that listeners(subscribers) will process the messages in the order they are actually + published. + + Hazelcast's Reliable Topic uses the same Topic interface as a regular topic. The main difference is that Reliable + Topic is backed up by the Ringbuffer data structure, a replicated but not partitioned data structure that stores + its data in a ring-like structure. + """ + + def __init__(self, client, service_name, name): + super(ReliableTopic, self).__init__(client, service_name, name) + + config = client.config.reliable_topic_configs.get(name, None) + if config is None: + config = ReliableTopicConfig() + + self.client = client + self.config = config + self._topic_overload_policy = self.config.topic_overload_policy + self.ringbuffer = client.get_ringbuffer("_hz_rb_" + name) + self._message_listeners_lock = threading.RLock() + self._message_listeners = {} + + def add_listener(self, reliable_topic_listener): + """ + Subscribes to this reliable topic. When someone publishes a message on this topic, on_message() method of + ReliableTopicListener is called. + + :param ReliableTopicListener: (Class), class to be used when a message is published. + :return: (str), a registration id which is used as a key to remove the listener. + """ + if not isinstance(reliable_topic_listener, ReliableMessageListener): + raise IllegalArgumentError("Message listener is not an instance of ReliableTopicListener") + + registration_id = str(uuid4()) + listener = _MessageListener(registration_id, self, self._to_object, reliable_topic_listener) + with self._message_listeners_lock: + self._message_listeners[registration_id] = listener + + listener.start() + return registration_id + + def _add_with_backoff(self, item): + sleep_time = _INITIAL_BACKOFF + while True: + seq_id = self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() + if seq_id != -1: + return + time.sleep(sleep_time) + sleep_time *= 2 + if sleep_time > _MAX_BACKOFF: + sleep_time = _MAX_BACKOFF + + def _add_or_fail(self, item): + seq_id = self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() + if seq_id == -1: + raise TopicOverflowError("failed to publish message to topic: " + self.name) + def publish(self, message): - raise NotImplementedError + """ + Publishes the message to all subscribers of this topic + + :param message: (object), the message to be published. + """ + # TODO: We need a publisher_address? + item = ReliableTopicMessage( + publish_time=current_time_in_millis(), + publisher_address="", + payload=self._to_data(message) + ) + if self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.BLOCK: + self._add_with_backoff(item) + elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.ERROR: + self._add_or_fail(item) + elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.DISCARD_NEWEST: + self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_FAIL).result() + elif self._topic_overload_policy == TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST: + self.ringbuffer.add(item, overflow_policy=OVERFLOW_POLICY_OVERWRITE).result() def remove_listener(self, registration_id): - raise NotImplementedError + """ + Stops receiving messages for the given message listener. If the given listener already removed, this method does + nothing. + + :param registration_id: (str), registration id of the listener to be removed. + :return: (bool), ``true`` if the listener is removed, ``false`` otherwise. + """ + with self._message_listeners_lock: + if registration_id not in self._message_listeners: + raise IllegalArgumentError("no listener is found with the given id : {}".format(registration_id)) + + listener = self._message_listeners[registration_id] + listener.cancel() + del self._message_listeners[registration_id] + return True + + return False + + def destroy(self): + """ + Destroys underlying Proxy and RingBuffer instances + """ + super(ReliableTopic, self).destroy() + return self.ringbuffer.destroy() diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index c5e37ef890..57fd26fd32 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -157,7 +157,7 @@ def read_many(self, start_sequence, min_count, max_count): """ check_not_negative(start_sequence, "sequence can't be smaller than 0") check_true(max_count >= min_count, "max count should be greater or equal to min count") - check_true(min_count <= self.capacity().result(), "min count should be smaller or equal to capacity") + #check_true(min_count <= self.capacity().result(), "min count should be smaller or equal to capacity") check_true(max_count < MAX_BATCH_SIZE, "max count can't be greater than %d" % MAX_BATCH_SIZE) return self._encode_invoke(ringbuffer_read_many_codec, response_handler=self._read_many_response_handler, diff --git a/hazelcast/serialization/reliable_topic.py b/hazelcast/serialization/reliable_topic.py new file mode 100644 index 0000000000..29ae6f0879 --- /dev/null +++ b/hazelcast/serialization/reliable_topic.py @@ -0,0 +1,27 @@ +from hazelcast.serialization.api import IdentifiedDataSerializable + + +class ReliableTopicMessage(IdentifiedDataSerializable): + FACTORY_ID = -18 + CLASS_ID = 2 + + def __init__(self, publish_time=None, publisher_address=None, payload=None): + self.publish_time = publish_time + self.publisher_address = publisher_address + self.payload = payload + + def read_data(self, object_data_input): + self.publish_time = object_data_input.read_long() + self.publisher_address = object_data_input.read_object() + self.payload = object_data_input.read_data() + + def write_data(self, object_data_output): + object_data_output.write_long(self.publish_time) + object_data_output.write_object(self.publisher_address) + object_data_output.write_data(self.payload) + + def get_factory_id(self): + return self.FACTORY_ID + + def get_class_id(self): + return self.CLASS_ID \ No newline at end of file diff --git a/hazelcast/serialization/service.py b/hazelcast/serialization/service.py index 1ceda5c251..34ba02a605 100644 --- a/hazelcast/serialization/service.py +++ b/hazelcast/serialization/service.py @@ -1,8 +1,8 @@ -from hazelcast.exception import HazelcastSerializationError from hazelcast.serialization.base import BaseSerializationService from hazelcast.serialization.portable.classdef import FieldType from hazelcast.serialization.portable.context import PortableContext from hazelcast.serialization.portable.serializer import PortableSerializer +from hazelcast.serialization.reliable_topic import ReliableTopicMessage from hazelcast.serialization.serializer import * from hazelcast import six from hazelcast.config import ClientProperties @@ -16,6 +16,12 @@ def default_partition_strategy(key): return None +def _init_factories(data_serializable_factories): + factories = {ReliableTopicMessage.FACTORY_ID: {ReliableTopicMessage.CLASS_ID: ReliableTopicMessage}} + factories.update(data_serializable_factories) + return factories + + class SerializationServiceV1(BaseSerializationService): def __init__(self, serialization_config, properties=ClientProperties({}), version=1, global_partition_strategy=default_partition_strategy, @@ -28,8 +34,7 @@ def __init__(self, serialization_config, properties=ClientProperties({}), versio self._registry._portable_serializer = PortableSerializer(self._portable_context, serialization_config.portable_factories) # merge configured factories with built in ones - factories = {} - factories.update(serialization_config.data_serializable_factories) + factories = _init_factories(serialization_config.data_serializable_factories) self._registry._data_serializer = IdentifiedDataSerializer(factories) self._register_constant_serializers() diff --git a/tests/proxy/hazelcast_topic.xml b/tests/proxy/hazelcast_topic.xml new file mode 100644 index 0000000000..bdf5d31f33 --- /dev/null +++ b/tests/proxy/hazelcast_topic.xml @@ -0,0 +1,30 @@ + + + + + 3 + 10 + + + + 120 + 10 + + + + 0 + 10 + + + + 120 + 10 + + + + 120 + 10 + + \ No newline at end of file diff --git a/tests/proxy/reliable_topic_test.py b/tests/proxy/reliable_topic_test.py new file mode 100644 index 0000000000..d1aa44ce49 --- /dev/null +++ b/tests/proxy/reliable_topic_test.py @@ -0,0 +1,295 @@ +import os +import time +from datetime import datetime + +import hazelcast +from hazelcast import ClientConfig +from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY +from hazelcast.exception import IllegalArgumentError, TopicOverflowError +from hazelcast.proxy.reliable_topic import ReliableMessageListener +from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE +from hazelcast.serialization.reliable_topic import ReliableTopicMessage +from hazelcast.util import current_time_in_millis +from tests.base import SingleMemberTestCase +from tests.util import random_string, event_collector + + +class _ReliableTopicTestException(BaseException): + pass + + +class TestReliableMessageListener(ReliableMessageListener): + def __init__(self, collector): + self._collector = collector + + def on_message(self, event): + self._collector(event) + + +class ReliableTopicTest(SingleMemberTestCase): + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open(os.path.join(dir_path, "hazelcast_topic.xml")) as f: + return f.read() + + def setUp(self): + config = ClientConfig() + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + + discard_config = ReliableTopicConfig("discard") + discard_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_NEWEST + config.add_reliable_topic_config(discard_config) + + overwrite_config = ReliableTopicConfig("overwrite") + overwrite_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST + config.add_reliable_topic_config(overwrite_config) + + error_config = ReliableTopicConfig("error") + error_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.ERROR + config.add_reliable_topic_config(error_config) + + stale_config = ReliableTopicConfig("stale") + stale_config.topic_overload_policy = TOPIC_OVERLOAD_POLICY.DISCARD_OLDEST + config.add_reliable_topic_config(stale_config) + + self.client = hazelcast.HazelcastClient(self.configure_client(config)) + self.reliable_topic = self.client.get_reliable_topic(random_string()).blocking() + self.registration_id = None + + def tearDown(self): + if self.registration_id is not None: + self.reliable_topic.remove_listener(self.registration_id) + + def test_add_listener(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + self.reliable_topic.publish('item-value') + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, 'item-value') + self.assertGreater(event.publish_time, 0) + + self.assertTrueEventually(assert_event, 5) + + def test_remove_listener(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + + reg_id = self.reliable_topic.add_listener(reliable_listener) + removed = self.reliable_topic.remove_listener(reg_id) + self.assertTrue(removed, True) + + def test_none_listener(self): + with self.assertRaises(IllegalArgumentError): + self.reliable_topic.add_listener("invalid-listener") + + def test_remove_listener_when_does_not_exist(self): + with self.assertRaises(IllegalArgumentError): + self.reliable_topic.remove_listener("id") + + def test_remove_listener_when_already_removed(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + + reg_id = self.reliable_topic.add_listener(reliable_listener) + self.reliable_topic.remove_listener(reg_id) + + with self.assertRaises(IllegalArgumentError): + self.reliable_topic.remove_listener(reg_id) + + def test_error_on_message_not_terminal(self): + collector = event_collector() + + class TestReliableMessageListenerNotTerminal(ReliableMessageListener): + def __init__(self, _collector): + self._collector = _collector + + def on_message(self, event): + if event.message == "raise-exception": + raise _ReliableTopicTestException("test-exception") + + self._collector(event) + + def is_terminal(self): + # TODO: Gereksiz? + return False + + reliable_listener = TestReliableMessageListenerNotTerminal(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('raise-exception') + self.reliable_topic.publish('work-normally') + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, 'work-normally') + self.assertGreater(event.publish_time, 0) + + self.assertTrueEventually(assert_event, 5) + + def test_error_on_message_terminal(self): + collector = event_collector() + + class TestReliableMessageListenerTerminal(ReliableMessageListener): + def __init__(self, _collector): + self._collector = _collector + + def on_message(self, event): + if event.message == "raise-exception": + raise _ReliableTopicTestException("test-exception") + + self._collector(event) + + def is_terminal(self): + return True + + reliable_listener = TestReliableMessageListenerTerminal(collector) + # This listener will be removed by the ReliableTopic implementation + self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('raise-exception') + self.reliable_topic.publish('work-normally') + time.sleep(0.5) + self.assertEqual(len(collector.events), 0) + + def test_error_on_message_terminal_exception(self): + collector = event_collector() + + class TestReliableMessageListenerTerminal(ReliableMessageListener): + def __init__(self, _collector): + self._collector = _collector + + def on_message(self, event): + if event.message == "raise-exception": + raise _ReliableTopicTestException("test-exception in on_message") + + self._collector(event) + + def is_terminal(self): + raise _ReliableTopicTestException("is_terminal failed") + + reliable_listener = TestReliableMessageListenerTerminal(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('raise-exception') + self.reliable_topic.publish('work-normally') + time.sleep(0.5) + self.assertEqual(len(collector.events), 0) + + def test_publish_many(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + for i in range(10): + self.reliable_topic.publish('message ' + str(i)) + + def assert_event(): + self.assertEqual(len(collector.events), 10) + + self.assertTrueEventually(assert_event, 10) + + def test_message_field_set_correctly(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + before_publish_time = current_time_in_millis() + time.sleep(0.1) + self.reliable_topic.publish('item-value') + time.sleep(0.1) + after_publish_time = current_time_in_millis() + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, 'item-value') + self.assertGreater(event.publish_time, before_publish_time) + self.assertLess(event.publish_time, after_publish_time) + + self.assertTrueEventually(assert_event, 5) + + def test_always_start_after_tail(self): + collector = event_collector() + reliable_listener = TestReliableMessageListener(collector) + self.reliable_topic.publish('1') + self.reliable_topic.publish('2') + self.reliable_topic.publish('3') + + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.reliable_topic.publish('4') + self.reliable_topic.publish('5') + self.reliable_topic.publish('6') + + def assert_event(): + self.assertEqual(len(collector.events), 3) + self.assertEqual(collector.events[0].message, "4") + self.assertEqual(collector.events[1].message, "5") + self.assertEqual(collector.events[2].message, "6") + + self.assertTrueEventually(assert_event, 5) + + def generate_items(self, n): + messages = [] + for i in range(n): + msg = ReliableTopicMessage( + publish_time=current_time_in_millis(), + publisher_address="", + payload=self.client.serialization_service.to_data(i+1) + ) + messages.append(msg) + + return messages + + def test_discard(self): + reliable_topic = self.client.get_reliable_topic("discard").blocking() + items = self.generate_items(10) + reliable_topic.ringbuffer.add_all(items, OVERFLOW_POLICY_FAIL) + + reliable_topic.publish(11) + seq = reliable_topic.ringbuffer.tail_sequence().result() + item = reliable_topic.ringbuffer.read_one(seq).result() + num = self.client.serialization_service.to_object(item.payload) + self.assertEqual(num, 10) + + def test_overwrite(self): + reliable_topic = self.client.get_reliable_topic("overwrite").blocking() + for i in range(10): + reliable_topic.publish(i+1) + + reliable_topic.publish(11) + seq = reliable_topic.ringbuffer.tail_sequence().result() + item = reliable_topic.ringbuffer.read_one(seq).result() + num = self.client.serialization_service.to_object(item.payload) + self.assertEqual(num, 11) + + def test_error(self): + reliable_topic = self.client.get_reliable_topic("error").blocking() + for i in range(10): + reliable_topic.publish(i+1) + + with self.assertRaises(TopicOverflowError): + reliable_topic.publish(11) + + def test_blocking(self): + reliable_topic = self.client.get_reliable_topic("blocking").blocking() + for i in range(10): + reliable_topic.publish(i+1) + + before = datetime.utcnow() + reliable_topic.publish(11) + time_diff = datetime.utcnow() - before + + seq = reliable_topic.ringbuffer.tail_sequence().result() + item = reliable_topic.ringbuffer.read_one(seq).result() + num = self.client.serialization_service.to_object(item.payload) + self.assertEqual(num, 11) + if time_diff.seconds <= 2: + self.fail("expected at least 2 seconds delay got %s" % time_diff.seconds) + From 656c6a34da09e250c47c89e321e92ec4e5c805cc Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Tue, 26 May 2020 22:34:41 +0300 Subject: [PATCH 2/7] Client does not have _listener, but instead listener as the reference to the listener service --- hazelcast/proxy/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hazelcast/proxy/__init__.py b/hazelcast/proxy/__init__.py index 8e01c039af..b7c122e198 100644 --- a/hazelcast/proxy/__init__.py +++ b/hazelcast/proxy/__init__.py @@ -117,11 +117,11 @@ def decode_add_listener(response): def encode_remove_listener(registration_id): return client_remove_distributed_object_listener_codec.encode_request(registration_id) - return self._client._listener.register_listener(request, decode_add_listener, + return self._client.listener.register_listener(request, decode_add_listener, encode_remove_listener, event_handler) def remove_distributed_object_listener(self, registration_id): - return self._client._listener.deregister_listener(registration_id) + return self._client.listener.deregister_listener(registration_id) def _find_next_proxy_address(self): # TODO: filter out lite members From af08775c476aa44e1d12df4b8c5402da67ceb68a Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Tue, 26 May 2020 22:39:40 +0300 Subject: [PATCH 3/7] Remove leftovers and useless comments --- hazelcast/proxy/reliable_topic.py | 3 --- tests/proxy/reliable_topic_test.py | 1 - 2 files changed, 4 deletions(-) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index f1dbf87f16..c6a759d5c4 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -9,7 +9,6 @@ from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE from hazelcast.serialization.reliable_topic import ReliableTopicMessage from hazelcast.util import current_time_in_millis -from hazelcast.six.moves import queue _INITIAL_BACKOFF = 0.1 _MAX_BACKOFF = 2 @@ -84,7 +83,6 @@ def __init__(self, uuid, proxy, to_object, listener): self._cancelled_lock = threading.Lock() self._cancelled = False self._sequence = 0 - self._q = queue.Queue() def start(self): tail_seq = self._proxy.ringbuffer.tail_sequence() @@ -289,7 +287,6 @@ def publish(self, message): :param message: (object), the message to be published. """ - # TODO: We need a publisher_address? item = ReliableTopicMessage( publish_time=current_time_in_millis(), publisher_address="", diff --git a/tests/proxy/reliable_topic_test.py b/tests/proxy/reliable_topic_test.py index d1aa44ce49..25ed13a15e 100644 --- a/tests/proxy/reliable_topic_test.py +++ b/tests/proxy/reliable_topic_test.py @@ -116,7 +116,6 @@ def on_message(self, event): self._collector(event) def is_terminal(self): - # TODO: Gereksiz? return False reliable_listener = TestReliableMessageListenerNotTerminal(collector) From a185ec63df96ed36d22ade61db4fdd7723430b40 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Tue, 26 May 2020 22:40:14 +0300 Subject: [PATCH 4/7] Correct documentation: ReliableTopicListener -> reliable_topic_listener --- hazelcast/proxy/reliable_topic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index c6a759d5c4..f227de8bbe 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -251,7 +251,7 @@ def add_listener(self, reliable_topic_listener): Subscribes to this reliable topic. When someone publishes a message on this topic, on_message() method of ReliableTopicListener is called. - :param ReliableTopicListener: (Class), class to be used when a message is published. + :param reliable_topic_listener: (Class), class to be used when a message is published. :return: (str), a registration id which is used as a key to remove the listener. """ if not isinstance(reliable_topic_listener, ReliableMessageListener): From e2d4f1c69e13c7f3358131aa8de92e8452d67f52 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Sun, 31 May 2020 15:42:01 +0300 Subject: [PATCH 5/7] Prevent deadlock in error handler --- hazelcast/proxy/reliable_topic.py | 49 ++++++++++++++++++------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index f227de8bbe..5d3c79e9f3 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -93,27 +93,36 @@ def start(self): self._proxy.client.reactor.add_timer(0, self._next) def _handle_illegal_argument_error(self): - head_seq = self._proxy.ringbuffer.head_sequence().result() - self._proxy.logger.warning("MessageListener {} on topic {} requested a too large sequence. Jumping from old " - "sequence: {} to sequence: {}".format(self._id, self._proxy.name, self._sequence, - head_seq)) - self._sequence = head_seq - self._next() - - def _handle_stale_sequence_error(self): - head_seq = self._proxy.ringbuffer.head_sequence().result() - if self._listener.is_loss_tolerant: + def on_response(res): + head_seq = res.result() + self._proxy.logger.warning("MessageListener {} on topic {} requested a too large sequence. Jumping from " + "old sequence: {} to sequence: {}".format(self._id, self._proxy.name, + self._sequence, + head_seq)) self._sequence = head_seq - self._proxy.logger.warning("Topic {} ran into a stale sequence. Jumping from old sequence {} to new " - "sequence {}".format(self._proxy.name, self._sequence, head_seq)) self._next() - return True - self._proxy.logger.warning( - "Terminating Message Listener: {} on topic: {}. Reason: The listener was too slow or the retention " - "period of the message has been violated. Head: {}, sequence: {}".format(self._id, self._proxy.name, - head_seq, self._sequence)) - return False + future = self._proxy.ringbuffer.head_sequence() + future.add_done_callback(on_response) + + def _handle_stale_sequence_error(self): + def on_response(res): + head_seq = res.result() + if self._listener.is_loss_tolerant: + self._proxy.logger.warning("Topic {} ran into a stale sequence. Jumping from old sequence {} to new " + "sequence {}".format(self._proxy.name, self._sequence, head_seq)) + self._sequence = head_seq + self._next() + + else: + self._proxy.logger.warning( + "Terminating Message Listener: {} on topic: {}. Reason: The listener was too slow or the retention " + "period of the message has been violated. Head: {}, sequence: {}".format(self._id, self._proxy.name, + head_seq, self._sequence)) + self._cancel_and_remove_listener() + + future = self._proxy.ringbuffer.head_sequence() + future.add_done_callback(on_response) def _handle_operation_timeout_error(self): self._proxy.logger.info("Message Listener ", self._proxy.id, "on topic: ", self._proxy.name, " timed out. " + @@ -126,8 +135,8 @@ def _handle_exception(self, exception): self._handle_illegal_argument_error() return elif isinstance(exception, StaleSequenceError): - if self._handle_stale_sequence_error(): - return + self._handle_stale_sequence_error() + return elif isinstance(exception, OperationTimeoutError): self._handle_operation_timeout_error() return From 2df48a690769f6813ffbac98bd2e635ae29011f4 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Sun, 31 May 2020 15:43:11 +0300 Subject: [PATCH 6/7] Add new tests: test_client_not_active_error, test_distributed_object_destroyed_error, test_distributed_object_destroyed, test_stale --- tests/proxy/reliable_topic_test.py | 85 +++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/tests/proxy/reliable_topic_test.py b/tests/proxy/reliable_topic_test.py index 25ed13a15e..d6a92a6707 100644 --- a/tests/proxy/reliable_topic_test.py +++ b/tests/proxy/reliable_topic_test.py @@ -4,7 +4,7 @@ import hazelcast from hazelcast import ClientConfig -from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY +from hazelcast.config import ReliableTopicConfig, TOPIC_OVERLOAD_POLICY, ClientProperties from hazelcast.exception import IllegalArgumentError, TopicOverflowError from hazelcast.proxy.reliable_topic import ReliableMessageListener from hazelcast.proxy.ringbuffer import OVERFLOW_POLICY_FAIL, OVERFLOW_POLICY_OVERWRITE @@ -26,6 +26,17 @@ def on_message(self, event): self._collector(event) +class TestReliableMessageListenerLossTolerant(ReliableMessageListener): + def __init__(self, collector): + self._collector = collector + + def on_message(self, event): + self._collector(event) + + def is_loss_tolerant(self): + return True + + class ReliableTopicTest(SingleMemberTestCase): @classmethod def configure_cluster(cls): @@ -292,3 +303,75 @@ def test_blocking(self): if time_diff.seconds <= 2: self.fail("expected at least 2 seconds delay got %s" % time_diff.seconds) + def test_stale(self): + collector = event_collector() + self.reliable_topic = self.client.get_reliable_topic("stale").blocking() + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + items = self.generate_items(20) + self.reliable_topic.ringbuffer.add_all(items, overflow_policy=OVERFLOW_POLICY_OVERWRITE) + + def assert_event(): + self.assertEqual(len(collector.events), 10) + event = collector.events[9] + self.assertEqual(event.message, 20) + + self.assertTrueEventually(assert_event, 5) + + def test_distributed_object_destroyed(self): + config = ClientConfig() + config.network_config.connection_attempt_limit = 10 + config.set_property(ClientProperties.INVOCATION_TIMEOUT_SECONDS.name, 10) + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + + + client_two = hazelcast.HazelcastClient(self.configure_client(config)) + # TODO: shutdown + + collector = event_collector() + self.reliable_topic = client_two.get_reliable_topic("x") + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + + self.rc.shutdownCluster(self.cluster.id) + self.cluster = self.create_cluster(self.rc, self.configure_cluster()) + self.cluster.start_member() + + self.reliable_topic.publish("aa") + + def assert_event(): + self.assertEqual(len(collector.events), 1) + event = collector.events[0] + self.assertEqual(event.message, "aa") + + self.assertTrueEventually(assert_event, 5) + + def test_distributed_object_destroyed_error(self): + collector = event_collector() + self.reliable_topic = self.client.get_reliable_topic("differentReliableTopic") + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + self.reliable_topic.destroy() + + def assert_event(): + self.assertEqual(len(collector.events), 0) + + self.assertTrueEventually(assert_event, 5) + + def test_client_not_active_error(self): + config = ClientConfig() + config.set_property("hazelcast.serialization.input.returns.bytearray", True) + client_two = hazelcast.HazelcastClient(self.configure_client(config)) + + collector = event_collector() + self.reliable_topic = self.client.get_reliable_topic("differentReliableTopic") + reliable_listener = TestReliableMessageListenerLossTolerant(collector) + self.registration_id = self.reliable_topic.add_listener(reliable_listener) + client_two.shutdown() + + def assert_event(): + self.assertEqual(len(collector.events), 0) + + self.assertTrueEventually(assert_event, 5) + From d85da998880b07ee784bb152361c85a344c26eb2 Mon Sep 17 00:00:00 2001 From: Burak Sezer Date: Wed, 19 Aug 2020 10:46:48 +0300 Subject: [PATCH 7/7] Use add_done_callback for validation --- hazelcast/proxy/ringbuffer.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hazelcast/proxy/ringbuffer.py b/hazelcast/proxy/ringbuffer.py index 57fd26fd32..36b46199a9 100644 --- a/hazelcast/proxy/ringbuffer.py +++ b/hazelcast/proxy/ringbuffer.py @@ -157,7 +157,12 @@ def read_many(self, start_sequence, min_count, max_count): """ check_not_negative(start_sequence, "sequence can't be smaller than 0") check_true(max_count >= min_count, "max count should be greater or equal to min count") - #check_true(min_count <= self.capacity().result(), "min count should be smaller or equal to capacity") + + def _capacity(res): + check_true(max_count <= res.result(), "max count should be smaller or equal to capacity") + + future = self.capacity() + future.add_done_callback(_capacity) check_true(max_count < MAX_BATCH_SIZE, "max count can't be greater than %d" % MAX_BATCH_SIZE) return self._encode_invoke(ringbuffer_read_many_codec, response_handler=self._read_many_response_handler,