Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ features:
- MultiMap
- Replicated Map
- Ringbuffer
- ReliableTopic
- Topic
- CRDT PN Counter
- Flake Id Generator
Expand Down
49 changes: 49 additions & 0 deletions docs/using_python_client_with_hazelcast_imdg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,55 @@ A Ringbuffer usage example is shown below.
sequence += 1
print(ringbuffer.read_one(sequence)) # Outputs '200'

Using ReliableTopic
~~~~~~~~~~~~~~~~~~~

Hazelcast ReliableTopic is a distributed topic implementation backed up by the Ringbuffer
data structure. For details, see the
`Reliable Topic section <https://docs.hazelcast.com/imdg/latest/data-structures/reliable-topic.html>`__
in the Hazelcast IMDG Reference Manual.

A Reliable Topic usage example is shown below.

.. code:: python

# Get a Topic called "my-distributed-topic"
topic = client.get_reliable_topic("my-distributed-topic").blocking()

# Add a Listener to the Topic
topic.add_listener(lambda message: print(message))

# Publish a message to the Topic
topic.publish("Hello to distributed world")

Configuring Reliable Topic
^^^^^^^^^^^^^^^^^^^^^^^^^^

You may configure Reliable Topics using the ``reliable_topics``
argument:

.. code:: python

client = hazelcast.HazelcastClient(
reliable_topics={
"my-topic": {
"overload_policy": TopicOverloadPolicy.DISCARD_OLDEST,
"read_batch_size": 20,
}
}
)

The following are the descriptions of configuration elements and
attributes:

- keys of the dictionary: Name of the Reliable Topic.
- ``overload_policy``: Policy to handle an overloaded topic. By default,
set to ``BLOCK``.
- ``read_batch_size``: Number of messages the reliable topic will try to
read in batch. It will get at least one, but if there are more
available, then it will try to get more to increase throughput.
By default, set to ``10``.

Using Topic
~~~~~~~~~~~

Expand Down
16 changes: 16 additions & 0 deletions examples/org-website/reliable_topic_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import hazelcast

# Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1
client = hazelcast.HazelcastClient()

# Get a Topic called "my-distributed-topic"
topic = client.get_reliable_topic("my-distributed-topic").blocking()

# Add a Listener to the Topic
topic.add_listener(lambda message: print(message))

# Publish a message to the Topic
topic.publish("Hello to distributed world")

# Shutdown this Hazelcast Client
client.shutdown()
58 changes: 58 additions & 0 deletions examples/reliable-topic/reliable_topic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import hazelcast
from hazelcast.config import TopicOverloadPolicy
from hazelcast.proxy.reliable_topic import ReliableMessageListener

# Customize the reliable topic
client = hazelcast.HazelcastClient(
reliable_topics={
"my-topic": {
"overload_policy": TopicOverloadPolicy.DISCARD_OLDEST,
"read_batch_size": 20,
}
}
)

topic = client.get_reliable_topic("my-topic").blocking()

# Add a listener with a callable
reg_id = topic.add_listener(lambda m: print("First listener:", m))


# Or, customize the behaviour of the listener
# via ReliableMessageListener
class MyListener(ReliableMessageListener):
def on_message(self, message):
print("Second listener:", message)

def retrieve_initial_sequence(self):
return 0

def store_sequence(self, sequence):
pass

def is_loss_tolerant(self):
return True

def is_terminal(self, error):
return False


# Add a custom ReliableMessageListener
topic.add_listener(MyListener())


for i in range(100):
# Publish messages one-by-one
topic.publish(i)


messages = range(100, 200)

# Publish message in batch
topic.publish_all(messages)

# Remove listener so that it won't receive
# messages anymore
topic.remove_listener(reg_id)

client.shutdown()
2 changes: 1 addition & 1 deletion hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class SomeClassSerializer(StreamSerializer):

reliable_topics (dict[str, dict[str, any]]): Dictionary of reliable
topic names and the corresponding reliable topic configurations as
a dictionary. The reliable topic configurations contains the
a dictionary. The reliable topic configurations contain the
following options. When an option is missing from the
configuration, it will be set to its default value.

Expand Down
2 changes: 1 addition & 1 deletion hazelcast/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class ReconnectMode(object):


class TopicOverloadPolicy(object):
"""A policy to deal with an overloaded topic; so topic where there is no
"""A policy to deal with an overloaded topic; a topic where there is no
place to store new messages.

The reliable topic uses a :class:`hazelcast.proxy.ringbuffer.Ringbuffer` to
Expand Down
8 changes: 8 additions & 0 deletions hazelcast/proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ def message(self):
self._message = self._to_object(self._message_data)
return self._message

def __repr__(self):
return "TopicMessage(message=%s, publish_time=%s, topic_name=%s, publishing_member=%s)" % (
self.message,
self.publish_time,
self.name,
self.member,
)


def get_entry_listener_flags(**kwargs):
flags = 0
Expand Down
5 changes: 2 additions & 3 deletions hazelcast/proxy/reliable_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ def on_message(self, message):
One should not block in this callback. If blocking is necessary,
consider delegating that task to an executor or a thread pool.


Args:
message (hazelcast.proxy.base.TopicMessage): The message that
is received for the topic
is received for the topic
"""
raise NotImplementedError("on_message")

Expand All @@ -108,7 +107,7 @@ def retrieve_initial_sequence(self):
raise NotImplementedError("retrieve_initial_sequence")

def store_sequence(self, sequence):
""" "
"""
Informs the ReliableMessageListener that it should store the sequence.
This method is called before the message is processed. Can be used to
make a durable subscription.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ def is_loss_tolerant(self):
def is_terminal(self, error):
return False

topic.publish_all(range(10))

registration_id = topic.add_listener(Listener())
self.assertIsNotNone(registration_id)

topic.publish_all(range(10))

self.assertTrueEventually(lambda: self.assertEqual(list(range(5, 10)), messages))

def test_add_listener_with_store_sequence(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/backward_compatible/ssl_tests/ssl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ def test_ssl_enabled_map_size(self):
cluster.id, True, get_abs_path(self.current_directory, "server1-cert.pem")
)
)
test_map = client.get_map("test_map")
test_map = client.get_map("test_map").blocking()
fill_map(test_map, 10)
self.assertEqual(test_map.size().result(), 10)
self.assertEqual(test_map.size(), 10)
client.shutdown()

def test_ssl_enabled_with_custom_ciphers(self):
Expand Down