Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
29 changes: 29 additions & 0 deletions examples/reliable-topic/reliable_topic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import time

import hazelcast
from hazelcast import ClientConfig
from hazelcast.proxy.reliable_topic import ReliableMessageListener


class MyReliableMessageListener(ReliableMessageListener):
def on_message(self, event):
print("Got message: {}".format(event.message))
print("Publish time: {}\n".format(event.publish_time))


if __name__ == "__main__":
config = ClientConfig()
config.set_property("hazelcast.serialization.input.returns.bytearray", True)
client = hazelcast.HazelcastClient(config)
listener = MyReliableMessageListener()

reliable_topic = client.get_reliable_topic("reliable-topic")
registration_id = reliable_topic.add_listener(listener)

for i in range(10):
reliable_topic.publish("Message " + str(i))
time.sleep(0.1)

reliable_topic.destroy()
reliable_topic.remove_listener(registration_id)
client.shutdown()
50 changes: 50 additions & 0 deletions hazelcast/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@

MAXIMUM_PREFETCH_COUNT = 100000

TOPIC_OVERLOAD_POLICY = enum(
DISCARD_OLDEST=0,
DISCARD_NEWEST=1,
BLOCK=2,
ERROR=3
)


class ClientConfig(object):
"""
Expand Down Expand Up @@ -107,6 +114,9 @@ def __init__(self):
self.flake_id_generator_configs = {}
"""Flake ID generator configuration which maps "config-name" : FlakeIdGeneratorConfig """

self.reliable_topic_configs = {}
"""ReliableTopic configuration"""

self.serialization_config = SerializationConfig()
"""Hazelcast serialization configuration"""

Expand Down Expand Up @@ -162,6 +172,10 @@ def add_flake_id_generator_config(self, flake_id_generator_config):
self.flake_id_generator_configs[flake_id_generator_config.name] = flake_id_generator_config
return self

def add_reliable_topic_config(self, reliable_topic_config):
self.reliable_topic_configs[reliable_topic_config.name] = reliable_topic_config
return self

def get_property_or_default(self, key, default):
"""
Client property accessor with fallback to default value.
Expand Down Expand Up @@ -538,6 +552,42 @@ def __init__(self):
"""


class ReliableTopicConfig(object):
"""
ReliableTopicConfig contains the configuration for the client regarding
:class:`~hazelcast.proxy.reliable_topic.ReliableTopic`
"""

def __init__(self, name="default"):
self._name = name
self._read_batch_size = 10
self._topic_overload_policy = TOPIC_OVERLOAD_POLICY.BLOCK

@property
def name(self):
return self._name

@name.setter
def name(self, name):
self._name = name

@property
def read_batch_size(self):
return self._read_batch_size

@read_batch_size.setter
def read_batch_size(self, read_batch_size):
self._read_batch_size = read_batch_size

@property
def topic_overload_policy(self):
return self._topic_overload_policy

@topic_overload_policy.setter
def topic_overload_policy(self, topic_overload_policy):
self._topic_overload_policy = topic_overload_policy


class FlakeIdGeneratorConfig(object):
"""
FlakeIdGeneratorConfig contains the configuration for the client regarding
Expand Down
4 changes: 4 additions & 0 deletions hazelcast/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ class TopicOverloadError(HazelcastError):
pass


class TopicOverflowError(HazelcastError):
pass


class TopologyChangedError(HazelcastError):
pass

Expand Down
2 changes: 1 addition & 1 deletion hazelcast/proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def encode_remove_listener(registration_id):
return client_remove_distributed_object_listener_codec.encode_request(registration_id)

return self._client.listener.register_listener(request, decode_add_listener,
encode_remove_listener, event_handler)
encode_remove_listener, event_handler)

def remove_distributed_object_listener(self, registration_id):
return self._client.listener.deregister_listener(registration_id)
Expand Down
Loading