diff --git a/README.md b/README.md index 5e9c168ebb..b36b1e6c98 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,8 @@ * [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator) * [7.4.12. CP Subsystem](#7412-cp-subsystem) * [7.4.12.1. Using AtomicLong](#74121-using-atomiclong) - * [7.4.12.2. Using AtomicReference](#74122-using-atomicreference) + * [7.4.12.2. Using CountDownLatch](#74122-using-countdownlatch) + * [7.4.12.3. Using AtomicReference](#74123-using-atomicreference) * [7.5. Distributed Events](#75-distributed-events) * [7.5.1. Cluster Events](#751-cluster-events) * [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events) @@ -1609,7 +1610,43 @@ print ('CAS operation result:', result) AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting. -#### 7.4.12.2. Using AtomicReference +#### 7.4.12.2. Using CountDownLatch + +Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch. +This data structure is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes. +This data structure is a part of CP Subsystem. + +A basic CountDownLatch usage example is shown below. + +```python +# Get a CountDownLatch called "my-latch" +latch = client.cp_subsystem.get_count_down_latch("my-latch").blocking() +# Try to initialize the latch +# (does nothing if the count is not zero) +initialized = latch.try_set_count(1) +print("Initialized:", initialized) +# Check count +count = latch.get_count() +print("Count:", count) +# Prints: +# Count: 1 + +# Bring the count down to zero after 10ms +def run(): + time.sleep(0.01) + latch.count_down() + +t = Thread(target=run) +t.start() + +# Wait up to 1 second for the count to become zero up +count_is_zero = latch.await(1) +print("Count is zero:", count_is_zero) +``` + +> **NOTE: CountDownLatch count can be reset with `try_set_count()` after a countdown has finished, but not during an active count.** + +#### 7.4.12.3. Using AtomicReference Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference. It provides a set of atomic operations allowing to modify the value behind the reference. diff --git a/docs/hazelcast.proxy.cp.atomic_reference.rst b/docs/hazelcast.proxy.cp.atomic_reference.rst new file mode 100644 index 0000000000..a11e7fb92f --- /dev/null +++ b/docs/hazelcast.proxy.cp.atomic_reference.rst @@ -0,0 +1,4 @@ +AtomicReference +=============== + +.. automodule:: hazelcast.proxy.cp.atomic_reference diff --git a/docs/hazelcast.proxy.cp.count_down_latch.rst b/docs/hazelcast.proxy.cp.count_down_latch.rst new file mode 100644 index 0000000000..9c7a1e7a0b --- /dev/null +++ b/docs/hazelcast.proxy.cp.count_down_latch.rst @@ -0,0 +1,4 @@ +CountDownLatch +============== + +.. automodule:: hazelcast.proxy.cp.count_down_latch diff --git a/docs/hazelcast.proxy.cp.rst b/docs/hazelcast.proxy.cp.rst index d709c25a3a..69d4e16856 100644 --- a/docs/hazelcast.proxy.cp.rst +++ b/docs/hazelcast.proxy.cp.rst @@ -2,4 +2,6 @@ CP Proxies ==================== .. toctree:: - hazelcast.proxy.cp.atomic_long \ No newline at end of file + hazelcast.proxy.cp.atomic_long + hazelcast.proxy.cp.atomic_reference + hazelcast.proxy.cp.count_down_latch \ No newline at end of file diff --git a/examples/cp/count_down_latch_example.py b/examples/cp/count_down_latch_example.py new file mode 100644 index 0000000000..c8c725da0d --- /dev/null +++ b/examples/cp/count_down_latch_example.py @@ -0,0 +1,18 @@ +import hazelcast + +client = hazelcast.HazelcastClient() + +latch = client.cp_subsystem.get_count_down_latch("my-latch") +initialized = latch.try_set_count(3).result() +print("Initialized:", initialized) +count = latch.get_count().result() +print("Count:", count) + +latch.await_latch(10).add_done_callback(lambda f: print("Result of await:", f.result())) + +for _ in range(3): + latch.count_down().result() + count = latch.get_count().result() + print("Current count:", count) + +client.shutdown() diff --git a/hazelcast/cp.py b/hazelcast/cp.py index 4268023b0e..e84be91d3a 100644 --- a/hazelcast/cp.py +++ b/hazelcast/cp.py @@ -2,6 +2,7 @@ from hazelcast.protocol.codec import cp_group_create_cp_group_codec from hazelcast.proxy.cp.atomic_long import AtomicLong from hazelcast.proxy.cp.atomic_reference import AtomicReference +from hazelcast.proxy.cp.count_down_latch import CountDownLatch from hazelcast.util import check_true @@ -74,6 +75,26 @@ def get_atomic_reference(self, name): """ return self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name) + def get_count_down_latch(self, name): + """Returns the distributed CountDownLatch instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + CountDownLatch instance will be created on the DEFAULT CP group. + If a group name is given, like ``.get_count_down_latch("myLatch@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name (str): Name of the CountDownLatch. + + Returns: + hazelcast.proxy.cp.count_down_latch.CountDownLatch: The CountDownLatch + proxy for the given name. + """ + return self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name) + _DEFAULT_GROUP_NAME = "default" @@ -105,6 +126,7 @@ def _get_object_name_for_proxy(name): ATOMIC_LONG_SERVICE = "hz:raft:atomicLongService" ATOMIC_REFERENCE_SERVICE = "hz:raft:atomicRefService" +COUNT_DOWN_LATCH_SERVICE = "hz:raft:countDownLatchService" class CPProxyManager(object): @@ -120,6 +142,8 @@ def get_or_create(self, service_name, proxy_name): return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) elif service_name == ATOMIC_REFERENCE_SERVICE: return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == COUNT_DOWN_LATCH_SERVICE: + return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) def _get_group_id(self, proxy_name): codec = cp_group_create_cp_group_codec diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index 3b18890816..1c7129f3ff 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -3,8 +3,8 @@ import functools from hazelcast.errors import create_error_from_message, HazelcastInstanceNotActiveError, is_retryable_error, \ - HazelcastTimeoutError, TargetDisconnectedError, HazelcastClientNotActiveError, TargetNotMemberError, \ - EXCEPTION_MESSAGE_TYPE, IndeterminateOperationStateError + TargetDisconnectedError, HazelcastClientNotActiveError, TargetNotMemberError, \ + EXCEPTION_MESSAGE_TYPE, IndeterminateOperationStateError, OperationTimeoutError from hazelcast.future import Future from hazelcast.protocol.codec import client_local_backup_listener_codec from hazelcast.util import AtomicInteger @@ -231,7 +231,7 @@ def _notify_error(self, invocation, error): if invocation.timeout < time.time(): self.logger.debug("Error will not be retried because invocation timed out: %s", error, extra=self._logger_extras) - error = HazelcastTimeoutError("Request timed out because an error occurred " + error = OperationTimeoutError("Request timed out because an error occurred " "after invocation timeout: %s" % error) self._complete_with_error(invocation, error) return diff --git a/hazelcast/proxy/cp/count_down_latch.py b/hazelcast/proxy/cp/count_down_latch.py new file mode 100644 index 0000000000..2b94e6ee45 --- /dev/null +++ b/hazelcast/proxy/cp/count_down_latch.py @@ -0,0 +1,140 @@ +import uuid + +from hazelcast.errors import OperationTimeoutError +from hazelcast.protocol.codec import count_down_latch_await_codec, count_down_latch_get_round_codec, \ + count_down_latch_count_down_codec, count_down_latch_get_count_codec, count_down_latch_try_set_count_codec +from hazelcast.proxy.cp import BaseCPProxy +from hazelcast.util import to_millis, check_true, check_is_number, check_is_int + + +class CountDownLatch(BaseCPProxy): + """A distributed, concurrent countdown latch data structure. + + CountDownLatch is a cluster-wide synchronization aid + that allows one or more callers to wait until a set of operations being + performed in other callers completes. + + CountDownLatch count can be reset using ``try_set_count()`` method after + a countdown has finished but not during an active count. This allows + the same latch instance to be reused. + + There is no ``await_latch()`` method to do an unbound wait since this is undesirable + in a distributed application: for example, a cluster can split or the master + and replicas could all die. In most cases, it is best to configure + an explicit timeout so you have the ability to deal with these situations. + + All of the API methods in the CountDownLatch offer the exactly-once + execution semantics. For instance, even if a ``count_down()`` call is + internally retried because of crashed Hazelcast member, the counter + value is decremented only once. + """ + + def await_latch(self, timeout): + """Causes the current thread to wait until the latch has counted down to + zero, or an exception is thrown, or the specified waiting time elapses. + + If the current count is zero then this method returns ``True``. + + If the current count is greater than zero, then the current + thread becomes disabled for thread scheduling purposes and lies + dormant until one of the following things happen: + + - The count reaches zero due to invocations of the ``count_down()`` method + - This CountDownLatch instance is destroyed + - The countdown owner becomes disconnected + - The specified waiting time elapses + + If the count reaches zero, then the method returns with the + value ``True``. + + If the specified waiting time elapses then the value ``False`` + is returned. If the time is less than or equal to zero, the method + will not wait at all. + + Args: + timeout (int): The maximum time to wait in seconds + + Returns: + hazelcast.future.Future[bool]: ``True`` if the count reached zero, + ``False`` if the waiting time elapsed before the count reached zero + Raises: + IllegalStateError: If the Hazelcast instance was shut down while waiting. + """ + check_is_number(timeout) + timeout = max(0, timeout) + invocation_uuid = uuid.uuid4() + codec = count_down_latch_await_codec + request = codec.encode_request(self._group_id, self._object_name, invocation_uuid, to_millis(timeout)) + return self._invoke(request, codec.decode_response) + + def count_down(self): + """Decrements the count of the latch, releasing all waiting threads if + the count reaches zero. + + If the current count is greater than zero, then it is decremented. + If the new count is zero: + + - All waiting threads are re-enabled for thread scheduling purposes + - Countdown owner is set to ``None``. + + If the current count equals zero, then nothing happens. + + Returns: + hazelcast.future.Future[None]: + """ + invocation_uuid = uuid.uuid4() + + def handler(f): + return self._do_count_down(f.result(), invocation_uuid) + + return self._get_round().continue_with(handler) + + def get_count(self): + """Returns the current count. + + Returns: + hazelcast.future.Future[int]: The current count. + """ + codec = count_down_latch_get_count_codec + request = codec.encode_request(self._group_id, self._object_name) + return self._invoke(request, codec.decode_response) + + def try_set_count(self, count): + """Sets the count to the given value if the current count is zero. + + If count is not zero, then this method does nothing and returns + ``False``. + + Args: + count (int): The number of times ``count_down()`` must be invoked + before callers can pass through ``await_latch()``. + + Returns: + hazelcast.future.Future[bool]: ``True`` if the new count was set, + ``False`` if the current count is not zero. + """ + check_is_int(count) + check_true(count > 0, "Count must be positive") + codec = count_down_latch_try_set_count_codec + request = codec.encode_request(self._group_id, self._object_name, count) + return self._invoke(request, codec.decode_response) + + def _do_count_down(self, expected_round, invocation_uuid): + def handler(f): + try: + f.result() + except OperationTimeoutError: + # we can retry safely because the retry is idempotent + return self._do_count_down(expected_round, invocation_uuid) + + return self._request_count_down(expected_round, invocation_uuid).continue_with(handler) + + def _get_round(self): + codec = count_down_latch_get_round_codec + request = codec.encode_request(self._group_id, self._object_name) + return self._invoke(request, codec.decode_response) + + def _request_count_down(self, expected_round, invocation_uuid): + codec = count_down_latch_count_down_codec + request = codec.encode_request(self._group_id, self._object_name, invocation_uuid, expected_round) + return self._invoke(request) diff --git a/hazelcast/util.py b/hazelcast/util.py index 55c77b74a9..60ef819a99 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -34,6 +34,11 @@ def check_not_empty(collection, message): raise AssertionError(message) +def check_is_number(val): + if not isinstance(val, number_types): + raise AssertionError("Number value expected") + + def check_is_int(val): if not isinstance(val, six.integer_types): raise AssertionError("Int value expected") @@ -80,13 +85,21 @@ def get_and_increment(self): """Returns the current value and increment it. Returns: - int: current value of AtomicInteger. + int: Current value of AtomicInteger. """ with self._mux: res = self._counter self._counter += 1 return res + def get(self): + """Returns the current value. + + Returns: + int: The current value. + """ + with self._mux: + return self._counter class ImmutableLazyDataList(Sequence): def __init__(self, list_data, to_object): diff --git a/tests/invocation_test.py b/tests/invocation_test.py index 70bf9e8fb5..0d4c459ec5 100644 --- a/tests/invocation_test.py +++ b/tests/invocation_test.py @@ -5,7 +5,7 @@ import hazelcast from hazelcast.config import _Config -from hazelcast.errors import HazelcastTimeoutError, IndeterminateOperationStateError +from hazelcast.errors import IndeterminateOperationStateError, OperationTimeoutError from hazelcast.invocation import Invocation, InvocationService from hazelcast.protocol.client_message import OutboundMessage from hazelcast.serialization import LE_INT @@ -176,7 +176,7 @@ def mock(*_): invocation_service._invoke_on_random_connection = MagicMock(return_value=False) invocation_service.invoke(invocation) - with self.assertRaises(HazelcastTimeoutError): + with self.assertRaises(OperationTimeoutError): invocation.future.result() def test_invocation_not_timed_out_when_there_is_no_exception(self): diff --git a/tests/proxy/cp/count_down_latch_test.py b/tests/proxy/cp/count_down_latch_test.py new file mode 100644 index 0000000000..b05da4a549 --- /dev/null +++ b/tests/proxy/cp/count_down_latch_test.py @@ -0,0 +1,175 @@ +import time +import unittest +from threading import Thread + +from mock import MagicMock + +from hazelcast.errors import DistributedObjectDestroyedError, OperationTimeoutError +from hazelcast.future import ImmediateExceptionFuture +from hazelcast.proxy.cp.count_down_latch import CountDownLatch +from hazelcast.util import AtomicInteger +from tests.proxy.cp import CPTestCase +from tests.util import random_string + +inf = 2 ** 31 - 1 + + +class CountDownLatchTest(CPTestCase): + def test_latch_in_another_group(self): + latch = self._get_latch() + another_latch = self.client.cp_subsystem.get_count_down_latch(latch._proxy_name + "@another").blocking() + + another_latch.try_set_count(42) + self.assertEqual(42, another_latch.get_count()) + self.assertNotEqual(42, latch.get_count()) + + def test_use_after_destroy(self): + latch = self._get_latch() + latch.destroy() + # the next destroy call should be ignored + latch.destroy() + + with self.assertRaises(DistributedObjectDestroyedError): + latch.get_count() + + latch2 = self.client.cp_subsystem.get_count_down_latch(latch._proxy_name).blocking() + + with self.assertRaises(DistributedObjectDestroyedError): + latch2.get_count() + + def test_await_latch_negative_timeout(self): + latch = self._get_latch(1) + self.assertFalse(latch.await_latch(-1)) + + def test_await_latch_zero_timeout(self): + latch = self._get_latch(1) + self.assertFalse(latch.await_latch(0)) + + def test_await_latch_with_timeout(self): + latch = self._get_latch(1) + start = time.time() + self.assertFalse(latch.await_latch(0.1)) + time_passed = time.time() - start + self.assertTrue(time_passed > 0.1) + + def test_await_latch_multiple_waiters(self): + latch = self._get_latch(1) + + completed = AtomicInteger() + + def run(): + latch.await_latch(inf) + completed.get_and_increment() + + count = 10 + threads = [] + for _ in range(count): + t = Thread(target=run) + threads.append(t) + t.start() + + latch.count_down() + + def assertion(): + self.assertEqual(count, completed.get()) + + self.assertTrueEventually(assertion) + + for i in range(count): + threads[i].join() + + def test_await_latch_response_on_count_down(self): + latch = self._get_latch() + self.assertTrue(latch.await_latch(inf)) + self.assertTrue(latch.try_set_count(1)) + + # make a non-blocking request + future = latch._wrapped.await_latch(inf) + t = Thread(target=lambda: latch.count_down()) + t.start() + t.join() + + def assertion(): + self.assertTrue(future.done()) + self.assertTrue(future.result()) + + self.assertTrueEventually(assertion) + + def test_count_down(self): + latch = self._get_latch(10) + + for i in range(9, -1, -1): + self.assertIsNone(latch.count_down()) + self.assertEqual(i, latch.get_count()) + + def test_count_down_retry_on_timeout(self): + latch = self._get_latch(1) + + original = latch._wrapped._request_count_down + called_count = AtomicInteger() + + def mock(expected_round, invocation_uuid): + if called_count.get_and_increment() < 2: + return ImmediateExceptionFuture(OperationTimeoutError("xx")) + return original(expected_round, invocation_uuid) + + latch._wrapped._request_count_down = mock + + latch.count_down() + self.assertEqual(3, called_count.get()) # Will resolve on it's third call. First 2 throws timeout error + self.assertEqual(0, latch.get_count()) + + def test_get_count(self): + latch = self._get_latch(1) + self.assertEqual(1, latch.get_count()) + latch.count_down() + self.assertEqual(0, latch.get_count()) + latch.try_set_count(10) + self.assertEqual(10, latch.get_count()) + + def test_try_set_count(self): + latch = self._get_latch() + self.assertTrue(latch.try_set_count(3)) + self.assertEqual(3, latch.get_count()) + + def test_try_set_count_when_count_is_already_set(self): + latch = self._get_latch(1) + self.assertFalse(latch.try_set_count(10)) + self.assertFalse(latch.try_set_count(20)) + self.assertEqual(1, latch.get_count()) + + def test_try_set_count_when_count_goes_to_zero(self): + latch = self._get_latch(1) + latch.count_down() + self.assertEqual(0, latch.get_count()) + self.assertTrue(latch.try_set_count(3)) + self.assertEqual(3, latch.get_count()) + + def _get_latch(self, initial_count=None): + latch = self.client.cp_subsystem.get_count_down_latch("latch-" + random_string()).blocking() + if initial_count is not None: + self.assertTrue(latch.try_set_count(initial_count)) + return latch + + +class CountDownLatchInvalidInputTest(unittest.TestCase): + def setUp(self): + self.latch = CountDownLatch(MagicMock(), None, None, None, None) + + def test_await_latch(self): + self._check_error("await_latch", "a") + + def test_try_set_count(self): + self._check_error("try_set_count", 1.1) + + def test_try_set_count_on_negative_input(self): + self._check_error("try_set_count", -1) + + def test_try_set_count_on_zero(self): + self._check_error("try_set_count", 0) + + def _check_error(self, method_name, *args): + fn = getattr(self.latch, method_name) + self.assertTrue(callable(fn)) + with self.assertRaises(AssertionError): + fn(*args)