Skip to content
1 change: 1 addition & 0 deletions docs/api/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ Config
.. autoclass:: UniqueKeyTransformation
.. autoclass:: IndexType
.. autoclass:: ReconnectMode
.. autoclass:: TopicOverloadPolicy
1 change: 1 addition & 0 deletions docs/api/proxy/modules.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Hazelcast Proxies
multi_map
queue
pn_counter
reliable_topic
replicated_map
ringbuffer
set
Expand Down
4 changes: 4 additions & 0 deletions docs/api/proxy/reliable_topic.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ReliableTopic
=============

.. automodule:: hazelcast.proxy.reliable_topic
18 changes: 18 additions & 0 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,20 @@ class SomeClassSerializer(StreamSerializer):
it will be much out of order. If you don't care about ordering, set this
value to ``0`` for unlimited ID validity.

reliable_topics (dict[str, dict[str, any]]): Dictionary of reliable
topic names and the corresponding reliable topic configurations as
a dictionary. The reliable topic configurations contains the
following options. When an option is missing from the
configuration, it will be set to its default value.

- **overload_policy** (int|str): Policy to handle an overloaded
topic. By default, set to ``BLOCK``. See the
:class:`hazelcast.config.TopicOverloadPolicy` for possible values.
- **read_batch_size** (int): Number of messages the reliable topic
will try to read in batch. It will get at least one, but if
there are more available, then it will try to get more to
increase throughput. By default, set to ``10``.

labels (`list[str]`): Labels for the client to be sent to the cluster.
heartbeat_interval (float): Time interval between the heartbeats sent by the
client to the member nodes in seconds. By default, set to ``5.0``.
Expand Down Expand Up @@ -379,6 +393,7 @@ def __init__(self, **kwargs):

def _init_context(self):
self._context.init_context(
self,
self._config,
self._invocation_service,
self._internal_partition_service,
Expand Down Expand Up @@ -696,6 +711,7 @@ class _ClientContext(object):
"""

def __init__(self):
self.client = None
self.config = None
self.invocation_service = None
self.partition_service = None
Expand All @@ -712,6 +728,7 @@ def __init__(self):

def init_context(
self,
client,
config,
invocation_service,
partition_service,
Expand All @@ -726,6 +743,7 @@ def init_context(
proxy_session_manager,
reactor,
):
self.client = client
self.config = config
self.invocation_service = invocation_service
self.partition_service = partition_service
Expand Down
110 changes: 110 additions & 0 deletions hazelcast/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,52 @@ class ReconnectMode(object):
"""


class TopicOverloadPolicy(object):
"""A policy to deal with an overloaded topic; so topic where there is no
place to store new messages.

The reliable topic uses a :class:`hazelcast.proxy.ringbuffer.Ringbuffer` to
store the messages. A ringbuffer doesn't track where readers are, so
it has no concept of a slow consumers. This provides many advantages like
high performance reads, but it also gives the ability to the reader to
re-read the same message multiple times in case of an error.

A ringbuffer has a limited, fixed capacity. A fast producer may overwrite
old messages that are still being read by a slow consumer. To prevent
this, we may configure a time-to-live on the ringbuffer.

Once the time-to-live is configured, the :class:`TopicOverloadPolicy`
controls how the publisher is going to deal with the situation that a
ringbuffer is full and the oldest item in the ringbuffer is not old
enough to get overwritten.

Keep in mind that this retention period (time-to-live) can keep messages
from being overwritten, even though all readers might have already completed
reading.
"""

DISCARD_OLDEST = 0
"""Using this policy, a message that has not expired can be overwritten.

No matter the retention period set, the overwrite will just overwrite
the item.

This can be a problem for slow consumers because they were promised a
certain time window to process messages. But it will benefit producers
and fast consumers since they are able to continue. This policy sacrifices
the slow producer in favor of fast producers/consumers.
"""

DISCARD_NEWEST = 1
"""The message that was to be published is discarded."""

BLOCK = 2
"""The caller will wait till there space in the ringbuffer."""

ERROR = 3
"""The publish call immediately fails."""


class BitmapIndexOptions(object):
__slots__ = ("_unique_key", "_unique_key_transformation")

Expand Down Expand Up @@ -513,6 +559,7 @@ class _Config(object):
"_membership_listeners",
"_lifecycle_listeners",
"_flake_id_generators",
"_reliable_topics",
"_labels",
"_heartbeat_interval",
"_heartbeat_timeout",
Expand Down Expand Up @@ -563,6 +610,7 @@ def __init__(self):
self._membership_listeners = []
self._lifecycle_listeners = []
self._flake_id_generators = {}
self._reliable_topics = {}
self._labels = []
self._heartbeat_interval = _DEFAULT_HEARTBEAT_INTERVAL
self._heartbeat_timeout = _DEFAULT_HEARTBEAT_TIMEOUT
Expand Down Expand Up @@ -1083,6 +1131,27 @@ def flake_id_generators(self, value):
else:
raise TypeError("flake_id_generators must be a dict")

@property
def reliable_topics(self):
return self._reliable_topics

@reliable_topics.setter
def reliable_topics(self, value):
if isinstance(value, dict):
configs = {}
for name, config in six.iteritems(value):
if not isinstance(name, six.string_types):
raise TypeError("Keys of reliable_topics must be strings")

if not isinstance(config, dict):
raise TypeError("Values of reliable_topics must be dict")

configs[name] = _ReliableTopicConfig.from_dict(config)

self._reliable_topics = configs
else:
raise TypeError("reliable_topics must be a dict")

@property
def labels(self):
return self._labels
Expand Down Expand Up @@ -1404,3 +1473,44 @@ def from_dict(cls, d):
"Unrecognized config option for the flake id generator: %s" % k
)
return config


class _ReliableTopicConfig(object):
__slots__ = ("_read_batch_size", "_overload_policy")

def __init__(self):
self._read_batch_size = 10
self._overload_policy = TopicOverloadPolicy.BLOCK

@property
def read_batch_size(self):
return self._read_batch_size

@read_batch_size.setter
def read_batch_size(self, value):
if isinstance(value, number_types):
if value <= 0:
raise ValueError("read_batch_size must be positive")
self._read_batch_size = value
else:
raise TypeError("read_batch_size must be a number")

@property
def overload_policy(self):
return self._overload_policy

@overload_policy.setter
def overload_policy(self, value):
self._overload_policy = try_to_get_enum_value(value, TopicOverloadPolicy)

@classmethod
def from_dict(cls, d):
config = cls()
for k, v in six.iteritems(d):
try:
config.__setattr__(k, v)
except AttributeError:
raise InvalidConfigurationError(
"Unrecognized config option for the reliable topic: %s" % k
)
return config
39 changes: 29 additions & 10 deletions hazelcast/proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,26 +266,45 @@ def __repr__(self):
)


_SENTINEL = object()


class TopicMessage(object):
"""Topic message.
"""Topic message."""

Attributes:
name (str): Name of the proxy that fired the event.
publish_time (int): UNIX time that the event is published as seconds.
member (hazelcast.core.MemberInfo): Member that fired the event.
"""
__slots__ = ("_name", "_message_data", "_message", "_publish_time", "_member", "_to_object")

def __init__(self, name, message_data, publish_time, member, to_object):
self.name = name
self._name = name
self._message_data = message_data
self.publish_time = publish_time
self.member = member
self._message = _SENTINEL
self._publish_time = publish_time
self._member = member
self._to_object = to_object

@property
def name(self):
"""str: Name of the proxy that fired the event."""
return self._name

@property
def publish_time(self):
"""int: UNIX time that the event is published as seconds."""
return self._publish_time

@property
def member(self):
"""hazelcast.core.MemberInfo: Member that fired the event."""
return self._member

@property
def message(self):
"""The message sent to Topic."""
return self._to_object(self._message_data)
if self._message is not _SENTINEL:
return self._message

self._message = self._to_object(self._message_data)
return self._message


def get_entry_listener_flags(**kwargs):
Expand Down
Loading