diff --git a/README.md b/README.md index 0f36893047..e57e95edc6 100644 --- a/README.md +++ b/README.md @@ -58,8 +58,9 @@ * [7.4.12. CP Subsystem](#7412-cp-subsystem) * [7.4.12.1. Using AtomicLong](#74121-using-atomiclong) * [7.4.12.2. Using Lock](#74122-using-lock) - * [7.4.12.3. Using CountDownLatch](#74123-using-countdownlatch) - * [7.4.12.4. Using AtomicReference](#74124-using-atomicreference) + * [7.4.12.3. Using Semaphore](#74123-using-semaphore) + * [7.4.12.3. Using CountDownLatch](#74124-using-countdownlatch) + * [7.4.12.4. Using AtomicReference](#74125-using-atomicreference) * [7.5. Distributed Events](#75-distributed-events) * [7.5.1. Cluster Events](#751-cluster-events) * [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events) @@ -1663,7 +1664,65 @@ After that, once Client-1 comes back alive, its write request will be rejected b You can read more about the fencing token idea in Martin Kleppmann's "How to do distributed locking" blog post and Google's Chubby paper. -#### 7.4.12.3. Using CountDownLatch + +#### 7.4.12.3. Using Semaphore + +Hazelcast `Semaphore` is the distributed implementation of a linearizable and distributed semaphore. It offers multiple operations for acquiring the permits. +This data structure is a part of CP Subsystem. + +Semaphore is a cluster-wide counting semaphore. Conceptually, it maintains a set of permits. Each `acquire()` waits if necessary until a permit is available, and then takes it. +Dually, each `release()` adds a permit, potentially releasing a waiting acquirer. However, no actual permit objects are used; the semaphore just keeps a count of the number available and acts accordingly. + +A basic Semaphore usage example is shown below. + +```python +# Get a Semaphore called "my-semaphore" +semaphore = client.cp_subsystem.get_semaphore("my-semaphore").blocking() +# Try to initialize the semaphore +# (does nothing if the semaphore is already initialized) +semaphore.init(3) +# Acquire 3 permits out of 3 +semaphore.acquire(3) +# Release 2 permits +semaphore.release(2) +# Check available permits +available = semaphore.available_permits() +print("Available:", available) +# Prints: +# Available: 2 +``` + +Beware of the increased risk of indefinite postponement when using the multiple-permit acquire. If permits are released one by one, a caller waiting for one permit will acquire it before a caller waiting for multiple permits regardless of the call order. +Correct usage of a semaphore is established by programming convention in the application. + +As an alternative, potentially safer approach to the multiple-permit acquire, you can use the `try_acquire()` method of Semaphore. +It tries to acquire the permits in optimistic manner and immediately returns with a `bool` operation result. It also accepts an optional `timeout` argument which specifies the timeout in seconds to acquire the permits before giving up. + +```python +# Try to acquire 2 permits +success = semaphore.try_acquire(2) +# Check for the result of the acquire request +if success: + try: + pass + # Your guarded code goes here + finally: + # Make sure to release the permits + semaphore.release(2) +``` + + Semaphore data structure has two variations: + + * The default implementation is session-aware. In this one, when a caller makes its very first `acquire()` call, + it starts a new CP session with the underlying CP group. Then, liveliness of the caller is tracked via this CP session. + When the caller fails, permits acquired by this caller are automatically and safely released. + However, the session-aware version comes with a limitation, that is, a Hazelcast client cannot release permits before acquiring them first. + In other words, a client can release only the permits it has acquired earlier. + * The second implementation is sessionless. This one does not perform auto-cleanup of acquired permits on failures. + Acquired permits are not bound to callers and permits can be released without acquiring first. However, you need to handle failed permit owners on your own. + If a Hazelcast server or a client fails while holding some permits, they will not be automatically released. You can use the sessionless CP Semaphore implementation by enabling JDK compatibility `jdk-compatible` server-side setting. Refer to [Semaphore configuration](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#semaphore-configuration) documentation for more details. + +#### 7.4.12.4. Using CountDownLatch Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch. This data structure is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes. @@ -1699,7 +1758,7 @@ print("Count is zero:", count_is_zero) > **NOTE: CountDownLatch count can be reset with `try_set_count()` after a countdown has finished, but not during an active count.** -#### 7.4.12.4. Using AtomicReference +#### 7.4.12.5. Using AtomicReference Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference. It provides a set of atomic operations allowing to modify the value behind the reference. diff --git a/docs/hazelcast.proxy.cp.rst b/docs/hazelcast.proxy.cp.rst index 896428e0e2..bb00dd1d94 100644 --- a/docs/hazelcast.proxy.cp.rst +++ b/docs/hazelcast.proxy.cp.rst @@ -5,4 +5,5 @@ CP Proxies hazelcast.proxy.cp.atomic_long hazelcast.proxy.cp.atomic_reference hazelcast.proxy.cp.count_down_latch - hazelcast.proxy.cp.fenced_lock \ No newline at end of file + hazelcast.proxy.cp.fenced_lock + hazelcast.proxy.cp.semaphore \ No newline at end of file diff --git a/docs/hazelcast.proxy.cp.semaphore.rst b/docs/hazelcast.proxy.cp.semaphore.rst new file mode 100644 index 0000000000..6f5846c841 --- /dev/null +++ b/docs/hazelcast.proxy.cp.semaphore.rst @@ -0,0 +1,4 @@ +Semaphore +========= + +.. autoclass:: hazelcast.proxy.cp.semaphore.Semaphore diff --git a/examples/cp/semaphore_example.py b/examples/cp/semaphore_example.py new file mode 100644 index 0000000000..29c420dedf --- /dev/null +++ b/examples/cp/semaphore_example.py @@ -0,0 +1,20 @@ +import hazelcast + +client = hazelcast.HazelcastClient() + +semaphore = client.cp_subsystem.get_semaphore("my-semaphore").blocking() + +initialized = semaphore.init(3) +print("Initialized:", initialized) +available = semaphore.available_permits() +print("Available:", available) + +semaphore.acquire(3) +available = semaphore.available_permits() +print("Available after acquire:", available) + +semaphore.release(2) +available = semaphore.available_permits() +print("Available after release:", available) + +client.shutdown() diff --git a/hazelcast/cp.py b/hazelcast/cp.py index c7361d1f40..5375aef0df 100644 --- a/hazelcast/cp.py +++ b/hazelcast/cp.py @@ -6,11 +6,13 @@ from hazelcast.future import ImmediateExceptionFuture, ImmediateFuture, combine_futures from hazelcast.invocation import Invocation from hazelcast.protocol.codec import cp_group_create_cp_group_codec, cp_session_heartbeat_session_codec, \ - cp_session_create_session_codec, cp_session_close_session_codec, cp_session_generate_thread_id_codec + cp_session_create_session_codec, cp_session_close_session_codec, cp_session_generate_thread_id_codec, \ + semaphore_get_semaphore_type_codec from hazelcast.proxy.cp.atomic_long import AtomicLong from hazelcast.proxy.cp.atomic_reference import AtomicReference from hazelcast.proxy.cp.count_down_latch import CountDownLatch from hazelcast.proxy.cp.fenced_lock import FencedLock +from hazelcast.proxy.cp.semaphore import SessionAwareSemaphore, SessionlessSemaphore from hazelcast.util import check_true, AtomicInteger, thread_id @@ -108,7 +110,7 @@ def get_lock(self, name): The instance is created on CP Subsystem. - If no group name is given within the `name` argument, then the + If no group name is given within the ``name`` argument, then the FencedLock instance will be created on the DEFAULT CP group. If a group name is given, like ``.get_lock("myLock@group1")``, the given group will be initialized first, if not initialized @@ -123,6 +125,25 @@ def get_lock(self, name): """ return self._proxy_manager.get_or_create(LOCK_SERVICE, name) + def get_semaphore(self, name): + """Returns the distributed Semaphore instance instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + Semaphore instance will be created on the DEFAULT CP group. + If a group name is given, like ``.get_semaphore("mySemaphore@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name (str): Name of the Semaphore + + Returns: + hazelcast.proxy.cp.semaphore.Semaphore: The Semaphore proxy for the given name. + """ + return self._proxy_manager.get_or_create(SEMAPHORE_SERVICE, name) + _DEFAULT_GROUP_NAME = "default" @@ -156,6 +177,7 @@ def _get_object_name_for_proxy(name): ATOMIC_REFERENCE_SERVICE = "hz:raft:atomicRefService" COUNT_DOWN_LATCH_SERVICE = "hz:raft:countDownLatchService" LOCK_SERVICE = "hz:raft:lockService" +SEMAPHORE_SERVICE = "hz:raft:semaphoreService" class CPProxyManager(object): @@ -177,6 +199,10 @@ def get_or_create(self, service_name, proxy_name): return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) elif service_name == LOCK_SERVICE: return self._create_fenced_lock(group_id, proxy_name, object_name) + elif service_name == SEMAPHORE_SERVICE: + return self._create_semaphore(group_id, proxy_name, object_name) + else: + raise ValueError("Unknown service name: %s" % service_name) def _create_fenced_lock(self, group_id, proxy_name, object_name): with self._mux: @@ -191,6 +217,18 @@ def _create_fenced_lock(self, group_id, proxy_name, object_name): self._lock_proxies[proxy_name] = proxy return proxy + def _create_semaphore(self, group_id, proxy_name, object_name): + codec = semaphore_get_semaphore_type_codec + request = codec.encode_request(proxy_name) + invocation = Invocation(request, response_handler=codec.decode_response) + invocation_service = self._context.invocation_service + invocation_service.invoke(invocation) + jdk_compatible = invocation.future.result() + if jdk_compatible: + return SessionlessSemaphore(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name) + else: + return SessionAwareSemaphore(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name) + def _get_group_id(self, proxy_name): codec = cp_group_create_cp_group_codec request = codec.encode_request(proxy_name) diff --git a/hazelcast/proxy/cp/__init__.py b/hazelcast/proxy/cp/__init__.py index 20d11e3978..3e0960d056 100644 --- a/hazelcast/proxy/cp/__init__.py +++ b/hazelcast/proxy/cp/__init__.py @@ -47,13 +47,6 @@ def get_group_id(self): """ return self._group_id - def _get_or_create_unique_thread_id(self): - """ - Returns: - hazelcast.future.Future[int]: Cluster-wide unique thread id. - """ - return self._session_manager.get_or_create_unique_thread_id(self._group_id) - def _get_session_id(self): """ Returns: diff --git a/hazelcast/proxy/cp/semaphore.py b/hazelcast/proxy/cp/semaphore.py new file mode 100644 index 0000000000..a215537a0a --- /dev/null +++ b/hazelcast/proxy/cp/semaphore.py @@ -0,0 +1,526 @@ +import time +import uuid + +from hazelcast.errors import SessionExpiredError, WaitKeyCancelledError, IllegalStateError +from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture +from hazelcast.protocol.codec import semaphore_init_codec, semaphore_acquire_codec, semaphore_available_permits_codec, \ + semaphore_drain_codec, semaphore_change_codec, semaphore_release_codec +from hazelcast.proxy.cp import SessionAwareCPProxy, BaseCPProxy +from hazelcast.util import check_not_negative, check_true, thread_id, to_millis + +_DRAIN_SESSION_ACQ_COUNT = 1024 +""" +Since a proxy does not know how many permits will be drained on +the Raft group, it uses this constant to increment its local session +acquire count. Then, it adjusts the local session acquire count after +the drain response is returned. +""" + +_NO_SESSION_ID = -1 + + +class Semaphore(BaseCPProxy): + """A linearizable, distributed semaphore. + + Semaphores are often used to restrict the number of callers that can access + some physical or logical resource. + + Semaphore is a cluster-wide counting semaphore. Conceptually, it maintains + a set of permits. Each ``acquire()`` blocks if necessary until a permit + is available, and then takes it. Dually, each ``release()`` adds a + permit, potentially releasing a blocking acquirer. However, no actual permit + objects are used; the semaphore just keeps a count of the number available + and acts accordingly. + + Hazelcast's distributed semaphore implementation guarantees that callers + invoking any of the ``acquire()`` methods are selected to + obtain permits in the order of their invocations (first-in-first-out; FIFO). + Note that FIFO ordering implies the order which the primary replica of an + Semaphore receives these acquire requests. Therefore, it is + possible for one member to invoke ``acquire()`` before another member, + but its request hits the primary replica after the other member. + + This class also provides convenient ways to work with multiple permits at once. + Beware of the increased risk of indefinite postponement when using the + multiple-permit acquire. If permits are released one by one, a caller waiting + for one permit will acquire it before a caller waiting for multiple permits + regardless of the call order. + + Correct usage of a semaphore is established by programming convention + in the application. + + It works on top of the Raft consensus algorithm. It offers linearizability during crash + failures and network partitions. It is CP with respect to the CAP principle. + If a network partition occurs, it remains available on at most one side of + the partition. + + It has 2 variations: + + - The default implementation accessed via ``cp_subsystem`` is session-aware. In this + one, when a caller makes its very first ``acquire()`` call, it starts + a new CP session with the underlying CP group. Then, liveliness of the + caller is tracked via this CP session. When the caller fails, permits + acquired by this caller are automatically and safely released. However, + the session-aware version comes with a limitation, that is, a client cannot + release permits before acquiring them first. In other words, a client can release + only the permits it has acquired earlier. It means, you can acquire a permit + from one thread and release it from another thread using the same Hazelcast client, + but not different instances of Hazelcast client. You can use the session-aware + CP Semaphore implementation by disabling JDK compatibility via ``jdk-compatible`` + server-side setting. Although the session-aware implementation has a minor + difference to the JDK Semaphore, we think it is a better fit for distributed + environments because of its safe auto-cleanup mechanism for acquired permits. + - The second implementation offered by ``cp_subsystem`` is sessionless. This + implementation does not perform auto-cleanup of acquired permits on failures. + Acquired permits are not bound to threads and permits can be released without + acquiring first. However, you need to handle failed permit owners on your own. + If a Hazelcast server or a client fails while holding some permits, they will not be + automatically released. You can use the sessionless CP Semaphore implementation + by enabling JDK compatibility via ``jdk-compatible`` server-side setting. + + There is a subtle difference between the lock and semaphore abstractions. + A lock can be assigned to at most one endpoint at a time, so we have a total + order among its holders. However, permits of a semaphore can be assigned to + multiple endpoints at a time, which implies that we may not have a total + order among permit holders. In fact, permit holders are partially ordered. + For this reason, the fencing token approach, which is explained in + :class:`~hazelcast.proxy.cp.fenced_lock.FencedLock`, does not work for the + semaphore abstraction. Moreover, each permit is an independent entity. + Multiple permit acquires and reentrant lock acquires of a single endpoint are + not equivalent. The only case where a semaphore behaves like a lock is the + binary case, where the semaphore has only 1 permit. In this case, the semaphore + works like a non-reentrant lock. + + All of the API methods in the new CP Semaphore implementation offer + the exactly-once execution semantics for the session-aware version. + For instance, even if a ``release()`` call is internally retried + because of a crashed Hazelcast member, the permit is released only once. + However, this guarantee is not given for the sessionless, a.k.a, + JDK-compatible CP Semaphore. + """ + + def init(self, permits): + """Tries to initialize this Semaphore instance with the given permit count. + + Args: + permits (int): The given permit count. + + Returns: + hazelcast.future.Future[bool]: ``True`` if the initialization succeeds, + ``False`` if already initialized. + + Raises: + AssertionError: If the ``permits`` is negative. + """ + check_not_negative(permits, "Permits must be non-negative") + codec = semaphore_init_codec + request = codec.encode_request(self._group_id, self._object_name, permits) + return self._invoke(request, codec.decode_response) + + def acquire(self, permits=1): + """Acquires the given number of permits if they are available, + and returns immediately, reducing the number of available permits + by the given amount. + + If insufficient permits are available then the result of the returned + future is not set until one of the following things happens: + + - Some other caller invokes one of the ``release`` + methods for this semaphore, the current caller is next to be assigned + permits and the number of available permits satisfies this request, + - This Semaphore instance is destroyed + + Args: + permits (int): Optional number of permits to acquire; defaults to ``1`` + when not specified + + Returns: + hazelcast.future.Future[None]: + + Raises: + AssertionError: If the ``permits`` is not positive. + """ + raise NotImplementedError("acquire") + + def available_permits(self): + """Returns the current number of permits currently available in this semaphore. + + This method is typically used for debugging and testing purposes. + + Returns: + hazelcast.future.Future[int]: The number of permits available in this semaphore. + """ + codec = semaphore_available_permits_codec + request = codec.encode_request(self._group_id, self._object_name) + return self._invoke(request, codec.decode_response) + + def drain_permits(self): + """Acquires and returns all permits that are available at invocation time. + + Returns: + hazelcast.future.Future[int]: The number of permits drained. + """ + raise NotImplementedError("drain_permits") + + def reduce_permits(self, reduction): + """Reduces the number of available permits by the indicated amount. + + This method differs from ``acquire`` as it does not block until permits + become available. Similarly, if the caller has acquired some permits, + they are not released with this call. + + Args: + reduction (int): The number of permits to reduce. + + Returns: + hazelcast.future.Future[None]: + + Raises: + AssertionError: If the ``reduction`` is negative. + """ + check_not_negative(reduction, "Reduction must be non-negative") + if reduction == 0: + return ImmediateFuture(None) + + return self._do_change_permits(-reduction) + + def increase_permits(self, increase): + """Increases the number of available permits by the indicated amount. + + If there are some callers waiting for permits to become available, they + will be notified. Moreover, if the caller has acquired some permits, + they are not released with this call. + + Args: + increase (int): The number of permits to increase. + + Returns: + hazelcast.future.Future[None]: + + Raises: + AssertionError: If ``increase`` is negative. + """ + check_not_negative(increase, "Increase must be non-negative") + if increase == 0: + return ImmediateFuture(None) + + return self._do_change_permits(increase) + + def release(self, permits=1): + """Releases the given number of permits and increases the number of + available permits by that amount. + + If some callers in the cluster are blocked for acquiring permits, + they will be notified. + + If the underlying Semaphore implementation is non-JDK-compatible + (configured via ``jdk-compatible`` server-side setting), then a + client can only release a permit which it has acquired before. + In other words, a client cannot release a permit without acquiring + it first. + + Otherwise, which means the underlying implementation is JDK compatible + (configured via ``jdk-compatible`` server-side setting), there is no + requirement that a client that releases a permit must have acquired + that permit by calling one of the ``acquire()`` methods. A client can + freely release a permit without acquiring it first. In this case, + correct usage of a semaphore is established by programming convention + in the application. + + Args: + permits (int): Optional number of permits to release; defaults to ``1`` + when not specified. + + Returns: + hazelcast.future.Future[None]: + + Raises: + AssertionError: If the ``permits`` is not positive. + IllegalStateError: if the Semaphore is non-JDK-compatible and the caller + does not have a permit + """ + raise NotImplementedError("release") + + def try_acquire(self, permits=1, timeout=0): + """Acquires the given number of permits and returns ``True``, if they + become available during the given waiting time. + + If permits are acquired, the number of available permits in the Semaphore + instance is also reduced by the given amount. + + If no sufficient permits are available, then the result of the returned + future is not set until one of the following things happens: + + - Permits are released by other callers, the current caller is next to + be assigned permits and the number of available permits satisfies this + request + - The specified waiting time elapses + + Args: + permits (int): The number of permits to acquire; defaults to ``1`` + when not specified. + timeout (int): Optional timeout in seconds to wait for the permits; + when it's not specified the operation will return + immediately after the acquire attempt + + Returns: + hazelcast.future.Future[bool]: ``True`` if all permits were acquired, + ``false`` if the waiting time elapsed before all permits could be + acquired + + Raises: + AssertionError: If the ``permits`` is not positive. + """ + raise NotImplementedError("try_acquire") + + def _do_change_permits(self, permits): + raise NotImplementedError("_do_change_permits") + + +class SessionAwareSemaphore(Semaphore, SessionAwareCPProxy): + def acquire(self, permits=1): + check_true(permits > 0, "Permits must be positive") + current_thread_id = thread_id() + invocation_uuid = uuid.uuid4() + return self._do_acquire(current_thread_id, invocation_uuid, permits) + + def drain_permits(self): + current_thread_id = thread_id() + invocation_uuid = uuid.uuid4() + return self._do_drain(current_thread_id, invocation_uuid) + + def release(self, permits=1): + check_true(permits > 0, "Permits must be positive") + session_id = self._get_session_id() + if session_id == _NO_SESSION_ID: + return ImmediateExceptionFuture(self._new_illegal_state_error()) + + current_thread_id = thread_id() + invocation_uuid = uuid.uuid4() + + def check_response(response): + try: + response.result() + except SessionExpiredError as e: + self._invalidate_session(session_id) + raise self._new_illegal_state_error(e) + finally: + self._release_session(session_id, permits) + + return self._request_release(session_id, current_thread_id, invocation_uuid, permits).continue_with( + check_response) + + def try_acquire(self, permits=1, timeout=0): + check_true(permits > 0, "Permits must be positive") + timeout = max(0, timeout) + current_thread_id = thread_id() + invocation_uuid = uuid.uuid4() + return self._do_try_acquire(current_thread_id, invocation_uuid, permits, timeout) + + def _do_acquire(self, current_thread_id, invocation_uuid, permits): + def do_acquire_once(session_id): + session_id = session_id.result() + + def check_response(response): + try: + response.result() + except SessionExpiredError: + self._invalidate_session(session_id) + return self._do_acquire(current_thread_id, invocation_uuid, permits) + except WaitKeyCancelledError: + self._release_session(session_id, permits) + error = IllegalStateError("Semaphore(\"%s\") not acquired because the acquire call on the CP " + "group is cancelled, possibly because of another indeterminate call " + "from the same thread." % self._object_name) + raise error + except Exception as e: + self._release_session(session_id, permits) + raise e + + return self._request_acquire(session_id, current_thread_id, invocation_uuid, permits, -1).continue_with( + check_response) + + return self._acquire_session(permits).continue_with(do_acquire_once) + + def _do_drain(self, current_thread_id, invocation_uuid): + def do_drain_once(session_id): + session_id = session_id.result() + + def check_count(count): + try: + count = count.result() + self._release_session(session_id, _DRAIN_SESSION_ACQ_COUNT - count) + return count + except SessionExpiredError: + self._invalidate_session(session_id) + return self._do_drain(current_thread_id, invocation_uuid) + except Exception as e: + self._release_session(session_id, _DRAIN_SESSION_ACQ_COUNT) + raise e + + return self._request_drain(session_id, current_thread_id, invocation_uuid).continue_with(check_count) + + return self._acquire_session(_DRAIN_SESSION_ACQ_COUNT).continue_with(do_drain_once) + + def _do_change_permits(self, delta): + current_thread_id = thread_id() + invocation_uuid = uuid.uuid4() + + def do_change_permits_once(session_id): + session_id = session_id.result() + + def check_response(response): + try: + response.result() + except SessionExpiredError as e: + self._invalidate_session(session_id) + raise self._new_illegal_state_error(e) + finally: + self._release_session(session_id) + + return self._request_change(session_id, current_thread_id, invocation_uuid, delta).continue_with( + check_response) + + return self._acquire_session().continue_with(do_change_permits_once) + + def _do_try_acquire(self, current_thread_id, invocation_uuid, permits, timeout): + start = time.time() + + def do_try_acquire_once(session_id): + session_id = session_id.result() + + def check_response(response): + try: + acquired = response.result() + if not acquired: + self._release_session(session_id, permits) + return acquired + except SessionExpiredError: + self._invalidate_session(session_id) + remaining_timeout = timeout - (time.time() - start) + if remaining_timeout <= 0: + return False + return self._do_try_acquire(current_thread_id, invocation_uuid, permits, remaining_timeout) + except WaitKeyCancelledError: + self._release_session(session_id, permits) + return False + except Exception as e: + self._release_session(session_id, permits) + raise e + + return self._request_acquire(session_id, current_thread_id, invocation_uuid, permits, + timeout).continue_with(check_response) + + return self._acquire_session(permits).continue_with(do_try_acquire_once) + + def _new_illegal_state_error(self, cause=None): + error = IllegalStateError("Semaphore[\"%s\"] has no valid session!" % self._object_name, cause) + return error + + def _request_acquire(self, session_id, current_thread_id, invocation_uuid, permits, timeout): + codec = semaphore_acquire_codec + if timeout > 0: + timeout = to_millis(timeout) + + request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id, + invocation_uuid, permits, timeout) + return self._invoke(request, codec.decode_response) + + def _request_drain(self, session_id, current_thread_id, invocation_uuid): + codec = semaphore_drain_codec + request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id, + invocation_uuid) + return self._invoke(request, codec.decode_response) + + def _request_change(self, session_id, current_thread_id, invocation_uuid, delta): + codec = semaphore_change_codec + request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id, + invocation_uuid, delta) + return self._invoke(request) + + def _request_release(self, session_id, current_thread_id, invocation_uuid, permits): + codec = semaphore_release_codec + request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id, + invocation_uuid, permits) + return self._invoke(request) + + +class SessionlessSemaphore(Semaphore): + def __init__(self, context, group_id, service_name, proxy_name, object_name): + super(SessionlessSemaphore, self).__init__(context, group_id, service_name, proxy_name, object_name) + self._session_manager = context.proxy_session_manager + + def acquire(self, permits=1): + check_true(permits > 0, "Permits must be positive") + + def handler(f): + f.result() + return None + + return self._get_thread_id().continue_with(self._do_try_acquire, permits, -1).continue_with(handler) + + def drain_permits(self): + return self._get_thread_id().continue_with(self._do_drain_permits) + + def release(self, permits=1): + check_true(permits > 0, "Permits must be positive") + invocation_uuid = uuid.uuid4() + return self._get_thread_id().continue_with(self._request_release, invocation_uuid, permits) + + def try_acquire(self, permits=1, timeout=0): + check_true(permits > 0, "Permits must be positive") + timeout = max(0, timeout) + return self._get_thread_id().continue_with(self._do_try_acquire, permits, timeout) + + def _do_try_acquire(self, global_thread_id, permits, timeout): + global_thread_id = global_thread_id.result() + invocation_uuid = uuid.uuid4() + return self._request_acquire(global_thread_id, invocation_uuid, permits, timeout).continue_with( + self._check_acquire_response) + + def _do_drain_permits(self, global_thread_id): + global_thread_id = global_thread_id.result() + invocation_uuid = uuid.uuid4() + codec = semaphore_drain_codec + request = codec.encode_request(self._group_id, self._object_name, _NO_SESSION_ID, global_thread_id, + invocation_uuid) + return self._invoke(request, codec.decode_response) + + def _do_change_permits(self, permits): + invocation_uuid = uuid.uuid4() + return self._get_thread_id().continue_with(self._request_change, invocation_uuid, permits) + + def _request_acquire(self, global_thread_id, invocation_uuid, permits, timeout): + codec = semaphore_acquire_codec + if timeout > 0: + timeout = to_millis(timeout) + + request = codec.encode_request(self._group_id, self._object_name, _NO_SESSION_ID, global_thread_id, + invocation_uuid, permits, timeout) + return self._invoke(request, codec.decode_response) + + def _request_change(self, global_thread_id, invocation_uuid, permits): + global_thread_id = global_thread_id.result() + codec = semaphore_change_codec + request = codec.encode_request(self._group_id, self._object_name, _NO_SESSION_ID, global_thread_id, + invocation_uuid, permits) + return self._invoke(request) + + def _request_release(self, global_thread_id, invocation_uuid, permits): + global_thread_id = global_thread_id.result() + codec = semaphore_release_codec + request = codec.encode_request(self._group_id, self._object_name, _NO_SESSION_ID, global_thread_id, + invocation_uuid, permits) + return self._invoke(request) + + def _check_acquire_response(self, response): + try: + return response.result() + except WaitKeyCancelledError: + error = IllegalStateError("Semaphore(\"%s\") not acquired because the acquire call on the " + "CP group is cancelled, possibly because of another indeterminate " + "call from the same thread." % self._object_name) + raise error + + def _get_thread_id(self): + return self._session_manager.get_or_create_unique_thread_id(self._group_id) diff --git a/setup.py b/setup.py index 9e31473544..65d964bb16 100644 --- a/setup.py +++ b/setup.py @@ -58,5 +58,5 @@ package_data={'hazelcast': ["git_info.json"]}, install_requires=[], extras_require=extras, - tests_require=['thrift', 'nose', 'coverage', 'psutil', 'mock'], + tests_require=['thrift', 'nose', 'coverage', 'psutil', 'mock', 'parameterized'], ) diff --git a/test-requirements.txt b/test-requirements.txt index aa1fa60068..689bbd595b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,3 +3,4 @@ nose==1.3.7 coverage==4.5.1 psutil>=5.4.8 mock==3.0.5 +parameterized==0.7.4 diff --git a/tests/proxy/cp/hazelcast_cpsubsystem.xml b/tests/proxy/cp/hazelcast_cpsubsystem.xml index 19af200079..979c682b28 100644 --- a/tests/proxy/cp/hazelcast_cpsubsystem.xml +++ b/tests/proxy/cp/hazelcast_cpsubsystem.xml @@ -23,11 +23,11 @@ 3 - sessionless + sessionless* true - sessionaware + sessionaware* false diff --git a/tests/proxy/cp/semaphore_test.py b/tests/proxy/cp/semaphore_test.py new file mode 100644 index 0000000000..9e4a04510a --- /dev/null +++ b/tests/proxy/cp/semaphore_test.py @@ -0,0 +1,772 @@ +import threading +import time +import unittest + +from mock import MagicMock +from parameterized import parameterized + +from hazelcast import HazelcastClient +from hazelcast.cp import SEMAPHORE_SERVICE +from hazelcast.errors import DistributedObjectDestroyedError, IllegalStateError, HazelcastRuntimeError, \ + SessionExpiredError, WaitKeyCancelledError +from hazelcast.future import ImmediateExceptionFuture, ImmediateFuture +from hazelcast.protocol import RaftGroupId +from hazelcast.proxy.cp.semaphore import SessionlessSemaphore, SessionAwareSemaphore +from hazelcast.util import AtomicInteger +from tests.proxy.cp import CPTestCase +from tests.util import random_string + +SEMAPHORE_TYPES = [ + "sessionless", + "sessionaware", +] + + +class SemaphoreTest(CPTestCase): + def setUp(self): + self.semaphore = None + + def tearDown(self): + if self.semaphore: + self.semaphore.destroy() + + @parameterized.expand(SEMAPHORE_TYPES) + def test_semaphore_in_another_group(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 1) + another_semaphore = self.client.cp_subsystem.get_semaphore(semaphore._proxy_name + "@another").blocking() + + self.assertEqual(1, semaphore.available_permits()) + self.assertEqual(0, another_semaphore.available_permits()) + semaphore.acquire() + self.assertEqual(0, semaphore.available_permits()) + self.assertEqual(0, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_use_after_destroy(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + semaphore.destroy() + # the next destroy call should be ignored + semaphore.destroy() + + with self.assertRaises(DistributedObjectDestroyedError): + semaphore.init(1) + + semaphore2 = self.client.cp_subsystem.get_semaphore(semaphore._proxy_name).blocking() + + with self.assertRaises(DistributedObjectDestroyedError): + semaphore2.init(1) + + def test_session_aware_semaphore_after_client_shutdown(self): + semaphore = self.get_semaphore("sessionaware", 1) + another_client = HazelcastClient(cluster_name=self.cluster.id) + another_semaphore = another_client.cp_subsystem.get_semaphore(semaphore._proxy_name).blocking() + another_semaphore.acquire(1) + self.assertEqual(0, another_semaphore.available_permits()) + self.assertEqual(0, semaphore.available_permits()) + another_client.shutdown() + + def assertion(): + self.assertEqual(1, semaphore.available_permits()) + + self.assertTrueEventually(assertion) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_init(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + self.assertEqual(0, semaphore.available_permits()) + self.assertTrue(semaphore.init(10)) + self.assertEqual(10, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_init_when_already_initialized(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + self.assertTrue(semaphore.init(5)) + self.assertFalse(semaphore.init(7)) + self.assertEqual(5, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_acquire(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 42) + self.assertIsNone(semaphore.acquire(2)) + self.assertEqual(40, semaphore.available_permits()) + self.assertIsNone(semaphore.acquire()) + self.assertEqual(39, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_acquire_when_not_enough_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 5) + f = semaphore._wrapped.acquire(10) + self.assertFalse(f.done()) + time.sleep(2) + self.assertFalse(f.done()) + semaphore.destroy() + + with self.assertRaises(DistributedObjectDestroyedError): + f.result() + + @parameterized.expand(SEMAPHORE_TYPES) + def test_acquire_blocks_until_someone_releases(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 1) + event = threading.Event() + event2 = threading.Event() + + def run(): + semaphore.acquire(1) + event.set() + event2.wait() + time.sleep(1) + semaphore.release() + + t = threading.Thread(target=run) + t.start() + event.wait() + start = time.time() + f = semaphore._wrapped.acquire() + event2.set() + f.result() + self.assertGreaterEqual(time.time() - start, 1) + t.join() + + @parameterized.expand(SEMAPHORE_TYPES) + def test_acquire_blocks_until_semaphore_is_destroyed(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 1) + event = threading.Event() + event2 = threading.Event() + + def run(): + semaphore.acquire(1) + event.set() + event2.wait() + time.sleep(1) + semaphore.destroy() + + t = threading.Thread(target=run) + t.start() + event.wait() + start = time.time() + f = semaphore._wrapped.acquire() + event2.set() + + with self.assertRaises(DistributedObjectDestroyedError): + f.result() + + self.assertGreaterEqual(time.time() - start, 1) + t.join() + + @parameterized.expand(SEMAPHORE_TYPES) + def test_available_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + self.assertEqual(0, semaphore.available_permits()) + semaphore.init(5) + self.assertEqual(5, semaphore.available_permits()) + semaphore.acquire(3) + self.assertEqual(2, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_drain_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 20) + semaphore.acquire(5) + self.assertEqual(15, semaphore.drain_permits()) + self.assertEqual(0, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_drain_permits_when_no_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 0) + self.assertEqual(0, semaphore.drain_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_reduce_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 10) + self.assertIsNone(semaphore.reduce_permits(5)) + self.assertEqual(5, semaphore.available_permits()) + self.assertIsNone(semaphore.reduce_permits(0)) + self.assertEqual(5, semaphore.available_permits()) + + def test_reduce_permits_on_negative_permits_counter_sessionless(self): + semaphore = self.get_semaphore("sessionless", 10) + semaphore.reduce_permits(15) + self.assertEqual(-5, semaphore.available_permits()) + semaphore.release(10) + self.assertEqual(5, semaphore.available_permits()) + + def test_reduce_permits_on_negative_permits_counter_juc_sessionless(self): + semaphore = self.get_semaphore("sessionless", 0) + semaphore.reduce_permits(100) + semaphore.release(10) + self.assertEqual(-90, semaphore.available_permits()) + self.assertEqual(-90, semaphore.drain_permits()) + semaphore.release(10) + self.assertEqual(10, semaphore.available_permits()) + self.assertEqual(10, semaphore.drain_permits()) + + def test_reduce_permits_on_negative_permits_counter_session_aware(self): + semaphore = self.get_semaphore("sessionaware", 10) + semaphore.reduce_permits(15) + self.assertEqual(-5, semaphore.available_permits()) + + def test_reduce_permits_on_negative_permits_counter_juc_session_aware(self): + semaphore = self.get_semaphore("sessionaware", 0) + semaphore.reduce_permits(100) + self.assertEqual(-100, semaphore.available_permits()) + self.assertEqual(-100, semaphore.drain_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_increase_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 10) + self.assertEqual(10, semaphore.available_permits()) + self.assertIsNone(semaphore.increase_permits(100)) + self.assertEqual(110, semaphore.available_permits()) + self.assertIsNone(semaphore.increase_permits(0)) + self.assertEqual(110, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_release(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 2) + semaphore.acquire(2) + self.assertIsNone(semaphore.release(2)) + self.assertEqual(2, semaphore.available_permits()) + + def test_release_when_acquired_by_another_client_sessionless(self): + semaphore = self.get_semaphore("sessionless") + another_client = HazelcastClient(cluster_name=self.cluster.id) + another_semaphore = another_client.cp_subsystem.get_semaphore(semaphore._proxy_name).blocking() + self.assertTrue(another_semaphore.init(1)) + another_semaphore.acquire() + + try: + semaphore.release(1) + self.assertEqual(1, semaphore.available_permits()) + finally: + another_client.shutdown() + + def test_release_when_not_acquired_session_aware(self): + semaphore = self.get_semaphore("sessionaware", 3) + semaphore.acquire(1) + + with self.assertRaises(IllegalStateError): + semaphore.release(2) + + def test_release_when_there_is_no_session_session_aware(self): + semaphore = self.get_semaphore("sessionaware", 3) + + with self.assertRaises(IllegalStateError): + semaphore.release() + + @parameterized.expand(SEMAPHORE_TYPES) + def test_test_try_acquire(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 5) + self.assertTrue(semaphore.try_acquire()) + self.assertEqual(4, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_try_acquire_with_given_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 5) + self.assertTrue(semaphore.try_acquire(3)) + self.assertEqual(2, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_try_acquire_when_not_enough_permits(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 1) + self.assertFalse(semaphore.try_acquire(2)) + self.assertEqual(1, semaphore.available_permits()) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_try_acquire_when_not_enough_permits_with_timeout(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type, 1) + start = time.time() + self.assertFalse(semaphore.try_acquire(2, 1)) + self.assertGreaterEqual(time.time() - start, 1) + self.assertEqual(1, semaphore.available_permits()) + + def get_semaphore(self, semaphore_type, initialize_with=None): + semaphore = self.client.cp_subsystem.get_semaphore(semaphore_type + random_string()).blocking() + if initialize_with is not None: + semaphore.init(initialize_with) + self.semaphore = semaphore + return semaphore + + +class SemaphoreIllegalArgumentTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.group_id = RaftGroupId("test", 0, 42) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_init_with_negative(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.init(-1) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_acquire_with_zero(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.acquire(0) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_acquire_with_negative(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.acquire(-100) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_reduce_permits_with_negative(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.reduce_permits(-11) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_increase_permits_with_negative(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.increase_permits(-11) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_release_with_zero(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.release(0) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_release_with_negative(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.release(-5) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_try_acquire_with_zero(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.try_acquire(0) + + @parameterized.expand(SEMAPHORE_TYPES) + def test_try_acquire_with_negative(self, semaphore_type): + semaphore = self.get_semaphore(semaphore_type) + + with self.assertRaises(AssertionError): + semaphore.try_acquire(-112) + + def get_semaphore(self, semaphore_type): + context = MagicMock() + proxy_name = "semaphore@mygroup" + object_name = "semaphore" + if semaphore_type == "sessionless": + return SessionlessSemaphore(context, self.group_id, SEMAPHORE_SERVICE, proxy_name, object_name) + elif semaphore_type == "sessionaware": + return SessionAwareSemaphore(context, self.group_id, SEMAPHORE_SERVICE, proxy_name, object_name) + else: + self.fail("Unknown semaphore type") + + +DRAIN_SESSION_ACQ_COUNT = 1024 + + +class SessionAwareSemaphoreMockTest(unittest.TestCase): + def setUp(self): + self.acquire_session = MagicMock() + self.release_session = MagicMock() + self.invalidate_session = MagicMock() + self.session_manager = MagicMock(acquire_session=self.acquire_session, release_session=self.release_session, + invalidate_session=self.invalidate_session) + context = MagicMock(proxy_session_manager=self.session_manager) + self.group_id = RaftGroupId("test", 0, 42) + self.semaphore = SessionAwareSemaphore(context, self.group_id, SEMAPHORE_SERVICE, "semaphore@mygroup", + "semaphore").blocking() + + def test_acquire(self): + # Everything works + self.prepare_acquire_session(1) + self.mock_request_acquire(True) + self.assertIsNone(self.semaphore.acquire()) + self.assert_call_counts(1, 0, 0) + self.assert_acquire_count(1) + + def test_acquire_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.acquire() + + self.assert_call_counts(1, 0, 0) + self.assert_acquire_count(1) + + def test_acquire_on_session_expired_error(self): + # Session expired error comes from the server on acquire request, + # retries and succeeds + self.prepare_acquire_session(1) + self.mock_request_acquire(True, SessionExpiredError()) + self.assertIsNone(self.semaphore.acquire()) + self.assert_call_counts(2, 0, 1) + self.assert_acquire_count(1) + + def test_acquire_on_wait_key_cancelled_error(self): + # Wait key cancelled error comes from the server, should not retry + self.prepare_acquire_session(12) + self.mock_request_acquire(True, WaitKeyCancelledError()) + + with self.assertRaises(IllegalStateError): + self.semaphore.acquire() + + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(1) + self.assert_release_count(12, 1) + + def test_acquire_on_unspecified_error(self): + # Server sends another error, should not retry + self.prepare_acquire_session(123) + self.mock_request_acquire(False, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.acquire(15) + + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(15) + self.assert_release_count(123, 15) + + def test_drain(self): + # Everything works + self.prepare_acquire_session(42) + self.mock_request_drain(10) + self.assertEqual(10, self.semaphore.drain_permits()) + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(DRAIN_SESSION_ACQ_COUNT) + self.assert_release_count(42, DRAIN_SESSION_ACQ_COUNT - 10) + + def test_drain_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.drain_permits() + + self.assert_call_counts(1, 0, 0) + self.assert_acquire_count(DRAIN_SESSION_ACQ_COUNT) + + def test_drain_on_session_expired_error(self): + # Session expired error comes from the server on drain request, + # retries and succeeds + self.prepare_acquire_session(99) + self.mock_request_drain(101, SessionExpiredError()) + self.assertEqual(101, self.semaphore.drain_permits()) + self.assert_call_counts(2, 1, 1) + self.assert_acquire_count(DRAIN_SESSION_ACQ_COUNT) + self.assert_release_count(99, DRAIN_SESSION_ACQ_COUNT - 101) + + def test_drain_on_unspecified_error(self): + # Server sends another error, should not retry + self.prepare_acquire_session(123) + self.mock_request_drain(False, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.drain_permits() + + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(DRAIN_SESSION_ACQ_COUNT) + self.assert_release_count(123, DRAIN_SESSION_ACQ_COUNT) + + def test_reduce_permits(self): + # Everything works + self.prepare_acquire_session(42) + self.mock_request_change() + self.assertIsNone(self.semaphore.reduce_permits(15)) + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(1) + self.assert_release_count(42, 1) + + def test_reduce_permits_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.reduce_permits(12) + + self.assert_call_counts(1, 0, 0) + self.assert_acquire_count(1) + + def test_reduce_permits_on_session_expired_error(self): + # Session expired error comes from the server on change request + self.prepare_acquire_session(99) + self.mock_request_change(SessionExpiredError()) + + with self.assertRaises(IllegalStateError): + self.semaphore.reduce_permits(123) + + self.assert_call_counts(1, 1, 1) + # Session will be invalidated before released, so release call is actually a no-op + self.assert_acquire_count(1) + self.assert_release_count(99, 1) + + def test_reduce_permits_on_unspecified_error(self): + # Server sends another error + self.prepare_acquire_session(1123) + self.mock_request_change(HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.reduce_permits(54) + + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(1) + self.assert_release_count(1123, 1) + + def test_increase_permits(self): + # Everything works + self.prepare_acquire_session(42) + self.mock_request_change() + self.assertIsNone(self.semaphore.increase_permits(15)) + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(1) + self.assert_release_count(42, 1) + + def test_increase_permits_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.increase_permits(12) + + self.assert_call_counts(1, 0, 0) + self.assert_acquire_count(1) + + def test_increase_permits_on_session_expired_error(self): + # Session expired error comes from the server on change request + self.prepare_acquire_session(99) + self.mock_request_change(SessionExpiredError()) + + with self.assertRaises(IllegalStateError): + self.semaphore.increase_permits(123) + + self.assert_call_counts(1, 1, 1) + # Session will be invalidated before released, so release call is actually a no-op + self.assert_acquire_count(1) + self.assert_release_count(99, 1) + + def test_increase_permits_on_unspecified_error(self): + # Server sends another error + self.prepare_acquire_session(1123) + self.mock_request_change(HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.increase_permits(54) + + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(1) + self.assert_release_count(1123, 1) + + def test_release(self): + # Everything works + self.prepare_get_session(42) + self.mock_request_release() + self.assertIsNone(self.semaphore.release(15)) + self.assert_call_counts(0, 1, 0) + self.assert_release_count(42, 15) + + def test_release_no_session(self): + # No session found for the release request on the client. + self.prepare_get_session(-1) + + with self.assertRaises(IllegalStateError): + self.semaphore.release() + + self.assert_call_counts(0, 0, 0) + + def test_release_on_session_expired_error(self): + # Session expired error comes from the server on release request, + self.prepare_get_session(99) + self.mock_request_release(SessionExpiredError()) + + with self.assertRaises(IllegalStateError): + self.semaphore.release(123) + + self.assert_call_counts(0, 1, 1) + # Session will be invalidated before released, so release call is actually a no-op + self.assert_release_count(99, 123) + + def test_release_on_unspecified_error(self): + # Server sends another error + self.prepare_get_session(1123) + self.mock_request_release(HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.release(54) + + self.assert_call_counts(0, 1, 0) + self.assert_release_count(1123, 54) + + def test_try_acquire(self): + # Everything works + self.prepare_acquire_session(1) + self.mock_request_acquire(True) + self.assertTrue(self.semaphore.try_acquire(15)) + self.assert_call_counts(1, 0, 0) + self.assert_acquire_count(15) + + def test_try_acquire_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.try_acquire() + + self.assert_call_counts(1, 0, 0) + self.assert_acquire_count(1) + + def test_try_acquire_on_session_expired_error(self): + # Session expired error comes from the server on acquire request, + # determines the timeout + self.prepare_acquire_session(1) + self.mock_request_acquire(True, SessionExpiredError()) + self.assertFalse(self.semaphore.try_acquire()) + self.assert_call_counts(1, 0, 1) + self.assert_acquire_count(1) + + def test_try_acquire_on_session_expired_error_when_not_timed_out(self): + # Session expired error comes from the server on acquire request, + # retries and succeeds + self.prepare_acquire_session(123) + self.mock_request_acquire(True, SessionExpiredError()) + self.assertTrue(self.semaphore.try_acquire(15, 3)) + self.assert_call_counts(2, 0, 1) + self.assert_acquire_count(15) + + def test_try_acquire_on_wait_key_cancelled_error(self): + # Wait key cancelled error comes from the server, should not retry + self.prepare_acquire_session(12) + self.mock_request_acquire(True, WaitKeyCancelledError()) + self.assertFalse(self.semaphore.try_acquire()) + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(1) + self.assert_release_count(12, 1) + + def test_try_acquire_on_unspecified_error(self): + # Server sends another error, should not retry + self.prepare_acquire_session(123) + self.mock_request_acquire(False, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.try_acquire() + + self.assert_call_counts(1, 1, 0) + self.assert_acquire_count(1) + self.assert_release_count(123, 1) + + def assert_call_counts(self, acquire, release, invalidate): + self.assertEqual(acquire, self.acquire_session.call_count) + self.assertEqual(release, self.release_session.call_count) + self.assertEqual(invalidate, self.invalidate_session.call_count) + + def assert_acquire_count(self, count): + self.acquire_session.assert_called_with(self.group_id, count) + + def assert_release_count(self, session_id, count): + self.release_session.assert_called_with(self.group_id, session_id, count) + + def prepare_acquire_session(self, session_id, err=None): + if err: + val = ImmediateExceptionFuture(err) + else: + val = ImmediateFuture(session_id) + + acquire_mock = MagicMock(return_value=val) + release_mock = MagicMock() + invalidate_mock = MagicMock() + self.session_manager.acquire_session = acquire_mock + self.session_manager.release_session = release_mock + self.session_manager.invalidate_session = invalidate_mock + self.acquire_session = acquire_mock + self.release_session = release_mock + self.invalidate_session = invalidate_mock + + def prepare_get_session(self, session_id): + self.session_manager.get_session_id = MagicMock(return_value=session_id) + + def mock_request_acquire(self, acquired, first_call_err=None): + mock_request(self.semaphore, "_request_acquire", acquired, first_call_err) + + def mock_request_drain(self, count, first_call_err=None): + mock_request(self.semaphore, "_request_drain", count, first_call_err) + + def mock_request_change(self, first_call_err=None): + mock_request(self.semaphore, "_request_change", None, first_call_err) + + def mock_request_release(self, first_call_err=None): + mock_request(self.semaphore, "_request_release", None, first_call_err) + + +class SessionlessSemaphoreProxy(unittest.TestCase): + def setUp(self): + self.session_manager = MagicMock() + self.context = MagicMock(proxy_session_manager=self.session_manager) + self.semaphore = SessionlessSemaphore(self.context, RaftGroupId("name", 0, 42), SEMAPHORE_SERVICE, + "semaphore@mygroup", "semaphore").blocking() + + def test_acquire(self): + # Everything works + self.prepare_thread_id(12) + self.mock_acquire_request(True) + self.assertIsNone(self.semaphore.acquire()) + + def test_acquire_when_get_thread_id_fails(self): + # Client cannot even get the thread id + self.prepare_thread_id(-1, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.acquire() + + def test_acquire_on_wait_key_cancelled_error(self): + # Server sends wait key cancelled error, should not retry + self.prepare_thread_id(23) + self.mock_acquire_request(False, WaitKeyCancelledError()) + + with self.assertRaises(IllegalStateError): + self.semaphore.acquire() + + def test_try_acquire(self): + # Everything works + self.prepare_thread_id(12) + self.mock_acquire_request(True) + self.assertTrue(self.semaphore.try_acquire()) + + def test_try_acquire_when_get_thread_id_fails(self): + # Client cannot even get the thread id + self.prepare_thread_id(-1, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.semaphore.try_acquire() + + def test_try_acquire_on_wait_key_cancelled_error(self): + # Server sends wait key cancelled error, should not retry + self.prepare_thread_id(23) + self.mock_acquire_request(False, WaitKeyCancelledError()) + + with self.assertRaises(IllegalStateError): + self.semaphore.try_acquire() + + def prepare_thread_id(self, thread_id, err=None): + if err: + value = ImmediateExceptionFuture(err) + else: + value = ImmediateFuture(thread_id) + self.session_manager.get_or_create_unique_thread_id = MagicMock(return_value=value) + + def mock_acquire_request(self, acquired, first_call_err=None): + mock_request(self.semaphore, "_request_acquire", acquired, first_call_err) + + +def mock_request(semaphore, method_name, result, first_call_err): + called = AtomicInteger() + + def mock(*_, **__): + if called.get_and_increment() == 0 and first_call_err: + return ImmediateExceptionFuture(first_call_err) + return ImmediateFuture(result) + + setattr(semaphore._wrapped, method_name, MagicMock(side_effect=mock))