From 6d808bcbbeff69edea8af0798034a40e4727419c Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 26 Apr 2021 16:29:54 +0300 Subject: [PATCH 1/5] Add documentation and code samples for the ReliableTopic Apart from that, also corrected a few docstring issues. --- ...sing_python_client_with_hazelcast_imdg.rst | 49 ++++++++++++++++ examples/org-website/reliable_topic_sample.py | 16 +++++ .../reliable-topic/reliable_topic_example.py | 58 +++++++++++++++++++ hazelcast/proxy/base.py | 8 +++ hazelcast/proxy/reliable_topic.py | 2 +- 5 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 examples/org-website/reliable_topic_sample.py create mode 100644 examples/reliable-topic/reliable_topic_example.py diff --git a/docs/using_python_client_with_hazelcast_imdg.rst b/docs/using_python_client_with_hazelcast_imdg.rst index 7b8ab05bd8..1167112c99 100644 --- a/docs/using_python_client_with_hazelcast_imdg.rst +++ b/docs/using_python_client_with_hazelcast_imdg.rst @@ -356,6 +356,55 @@ A Ringbuffer usage example is shown below. sequence += 1 print(ringbuffer.read_one(sequence)) # Outputs '200' +Using ReliableTopic +~~~~~~~~~~~~~~~~~~~ + +Hazelcast ReliableTopic is a distributed topic implementation backed up by the Ringbuffer +data structure. For details, see the +`Reliable Topic section `__ +in the Hazelcast IMDG Reference Manual. + +A Reliable Topic usage example is shown below. + +.. code:: python + + # Get a Topic called "my-distributed-topic" + topic = client.get_reliable_topic("my-distributed-topic").blocking() + + # Add a Listener to the Topic + topic.add_listener(lambda message: print(message)) + + # Publish a message to the Topic + topic.publish("Hello to distributed world") + +Configuring Reliable Topic +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You may configure Reliable Topics using the ``reliable_topics`` +argument: + +.. code:: python + + client = hazelcast.HazelcastClient( + reliable_topics={ + "my-topic": { + "overload_policy": TopicOverloadPolicy.DISCARD_OLDEST, + "read_batch_size": 20, + } + } + ) + +The following are the descriptions of configuration elements and +attributes: + +- keys of the dictionary: Name of the Reliable Topic. +- ``overload_policy``: Policy to handle an overloaded topic. By default, + set to ``BLOCK``. +- ``read_batch_size``: Number of messages the reliable topic will try to + read in batch. It will get at least one, but if there are more + available, then it will try to get more to increase throughput. + By default, set to ``10``. + Using Topic ~~~~~~~~~~~ diff --git a/examples/org-website/reliable_topic_sample.py b/examples/org-website/reliable_topic_sample.py new file mode 100644 index 0000000000..0e10d8a5a0 --- /dev/null +++ b/examples/org-website/reliable_topic_sample.py @@ -0,0 +1,16 @@ +import hazelcast + +# Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1 +client = hazelcast.HazelcastClient() + +# Get a Topic called "my-distributed-topic" +topic = client.get_reliable_topic("my-distributed-topic").blocking() + +# Add a Listener to the Topic +topic.add_listener(lambda message: print(message)) + +# Publish a message to the Topic +topic.publish("Hello to distributed world") + +# Shutdown this Hazelcast Client +client.shutdown() diff --git a/examples/reliable-topic/reliable_topic_example.py b/examples/reliable-topic/reliable_topic_example.py new file mode 100644 index 0000000000..f1d0207e15 --- /dev/null +++ b/examples/reliable-topic/reliable_topic_example.py @@ -0,0 +1,58 @@ +import hazelcast +from hazelcast.config import TopicOverloadPolicy +from hazelcast.proxy.reliable_topic import ReliableMessageListener + +# Customize the reliable topic +client = hazelcast.HazelcastClient( + reliable_topics={ + "my-topic": { + "overload_policy": TopicOverloadPolicy.DISCARD_OLDEST, + "read_batch_size": 20, + } + } +) + +topic = client.get_reliable_topic("my-topic").blocking() + +# Add a listener with a callable +reg_id = topic.add_listener(lambda m: print("First listener:", m)) + + +# Or, customize the behaviour of the listener +# via ReliableMessageListener +class MyListener(ReliableMessageListener): + def on_message(self, message): + print("Second listener:", message) + + def retrieve_initial_sequence(self): + return 0 + + def store_sequence(self, sequence): + pass + + def is_loss_tolerant(self): + return True + + def is_terminal(self, error): + return False + + +# Add a custom ReliableMessageListener +topic.add_listener(MyListener()) + + +for i in range(100): + # Publish messages one-by-one + topic.publish(i) + + +messages = range(100, 200) + +# Publish message in batch +topic.publish_all(messages) + +# Remove listener so that it won't receive +# messages anymore +topic.remove_listener(reg_id) + +client.shutdown() diff --git a/hazelcast/proxy/base.py b/hazelcast/proxy/base.py index dcc4768dac..cd430f98dc 100644 --- a/hazelcast/proxy/base.py +++ b/hazelcast/proxy/base.py @@ -306,6 +306,14 @@ def message(self): self._message = self._to_object(self._message_data) return self._message + def __repr__(self): + return "TopicMessage(message=%s, publish_time=%s, topic_name=%s, publishing_member=%s)" % ( + self.message, + self.publish_time, + self.name, + self.member, + ) + def get_entry_listener_flags(**kwargs): flags = 0 diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index e9a26072ee..a925a69c6b 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -108,7 +108,7 @@ def retrieve_initial_sequence(self): raise NotImplementedError("retrieve_initial_sequence") def store_sequence(self, sequence): - """ " + """ Informs the ReliableMessageListener that it should store the sequence. This method is called before the message is processed. Can be used to make a durable subscription. From ab88ecf2021746efce8c7b835bde110b7adf1fca Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 27 Apr 2021 13:09:17 +0300 Subject: [PATCH 2/5] small doc corrections --- hazelcast/client.py | 2 +- hazelcast/config.py | 2 +- hazelcast/proxy/reliable_topic.py | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/hazelcast/client.py b/hazelcast/client.py index 3fa3399589..32c47eca8e 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -270,7 +270,7 @@ class SomeClassSerializer(StreamSerializer): reliable_topics (dict[str, dict[str, any]]): Dictionary of reliable topic names and the corresponding reliable topic configurations as - a dictionary. The reliable topic configurations contains the + a dictionary. The reliable topic configurations contain the following options. When an option is missing from the configuration, it will be set to its default value. diff --git a/hazelcast/config.py b/hazelcast/config.py index 5d28b44777..e97b7c7198 100644 --- a/hazelcast/config.py +++ b/hazelcast/config.py @@ -205,7 +205,7 @@ class ReconnectMode(object): class TopicOverloadPolicy(object): - """A policy to deal with an overloaded topic; so topic where there is no + """A policy to deal with an overloaded topic; a topic where there is no place to store new messages. The reliable topic uses a :class:`hazelcast.proxy.ringbuffer.Ringbuffer` to diff --git a/hazelcast/proxy/reliable_topic.py b/hazelcast/proxy/reliable_topic.py index a925a69c6b..53d4beb4e7 100644 --- a/hazelcast/proxy/reliable_topic.py +++ b/hazelcast/proxy/reliable_topic.py @@ -83,10 +83,9 @@ def on_message(self, message): One should not block in this callback. If blocking is necessary, consider delegating that task to an executor or a thread pool. - Args: message (hazelcast.proxy.base.TopicMessage): The message that - is received for the topic + is received for the topic """ raise NotImplementedError("on_message") From ecf062493cfb1e324efce93fcdebb05fdb5c376b Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 27 Apr 2021 13:14:34 +0300 Subject: [PATCH 3/5] add reliable topic to feature list --- docs/features.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/features.rst b/docs/features.rst index 8ca7847cc6..7c4a637094 100644 --- a/docs/features.rst +++ b/docs/features.rst @@ -11,6 +11,7 @@ features: - MultiMap - Replicated Map - Ringbuffer +- ReliableTopic - Topic - CRDT PN Counter - Flake Id Generator From b2edb9a3d4015f1850100bfe86abbea1019baba1 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 27 Apr 2021 13:55:31 +0300 Subject: [PATCH 4/5] add listener after publish --- .../backward_compatible/proxy/reliable_topic_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/backward_compatible/proxy/reliable_topic_test.py b/tests/integration/backward_compatible/proxy/reliable_topic_test.py index 110af3502f..65b415e865 100644 --- a/tests/integration/backward_compatible/proxy/reliable_topic_test.py +++ b/tests/integration/backward_compatible/proxy/reliable_topic_test.py @@ -117,11 +117,11 @@ def is_loss_tolerant(self): def is_terminal(self, error): return False + topic.publish_all(range(10)) + registration_id = topic.add_listener(Listener()) self.assertIsNotNone(registration_id) - topic.publish_all(range(10)) - self.assertTrueEventually(lambda: self.assertEqual(list(range(5, 10)), messages)) def test_add_listener_with_store_sequence(self): From b67dc473750616b32451e753c353c647713c3c05 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 27 Apr 2021 14:17:21 +0300 Subject: [PATCH 5/5] fix the ssl test in this test, we were using non-blocking map and trying to put elements to it, in non-blocking manner. therefore, the size() call might not see all the elements we put into the map. fixed it to use blocking map instead. --- tests/integration/backward_compatible/ssl_tests/ssl_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/backward_compatible/ssl_tests/ssl_test.py b/tests/integration/backward_compatible/ssl_tests/ssl_test.py index 56f7d58586..4dbc4b2b09 100644 --- a/tests/integration/backward_compatible/ssl_tests/ssl_test.py +++ b/tests/integration/backward_compatible/ssl_tests/ssl_test.py @@ -65,9 +65,9 @@ def test_ssl_enabled_map_size(self): cluster.id, True, get_abs_path(self.current_directory, "server1-cert.pem") ) ) - test_map = client.get_map("test_map") + test_map = client.get_map("test_map").blocking() fill_map(test_map, 10) - self.assertEqual(test_map.size().result(), 10) + self.assertEqual(test_map.size(), 10) client.shutdown() def test_ssl_enabled_with_custom_ciphers(self):