Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
* [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator)
* [7.4.12. CP Subsystem](#7412-cp-subsystem)
* [7.4.12.1. Using AtomicLong](#74121-using-atomiclong)
* [7.4.12.2. Using AtomicReference](#74122-using-atomicreference)
* [7.4.12.2. Using CountDownLatch](#74122-using-countdownlatch)
* [7.4.12.3. Using AtomicReference](#74123-using-atomicreference)
* [7.5. Distributed Events](#75-distributed-events)
* [7.5.1. Cluster Events](#751-cluster-events)
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
Expand Down Expand Up @@ -1609,7 +1610,43 @@ print ('CAS operation result:', result)
AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures.
It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.

#### 7.4.12.2. Using AtomicReference
#### 7.4.12.2. Using CountDownLatch

Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch.
This data structure is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes.
This data structure is a part of CP Subsystem.

A basic CountDownLatch usage example is shown below.

```python
# Get a CountDownLatch called "my-latch"
latch = client.cp_subsystem.get_count_down_latch("my-latch").blocking()
# Try to initialize the latch
# (does nothing if the count is not zero)
initialized = latch.try_set_count(1)
print("Initialized:", initialized)
# Check count
count = latch.get_count()
print("Count:", count)
# Prints:
# Count: 1

# Bring the count down to zero after 10ms
def run():
time.sleep(0.01)
latch.count_down()

t = Thread(target=run)
t.start()

# Wait up to 1 second for the count to become zero up
count_is_zero = latch.await(1)
print("Count is zero:", count_is_zero)
```

> **NOTE: CountDownLatch count can be reset with `try_set_count()` after a countdown has finished, but not during an active count.**

#### 7.4.12.3. Using AtomicReference

Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference.
It provides a set of atomic operations allowing to modify the value behind the reference.
Expand Down
4 changes: 4 additions & 0 deletions docs/hazelcast.proxy.cp.atomic_reference.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
AtomicReference
===============

.. automodule:: hazelcast.proxy.cp.atomic_reference
4 changes: 4 additions & 0 deletions docs/hazelcast.proxy.cp.count_down_latch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CountDownLatch
==============

.. automodule:: hazelcast.proxy.cp.count_down_latch
4 changes: 3 additions & 1 deletion docs/hazelcast.proxy.cp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ CP Proxies
====================

.. toctree::
hazelcast.proxy.cp.atomic_long
hazelcast.proxy.cp.atomic_long
hazelcast.proxy.cp.atomic_reference
hazelcast.proxy.cp.count_down_latch
18 changes: 18 additions & 0 deletions examples/cp/count_down_latch_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import hazelcast

client = hazelcast.HazelcastClient()

latch = client.cp_subsystem.get_count_down_latch("my-latch")
initialized = latch.try_set_count(3).result()
print("Initialized:", initialized)
count = latch.get_count().result()
print("Count:", count)

latch.await_latch(10).add_done_callback(lambda f: print("Result of await:", f.result()))

for _ in range(3):
latch.count_down().result()
count = latch.get_count().result()
print("Current count:", count)

client.shutdown()
24 changes: 24 additions & 0 deletions hazelcast/cp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from hazelcast.protocol.codec import cp_group_create_cp_group_codec
from hazelcast.proxy.cp.atomic_long import AtomicLong
from hazelcast.proxy.cp.atomic_reference import AtomicReference
from hazelcast.proxy.cp.count_down_latch import CountDownLatch
from hazelcast.util import check_true


Expand Down Expand Up @@ -74,6 +75,26 @@ def get_atomic_reference(self, name):
"""
return self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name)

def get_count_down_latch(self, name):
"""Returns the distributed CountDownLatch instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the ``name`` argument, then the
CountDownLatch instance will be created on the DEFAULT CP group.
If a group name is given, like ``.get_count_down_latch("myLatch@group1")``,
the given group will be initialized first, if not initialized
already, and then the instance will be created on this group.

Args:
name (str): Name of the CountDownLatch.

Returns:
hazelcast.proxy.cp.count_down_latch.CountDownLatch: The CountDownLatch
proxy for the given name.
"""
return self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name)


_DEFAULT_GROUP_NAME = "default"

Expand Down Expand Up @@ -105,6 +126,7 @@ def _get_object_name_for_proxy(name):

ATOMIC_LONG_SERVICE = "hz:raft:atomicLongService"
ATOMIC_REFERENCE_SERVICE = "hz:raft:atomicRefService"
COUNT_DOWN_LATCH_SERVICE = "hz:raft:countDownLatchService"


class CPProxyManager(object):
Expand All @@ -120,6 +142,8 @@ def get_or_create(self, service_name, proxy_name):
return AtomicLong(self._context, group_id, service_name, proxy_name, object_name)
elif service_name == ATOMIC_REFERENCE_SERVICE:
return AtomicReference(self._context, group_id, service_name, proxy_name, object_name)
elif service_name == COUNT_DOWN_LATCH_SERVICE:
return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name)

def _get_group_id(self, proxy_name):
codec = cp_group_create_cp_group_codec
Expand Down
6 changes: 3 additions & 3 deletions hazelcast/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import functools

from hazelcast.errors import create_error_from_message, HazelcastInstanceNotActiveError, is_retryable_error, \
HazelcastTimeoutError, TargetDisconnectedError, HazelcastClientNotActiveError, TargetNotMemberError, \
EXCEPTION_MESSAGE_TYPE, IndeterminateOperationStateError
TargetDisconnectedError, HazelcastClientNotActiveError, TargetNotMemberError, \
EXCEPTION_MESSAGE_TYPE, IndeterminateOperationStateError, OperationTimeoutError
from hazelcast.future import Future
from hazelcast.protocol.codec import client_local_backup_listener_codec
from hazelcast.util import AtomicInteger
Expand Down Expand Up @@ -231,7 +231,7 @@ def _notify_error(self, invocation, error):
if invocation.timeout < time.time():
self.logger.debug("Error will not be retried because invocation timed out: %s", error,
extra=self._logger_extras)
error = HazelcastTimeoutError("Request timed out because an error occurred "
error = OperationTimeoutError("Request timed out because an error occurred "
"after invocation timeout: %s" % error)
self._complete_with_error(invocation, error)
return
Expand Down
140 changes: 140 additions & 0 deletions hazelcast/proxy/cp/count_down_latch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import uuid

from hazelcast.errors import OperationTimeoutError
from hazelcast.protocol.codec import count_down_latch_await_codec, count_down_latch_get_round_codec, \
count_down_latch_count_down_codec, count_down_latch_get_count_codec, count_down_latch_try_set_count_codec
from hazelcast.proxy.cp import BaseCPProxy
from hazelcast.util import to_millis, check_true, check_is_number, check_is_int


class CountDownLatch(BaseCPProxy):
"""A distributed, concurrent countdown latch data structure.

CountDownLatch is a cluster-wide synchronization aid
that allows one or more callers to wait until a set of operations being
performed in other callers completes.

CountDownLatch count can be reset using ``try_set_count()`` method after
a countdown has finished but not during an active count. This allows
the same latch instance to be reused.

There is no ``await_latch()`` method to do an unbound wait since this is undesirable
in a distributed application: for example, a cluster can split or the master
and replicas could all die. In most cases, it is best to configure
an explicit timeout so you have the ability to deal with these situations.

All of the API methods in the CountDownLatch offer the exactly-once
execution semantics. For instance, even if a ``count_down()`` call is
internally retried because of crashed Hazelcast member, the counter
value is decremented only once.
"""

def await_latch(self, timeout):
"""Causes the current thread to wait until the latch has counted down to
zero, or an exception is thrown, or the specified waiting time elapses.

If the current count is zero then this method returns ``True``.

If the current count is greater than zero, then the current
thread becomes disabled for thread scheduling purposes and lies
dormant until one of the following things happen:

- The count reaches zero due to invocations of the ``count_down()`` method
- This CountDownLatch instance is destroyed
- The countdown owner becomes disconnected
- The specified waiting time elapses

If the count reaches zero, then the method returns with the
value ``True``.

If the specified waiting time elapses then the value ``False``
is returned. If the time is less than or equal to zero, the method
will not wait at all.

Args:
timeout (int): The maximum time to wait in seconds

Returns:
hazelcast.future.Future[bool]: ``True`` if the count reached zero,
``False`` if the waiting time elapsed before the count reached zero
Raises:
IllegalStateError: If the Hazelcast instance was shut down while waiting.
"""
check_is_number(timeout)
timeout = max(0, timeout)
invocation_uuid = uuid.uuid4()
codec = count_down_latch_await_codec
request = codec.encode_request(self._group_id, self._object_name, invocation_uuid, to_millis(timeout))
return self._invoke(request, codec.decode_response)

def count_down(self):
"""Decrements the count of the latch, releasing all waiting threads if
the count reaches zero.

If the current count is greater than zero, then it is decremented.
If the new count is zero:

- All waiting threads are re-enabled for thread scheduling purposes
- Countdown owner is set to ``None``.

If the current count equals zero, then nothing happens.

Returns:
hazelcast.future.Future[None]:
"""
invocation_uuid = uuid.uuid4()

def handler(f):
return self._do_count_down(f.result(), invocation_uuid)

return self._get_round().continue_with(handler)

def get_count(self):
"""Returns the current count.

Returns:
hazelcast.future.Future[int]: The current count.
"""
codec = count_down_latch_get_count_codec
request = codec.encode_request(self._group_id, self._object_name)
return self._invoke(request, codec.decode_response)

def try_set_count(self, count):
"""Sets the count to the given value if the current count is zero.

If count is not zero, then this method does nothing and returns
``False``.

Args:
count (int): The number of times ``count_down()`` must be invoked
before callers can pass through ``await_latch()``.

Returns:
hazelcast.future.Future[bool]: ``True`` if the new count was set,
``False`` if the current count is not zero.
"""
check_is_int(count)
check_true(count > 0, "Count must be positive")
codec = count_down_latch_try_set_count_codec
request = codec.encode_request(self._group_id, self._object_name, count)
return self._invoke(request, codec.decode_response)

def _do_count_down(self, expected_round, invocation_uuid):
def handler(f):
try:
f.result()
except OperationTimeoutError:
# we can retry safely because the retry is idempotent
return self._do_count_down(expected_round, invocation_uuid)

return self._request_count_down(expected_round, invocation_uuid).continue_with(handler)

def _get_round(self):
codec = count_down_latch_get_round_codec
request = codec.encode_request(self._group_id, self._object_name)
return self._invoke(request, codec.decode_response)

def _request_count_down(self, expected_round, invocation_uuid):
codec = count_down_latch_count_down_codec
request = codec.encode_request(self._group_id, self._object_name, invocation_uuid, expected_round)
return self._invoke(request)
15 changes: 14 additions & 1 deletion hazelcast/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def check_not_empty(collection, message):
raise AssertionError(message)


def check_is_number(val):
if not isinstance(val, number_types):
raise AssertionError("Number value expected")


def check_is_int(val):
if not isinstance(val, six.integer_types):
raise AssertionError("Int value expected")
Expand Down Expand Up @@ -80,13 +85,21 @@ def get_and_increment(self):
"""Returns the current value and increment it.

Returns:
int: current value of AtomicInteger.
int: Current value of AtomicInteger.
"""
with self._mux:
res = self._counter
self._counter += 1
return res

def get(self):
"""Returns the current value.

Returns:
int: The current value.
"""
with self._mux:
return self._counter

class ImmutableLazyDataList(Sequence):
def __init__(self, list_data, to_object):
Expand Down
4 changes: 2 additions & 2 deletions tests/invocation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import hazelcast
from hazelcast.config import _Config
from hazelcast.errors import HazelcastTimeoutError, IndeterminateOperationStateError
from hazelcast.errors import IndeterminateOperationStateError, OperationTimeoutError
from hazelcast.invocation import Invocation, InvocationService
from hazelcast.protocol.client_message import OutboundMessage
from hazelcast.serialization import LE_INT
Expand Down Expand Up @@ -176,7 +176,7 @@ def mock(*_):
invocation_service._invoke_on_random_connection = MagicMock(return_value=False)

invocation_service.invoke(invocation)
with self.assertRaises(HazelcastTimeoutError):
with self.assertRaises(OperationTimeoutError):
invocation.future.result()

def test_invocation_not_timed_out_when_there_is_no_exception(self):
Expand Down
Loading