From aa38c9632b89f0c27ae8e7809b540dc5fec3f8ad Mon Sep 17 00:00:00 2001 From: mdumandag Date: Thu, 1 Oct 2020 18:15:30 +0300 Subject: [PATCH 1/6] Implement CP Fenced Lock Implementation, tests, documentation and code samples for the Fenced Lock is added. Also, the session manager for session aware CP proxies is implemented fully along with its test suit. The unused parts of the session manager will be used with the Semaphore proxy. --- README.md | 66 +- examples/cp/fenced_lock_example.py | 25 + hazelcast/client.py | 16 +- hazelcast/cp.py | 252 ++++++- hazelcast/errors.py | 48 +- hazelcast/future.py | 4 +- hazelcast/listener.py | 2 +- hazelcast/protocol/__init__.py | 3 + hazelcast/proxy/cp/__init__.py | 40 + hazelcast/proxy/cp/fenced_lock.py | 533 ++++++++++++++ hazelcast/proxy/executor.py | 2 +- hazelcast/proxy/map.py | 4 +- hazelcast/util.py | 9 + tests/cp_test.py | 231 ++++++ tests/future_test.py | 11 +- tests/proxy/cp/fenced_lock_test.py | 1099 ++++++++++++++++++++++++++++ 16 files changed, 2320 insertions(+), 25 deletions(-) create mode 100644 examples/cp/fenced_lock_example.py create mode 100644 hazelcast/proxy/cp/fenced_lock.py create mode 100644 tests/cp_test.py create mode 100644 tests/proxy/cp/fenced_lock_test.py diff --git a/README.md b/README.md index b36b1e6c98..ad21f0ff3e 100644 --- a/README.md +++ b/README.md @@ -57,8 +57,9 @@ * [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator) * [7.4.12. CP Subsystem](#7412-cp-subsystem) * [7.4.12.1. Using AtomicLong](#74121-using-atomiclong) - * [7.4.12.2. Using CountDownLatch](#74122-using-countdownlatch) - * [7.4.12.3. Using AtomicReference](#74123-using-atomicreference) + * [7.4.12.2. Using Lock](#74122-using-lock) + * [7.4.12.3. Using CountDownLatch](#74123-using-countdownlatch) + * [7.4.12.4. Using AtomicReference](#74124-using-atomicreference) * [7.5. Distributed Events](#75-distributed-events) * [7.5.1. Cluster Events](#751-cluster-events) * [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events) @@ -1577,7 +1578,7 @@ Refer to [CP Subsystem](https://docs.hazelcast.org/docs/latest/manual/html-singl Data structures in CP Subsystem run in CP groups. Each CP group elects its own Raft leader and runs the Raft consensus algorithm independently. The CP data structures differ from the other Hazelcast data structures in two aspects. First, an internal commit is performed on the METADATA CP group every time you fetch a proxy from this interface. -Hence, callers should cache returned proxy objects. Second, if you call `DistributedObject.destroy()` on a CP data structure proxy, +Hence, callers should cache returned proxy objects. Second, if you call `distributed_object.destroy()` on a CP data structure proxy, that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed. For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy. @@ -1610,7 +1611,62 @@ print ('CAS operation result:', result) AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures. It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting. -#### 7.4.12.2. Using CountDownLatch +#### 7.4.12.2. Using Lock + +Hazelcast `FencedLock` is the distributed and reentrant implementation of a linearizable lock. +It is CP with respect to the CAP principle. It works on top of the Raft consensus algorithm. +It offers linearizability during crash-stop failures and network partitions. +If a network partition occurs, it remains available on at most one side of the partition. + +A basic Lock usage example is shown below. + +```python +# Get a FencedLock called "my-lock" +lock = client.cp_subsystem.get_lock("my-lock").blocking() +# Acquire the lock +lock.lock() +try: + # Your guarded code goes here + pass +finally: + # Make sure to release the lock + lock.unlock() +``` + +FencedLock works on top of CP sessions. It keeps a CP session open while the lock is acquired. Please refer to [CP Session](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-sessions) documentation for more information. + +By default, FencedLock is reentrant. Once a caller acquires the lock, it can acquire the lock reentrantly as many times as it wants in a linearizable manner. +You can configure the reentrancy behavior on the member side. For instance, reentrancy can be disabled and FencedLock can work as a non-reentrant mutex. +You can also set a custom reentrancy limit. When the reentrancy limit is already reached, FencedLock does not block a lock call. +Instead, it fails with ``LockAcquireLimitReachedError`` or a specified return value. + +Distributed locks are unfortunately *not equivalent* to single-node mutexes because of the complexities in distributed systems, such as uncertain communication patterns, and independent and partial failures. +In an asynchronous network, no lock service can guarantee mutual exclusion, because there is no way to distinguish between a slow and a crashed process. +Consider the following scenario, where a Hazelcast client acquires a FencedLock, then hits a long pause. +Since it will not be able to commit session heartbeats while paused, its CP session will be eventually closed. +After this moment, another Hazelcast client can acquire this lock. +If the first client wakes up again, it may not immediately notice that it has lost ownership of the lock. +In this case, multiple clients think they hold the lock. If they attempt to perform an operation on a shared resource, they can break the system. +To prevent such situations, you can choose to use an infinite session timeout, but this time probably you are going to deal with liveliness issues. +For the scenario above, even if the first client actually crashes, requests sent by 2 clients can be re-ordered in the network and hit the external resource in reverse order. + +There is a simple solution for this problem. Lock holders are ordered by a monotonic fencing token, which increments each time the lock is assigned to a new owner. +This fencing token can be passed to external services or resources to ensure sequential execution of side effects performed by lock holders. + +The following diagram illustrates the idea. Client-1 acquires the lock first and receives ``1`` as its fencing token. +Then, it passes this token to the external service, which is our shared resource in this scenario. +Just after that, Client-1 hits a long GC pause and eventually loses ownership of the lock because it misses to commit CP session heartbeats. +Then, Client-2 chimes in and acquires the lock. Similar to Client-1, Client-2 passes its fencing token to the external service. +After that, once Client-1 comes back alive, its write request will be rejected by the external service, and only Client-2 will be able to safely talk to it. + +![CP Fenced Lock diagram](https://docs.hazelcast.org/docs/latest/manual/html-single/images/FencedLock.png) + +You can read more about the fencing token idea in Martin Kleppmann's "How to do distributed locking" blog post and Google's Chubby paper. + +To get fencing token, one may use ``lock.lock_and_get_fence()`` or ``lock.try_lock_and_get_fence()`` utility methods, or ``lock.get_fence()`` method +while holding the lock. + +#### 7.4.12.3. Using CountDownLatch Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch. This data structure is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes. @@ -1646,7 +1702,7 @@ print("Count is zero:", count_is_zero) > **NOTE: CountDownLatch count can be reset with `try_set_count()` after a countdown has finished, but not during an active count.** -#### 7.4.12.3. Using AtomicReference +#### 7.4.12.4. Using AtomicReference Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference. It provides a set of atomic operations allowing to modify the value behind the reference. diff --git a/examples/cp/fenced_lock_example.py b/examples/cp/fenced_lock_example.py new file mode 100644 index 0000000000..9152006822 --- /dev/null +++ b/examples/cp/fenced_lock_example.py @@ -0,0 +1,25 @@ +import hazelcast + +client = hazelcast.HazelcastClient() + +lock = client.cp_subsystem.get_lock("my-lock").blocking() + +locked = lock.is_locked() +print("Locked initially:", locked) + +fence = lock.lock_and_get_fence() +print("Fence token:", fence) +try: + locked = lock.is_locked() + print("Locked after lock:", locked) + + locked = lock.try_lock() + print("Locked reentratly:", locked) + + # more guarded code +finally: + # unlock must be called for each successful lock request + lock.unlock() + lock.unlock() + +client.shutdown() diff --git a/hazelcast/client.py b/hazelcast/client.py index f30ec309e7..86fd9b664f 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -7,7 +7,7 @@ from hazelcast.config import _Config from hazelcast.connection import ConnectionManager, DefaultAddressProvider from hazelcast.core import DistributedObjectInfo, DistributedObjectEvent -from hazelcast.cp import CPSubsystem +from hazelcast.cp import CPSubsystem, ProxySessionManager from hazelcast.invocation import InvocationService, Invocation from hazelcast.listener import ListenerService, ClusterViewListenerService from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService @@ -331,6 +331,7 @@ def __init__(self, **kwargs): self._logger_extras) self._proxy_manager = ProxyManager(self._context) self.cp_subsystem = CPSubsystem(self._context) + self._proxy_session_manager = ProxySessionManager(self._context) self._transaction_manager = TransactionManager(self._context, self._logger_extras) self._lock_reference_id_generator = AtomicInteger(1) self._statistics = Statistics(self, self._reactor, self._connection_manager, @@ -348,7 +349,8 @@ def _init_context(self): self._context.init_context(self.config, self._invocation_service, self._internal_partition_service, self._internal_cluster_service, self._connection_manager, self._serialization_service, self._listener_service, self._proxy_manager, - self._near_cache_manager, self._lock_reference_id_generator, self._logger_extras) + self._near_cache_manager, self._lock_reference_id_generator, self._logger_extras, + self.name, self._proxy_session_manager, self._reactor) def _start(self): self._reactor.start() @@ -597,6 +599,7 @@ def shutdown(self): if self._internal_lifecycle_service.running: self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN) self._internal_lifecycle_service.shutdown() + self._proxy_session_manager.shutdown().result() self._near_cache_manager.destroy_near_caches() self._connection_manager.shutdown() self._invocation_service.shutdown() @@ -666,11 +669,15 @@ def __init__(self): self.near_cache_manager = None self.lock_reference_id_generator = None self.logger_extras = None + self.name = None + self.proxy_session_manager = None + self.reactor = None def init_context(self, config, invocation_service, partition_service, cluster_service, connection_manager, serialization_service, listener_service, proxy_manager, near_cache_manager, - lock_reference_id_generator, logger_extras): + lock_reference_id_generator, logger_extras, name, + proxy_session_manager, reactor): self.config = config self.invocation_service = invocation_service self.partition_service = partition_service @@ -682,3 +689,6 @@ def init_context(self, config, invocation_service, partition_service, self.near_cache_manager = near_cache_manager self.lock_reference_id_generator = lock_reference_id_generator self.logger_extras = logger_extras + self.name = name + self.proxy_session_manager = proxy_session_manager + self.reactor = reactor diff --git a/hazelcast/cp.py b/hazelcast/cp.py index e84be91d3a..cf6db56cbd 100644 --- a/hazelcast/cp.py +++ b/hazelcast/cp.py @@ -1,9 +1,17 @@ +import time +from threading import RLock + +from hazelcast import six +from hazelcast.errors import SessionExpiredError, CPGroupDestroyedError, HazelcastClientNotActiveError +from hazelcast.future import ImmediateExceptionFuture, ImmediateFuture, combine_futures from hazelcast.invocation import Invocation -from hazelcast.protocol.codec import cp_group_create_cp_group_codec +from hazelcast.protocol.codec import cp_group_create_cp_group_codec, cp_session_heartbeat_session_codec, \ + cp_session_create_session_codec, cp_session_close_session_codec, cp_session_generate_thread_id_codec from hazelcast.proxy.cp.atomic_long import AtomicLong from hazelcast.proxy.cp.atomic_reference import AtomicReference from hazelcast.proxy.cp.count_down_latch import CountDownLatch -from hazelcast.util import check_true +from hazelcast.proxy.cp.fenced_lock import FencedLock +from hazelcast.util import check_true, AtomicInteger, thread_id class CPSubsystem(object): @@ -95,6 +103,26 @@ def get_count_down_latch(self, name): """ return self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name) + def get_lock(self, name): + """Returns the distributed FencedLock instance instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the `name` argument, then the + FencedLock instance will be created on the DEFAULT CP group. + If a group name is given, like ``.get_lock("myLock@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name (str): Name of the FencedLock + + Returns: + hazelcast.proxy.cp.fenced_lock.FencedLock: The FencedLock proxy + for the given name. + """ + return self._proxy_manager.get_or_create(LOCK_SERVICE, name) + _DEFAULT_GROUP_NAME = "default" @@ -127,11 +155,13 @@ def _get_object_name_for_proxy(name): ATOMIC_LONG_SERVICE = "hz:raft:atomicLongService" ATOMIC_REFERENCE_SERVICE = "hz:raft:atomicRefService" COUNT_DOWN_LATCH_SERVICE = "hz:raft:countDownLatchService" +LOCK_SERVICE = "hz:raft:lockService" class CPProxyManager(object): def __init__(self, context): self._context = context + self._lock_proxies = dict() # proxy_name to FencedLock def get_or_create(self, service_name, proxy_name): proxy_name = _without_default_group_name(proxy_name) @@ -144,6 +174,24 @@ def get_or_create(self, service_name, proxy_name): return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) elif service_name == COUNT_DOWN_LATCH_SERVICE: return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == LOCK_SERVICE: + return self._create_fenced_lock(group_id, proxy_name, object_name) + + def _create_fenced_lock(self, group_id, proxy_name, object_name): + while True: + proxy = self._lock_proxies.get(proxy_name, None) + if proxy: + if proxy.get_group_id() != group_id: + self._lock_proxies.pop(proxy_name, None) + else: + return proxy + + proxy = FencedLock(self._context, group_id, LOCK_SERVICE, proxy_name, object_name) + existing = self._lock_proxies.setdefault(proxy_name, proxy) + if existing.get_group_id() == proxy.get_group_id(): + return existing + + group_id = self._get_group_id(proxy_name) def _get_group_id(self, proxy_name): codec = cp_group_create_cp_group_codec @@ -152,3 +200,203 @@ def _get_group_id(self, proxy_name): invocation_service = self._context.invocation_service invocation_service.invoke(invocation) return invocation.future.result() + + +class _SessionState(object): + __slots__ = ("id", "group_id", "ttl", "creation_time", "acquire_count") + + def __init__(self, state_id, group_id, ttl): + self.id = state_id + self.ttl = ttl + self.group_id = group_id + self.creation_time = time.time() + self.acquire_count = AtomicInteger() + + def acquire(self, count): + self.acquire_count.add(count) + return self.id + + def release(self, count): + self.acquire_count.add(-count) + + def is_valid(self): + return self.is_in_use() or not self._is_expired(time.time()) + + def is_in_use(self): + return self.acquire_count.get() > 0 + + def _is_expired(self, timestamp): + expiration_time = self.creation_time + self.ttl + if expiration_time < 0: + expiration_time = six.MAXSIZE + return timestamp > expiration_time + + def __eq__(self, other): + return isinstance(other, _SessionState) and self.id == other.id + + def __ne__(self, other): + return not self.__eq__(other) + + +_NO_SESSION_ID = -1 + + +class ProxySessionManager(object): + def __init__(self, context): + self._context = context + self._mutexes = dict() # RaftGroupId to RLock + self._sessions = dict() # RaftGroupId to SessionState + self._thread_ids = dict() # (RaftGroupId, thread_id) to global thread id + self._heartbeat_timer = None + self._shutdown = False + self._lock = RLock() + + def get_session_id(self, group_id): + session = self._sessions.get(group_id, None) + if session: + return session.id + return _NO_SESSION_ID + + def acquire_session(self, group_id, count): + return self._get_or_crate_session(group_id).continue_with(lambda state: state.result().acquire(count)) + + def release_session(self, group_id, session_id, count): + session = self._sessions.get(group_id, None) + if session and session.id == session_id: + session.release(count) + + def invalidate_session(self, group_id, session_id): + session = self._sessions.get(group_id, None) + if session and session.id == session_id: + self._sessions.pop(group_id, None) + + def get_or_create_unique_thread_id(self, group_id): + with self._lock: + if self._shutdown: + error = HazelcastClientNotActiveError("Session manager is already shut down!") + return ImmediateExceptionFuture(error) + + key = (group_id, thread_id()) + global_thread_id = self._thread_ids.get(key) + if global_thread_id: + return ImmediateFuture(global_thread_id) + + return self._request_generate_thread_id(group_id).continue_with( + lambda t_id: self._thread_ids.setdefault(key, t_id.result())) + + def shutdown(self): + with self._lock: + if self._shutdown: + return ImmediateFuture(None) + + self._shutdown = True + if self._heartbeat_timer: + self._heartbeat_timer.cancel() + + futures = [] + for session in list(self._sessions.values()): + future = self._request_close_session(session.group_id, session.id) + futures.append(future) + + def clear(_): + self._sessions.clear() + self._mutexes.clear() + self._thread_ids.clear() + + return combine_futures(futures).continue_with(clear) + + def _request_generate_thread_id(self, group_id): + codec = cp_session_generate_thread_id_codec + request = codec.encode_request(group_id) + invocation = Invocation(request, response_handler=codec.decode_response) + self._context.invocation_service.invoke(invocation) + return invocation.future + + def _request_close_session(self, group_id, session_id): + codec = cp_session_close_session_codec + request = codec.encode_request(group_id, session_id) + invocation = Invocation(request, response_handler=codec.decode_response) + self._context.invocation_service.invoke(invocation) + return invocation.future + + def _get_or_crate_session(self, group_id): + with self._lock: + if self._shutdown: + error = HazelcastClientNotActiveError("Session manager is already shut down!") + return ImmediateExceptionFuture(error) + + session = self._sessions.get(group_id, None) + if session is None or not session.is_valid(): + with self._mutex(group_id): + session = self._sessions.get(group_id) + if session is None or not session.is_valid(): + return self._create_new_session(group_id) + return ImmediateFuture(session) + + def _create_new_session(self, group_id): + f = self._request_new_session(group_id) + return f.continue_with(self._do_create_new_session, group_id) + + def _do_create_new_session(self, response, group_id): + # called from the reactor thread only + response = response.result() + session = _SessionState(response["session_id"], group_id, response["ttl_millis"] / 1000.0) + self._sessions[group_id] = session + self._start_heartbeat_timer(response["heartbeat_millis"] / 1000.0) + return session + + def _request_new_session(self, group_id): + codec = cp_session_create_session_codec + request = codec.encode_request(group_id, self._context.name) + invocation = Invocation(request, response_handler=codec.decode_response) + self._context.invocation_service.invoke(invocation) + return invocation.future + + def _mutex(self, group_id): + mutex = self._mutexes.get(group_id, None) + if mutex: + return mutex + + mutex = RLock() + current = self._mutexes.setdefault(group_id, mutex) + return current + + def _start_heartbeat_timer(self, period): + if self._heartbeat_timer is not None: + return + + def heartbeat(): + if self._shutdown: + return + + for session in list(self._sessions.values()): + if session.is_in_use(): + + def cb(heartbeat_future): + if heartbeat_future.is_success(): + return + + error = heartbeat_future.exception() + if isinstance(error, (SessionExpiredError, CPGroupDestroyedError)): + self._invalidate_session(session.group_id, session.id) + + f = self._request_heartbeat(session.group_id, session.id) + f.add_done_callback(cb) + + r = self._context.reactor + self._heartbeat_timer = r.add_timer(period, heartbeat) + + reactor = self._context.reactor + self._heartbeat_timer = reactor.add_timer(period, heartbeat) + + def _request_heartbeat(self, group_id, session_id): + codec = cp_session_heartbeat_session_codec + request = codec.encode_request(group_id, session_id) + invocation = Invocation(request) + self._context.invocation_service.invoke(invocation) + return invocation.future + + def _invalidate_session(self, group_id, session_id): + session = self._sessions.get(group_id, None) + if session and session.id == session_id: + self._sessions.pop(group_id, None) diff --git a/hazelcast/errors.py b/hazelcast/errors.py index 6f9de0ac44..24d742150f 100644 --- a/hazelcast/errors.py +++ b/hazelcast/errors.py @@ -353,7 +353,7 @@ class ConsistencyLostError(HazelcastError): pass -class HazelcastClientNotActiveError(ValueError): +class HazelcastClientNotActiveError(HazelcastError): def __init__(self, message="Client is not active"): super(HazelcastClientNotActiveError, self).__init__(message) @@ -399,6 +399,43 @@ class UndefinedErrorCodeError(HazelcastError): pass +class SessionExpiredError(HazelcastError): + pass + + +class WaitKeyCancelledError(HazelcastError): + pass + + +class LockAcquireLimitReachedError(HazelcastError): + pass + + +class LockOwnershipLostError(HazelcastError): + pass + + +class CPGroupDestroyedError(HazelcastError): + pass + + +@retryable +class CannotReplicateError(HazelcastError): + pass + + +class LeaderDemotedError(HazelcastError): + pass + + +class StaleAppendRequestError(HazelcastError): + pass + + +class NotLeaderError(HazelcastError): + pass + + # Error Codes _UNDEFINED = 0 _ARRAY_INDEX_OUT_OF_BOUNDS = 1 @@ -582,6 +619,15 @@ class UndefinedErrorCodeError(HazelcastError): _TARGET_NOT_REPLICA_EXCEPTION: TargetNotReplicaError, _MUTATION_DISALLOWED_EXCEPTION: MutationDisallowedError, _CONSISTENCY_LOST_EXCEPTION: ConsistencyLostError, + _SESSION_EXPIRED_EXCEPTION: SessionExpiredError, + _WAIT_KEY_CANCELLED_EXCEPTION: WaitKeyCancelledError, + _LOCK_ACQUIRE_LIMIT_REACHED_EXCEPTION: LockAcquireLimitReachedError, + _LOCK_OWNERSHIP_LOST_EXCEPTION: LockOwnershipLostError, + _CP_GROUP_DESTROYED_EXCEPTION: CPGroupDestroyedError, + _CANNOT_REPLICATE_EXCEPTION: CannotReplicateError, + _LEADER_DEMOTED_EXCEPTION: LeaderDemotedError, + _STALE_APPEND_REQUEST_EXCEPTION: StaleAppendRequestError, + _NOT_LEADER_EXCEPTION: NotLeaderError, _VERSION_MISMATCH_EXCEPTION: VersionMismatchError, _NO_SUCH_METHOD_ERROR: NoSuchMethodError, _NO_SUCH_METHOD_EXCEPTION: NoSuchMethodException, diff --git a/hazelcast/future.py b/hazelcast/future.py index 7c72c3f698..5cb25fcbca 100644 --- a/hazelcast/future.py +++ b/hazelcast/future.py @@ -249,11 +249,11 @@ def add_done_callback(self, callback): self._invoke_cb(callback) -def combine_futures(*futures): +def combine_futures(futures): """Combines set of Futures. Args: - *futures (Future): Futures to be combined. + futures (list[Future]): List of Futures to be combined. Returns: Future: Result of the combination. diff --git a/hazelcast/listener.py b/hazelcast/listener.py index b219e38599..08bc7a79c8 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -59,7 +59,7 @@ def register_listener(self, registration_request, decode_register_response, enco futures.append(future) try: - combine_futures(*futures).result() + combine_futures(futures).result() except: self.deregister_listener(registration_id) raise HazelcastError("Listener cannot be added") diff --git a/hazelcast/protocol/__init__.py b/hazelcast/protocol/__init__.py index 25f20308b4..cc31e4f8c9 100644 --- a/hazelcast/protocol/__init__.py +++ b/hazelcast/protocol/__init__.py @@ -60,3 +60,6 @@ def __ne__(self, other): def __hash__(self): return hash((self.name, self.seed, self.id)) + + def __repr__(self): + return "RaftGroupId(name=%s, seed=%s, id=%s)" % (self.name, self.seed, self.id) diff --git a/hazelcast/proxy/cp/__init__.py b/hazelcast/proxy/cp/__init__.py index 68b665716d..20d11e3978 100644 --- a/hazelcast/proxy/cp/__init__.py +++ b/hazelcast/proxy/cp/__init__.py @@ -33,3 +33,43 @@ def _invoke(self, request, response_handler=_no_op_response_handler): invocation = Invocation(request, response_handler=response_handler) self._invocation_service.invoke(invocation) return invocation.future + + +class SessionAwareCPProxy(BaseCPProxy): + def __init__(self, context, group_id, service_name, proxy_name, object_name): + super(SessionAwareCPProxy, self).__init__(context, group_id, service_name, proxy_name, object_name) + self._session_manager = context.proxy_session_manager + + def get_group_id(self): + """ + Returns: + hazelcast.protocol.RaftGroupId: Id of the CP group that runs this proxy. + """ + return self._group_id + + def _get_or_create_unique_thread_id(self): + """ + Returns: + hazelcast.future.Future[int]: Cluster-wide unique thread id. + """ + return self._session_manager.get_or_create_unique_thread_id(self._group_id) + + def _get_session_id(self): + """ + Returns: + int: Session id. + """ + return self._session_manager.get_session_id(self._group_id) + + def _acquire_session(self, count=1): + """ + Returns: + hazelcast.future.Future[int]: Session id. + """ + return self._session_manager.acquire_session(self._group_id, count) + + def _release_session(self, session_id, count=1): + self._session_manager.release_session(self._group_id, session_id, count) + + def _invalidate_session(self, session_id): + self._session_manager.invalidate_session(self._group_id, session_id) diff --git a/hazelcast/proxy/cp/fenced_lock.py b/hazelcast/proxy/cp/fenced_lock.py new file mode 100644 index 0000000000..867b71b470 --- /dev/null +++ b/hazelcast/proxy/cp/fenced_lock.py @@ -0,0 +1,533 @@ +import time +import uuid + +from hazelcast.errors import LockOwnershipLostError, LockAcquireLimitReachedError, SessionExpiredError, \ + WaitKeyCancelledError, IllegalMonitorStateError +from hazelcast.future import ImmediateExceptionFuture +from hazelcast.protocol.codec import fenced_lock_lock_codec, fenced_lock_try_lock_codec, fenced_lock_unlock_codec, \ + fenced_lock_get_lock_ownership_codec +from hazelcast.proxy.cp import SessionAwareCPProxy +from hazelcast.util import thread_id, to_millis + +_NO_SESSION_ID = -1 + + +class FencedLock(SessionAwareCPProxy): + """A linearizable, distributed lock. + + FencedLock is CP with respect to the CAP principle. It works on top + of the Raft consensus algorithm. It offers linearizability during crash-stop + failures and network partitions. If a network partition occurs, it remains + available on at most one side of the partition. + + FencedLock works on top of CP sessions. Please refer to CP Session + IMDG documentation section for more information. + + By default, {@link FencedLock} is reentrant. Once a caller acquires + the lock, it can acquire the lock reentrantly as many times as it wants + in a linearizable manner. You can configure the reentrancy behaviour + on the member side. For instance, reentrancy can be disabled and + FencedLock can work as a non-reentrant mutex. One can also set + a custom reentrancy limit. When the reentrancy limit is reached, + FencedLock does not block a lock call. Instead, it fails with + ``LockAcquireLimitReachedException`` or a specified return value. + Please check the locking methods to see details about the behaviour. + """ + + INVALID_FENCE = 0 + + def __init__(self, context, group_id, service_name, proxy_name, object_name): + super(FencedLock, self).__init__(context, group_id, service_name, proxy_name, object_name) + self._lock_session_ids = dict() # thread-id to session id that has acquired the lock + + def lock(self): + """Acquires the lock. + + When the caller already holds the lock and the current ``lock()`` call is + reentrant, the call can fail with ``LockAcquireLimitReachedException`` + if the lock acquire limit is already reached. + + If the lock is not available then the current thread becomes disabled + for thread scheduling purposes and lies dormant until the lock has been + acquired. + + Returns: + hazelcast.future.Future[None]: + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + LockAcquireLimitReachedException: If the lock call is reentrant + and the configured lock acquire limit is already reached. + """ + def handler(f): + f.result() + return None + + return self.lock_and_get_fence().continue_with(handler) + + def lock_and_get_fence(self): + """Acquires the lock and returns the fencing token assigned to the current + thread for this lock acquire. + + If the lock is acquired reentrantly, the same fencing token is returned, + or the ``lock()`` call can fail with ``LockAcquireLimitReachedException`` + if the lock acquire limit is already reached. + + If the lock is not available then the current thread becomes disabled + for thread scheduling purposes and lies dormant until the lock has been + acquired. + + This is a convenience method for the following pattern :: + + lock = client.cp_subsystem.get_lock("lock").blocking() + lock.lock() + fence = lock.get_fence() + + Fencing tokens are monotonic numbers that are incremented each time + the lock switches from the free state to the acquired state. They are + simply used for ordering lock holders. A lock holder can pass + its fencing to the shared resource to fence off previous lock holders. + When this resource receives an operation, it can validate the fencing + token in the operation. + + Consider the following scenario where the lock is free initially :: + + lock = client.cp_subsystem.get_lock("lock").blocking() + fence1 = lock.lock_and_get_fence() # (1) + fence2 = lock.lock_and_get_fence() # (2) + assert fence1 == fence2 + lock.unlock() + lock.unlock() + fence3 = lock.lock_and_get_fence() # (3) + assert fence3 > fence1 + + In this scenario, the lock is acquired by a thread in the cluster. Then, + the same thread reentrantly acquires the lock again. The fencing token + returned from the second acquire is equal to the one returned from the + first acquire, because of reentrancy. After the second acquire, the lock + is released 2 times, hence becomes free. There is a third lock acquire + here, which returns a new fencing token. Because this last lock acquire + is not reentrant, its fencing token is guaranteed to be larger than the + previous tokens, independent of the thread that has acquired the lock. + + Returns: + hazelcast.future.Future[int]: The fencing token. + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + LockAcquireLimitReachedException: If the lock call is reentrant + and the configured lock acquire limit is already reached. + """ + current_thread_id = thread_id() + invocation_uuid = uuid.uuid4() + return self._do_lock(current_thread_id, invocation_uuid) + + def try_lock(self, timeout=0): + """Acquires the lock if it is free within the given waiting time, + or already held by the current thread. + + If the lock is available, this method returns immediately with the value + ``True``. When the call is reentrant, it immediately returns + ``True`` if the lock acquire limit is not exceeded. Otherwise, + it returns ``False`` on the reentrant lock attempt if the acquire + limit is exceeded. + + If the lock is not available then the current thread becomes disabled + for thread scheduling purposes and lies dormant until the lock is + acquired by the current thread or the specified waiting time elapses. + + If the lock is acquired, then the value ``True`` is returned. + + If the specified waiting time elapses, then the value ``False`` + is returned. If the time is less than or equal to zero, the method does + not wait at all. By default, timeout is set to zero. + + A typical usage idiom for this method would be :: + + lock = client.cp_subsystem.get_lock("lock").blocking() + if lock.try_lock(): + try: + # manipulate the protected state + finally: + lock.unlock() + else: + # perform another action + + This usage ensures that the lock is unlocked if it was acquired, + and doesn't try to unlock if the lock was not acquired. + + Args: + timeout (int): The maximum time to wait for the lock in seconds. + + Returns: + hazelcast.future.Future[bool]: ``True`` if the lock was acquired and ``False`` + if the waiting time elapsed before the lock was acquired. + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + """ + return self.try_lock_and_get_fence(timeout).continue_with(lambda f: f.result() != self.INVALID_FENCE) + + def try_lock_and_get_fence(self, timeout=0): + """Acquires the lock if it is free within the given waiting time, + or already held by the current thread at the time of invocation and, + the acquire limit is not exceeded, and returns the fencing token + assigned to the current thread for this lock acquire. + + If the lock is acquired reentrantly, the same fencing token is returned. + If the lock acquire limit is exceeded, then this method immediately returns + :const:`INVALID_FENCE` that represents a failed lock attempt. + + If the lock is not available then the current thread becomes disabled + for thread scheduling purposes and lies dormant until the lock is + acquired by the current thread or the specified waiting time elapses. + + If the specified waiting time elapses, then :const:`INVALID_FENCE` + is returned. If the time is less than or equal to zero, the method does + not wait at all. By default, timeout is set to zero. + + This is a convenience method for the following pattern :: + + lock = client.cp_subsystem.get_lock("lock").blocking() + if lock.try_lock(): + fence = lock.get_fence() + else: + fence = lock.INVALID_FENCE + + See Also: + :func:`lock_and_get_fence` function for more information about fences. + + Args: + timeout (int): The maximum time to wait for the lock in seconds. + + Returns: + hazelcast.future.Future[int]: The fencing token if the lock was acquired and + :const:`INVALID_FENCE` otherwise. + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + """ + current_thread_id = thread_id() + invocation_uuid = uuid.uuid4() + timeout = max(0, timeout) + return self._do_try_lock(current_thread_id, invocation_uuid, timeout) + + def unlock(self): + """Releases the lock if the lock is currently held by the current thread. + + Returns: + hazelcast.future.Future[None]: + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + IllegalMonitorStateError: If the lock is not held by + the current thread + """ + current_thread_id = thread_id() + session_id = self._get_session_id() + + # the order of the following checks is important + try: + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + except LockOwnershipLostError as e: + return ImmediateExceptionFuture(e) + + if session_id == _NO_SESSION_ID: + self._lock_session_ids.pop(current_thread_id, None) + return ImmediateExceptionFuture(self._new_illegal_monitor_state_error()) + + def check_response(f): + try: + still_locked_by_the_current_thread = f.result() + if still_locked_by_the_current_thread: + self._lock_session_ids[current_thread_id] = session_id + else: + self._lock_session_ids.pop(current_thread_id, None) + + self._release_session(session_id) + except SessionExpiredError: + self._invalidate_session(session_id) + self._lock_session_ids.pop(current_thread_id, None) + raise self._new_lock_ownership_lost_error(session_id) + except IllegalMonitorStateError as e: + self._lock_session_ids.pop(current_thread_id, None) + raise e + + return self._request_unlock(session_id, current_thread_id, uuid.uuid4()).continue_with(check_response) + + def get_fence(self): + """Returns the fencing token if the lock is held by the current thread. + + Fencing tokens are monotonic numbers that are incremented each time + the lock switches from the free state to the acquired state. They are + simply used for ordering lock holders. A lock holder can pass + its fencing to the shared resource to fence off previous lock holders. + When this resource receives an operation, it can validate the fencing + token in the operation. + + Returns: + hazelcast.future.Future[int]: The fencing token if the lock is held + by the current thread + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + IllegalMonitorStateError: If the lock is not held by + the current thread + """ + current_thread_id = thread_id() + session_id = self._get_session_id() + + # the order of the following checks is important + try: + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + except LockOwnershipLostError as e: + return ImmediateExceptionFuture(e) + + if session_id == _NO_SESSION_ID: + self._lock_session_ids.pop(current_thread_id, None) + return ImmediateExceptionFuture(self._new_illegal_monitor_state_error()) + + def check_response(f): + state = _LockOwnershipState(f.result()) + if state.is_locked_by(session_id, current_thread_id): + self._lock_session_ids[current_thread_id] = session_id + return state.fence + + self._verify_no_locked_session_id_present(current_thread_id) + raise self._new_illegal_monitor_state_error() + + return self._request_get_lock_ownership_state().continue_with(check_response) + + def is_locked(self): + """Returns whether this lock is locked or not. + + Returns: + hazelcast.future.Future[bool]: ``True`` if this lock is locked by any thread + in the cluster, ``False`` otherwise. + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + """ + current_thread_id = thread_id() + session_id = self._get_session_id() + + try: + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + except LockOwnershipLostError as e: + return ImmediateExceptionFuture(e) + + def check_response(f): + state = _LockOwnershipState(f.result()) + if state.is_locked_by(session_id, current_thread_id): + self._lock_session_ids[current_thread_id] = session_id + return True + + self._verify_no_locked_session_id_present(current_thread_id) + return state.is_locked() + + return self._request_get_lock_ownership_state().continue_with(check_response) + + def is_locked_by_current_thread(self): + """Returns whether the lock is held by the current thread or not. + + Returns: + hazelcast.future.Future[bool]: ``True`` if the lock is held by the current thread, + ``False`` otherwise. + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + """ + current_thread_id = thread_id() + session_id = self._get_session_id() + + try: + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + except LockOwnershipLostError as e: + return ImmediateExceptionFuture(e) + + def check_response(f): + state = _LockOwnershipState(f.result()) + locked_by_the_current_thread = state.is_locked_by(session_id, current_thread_id) + if locked_by_the_current_thread: + self._lock_session_ids[current_thread_id] = session_id + else: + self._verify_no_locked_session_id_present(current_thread_id) + + return locked_by_the_current_thread + + return self._request_get_lock_ownership_state().continue_with(check_response) + + def get_lock_count(self): + """Returns the reentrant lock count if the lock is held by any thread + in the cluster. + + Returns: + hazelcast.future.Future[int]: The reentrant lock count if the lock is held + by any thread in the cluster + + Raises: + LockOwnershipLostError: If the underlying CP session was + closed before the client releases the lock + """ + current_thread_id = thread_id() + session_id = self._get_session_id() + + try: + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + except LockOwnershipLostError as e: + return ImmediateExceptionFuture(e) + + def check_response(f): + state = _LockOwnershipState(f.result()) + if state.is_locked_by(session_id, current_thread_id): + self._lock_session_ids[current_thread_id] = session_id + else: + self._verify_no_locked_session_id_present(current_thread_id) + + return state.lock_count + + return self._request_get_lock_ownership_state().continue_with(check_response) + + def destroy(self): + self._lock_session_ids.clear() + return super(FencedLock, self).destroy() + + def _do_lock(self, current_thread_id, invocation_uuid): + def do_lock_once(session_id): + session_id = session_id.result() + self._verify_locked_session_id_if_present(current_thread_id, session_id, True) + + def check_fence(fence): + try: + fence = fence.result() + except SessionExpiredError: + self._invalidate_session(session_id) + self._verify_no_locked_session_id_present(current_thread_id) + return self._do_lock(current_thread_id, invocation_uuid) + except WaitKeyCancelledError: + self._release_session(session_id) + error = IllegalMonitorStateError("Lock(%s) not acquired because the lock call on the CP group " + "is cancelled, possibly because of another indeterminate call " + "from the same thread." % self._object_name) + raise error + except Exception as e: + self._release_session(session_id) + raise e + + if fence != self.INVALID_FENCE: + self._lock_session_ids[current_thread_id] = session_id + return fence + + self._release_session(session_id) + error = LockAcquireLimitReachedError("Lock(%s) reentrant lock limit is already reached!" + % self._object_name) + raise error + + return self._request_lock(session_id, current_thread_id, invocation_uuid).continue_with(check_fence) + + return self._acquire_session().continue_with(do_lock_once) + + def _do_try_lock(self, current_thread_id, invocation_uuid, timeout): + start = time.time() + + def do_try_lock_once(session_id): + session_id = session_id.result() + self._verify_locked_session_id_if_present(current_thread_id, session_id, True) + + def check_fence(fence): + try: + fence = fence.result() + except SessionExpiredError: + self._invalidate_session(session_id) + self._verify_no_locked_session_id_present(current_thread_id) + + remaining_timeout = timeout - (time.time() - start) + if remaining_timeout <= 0: + return self.INVALID_FENCE + return self._do_try_lock(current_thread_id, invocation_uuid, remaining_timeout) + except WaitKeyCancelledError: + self._release_session(session_id) + return self.INVALID_FENCE + except Exception as e: + self._release_session(session_id) + raise e + + if fence != self.INVALID_FENCE: + self._lock_session_ids[current_thread_id] = session_id + else: + self._release_session(session_id) + + return fence + + return self._request_try_lock(session_id, current_thread_id, invocation_uuid, timeout).continue_with( + check_fence) + + return self._acquire_session().continue_with(do_try_lock_once) + + def _verify_locked_session_id_if_present(self, current_thread_id, session_id, release_session): + lock_session_id = self._lock_session_ids.get(current_thread_id, None) + if lock_session_id and lock_session_id != session_id: + self._lock_session_ids.pop(current_thread_id, None) + if release_session: + self._release_session(session_id) + + raise self._new_lock_ownership_lost_error(lock_session_id) + + def _verify_no_locked_session_id_present(self, current_thread_id): + lock_session_id = self._lock_session_ids.pop(current_thread_id, None) + if lock_session_id: + raise self._new_lock_ownership_lost_error(lock_session_id) + + def _new_lock_ownership_lost_error(self, lock_session_id): + error = LockOwnershipLostError("Current thread is not the owner of the Lock(%s) because its " + "Session(%s) is closed by the server." % (self._proxy_name, lock_session_id)) + return error + + def _new_illegal_monitor_state_error(self): + error = IllegalMonitorStateError("Current thread is not the owner of the Lock(%s)" % self._proxy_name) + return error + + def _request_lock(self, session_id, current_thread_id, invocation_uuid): + codec = fenced_lock_lock_codec + request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id, + invocation_uuid) + return self._invoke(request, codec.decode_response) + + def _request_try_lock(self, session_id, current_thread_id, invocation_uuid, timeout): + codec = fenced_lock_try_lock_codec + request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id, + invocation_uuid, to_millis(timeout)) + return self._invoke(request, codec.decode_response) + + def _request_unlock(self, session_id, current_thread_id, invocation_uuid): + codec = fenced_lock_unlock_codec + request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id, + invocation_uuid) + return self._invoke(request, codec.decode_response) + + def _request_get_lock_ownership_state(self): + codec = fenced_lock_get_lock_ownership_codec + request = codec.encode_request(self._group_id, self._object_name) + return self._invoke(request, codec.decode_response) + + +class _LockOwnershipState(object): + def __init__(self, state): + self.fence = state["fence"] + self.lock_count = state["lock_count"] + self.session_id = state["session_id"] + self.thread_id = state["thread_id"] + + def is_locked(self): + return self.fence != FencedLock.INVALID_FENCE + + def is_locked_by(self, session_id, current_thread_id): + return self.is_locked() and self.session_id == session_id and self.thread_id == current_thread_id diff --git a/hazelcast/proxy/executor.py b/hazelcast/proxy/executor.py index 35b452f17f..df28427b61 100644 --- a/hazelcast/proxy/executor.py +++ b/hazelcast/proxy/executor.py @@ -66,7 +66,7 @@ def execute_on_members(self, members, task): for member in members: f = self._execute_on_member(uuid, task_data, member.uuid) futures.append(f) - return future.combine_futures(*futures) + return future.combine_futures(futures) def execute_on_all_members(self, task): """Executes a task on all of the known cluster members. diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index 241453d02f..d94f0353c1 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -696,7 +696,7 @@ def put_all(self, map): future = self._invoke_on_partition(request, partition_id) futures.append(future) - return combine_futures(*futures) + return combine_futures(futures) def put_if_absent(self, key, value, ttl=-1): """Associates the specified key with the given value if it is not already associated. @@ -1085,7 +1085,7 @@ def handler(message): def merge(f): return dict(itertools.chain(*f.result())) - return combine_futures(*futures).continue_with(merge) + return combine_futures(futures).continue_with(merge) def _remove_internal(self, key_data): def handler(message): diff --git a/hazelcast/util.py b/hazelcast/util.py index 60ef819a99..cbf93b3fed 100644 --- a/hazelcast/util.py +++ b/hazelcast/util.py @@ -101,6 +101,15 @@ def get(self): with self._mux: return self._counter + def add(self, count): + """Adds the given value to the current value. + Args: + count (int): The value to add. + """ + with self._mux: + self._counter += count + + class ImmutableLazyDataList(Sequence): def __init__(self, list_data, to_object): super(ImmutableLazyDataList, self).__init__() diff --git a/tests/cp_test.py b/tests/cp_test.py new file mode 100644 index 0000000000..b983c3caa6 --- /dev/null +++ b/tests/cp_test.py @@ -0,0 +1,231 @@ +import time +import unittest + +from mock import MagicMock + +from hazelcast.cp import _SessionState, ProxySessionManager +from hazelcast.errors import HazelcastClientNotActiveError, SessionExpiredError +from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture +from hazelcast.protocol import RaftGroupId +from hazelcast.reactor import AsyncoreReactor +from hazelcast.util import thread_id + + +class SessionStateTest(unittest.TestCase): + def setUp(self): + self.state = _SessionState(42, None, 0.5) + + def test_acquire(self): + self.assertEqual(0, self.state.acquire_count.get()) + self.assertEqual(42, self.state.acquire(5)) # session id + self.assertEqual(5, self.state.acquire_count.get()) + + def test_release(self): + self.state.acquire(5) + self.state.release(4) + self.assertEqual(1, self.state.acquire_count.get()) + + def test_is_in_use(self): + self.assertFalse(self.state.is_in_use()) + self.state.acquire(5) + self.assertTrue(self.state.is_in_use()) + + def test_is_valid(self): + self.assertTrue(self.state.is_valid()) # not timed out + time.sleep(1) + self.assertFalse(self.state.is_valid()) # timed out and there is no acquire + self.state.acquire(5) + self.assertTrue(self.state.is_valid()) # timed out but acquired + + +class SessionManagerTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.group_id = 42 + cls.session_id = 24 + cls.raft_group_id = RaftGroupId("test", 0, cls.group_id) + + def setUp(self): + self.context = MagicMock() + self.manager = ProxySessionManager(self.context) + + def tearDown(self): + self.manager._sessions.clear() + self.manager.shutdown().result() + + def test_get_session_id(self): + self.assertEqual(-1, self.manager.get_session_id(self.raft_group_id)) + self.set_session(self.prepare_state()) + self.assertEqual(self.session_id, self.manager.get_session_id(self.raft_group_id)) + + def test_acquire_session_after_shutdown(self): + self.manager.shutdown().result() + + with self.assertRaises(HazelcastClientNotActiveError): + self.manager.acquire_session(self.raft_group_id, 1).result() + + def test_acquire_session_with_unknown_group_id(self): + m = self.mock_request_new_session() + self.assertEqual(self.session_id, self.manager.acquire_session(self.raft_group_id, 3).result()) + self.assertEqual(3, self.get_acquire_count()) + m.assert_called_once_with(self.raft_group_id) + + def test_acquire_session_with_existing_invalid_session(self): + m = self.mock_request_new_session() + state = MagicMock(is_valid=lambda: False) + self.set_session(state) + + self.assertEqual(self.session_id, self.manager.acquire_session(self.raft_group_id, 1).result()) + m.assert_called_once_with(self.raft_group_id) + self.assertEqual(1, self.get_acquire_count()) + + def test_acquire_session_for_valid_session(self): + m = self.mock_request_new_session() + self.set_session(self.prepare_state()) + + self.assertEqual(self.session_id, self.manager.acquire_session(self.raft_group_id, 10).result()) + m.assert_not_called() + self.assertEqual(10, self.get_acquire_count()) + + def test_release_session(self): + self.set_session(self.prepare_state()) + + self.manager.release_session(self.raft_group_id, self.session_id, 3) + self.assertEqual(-3, self.get_acquire_count()) + + def test_release_session_with_unknown_session(self): + self.set_session(self.prepare_state()) + + self.manager.release_session(self.raft_group_id, -1, 3) + self.assertEqual(0, self.get_acquire_count()) + + def test_invalidate_session(self): + self.set_session(self.prepare_state()) + + self.manager.invalidate_session(self.raft_group_id, self.session_id) + self.assertEqual(0, len(self.manager._sessions)) + + def test_invalidate_session_with_unknown_session(self): + self.set_session(self.prepare_state()) + + self.manager.invalidate_session(self.raft_group_id, self.session_id - 1) + self.assertEqual(1, len(self.manager._sessions)) + + def test_create_thread_id_after_shutdown(self): + self.manager.shutdown().result() + + with self.assertRaises(HazelcastClientNotActiveError): + self.manager.get_or_create_unique_thread_id(self.raft_group_id).result() + + def test_create_thread_id(self): + m = self.mock_request_generate_thread_id(5) + self.assertEqual(5, self.manager.get_or_create_unique_thread_id(self.raft_group_id).result()) + m.assert_called_once_with(self.raft_group_id) + self.assertEqual(5, self.manager._thread_ids.get((self.raft_group_id, thread_id()))) + + def test_create_thread_id_with_known_group_id(self): + m = self.mock_request_generate_thread_id(12) + self.set_thread_id(13) + self.assertEqual(13, self.manager.get_or_create_unique_thread_id(self.raft_group_id).result()) + m.assert_not_called() + self.assertEqual(13, self.manager._thread_ids.get((self.raft_group_id, thread_id()))) + + def test_shutdown(self): + self.set_session(self.prepare_state()) + self.set_thread_id(123) + self.manager._mutexes[self.raft_group_id] = object() + m = MagicMock(return_value=ImmediateFuture(True)) + self.manager._request_close_session = m + + self.manager.shutdown().result() + m.assert_called_once_with(self.raft_group_id, self.session_id) + self.assertEqual(0, len(self.manager._sessions)) + self.assertEqual(0, len(self.manager._mutexes)) + self.assertEqual(0, len(self.manager._thread_ids)) + + def test_heartbeat(self): + reactor = self.mock_reactor() + self.mock_request_new_session() + + r = MagicMock(return_value=ImmediateFuture(None)) + self.manager._request_heartbeat = r + self.manager.acquire_session(self.raft_group_id, 1).result() + time.sleep(2) + self.manager.shutdown() + reactor.shutdown() + self.assertGreater(self.context.reactor.add_timer.call_count, 1) # assert that the heartbeat task is executed + r.assert_called() + r.assert_called_with(self.raft_group_id, self.session_id) + self.assertEqual(1, len(self.manager._sessions)) + + def test_heartbeat_when_session_is_released(self): + reactor = self.mock_reactor() + self.mock_request_new_session() + + r = MagicMock(return_value=ImmediateFuture(None)) + self.manager._request_heartbeat = r + self.manager.acquire_session(self.raft_group_id, 1).add_done_callback( + lambda _: self.manager.release_session(self.raft_group_id, self.session_id, 1)) + time.sleep(2) + self.manager.shutdown() + reactor.shutdown() + self.assertGreater(self.context.reactor.add_timer.call_count, 1) # assert that the heartbeat task is executed + r.assert_not_called() + self.assertEqual(1, len(self.manager._sessions)) + + def test_heartbeat_on_failure(self): + reactor = self.mock_reactor() + self.mock_request_new_session() + self.manager._request_heartbeat = MagicMock(return_value=ImmediateExceptionFuture(SessionExpiredError())) + + m = MagicMock(side_effect=self.manager._invalidate_session) + self.manager._invalidate_session = m + + self.manager.acquire_session(self.raft_group_id, 1).result() + time.sleep(2) + self.manager.shutdown() + reactor.shutdown() + self.assertGreater(self.context.reactor.add_timer.call_count, 1) # assert that the heartbeat task is executed + m.assert_called_once_with(self.raft_group_id, self.session_id) + self.assertEqual(0, len(self.manager._sessions)) + + def mock_request_generate_thread_id(self, t_id): + def mock(*_, **__): + return ImmediateFuture(t_id) + + m = MagicMock(side_effect=mock) + self.manager._request_generate_thread_id = m + return m + + def mock_request_new_session(self, ): + def mock(*_, **__): + d = { + "session_id": self.session_id, + "ttl_millis": 1000, + "heartbeat_millis": 100, + } + return ImmediateFuture(d) + + m = MagicMock(side_effect=mock) + self.manager._request_new_session = m + return m + + def prepare_state(self): + return _SessionState(self.session_id, self.raft_group_id, 1) + + def get_acquire_count(self): + return self.manager._sessions[self.raft_group_id].acquire_count.get() + + def set_session(self, state): + self.manager._sessions[self.raft_group_id] = state + + def set_thread_id(self, global_t_id): + self.manager._thread_ids[(self.raft_group_id, thread_id())] = global_t_id + + def mock_reactor(self): + r = AsyncoreReactor({}) + r.start() + m = MagicMock() + m.add_timer = MagicMock(side_effect=lambda d, c: r.add_timer(d, c)) + self.context.reactor = m + return r diff --git a/tests/future_test.py b/tests/future_test.py index 587dbfae89..4850f78178 100644 --- a/tests/future_test.py +++ b/tests/future_test.py @@ -391,7 +391,7 @@ class CombineFutureTest(unittest.TestCase): def test_combine_futures(self): f1, f2, f3 = Future(), Future(), Future() - combined = combine_futures(f1, f2, f3) + combined = combine_futures([f1, f2, f3]) f1.set_result("done1") self.assertFalse(combined.done()) @@ -405,7 +405,7 @@ def test_combine_futures(self): def test_combine_futures_exception(self): f1, f2, f3 = Future(), Future(), Future() - combined = combine_futures(f1, f2, f3) + combined = combine_futures([f1, f2, f3]) e = RuntimeError("error") f1.set_result("done") @@ -415,14 +415,9 @@ def test_combine_futures_exception(self): self.assertEqual(e, combined.exception()) def test_combine_futures_with_empty_list(self): - combined = combine_futures() - combined2 = combine_futures(*[]) - + combined = combine_futures([]) self.assertTrue(combined.done()) - self.assertTrue(combined2.done()) - self.assertEqual([], combined.result()) - self.assertEqual([], combined2.result()) class MakeBlockingTest(unittest.TestCase): diff --git a/tests/proxy/cp/fenced_lock_test.py b/tests/proxy/cp/fenced_lock_test.py new file mode 100644 index 0000000000..edd57085e3 --- /dev/null +++ b/tests/proxy/cp/fenced_lock_test.py @@ -0,0 +1,1099 @@ +import time +import unittest + +from mock import MagicMock + +from hazelcast import HazelcastClient +from hazelcast.cp import LOCK_SERVICE +from hazelcast.errors import DistributedObjectDestroyedError, IllegalMonitorStateError, \ + LockOwnershipLostError, LockAcquireLimitReachedError, SessionExpiredError, WaitKeyCancelledError, \ + HazelcastRuntimeError +from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture +from hazelcast.protocol import RaftGroupId +from hazelcast.proxy.cp.fenced_lock import FencedLock +from hazelcast.util import AtomicInteger, thread_id +from tests.proxy.cp import CPTestCase +from tests.util import random_string + + +class FencedLockTest(CPTestCase): + def setUp(self): + self.lock = self.client.cp_subsystem.get_lock(random_string()).blocking() + self.initial_acquire_count = self.get_initial_acquire_count() + + def tearDown(self): + self.lock.destroy() + + def test_lock_in_another_group(self): + another_lock = self.client.cp_subsystem.get_lock(self.lock._proxy_name + "@another").blocking() + another_lock.lock() + try: + self.assertTrue(another_lock.is_locked()) + self.assertFalse(self.lock.is_locked()) + finally: + another_lock.unlock() + + def test_lock_after_client_shutdown(self): + another_client = HazelcastClient(cluster_name=self.cluster.id) + another_lock = another_client.cp_subsystem.get_lock(self.lock._proxy_name).blocking() + another_lock.lock() + self.assertTrue(another_lock.is_locked()) + self.assertTrue(self.lock.is_locked()) + another_client.shutdown() + + def assertion(): + self.assertFalse(self.lock.is_locked()) + + self.assertTrueEventually(assertion) + + def test_use_after_destroy(self): + self.lock.destroy() + # the next destroy call should be ignored + self.lock.destroy() + + with self.assertRaises(DistributedObjectDestroyedError): + self.lock.lock_and_get_fence() + + lock2 = self.client.cp_subsystem.get_lock(self.lock._proxy_name).blocking() + + with self.assertRaises(DistributedObjectDestroyedError): + lock2.lock_and_get_fence() + + def test_lock_when_not_locked(self): + self.assertIsNone(self.lock.lock()) + self.assertTrue(self.lock.is_locked()) + self.assertTrue(self.lock.is_locked_by_current_thread()) + self.assertEqual(1, self.lock.get_lock_count()) + self.assert_session_acquire_count(1) + + def test_lock_when_locked_by_self(self): + self.lock.lock() + self.lock.lock() + self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + + def test_lock_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.lock.lock()) + t.join() + self.assert_lock(True, False, 1) + future = self.lock._wrapped.lock() + time.sleep(2) + self.assertFalse(future.done()) # lock request made in the main thread should not complete + self.assert_session_acquire_count(2) + self.lock.destroy() + + with self.assertRaises(DistributedObjectDestroyedError): + future.result() + + def test_lock_and_get_fence_when_not_locked(self): + self.assert_valid_fence(self.lock.lock_and_get_fence()) + self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + def test_lock_and_get_fence_when_locked_by_self(self): + fence = self.lock.lock_and_get_fence() + self.assert_valid_fence(fence) + fence2 = self.lock.lock_and_get_fence() + self.assertEqual(fence, fence2) + self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + self.lock.unlock() + self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = self.lock.lock_and_get_fence() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + def test_lock_and_get_fence_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.lock_and_get_fence())) + t.join() + self.assert_lock(True, False, 1) + future = self.lock._wrapped.lock_and_get_fence() + time.sleep(2) + self.assertFalse(future.done()) # lock request made in the main thread should not complete + self.assert_session_acquire_count(2) + self.lock.destroy() + + with self.assertRaises(DistributedObjectDestroyedError): + future.result() + + def test_try_lock_when_free(self): + self.assertTrue(self.lock.try_lock()) + self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + def test_try_lock_when_free_with_timeout(self): + self.assertTrue(self.lock.try_lock(1)) + self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + def test_try_lock_when_locked_by_self(self): + self.lock.lock() + self.assertTrue(self.lock.try_lock()) + self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + + def test_try_lock_when_locked_by_self_with_timeout(self): + self.lock.lock() + self.assertTrue(self.lock.try_lock(2)) + self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + + def test_try_lock_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.lock.try_lock()) + t.join() + self.assertFalse(self.lock.try_lock()) + self.assert_lock(True, False, 1) + self.assert_session_acquire_count(1) + + def test_try_lock_when_locked_by_another_thread_with_timeout(self): + t = self.start_new_thread(lambda: self.lock.try_lock()) + t.join() + self.assertFalse(self.lock.try_lock(1)) + self.assert_lock(True, False, 1) + self.assert_session_acquire_count(1) + + def test_try_lock_and_get_fence_when_free(self): + self.assert_valid_fence(self.lock.try_lock_and_get_fence()) + self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + def test_try_lock_and_get_fence_when_free_with_timeout(self): + self.assert_valid_fence(self.lock.try_lock(1)) + self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + def test_try_lock_and_get_fence_when_locked_by_self(self): + fence = self.lock.try_lock_and_get_fence() + self.assert_valid_fence(fence) + fence2 = self.lock.try_lock_and_get_fence() + self.assertEqual(fence, fence2) + self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + self.lock.unlock() + self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = self.lock.try_lock_and_get_fence() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + def test_try_lock_and_get_fence_when_locked_by_self_with_timeout(self): + fence = self.lock.try_lock_and_get_fence(1) + self.assert_valid_fence(fence) + fence2 = self.lock.try_lock_and_get_fence(2) + self.assertEqual(fence, fence2) + self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + self.lock.unlock() + self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = self.lock.try_lock_and_get_fence() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + def test_try_lock_and_get_fence_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.try_lock_and_get_fence())) + t.join() + self.assert_not_valid_fence(self.lock.try_lock_and_get_fence()) + self.assert_lock(True, False, 1) + self.assert_session_acquire_count(1) + + def test_try_lock_and_get_fence_when_locked_by_another_thread_with_timeout(self): + t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.try_lock_and_get_fence(1))) + t.join() + self.assert_not_valid_fence(self.lock.try_lock_and_get_fence(1)) + self.assert_lock(True, False, 1) + self.assert_session_acquire_count(1) + + def test_unlock_when_free(self): + with self.assertRaises(IllegalMonitorStateError): + self.lock.unlock() + + self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + def test_unlock_when_locked_by_self(self): + self.lock.lock() + self.assertIsNone(self.lock.unlock()) + + with self.assertRaises(IllegalMonitorStateError): + self.lock.unlock() + + self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + def test_unlock_multiple_times(self): + self.lock.lock() + self.lock.lock() + self.lock.unlock() + self.lock.unlock() + self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + def test_unlock_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.lock.lock()) + t.join() + + with self.assertRaises(IllegalMonitorStateError): + self.lock.unlock() + + self.assert_lock(True, False, 1) + self.assert_session_acquire_count(1) + + def test_get_fence_when_free(self): + with self.assertRaises(IllegalMonitorStateError): + self.lock.get_fence() + + self.assert_session_acquire_count(0) + + def test_get_fence_when_locked_by_self(self): + self.lock.lock() + fence = self.lock.get_fence() + self.assert_valid_fence(fence) + self.lock.lock() + fence2 = self.lock.get_fence() + self.assertEqual(fence, fence2) + self.lock.unlock() + self.lock.unlock() + self.lock.lock() + fence3 = self.lock.get_fence() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.lock.unlock() + + with self.assertRaises(IllegalMonitorStateError): + self.lock.get_fence() + + self.assert_session_acquire_count(0) + + def test_get_fence_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.lock.lock()) + t.join() + + with self.assertRaises(IllegalMonitorStateError): + self.lock.get_fence() + + self.assert_session_acquire_count(1) + + def test_is_locked_when_free(self): + self.assertFalse(self.lock.is_locked()) + self.assert_session_acquire_count(0) + + def test_is_locked_when_locked_by_self(self): + self.lock.lock() + self.assertTrue(self.lock.is_locked()) + self.lock.unlock() + self.assertFalse(self.lock.is_locked()) + self.assert_session_acquire_count(0) + + def test_is_locked_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.lock.lock()) + t.join() + + self.assertTrue(self.lock.is_locked()) + self.assert_session_acquire_count(1) + + def test_is_locked_by_current_thread_when_free(self): + self.assertFalse(self.lock.is_locked_by_current_thread()) + self.assert_session_acquire_count(0) + + def test_is_locked_by_current_thread_when_locked_by_self(self): + self.lock.lock() + self.assertTrue(self.lock.is_locked_by_current_thread()) + self.lock.unlock() + self.assertFalse(self.lock.is_locked_by_current_thread()) + self.assert_session_acquire_count(0) + + def test_is_locked_by_current_thread_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.lock.lock()) + t.join() + + self.assertFalse(self.lock.is_locked_by_current_thread()) + self.assert_session_acquire_count(1) + + def assert_lock(self, is_locked, is_locked_by_current_thread, lock_count): + self.assertEqual(is_locked, self.lock.is_locked()) + self.assertEqual(is_locked_by_current_thread, self.lock.is_locked_by_current_thread()) + self.assertEqual(lock_count, self.lock.get_lock_count()) + + def assert_valid_fence(self, fence): + self.assertNotEqual(FencedLock.INVALID_FENCE, fence) + + def assert_not_valid_fence(self, fence): + self.assertEqual(FencedLock.INVALID_FENCE, fence) + + def assert_session_acquire_count(self, expected_acquire_count): + session = self.client._proxy_session_manager._sessions.get(self.lock.get_group_id(), None) + if not session: + actual = 0 + else: + actual = session.acquire_count.get() + + self.assertEqual(self.initial_acquire_count + expected_acquire_count, actual) + + def get_initial_acquire_count(self): + session = self.client._proxy_session_manager._sessions.get(self.lock.get_group_id(), None) + if not session: + return 0 + return session.acquire_count.get() + + +class FencedLockMockTest(unittest.TestCase): + def setUp(self): + self.acquire_session = MagicMock() + self.release_session = MagicMock() + self.invalidate_session = MagicMock() + self.session_manager = MagicMock(acquire_session=self.acquire_session, release_session=self.release_session, + invalidate_session=self.invalidate_session) + context = MagicMock(proxy_session_manager=self.session_manager) + group_id = RaftGroupId("test", 0, 42) + self.proxy = FencedLock(context, group_id, LOCK_SERVICE, "mylock@mygroup", "mylock").blocking() + + def test_lock(self): + # Everything succeeds + self.prepare_acquire_session(1) + self.mock_request_lock(2) + self.proxy.lock() + self.assert_call_counts(1, 0, 0) + self.assert_lock_session_id(1) + + def test_lock_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError("server_error")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.lock() + + self.assert_call_counts(1, 0, 0) + self.assert_no_lock_session_id() + + def test_lock_when_server_closes_old_session(self): + # Same thread issues a new lock call while holding a lock. + # Server closes session related to the first lock, should not retry + self.prepare_acquire_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.lock() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_lock_when_lock_acquire_limit_reached(self): + # Lock acquire limit is reached, server returns invalid fence, should not retry + self.prepare_acquire_session(1) + self.mock_request_lock(FencedLock.INVALID_FENCE) + + with self.assertRaises(LockAcquireLimitReachedError): + self.proxy.lock() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_lock_on_session_expired_error(self): + # Session expired error comes from the server on lock request, retries and gets valid fence + self.prepare_acquire_session(1) + self.mock_request_lock(2, SessionExpiredError()) + self.proxy.lock() + self.assert_call_counts(2, 0, 1) + self.assert_lock_session_id(1) + + def test_lock_on_session_expired_error_on_reentrant_lock_request(self): + # Session expired error comes from the server on second lock request, + # while holding a lock, should not retry + self.prepare_acquire_session(1) + self.prepare_lock_session_ids(1) + self.mock_request_lock(3, SessionExpiredError()) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.lock() + + self.assert_call_counts(1, 0, 1) + self.assert_no_lock_session_id() + + def test_lock_on_wait_key_cancelled_error(self): + # Wait key cancelled error comes from the server, should not retry + self.prepare_acquire_session(1) + self.mock_request_lock(2, WaitKeyCancelledError()) + + with self.assertRaises(IllegalMonitorStateError): + self.proxy.lock() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_lock_on_unspecified_error(self): + # Server sends another error, should not retry + self.prepare_acquire_session(1) + self.mock_request_lock(-1, HazelcastRuntimeError("expected")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.lock() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_lock_and_get_fence(self): + # Everything succeeds + self.prepare_acquire_session(1) + self.mock_request_lock(2) + self.assertEqual(2, self.proxy.lock_and_get_fence()) + self.assert_call_counts(1, 0, 0) + self.assert_lock_session_id(1) + + def test_lock_and_get_fence_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError("server_error")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.lock_and_get_fence() + + self.assert_call_counts(1, 0, 0) + self.assert_no_lock_session_id() + + def test_lock_and_get_fence_when_server_closes_old_session(self): + # Same thread issues a new lock call while holding a lock. + # Server closes session related to the first lock, should not retry + self.prepare_acquire_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.lock_and_get_fence() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_lock_and_get_fence_when_lock_acquire_limit_reached(self): + # Lock acquire limit is reached, server returns invalid fence, should not retry + self.prepare_acquire_session(1) + self.mock_request_lock(FencedLock.INVALID_FENCE) + + with self.assertRaises(LockAcquireLimitReachedError): + self.proxy.lock_and_get_fence() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_lock_and_get_fence_on_session_expired_error(self): + # Session expired error comes from the server on lock request, retries and gets valid fence + self.prepare_acquire_session(1) + self.mock_request_lock(2, SessionExpiredError()) + self.assertEqual(2, self.proxy.lock_and_get_fence()) + self.assert_call_counts(2, 0, 1) + self.assert_lock_session_id(1) + + def test_lock_and_get_fence_on_session_expired_error_on_reentrant_lock_request(self): + # Session expired error comes from the server on second lock request, + # while holding a lock, should not retry + self.prepare_acquire_session(1) + self.prepare_lock_session_ids(1) + self.mock_request_lock(3, SessionExpiredError()) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.lock_and_get_fence() + + self.assert_call_counts(1, 0, 1) + self.assert_no_lock_session_id() + + def test_lock_and_get_fence_on_wait_key_cancelled_error(self): + # Wait key cancelled error comes from the server, should not retry + self.prepare_acquire_session(1) + self.mock_request_lock(2, WaitKeyCancelledError()) + + with self.assertRaises(IllegalMonitorStateError): + self.proxy.lock_and_get_fence() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_lock_and_get_fence_on_unspecified_error(self): + # Server sends another error, should not retry + self.prepare_acquire_session(1) + self.mock_request_lock(-1, HazelcastRuntimeError("expected")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.lock_and_get_fence() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock(self): + # Everything succeeds + self.prepare_acquire_session(1) + self.mock_request_try_lock(2) + self.assertTrue(self.proxy.try_lock()) + self.assert_call_counts(1, 0, 0) + self.assert_lock_session_id(1) + + def test_try_lock_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError("server_error")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.try_lock() + + self.assert_call_counts(1, 0, 0) + self.assert_no_lock_session_id() + + def test_try_lock_when_server_closes_old_session(self): + # Same thread issues a new lock call while holding a lock. + # Server closes session related to the first lock, should not retry + self.prepare_acquire_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.try_lock() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock_when_lock_acquire_limit_reached(self): + # Lock acquire limit is reached, server returns invalid fence + self.prepare_acquire_session(1) + self.mock_request_try_lock(FencedLock.INVALID_FENCE) + self.assertFalse(self.proxy.try_lock()) + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock_on_session_expired_error(self): + # Session expired error comes from the server on lock request, + # client determines the timeout and returns invalid fence + self.prepare_acquire_session(1) + self.mock_request_try_lock(2, SessionExpiredError()) + self.assertFalse(self.proxy.try_lock()) + self.assert_call_counts(1, 0, 1) + self.assert_no_lock_session_id() + + def test_try_lock_on_session_expired_error_when_not_timed_out(self): + # Session expired error comes from the server on lock request, + # client retries due to not reaching timeout and succeeds + self.prepare_acquire_session(1) + self.mock_request_try_lock(2, SessionExpiredError()) + self.assertTrue(self.proxy.try_lock(100)) + self.assert_call_counts(2, 0, 1) + self.assert_lock_session_id(1) + + def test_try_lock_on_session_expired_error_on_reentrant_lock_request(self): + # Session expired error comes from the server on second lock request, + # while holding a lock, should not retry + self.prepare_acquire_session(1) + self.prepare_lock_session_ids(1) + self.mock_request_try_lock(3, SessionExpiredError()) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.try_lock() + + self.assert_call_counts(1, 0, 1) + self.assert_no_lock_session_id() + + def test_try_lock_on_wait_key_cancelled_error(self): + # Wait key cancelled error comes from the server, invalid fence is returned + self.prepare_acquire_session(1) + self.mock_request_try_lock(2, WaitKeyCancelledError()) + self.assertFalse(self.proxy.try_lock()) + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock_on_unspecified_error(self): + # Server sends another error, should not retry + self.prepare_acquire_session(1) + self.mock_request_try_lock(-1, HazelcastRuntimeError("expected")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.try_lock() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock_and_get_fence(self): + # Everything succeeds + self.prepare_acquire_session(1) + self.mock_request_try_lock(2) + self.assertEqual(2, self.proxy.try_lock_and_get_fence()) + self.assert_call_counts(1, 0, 0) + self.assert_lock_session_id(1) + + def test_try_lock_and_get_fence_when_acquire_session_fails(self): + # First call to acquire session fails, should not retry + self.prepare_acquire_session(-1, HazelcastRuntimeError("server_error")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.try_lock_and_get_fence() + + self.assert_call_counts(1, 0, 0) + self.assert_no_lock_session_id() + + def test_try_lock_and_get_fence_when_server_closes_old_session(self): + # Same thread issues a new lock call while holding a lock. + # Server closes session related to the first lock, should not retry + self.prepare_acquire_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.try_lock_and_get_fence() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock_and_get_fence_when_lock_acquire_limit_reached(self): + # Lock acquire limit is reached, server returns invalid fence + self.prepare_acquire_session(1) + self.mock_request_try_lock(FencedLock.INVALID_FENCE) + self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock_and_get_fence()) + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock_and_get_fence_on_session_expired_error(self): + # Session expired error comes from the server on lock request, + # client determines the timeout and returns invalid fence + self.prepare_acquire_session(1) + self.mock_request_try_lock(2, SessionExpiredError()) + self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock_and_get_fence()) + self.assert_call_counts(1, 0, 1) + self.assert_no_lock_session_id() + + def test_try_lock_and_get_fence_on_session_expired_error_when_not_timed_out(self): + # Session expired error comes from the server on lock request, + # client retries due to not reaching timeout and succeeds + self.prepare_acquire_session(1) + self.mock_request_try_lock(2, SessionExpiredError()) + self.assertEqual(2, self.proxy.try_lock_and_get_fence(100)) + self.assert_call_counts(2, 0, 1) + self.assert_lock_session_id(1) + + def test_try_lock_and_get_fence_on_session_expired_error_on_reentrant_lock_request(self): + # Session expired error comes from the server on second lock request, + # while holding a lock, should not retry + self.prepare_acquire_session(1) + self.prepare_lock_session_ids(1) + self.mock_request_try_lock(3, SessionExpiredError()) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.try_lock_and_get_fence() + + self.assert_call_counts(1, 0, 1) + self.assert_no_lock_session_id() + + def test_try_lock_and_get_fence_on_wait_key_cancelled_error(self): + # Wait key cancelled error comes from the server, invalid fence is returned + self.prepare_acquire_session(1) + self.mock_request_try_lock(2, WaitKeyCancelledError()) + self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock_and_get_fence()) + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_try_lock_and_get_fence_on_unspecified_error(self): + # Server sends another error, should not retry + self.prepare_acquire_session(1) + self.mock_request_try_lock(-1, HazelcastRuntimeError("expected")) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.try_lock_and_get_fence() + + self.assert_call_counts(1, 1, 0) + self.assert_no_lock_session_id() + + def test_unlock(self): + # Everything succeeds + self.prepare_get_session(2) + self.mock_request_unlock(True) + self.proxy.unlock() + self.assert_call_counts(0, 1, 0) + self.assert_lock_session_id(2) # Server sent true, client still holds the lock after unlock + + def test_unlock_when_server_closes_old_session(self): + # Session id is different than what we store in the + # dict. The old session must be closed while we were + # holding the lock. + self.prepare_get_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.unlock() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_unlock_when_there_is_no_session(self): + # No active session for the current thread. + self.prepare_get_session(-1) + + with self.assertRaises(IllegalMonitorStateError): + self.proxy.unlock() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_unlock_when_client_unlocked_the_locked(self): + # After the unlock, lock is free + self.prepare_get_session(1) + self.mock_request_unlock(False) + self.proxy.unlock() + self.assert_call_counts(0, 1, 0) + self.assert_no_lock_session_id() + + def test_unlock_on_session_expired_error(self): + # Server sends session expired error + self.prepare_get_session(1) + self.mock_request_unlock(None, SessionExpiredError()) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.unlock() + + self.assert_call_counts(0, 0, 1) + self.assert_no_lock_session_id() + + def test_unlock_on_illegal_monitor_state_error(self): + # Lock is not held by the current thread, but client + # thinks that it holds it and sends the request. + # Server sends illegal monitor state error in response. + self.prepare_get_session(1) + self.mock_request_unlock(None, IllegalMonitorStateError()) + + with self.assertRaises(IllegalMonitorStateError): + self.proxy.unlock() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_unlock_on_unspecified_error(self): + # Server sends an unspecified error + self.prepare_get_session(1) + self.mock_request_unlock(None, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.unlock() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_fence(self): + # Everything succeeds + self.prepare_get_session(2) + state = self.prepare_state(3, 1, 2, thread_id()) + self.mock_request_get_lock_ownership_state(state) + self.assertEqual(3, self.proxy.get_fence()) + self.assert_call_counts(0, 0, 0) + self.assert_lock_session_id(2) + + def test_get_fence_when_server_closes_old_session(self): + # Session id is different than what we store in the + # dict. The old session must be closed while we were + # holding the lock. + self.prepare_get_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.get_fence() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_fence_when_there_is_no_session(self): + # No active session for the current thread. + self.prepare_get_session(-1) + + with self.assertRaises(IllegalMonitorStateError): + self.proxy.get_fence() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_fence_when_server_returns_a_different_thread_id_for_lock_holder(self): + # Client thinks that it holds the lock, but server + # says it's not. + self.prepare_get_session(1) + self.prepare_lock_session_ids(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.get_fence() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_fence_when_not_holding_the_lock(self): + # Client is not holding the lock. + self.prepare_get_session(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + + with self.assertRaises(IllegalMonitorStateError): + self.proxy.get_fence() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_fence_on_unspecified_error(self): + # Server sends an unspecified error + self.prepare_get_session(1) + self.mock_request_get_lock_ownership_state(None, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.get_fence() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked(self): + # Everything succeeds, client holds the lock + self.prepare_get_session(2) + state = self.prepare_state(3, 1, 2, thread_id()) + self.mock_request_get_lock_ownership_state(state) + self.assertTrue(self.proxy.is_locked()) + self.assert_call_counts(0, 0, 0) + self.assert_lock_session_id(2) + + def test_is_locked_when_it_is_locked_by_another_thread(self): + # Client is not holding the lock, but someone else does. + self.prepare_get_session(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + self.assertTrue(self.proxy.is_locked()) + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_when_free(self): + # No one holds the lock + self.prepare_get_session(1) + state = self.prepare_state(FencedLock.INVALID_FENCE, 0, -1, -1) + self.mock_request_get_lock_ownership_state(state) + self.assertFalse(self.proxy.is_locked()) + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_when_server_closes_old_session(self): + # Session id is different than what we store in the + # dict. The old session must be closed while we were + # holding the lock. + self.prepare_get_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.is_locked() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_when_server_returns_a_different_thread_id_for_lock_holder(self): + # Client thinks that it holds the lock, but server + # says it's not. + self.prepare_get_session(1) + self.prepare_lock_session_ids(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.is_locked() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_on_unspecified_error(self): + # Server sends an unspecified error + self.prepare_get_session(1) + self.mock_request_get_lock_ownership_state(None, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.is_locked() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_by_current_thread(self): + # Everything succeeds, client holds the lock + self.prepare_get_session(2) + state = self.prepare_state(3, 1, 2, thread_id()) + self.mock_request_get_lock_ownership_state(state) + self.assertTrue(self.proxy.is_locked_by_current_thread()) + self.assert_call_counts(0, 0, 0) + self.assert_lock_session_id(2) + + def test_is_locked_by_current_thread_when_it_is_locked_by_another_thread(self): + # Client is not holding the lock, but someone else does. + self.prepare_get_session(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + self.assertFalse(self.proxy.is_locked_by_current_thread()) + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_by_current_thread_when_free(self): + # No one holds the lock + self.prepare_get_session(1) + state = self.prepare_state(FencedLock.INVALID_FENCE, 0, -1, -1) + self.mock_request_get_lock_ownership_state(state) + self.assertFalse(self.proxy.is_locked_by_current_thread()) + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_by_current_thread_when_server_closes_old_session(self): + # Session id is different than what we store in the + # dict. The old session must be closed while we were + # holding the lock. + self.prepare_get_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.is_locked_by_current_thread() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_by_current_thread_when_server_returns_a_different_thread_id_for_lock_holder(self): + # Client thinks that it holds the lock, but server + # says it's not. + self.prepare_get_session(1) + self.prepare_lock_session_ids(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.is_locked_by_current_thread() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_is_locked_by_current_thread_on_unspecified_error(self): + # Server sends an unspecified error + self.prepare_get_session(1) + self.mock_request_get_lock_ownership_state(None, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.is_locked_by_current_thread() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_lock_count(self): + # Everything succeeds, client holds the lock + self.prepare_get_session(2) + state = self.prepare_state(3, 123, 2, thread_id()) + self.mock_request_get_lock_ownership_state(state) + self.assertEqual(123, self.proxy.get_lock_count()) + self.assert_call_counts(0, 0, 0) + self.assert_lock_session_id(2) + + def test_get_lock_count_when_it_is_locked_by_another_thread(self): + # Client is not holding the lock, but someone else does. + self.prepare_get_session(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + self.assertEqual(1, self.proxy.get_lock_count()) + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_lock_count_when_free(self): + # No one holds the lock + self.prepare_get_session(1) + state = self.prepare_state(FencedLock.INVALID_FENCE, 0, -1, -1) + self.mock_request_get_lock_ownership_state(state) + self.assertEqual(0, self.proxy.get_lock_count()) + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_lock_count_when_server_closes_old_session(self): + # Session id is different than what we store in the + # dict. The old session must be closed while we were + # holding the lock. + self.prepare_get_session(2) + self.prepare_lock_session_ids(1) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.get_lock_count() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_lock_count_when_server_returns_a_different_thread_id_for_lock_holder(self): + # Client thinks that it holds the lock, but server + # says it's not. + self.prepare_get_session(1) + self.prepare_lock_session_ids(1) + state = self.prepare_state(3, 1, 2, thread_id() - 1) + self.mock_request_get_lock_ownership_state(state) + + with self.assertRaises(LockOwnershipLostError): + self.proxy.get_lock_count() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def test_get_lock_count_on_unspecified_error(self): + # Server sends an unspecified error + self.prepare_get_session(1) + self.mock_request_get_lock_ownership_state(None, HazelcastRuntimeError()) + + with self.assertRaises(HazelcastRuntimeError): + self.proxy.get_lock_count() + + self.assert_call_counts(0, 0, 0) + self.assert_no_lock_session_id() + + def prepare_lock_session_ids(self, session_id): + self.proxy._wrapped._lock_session_ids[thread_id()] = session_id + + def prepare_acquire_session(self, session_id, err=None): + if err: + val = ImmediateExceptionFuture(err) + else: + val = ImmediateFuture(session_id) + + acquire_mock = MagicMock(return_value=val) + release_mock = MagicMock() + invalidate_mock = MagicMock() + self.session_manager.acquire_session = acquire_mock + self.session_manager.release_session = release_mock + self.session_manager.invalidate_session = invalidate_mock + self.acquire_session = acquire_mock + self.release_session = release_mock + self.invalidate_session = invalidate_mock + + def prepare_get_session(self, session_id): + self.session_manager.get_session_id = MagicMock(return_value=session_id) + + def mock_request_lock(self, fence, first_call_err=None): + self._mock_request("_request_lock", fence, first_call_err) + + def mock_request_unlock(self, result, first_call_err=None): + self._mock_request("_request_unlock", result, first_call_err) + + def mock_request_try_lock(self, fence, first_call_err=None): + self._mock_request("_request_try_lock", fence, first_call_err) + + def mock_request_get_lock_ownership_state(self, state, first_call_err=None): + self._mock_request("_request_get_lock_ownership_state", state, first_call_err) + + def _mock_request(self, method_name, result, first_call_err): + called = AtomicInteger() + + def mock(*_, **__): + if called.get_and_increment() == 0 and first_call_err: + return ImmediateExceptionFuture(first_call_err) + return ImmediateFuture(result) + + setattr(self.proxy._wrapped, method_name, MagicMock(side_effect=mock)) + + def assert_call_counts(self, acquire, release, invalidate): + self.assertEqual(acquire, self.acquire_session.call_count) + self.assertEqual(release, self.release_session.call_count) + self.assertEqual(invalidate, self.invalidate_session.call_count) + + def prepare_state(self, fence, lock_count, session_id, t_id): + return { + "fence": fence, + "lock_count": lock_count, + "session_id": session_id, + "thread_id": t_id, + } + + def assert_no_lock_session_id(self): + self.assertEqual(0, len(self.proxy._wrapped._lock_session_ids)) + + def assert_lock_session_id(self, session_id): + s_id = self.proxy._wrapped._lock_session_ids.get(thread_id(), None) + self.assertEqual(session_id, s_id) From c19a60df70e4017f5dd50b3de97f775e57c42913 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 27 Oct 2020 16:00:40 +0300 Subject: [PATCH 2/6] Add API documentation and fix some docstrings --- docs/hazelcast.proxy.cp.fenced_lock.rst | 4 ++++ docs/hazelcast.proxy.cp.rst | 3 ++- hazelcast/proxy/cp/fenced_lock.py | 12 ++++++------ 3 files changed, 12 insertions(+), 7 deletions(-) create mode 100644 docs/hazelcast.proxy.cp.fenced_lock.rst diff --git a/docs/hazelcast.proxy.cp.fenced_lock.rst b/docs/hazelcast.proxy.cp.fenced_lock.rst new file mode 100644 index 0000000000..9c0770a34f --- /dev/null +++ b/docs/hazelcast.proxy.cp.fenced_lock.rst @@ -0,0 +1,4 @@ +FencedLock +============== + +.. automodule:: hazelcast.proxy.cp.fenced_lock diff --git a/docs/hazelcast.proxy.cp.rst b/docs/hazelcast.proxy.cp.rst index 69d4e16856..896428e0e2 100644 --- a/docs/hazelcast.proxy.cp.rst +++ b/docs/hazelcast.proxy.cp.rst @@ -4,4 +4,5 @@ CP Proxies .. toctree:: hazelcast.proxy.cp.atomic_long hazelcast.proxy.cp.atomic_reference - hazelcast.proxy.cp.count_down_latch \ No newline at end of file + hazelcast.proxy.cp.count_down_latch + hazelcast.proxy.cp.fenced_lock \ No newline at end of file diff --git a/hazelcast/proxy/cp/fenced_lock.py b/hazelcast/proxy/cp/fenced_lock.py index 867b71b470..a89471e7bf 100644 --- a/hazelcast/proxy/cp/fenced_lock.py +++ b/hazelcast/proxy/cp/fenced_lock.py @@ -23,14 +23,14 @@ class FencedLock(SessionAwareCPProxy): FencedLock works on top of CP sessions. Please refer to CP Session IMDG documentation section for more information. - By default, {@link FencedLock} is reentrant. Once a caller acquires + By default, FencedLock is reentrant. Once a caller acquires the lock, it can acquire the lock reentrantly as many times as it wants in a linearizable manner. You can configure the reentrancy behaviour on the member side. For instance, reentrancy can be disabled and FencedLock can work as a non-reentrant mutex. One can also set a custom reentrancy limit. When the reentrancy limit is reached, FencedLock does not block a lock call. Instead, it fails with - ``LockAcquireLimitReachedException`` or a specified return value. + ``LockAcquireLimitReachedError`` or a specified return value. Please check the locking methods to see details about the behaviour. """ @@ -44,7 +44,7 @@ def lock(self): """Acquires the lock. When the caller already holds the lock and the current ``lock()`` call is - reentrant, the call can fail with ``LockAcquireLimitReachedException`` + reentrant, the call can fail with ``LockAcquireLimitReachedError`` if the lock acquire limit is already reached. If the lock is not available then the current thread becomes disabled @@ -57,7 +57,7 @@ def lock(self): Raises: LockOwnershipLostError: If the underlying CP session was closed before the client releases the lock - LockAcquireLimitReachedException: If the lock call is reentrant + LockAcquireLimitReachedError: If the lock call is reentrant and the configured lock acquire limit is already reached. """ def handler(f): @@ -71,7 +71,7 @@ def lock_and_get_fence(self): thread for this lock acquire. If the lock is acquired reentrantly, the same fencing token is returned, - or the ``lock()`` call can fail with ``LockAcquireLimitReachedException`` + or the ``lock()`` call can fail with ``LockAcquireLimitReachedError`` if the lock acquire limit is already reached. If the lock is not available then the current thread becomes disabled @@ -117,7 +117,7 @@ def lock_and_get_fence(self): Raises: LockOwnershipLostError: If the underlying CP session was closed before the client releases the lock - LockAcquireLimitReachedException: If the lock call is reentrant + LockAcquireLimitReachedError: If the lock call is reentrant and the configured lock acquire limit is already reached. """ current_thread_id = thread_id() From f9555d353f7831503558ed64da337892899f0994 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 28 Oct 2020 18:27:12 +0300 Subject: [PATCH 3/6] fix typos --- examples/cp/fenced_lock_example.py | 2 +- hazelcast/cp.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/cp/fenced_lock_example.py b/examples/cp/fenced_lock_example.py index 9152006822..624c2cd2cd 100644 --- a/examples/cp/fenced_lock_example.py +++ b/examples/cp/fenced_lock_example.py @@ -14,7 +14,7 @@ print("Locked after lock:", locked) locked = lock.try_lock() - print("Locked reentratly:", locked) + print("Locked reentrantly:", locked) # more guarded code finally: diff --git a/hazelcast/cp.py b/hazelcast/cp.py index cf6db56cbd..530916b2c1 100644 --- a/hazelcast/cp.py +++ b/hazelcast/cp.py @@ -258,7 +258,7 @@ def get_session_id(self, group_id): return _NO_SESSION_ID def acquire_session(self, group_id, count): - return self._get_or_crate_session(group_id).continue_with(lambda state: state.result().acquire(count)) + return self._get_or_create_session(group_id).continue_with(lambda state: state.result().acquire(count)) def release_session(self, group_id, session_id, count): session = self._sessions.get(group_id, None) @@ -319,7 +319,7 @@ def _request_close_session(self, group_id, session_id): self._context.invocation_service.invoke(invocation) return invocation.future - def _get_or_crate_session(self, group_id): + def _get_or_create_session(self, group_id): with self._lock: if self._shutdown: error = HazelcastClientNotActiveError("Session manager is already shut down!") From 6a58a38d91b65b89469f59ee796cb29d20528d75 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Fri, 30 Oct 2020 11:23:51 +0300 Subject: [PATCH 4/6] address review comments --- hazelcast/cp.py | 20 +++++++------------- tests/cp_test.py | 4 ++-- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/hazelcast/cp.py b/hazelcast/cp.py index 530916b2c1..c7361d1f40 100644 --- a/hazelcast/cp.py +++ b/hazelcast/cp.py @@ -1,5 +1,5 @@ import time -from threading import RLock +from threading import RLock, Lock from hazelcast import six from hazelcast.errors import SessionExpiredError, CPGroupDestroyedError, HazelcastClientNotActiveError @@ -162,6 +162,7 @@ class CPProxyManager(object): def __init__(self, context): self._context = context self._lock_proxies = dict() # proxy_name to FencedLock + self._mux = Lock() # Guards the _lock_proxies def get_or_create(self, service_name, proxy_name): proxy_name = _without_default_group_name(proxy_name) @@ -178,7 +179,7 @@ def get_or_create(self, service_name, proxy_name): return self._create_fenced_lock(group_id, proxy_name, object_name) def _create_fenced_lock(self, group_id, proxy_name, object_name): - while True: + with self._mux: proxy = self._lock_proxies.get(proxy_name, None) if proxy: if proxy.get_group_id() != group_id: @@ -187,11 +188,8 @@ def _create_fenced_lock(self, group_id, proxy_name, object_name): return proxy proxy = FencedLock(self._context, group_id, LOCK_SERVICE, proxy_name, object_name) - existing = self._lock_proxies.setdefault(proxy_name, proxy) - if existing.get_group_id() == proxy.get_group_id(): - return existing - - group_id = self._get_group_id(proxy_name) + self._lock_proxies[proxy_name] = proxy + return proxy def _get_group_id(self, proxy_name): codec = cp_group_create_cp_group_codec @@ -266,6 +264,7 @@ def release_session(self, group_id, session_id, count): session.release(count) def invalidate_session(self, group_id, session_id): + # called from the reactor thread only session = self._sessions.get(group_id, None) if session and session.id == session_id: self._sessions.pop(group_id, None) @@ -378,7 +377,7 @@ def cb(heartbeat_future): error = heartbeat_future.exception() if isinstance(error, (SessionExpiredError, CPGroupDestroyedError)): - self._invalidate_session(session.group_id, session.id) + self.invalidate_session(session.group_id, session.id) f = self._request_heartbeat(session.group_id, session.id) f.add_done_callback(cb) @@ -395,8 +394,3 @@ def _request_heartbeat(self, group_id, session_id): invocation = Invocation(request) self._context.invocation_service.invoke(invocation) return invocation.future - - def _invalidate_session(self, group_id, session_id): - session = self._sessions.get(group_id, None) - if session and session.id == session_id: - self._sessions.pop(group_id, None) diff --git a/tests/cp_test.py b/tests/cp_test.py index b983c3caa6..7a6333edd1 100644 --- a/tests/cp_test.py +++ b/tests/cp_test.py @@ -178,8 +178,8 @@ def test_heartbeat_on_failure(self): self.mock_request_new_session() self.manager._request_heartbeat = MagicMock(return_value=ImmediateExceptionFuture(SessionExpiredError())) - m = MagicMock(side_effect=self.manager._invalidate_session) - self.manager._invalidate_session = m + m = MagicMock(side_effect=self.manager.invalidate_session) + self.manager.invalidate_session = m self.manager.acquire_session(self.raft_group_id, 1).result() time.sleep(2) From 229a35fd943d0be703ebe819aabba455b546a2bd Mon Sep 17 00:00:00 2001 From: mdumandag Date: Fri, 30 Oct 2020 13:51:19 +0300 Subject: [PATCH 5/6] remove lock_and_get_fence, try_lock_and_get_fence and get_fence methods --- README.md | 7 +- examples/cp/fenced_lock_example.py | 6 +- examples/org-website/lock_sample.py | 14 +- hazelcast/proxy/cp/fenced_lock.py | 142 +--------- tests/proxy/cp/fenced_lock_test.py | 406 +++------------------------- 5 files changed, 59 insertions(+), 516 deletions(-) diff --git a/README.md b/README.md index ad21f0ff3e..0f36893047 100644 --- a/README.md +++ b/README.md @@ -1623,8 +1623,8 @@ A basic Lock usage example is shown below. ```python # Get a FencedLock called "my-lock" lock = client.cp_subsystem.get_lock("my-lock").blocking() -# Acquire the lock -lock.lock() +# Acquire the lock and get the fencing token +fence = lock.lock() try: # Your guarded code goes here pass @@ -1663,9 +1663,6 @@ After that, once Client-1 comes back alive, its write request will be rejected b You can read more about the fencing token idea in Martin Kleppmann's "How to do distributed locking" blog post and Google's Chubby paper. -To get fencing token, one may use ``lock.lock_and_get_fence()`` or ``lock.try_lock_and_get_fence()`` utility methods, or ``lock.get_fence()`` method -while holding the lock. - #### 7.4.12.3. Using CountDownLatch Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch. diff --git a/examples/cp/fenced_lock_example.py b/examples/cp/fenced_lock_example.py index 624c2cd2cd..d1dc3e60a8 100644 --- a/examples/cp/fenced_lock_example.py +++ b/examples/cp/fenced_lock_example.py @@ -7,14 +7,14 @@ locked = lock.is_locked() print("Locked initially:", locked) -fence = lock.lock_and_get_fence() +fence = lock.lock() print("Fence token:", fence) try: locked = lock.is_locked() print("Locked after lock:", locked) - locked = lock.try_lock() - print("Locked reentrantly:", locked) + fence = lock.try_lock() + print("Locked reentrantly:", fence != lock.INVALID_FENCE) # more guarded code finally: diff --git a/examples/org-website/lock_sample.py b/examples/org-website/lock_sample.py index a4403f9640..875771ddfe 100644 --- a/examples/org-website/lock_sample.py +++ b/examples/org-website/lock_sample.py @@ -1,13 +1,13 @@ -# TODO Fix this when we add CP Lock - import hazelcast -# Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1 +# Start the Hazelcast Client and connect to an already running +# Hazelcast Cluster on 127.0.0.1 hz = hazelcast.HazelcastClient() -# Get a distributed lock called "my-distributed-lock" -lock = hz.get_lock("my-distributed-lock").blocking() -# Now create a lock and execute some guarded code. -lock.lock() +# Get the Distributed Lock from CP Subsystem +lock = hz.cp_subsystem.get_lock("my-distributed-lock").blocking() +# Now acquire the lock and execute some guarded code +fence = lock.lock() +print("Fence token:", fence) try: # do something here pass diff --git a/hazelcast/proxy/cp/fenced_lock.py b/hazelcast/proxy/cp/fenced_lock.py index a89471e7bf..fb572ad30a 100644 --- a/hazelcast/proxy/cp/fenced_lock.py +++ b/hazelcast/proxy/cp/fenced_lock.py @@ -41,32 +41,6 @@ def __init__(self, context, group_id, service_name, proxy_name, object_name): self._lock_session_ids = dict() # thread-id to session id that has acquired the lock def lock(self): - """Acquires the lock. - - When the caller already holds the lock and the current ``lock()`` call is - reentrant, the call can fail with ``LockAcquireLimitReachedError`` - if the lock acquire limit is already reached. - - If the lock is not available then the current thread becomes disabled - for thread scheduling purposes and lies dormant until the lock has been - acquired. - - Returns: - hazelcast.future.Future[None]: - - Raises: - LockOwnershipLostError: If the underlying CP session was - closed before the client releases the lock - LockAcquireLimitReachedError: If the lock call is reentrant - and the configured lock acquire limit is already reached. - """ - def handler(f): - f.result() - return None - - return self.lock_and_get_fence().continue_with(handler) - - def lock_and_get_fence(self): """Acquires the lock and returns the fencing token assigned to the current thread for this lock acquire. @@ -78,12 +52,6 @@ def lock_and_get_fence(self): for thread scheduling purposes and lies dormant until the lock has been acquired. - This is a convenience method for the following pattern :: - - lock = client.cp_subsystem.get_lock("lock").blocking() - lock.lock() - fence = lock.get_fence() - Fencing tokens are monotonic numbers that are incremented each time the lock switches from the free state to the acquired state. They are simply used for ordering lock holders. A lock holder can pass @@ -94,12 +62,12 @@ def lock_and_get_fence(self): Consider the following scenario where the lock is free initially :: lock = client.cp_subsystem.get_lock("lock").blocking() - fence1 = lock.lock_and_get_fence() # (1) - fence2 = lock.lock_and_get_fence() # (2) + fence1 = lock.lock() # (1) + fence2 = lock.lock() # (2) assert fence1 == fence2 lock.unlock() lock.unlock() - fence3 = lock.lock_and_get_fence() # (3) + fence3 = lock.lock() # (3) assert fence3 > fence1 In this scenario, the lock is acquired by a thread in the cluster. Then, @@ -126,28 +94,27 @@ def lock_and_get_fence(self): def try_lock(self, timeout=0): """Acquires the lock if it is free within the given waiting time, - or already held by the current thread. + or already held by the current thread at the time of invocation and, + the acquire limit is not exceeded, and returns the fencing token + assigned to the current thread for this lock acquire. - If the lock is available, this method returns immediately with the value - ``True``. When the call is reentrant, it immediately returns - ``True`` if the lock acquire limit is not exceeded. Otherwise, - it returns ``False`` on the reentrant lock attempt if the acquire - limit is exceeded. + If the lock is acquired reentrantly, the same fencing token is returned. + If the lock acquire limit is exceeded, then this method immediately returns + :const:`INVALID_FENCE` that represents a failed lock attempt. If the lock is not available then the current thread becomes disabled for thread scheduling purposes and lies dormant until the lock is acquired by the current thread or the specified waiting time elapses. - If the lock is acquired, then the value ``True`` is returned. - - If the specified waiting time elapses, then the value ``False`` + If the specified waiting time elapses, then :const:`INVALID_FENCE` is returned. If the time is less than or equal to zero, the method does not wait at all. By default, timeout is set to zero. A typical usage idiom for this method would be :: lock = client.cp_subsystem.get_lock("lock").blocking() - if lock.try_lock(): + fence = lock.try_lock() + if fence != lock.INVALID_FENCE: try: # manipulate the protected state finally: @@ -158,47 +125,8 @@ def try_lock(self, timeout=0): This usage ensures that the lock is unlocked if it was acquired, and doesn't try to unlock if the lock was not acquired. - Args: - timeout (int): The maximum time to wait for the lock in seconds. - - Returns: - hazelcast.future.Future[bool]: ``True`` if the lock was acquired and ``False`` - if the waiting time elapsed before the lock was acquired. - - Raises: - LockOwnershipLostError: If the underlying CP session was - closed before the client releases the lock - """ - return self.try_lock_and_get_fence(timeout).continue_with(lambda f: f.result() != self.INVALID_FENCE) - - def try_lock_and_get_fence(self, timeout=0): - """Acquires the lock if it is free within the given waiting time, - or already held by the current thread at the time of invocation and, - the acquire limit is not exceeded, and returns the fencing token - assigned to the current thread for this lock acquire. - - If the lock is acquired reentrantly, the same fencing token is returned. - If the lock acquire limit is exceeded, then this method immediately returns - :const:`INVALID_FENCE` that represents a failed lock attempt. - - If the lock is not available then the current thread becomes disabled - for thread scheduling purposes and lies dormant until the lock is - acquired by the current thread or the specified waiting time elapses. - - If the specified waiting time elapses, then :const:`INVALID_FENCE` - is returned. If the time is less than or equal to zero, the method does - not wait at all. By default, timeout is set to zero. - - This is a convenience method for the following pattern :: - - lock = client.cp_subsystem.get_lock("lock").blocking() - if lock.try_lock(): - fence = lock.get_fence() - else: - fence = lock.INVALID_FENCE - See Also: - :func:`lock_and_get_fence` function for more information about fences. + :func:`lock` function for more information about fences. Args: timeout (int): The maximum time to wait for the lock in seconds. @@ -260,50 +188,6 @@ def check_response(f): return self._request_unlock(session_id, current_thread_id, uuid.uuid4()).continue_with(check_response) - def get_fence(self): - """Returns the fencing token if the lock is held by the current thread. - - Fencing tokens are monotonic numbers that are incremented each time - the lock switches from the free state to the acquired state. They are - simply used for ordering lock holders. A lock holder can pass - its fencing to the shared resource to fence off previous lock holders. - When this resource receives an operation, it can validate the fencing - token in the operation. - - Returns: - hazelcast.future.Future[int]: The fencing token if the lock is held - by the current thread - - Raises: - LockOwnershipLostError: If the underlying CP session was - closed before the client releases the lock - IllegalMonitorStateError: If the lock is not held by - the current thread - """ - current_thread_id = thread_id() - session_id = self._get_session_id() - - # the order of the following checks is important - try: - self._verify_locked_session_id_if_present(current_thread_id, session_id, False) - except LockOwnershipLostError as e: - return ImmediateExceptionFuture(e) - - if session_id == _NO_SESSION_ID: - self._lock_session_ids.pop(current_thread_id, None) - return ImmediateExceptionFuture(self._new_illegal_monitor_state_error()) - - def check_response(f): - state = _LockOwnershipState(f.result()) - if state.is_locked_by(session_id, current_thread_id): - self._lock_session_ids[current_thread_id] = session_id - return state.fence - - self._verify_no_locked_session_id_present(current_thread_id) - raise self._new_illegal_monitor_state_error() - - return self._request_get_lock_ownership_state().continue_with(check_response) - def is_locked(self): """Returns whether this lock is locked or not. diff --git a/tests/proxy/cp/fenced_lock_test.py b/tests/proxy/cp/fenced_lock_test.py index edd57085e3..448643e8c7 100644 --- a/tests/proxy/cp/fenced_lock_test.py +++ b/tests/proxy/cp/fenced_lock_test.py @@ -26,7 +26,7 @@ def tearDown(self): def test_lock_in_another_group(self): another_lock = self.client.cp_subsystem.get_lock(self.lock._proxy_name + "@another").blocking() - another_lock.lock() + self.assert_valid_fence(another_lock.lock()) try: self.assertTrue(another_lock.is_locked()) self.assertFalse(self.lock.is_locked()) @@ -36,7 +36,7 @@ def test_lock_in_another_group(self): def test_lock_after_client_shutdown(self): another_client = HazelcastClient(cluster_name=self.cluster.id) another_lock = another_client.cp_subsystem.get_lock(self.lock._proxy_name).blocking() - another_lock.lock() + self.assert_valid_fence(another_lock.lock()) self.assertTrue(another_lock.is_locked()) self.assertTrue(self.lock.is_locked()) another_client.shutdown() @@ -52,64 +52,38 @@ def test_use_after_destroy(self): self.lock.destroy() with self.assertRaises(DistributedObjectDestroyedError): - self.lock.lock_and_get_fence() + self.lock.lock() lock2 = self.client.cp_subsystem.get_lock(self.lock._proxy_name).blocking() with self.assertRaises(DistributedObjectDestroyedError): - lock2.lock_and_get_fence() + lock2.lock() def test_lock_when_not_locked(self): - self.assertIsNone(self.lock.lock()) - self.assertTrue(self.lock.is_locked()) - self.assertTrue(self.lock.is_locked_by_current_thread()) - self.assertEqual(1, self.lock.get_lock_count()) - self.assert_session_acquire_count(1) - - def test_lock_when_locked_by_self(self): - self.lock.lock() - self.lock.lock() - self.assert_lock(True, True, 2) - self.assert_session_acquire_count(2) - - def test_lock_when_locked_by_another_thread(self): - t = self.start_new_thread(lambda: self.lock.lock()) - t.join() - self.assert_lock(True, False, 1) - future = self.lock._wrapped.lock() - time.sleep(2) - self.assertFalse(future.done()) # lock request made in the main thread should not complete - self.assert_session_acquire_count(2) - self.lock.destroy() - - with self.assertRaises(DistributedObjectDestroyedError): - future.result() - - def test_lock_and_get_fence_when_not_locked(self): - self.assert_valid_fence(self.lock.lock_and_get_fence()) + self.assert_valid_fence(self.lock.lock()) self.assert_lock(True, True, 1) self.assert_session_acquire_count(1) - def test_lock_and_get_fence_when_locked_by_self(self): - fence = self.lock.lock_and_get_fence() + def test_lock_when_locked_by_self(self): + fence = self.lock.lock() self.assert_valid_fence(fence) - fence2 = self.lock.lock_and_get_fence() + fence2 = self.lock.lock() self.assertEqual(fence, fence2) self.assert_lock(True, True, 2) self.assert_session_acquire_count(2) self.lock.unlock() self.lock.unlock() self.assert_session_acquire_count(0) - fence3 = self.lock.lock_and_get_fence() + fence3 = self.lock.lock() self.assert_valid_fence(fence3) self.assertGreater(fence3, fence) self.assert_session_acquire_count(1) - def test_lock_and_get_fence_when_locked_by_another_thread(self): - t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.lock_and_get_fence())) + def test_lock_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.lock())) t.join() self.assert_lock(True, False, 1) - future = self.lock._wrapped.lock_and_get_fence() + future = self.lock._wrapped.lock() time.sleep(2) self.assertFalse(future.done()) # lock request made in the main thread should not complete self.assert_session_acquire_count(2) @@ -119,92 +93,56 @@ def test_lock_and_get_fence_when_locked_by_another_thread(self): future.result() def test_try_lock_when_free(self): - self.assertTrue(self.lock.try_lock()) + self.assert_valid_fence(self.lock.try_lock()) self.assert_lock(True, True, 1) self.assert_session_acquire_count(1) def test_try_lock_when_free_with_timeout(self): - self.assertTrue(self.lock.try_lock(1)) - self.assert_lock(True, True, 1) - self.assert_session_acquire_count(1) - - def test_try_lock_when_locked_by_self(self): - self.lock.lock() - self.assertTrue(self.lock.try_lock()) - self.assert_lock(True, True, 2) - self.assert_session_acquire_count(2) - - def test_try_lock_when_locked_by_self_with_timeout(self): - self.lock.lock() - self.assertTrue(self.lock.try_lock(2)) - self.assert_lock(True, True, 2) - self.assert_session_acquire_count(2) - - def test_try_lock_when_locked_by_another_thread(self): - t = self.start_new_thread(lambda: self.lock.try_lock()) - t.join() - self.assertFalse(self.lock.try_lock()) - self.assert_lock(True, False, 1) - self.assert_session_acquire_count(1) - - def test_try_lock_when_locked_by_another_thread_with_timeout(self): - t = self.start_new_thread(lambda: self.lock.try_lock()) - t.join() - self.assertFalse(self.lock.try_lock(1)) - self.assert_lock(True, False, 1) - self.assert_session_acquire_count(1) - - def test_try_lock_and_get_fence_when_free(self): - self.assert_valid_fence(self.lock.try_lock_and_get_fence()) - self.assert_lock(True, True, 1) - self.assert_session_acquire_count(1) - - def test_try_lock_and_get_fence_when_free_with_timeout(self): self.assert_valid_fence(self.lock.try_lock(1)) self.assert_lock(True, True, 1) self.assert_session_acquire_count(1) - def test_try_lock_and_get_fence_when_locked_by_self(self): - fence = self.lock.try_lock_and_get_fence() + def test_try_lock_when_locked_by_self(self): + fence = self.lock.lock() self.assert_valid_fence(fence) - fence2 = self.lock.try_lock_and_get_fence() + fence2 = self.lock.try_lock() self.assertEqual(fence, fence2) self.assert_lock(True, True, 2) self.assert_session_acquire_count(2) self.lock.unlock() self.lock.unlock() self.assert_session_acquire_count(0) - fence3 = self.lock.try_lock_and_get_fence() + fence3 = self.lock.try_lock() self.assert_valid_fence(fence3) self.assertGreater(fence3, fence) self.assert_session_acquire_count(1) - def test_try_lock_and_get_fence_when_locked_by_self_with_timeout(self): - fence = self.lock.try_lock_and_get_fence(1) + def test_try_lock_when_locked_by_self_with_timeout(self): + fence = self.lock.lock() self.assert_valid_fence(fence) - fence2 = self.lock.try_lock_and_get_fence(2) + fence2 = self.lock.try_lock(2) self.assertEqual(fence, fence2) self.assert_lock(True, True, 2) self.assert_session_acquire_count(2) self.lock.unlock() self.lock.unlock() self.assert_session_acquire_count(0) - fence3 = self.lock.try_lock_and_get_fence() + fence3 = self.lock.try_lock(2) self.assert_valid_fence(fence3) self.assertGreater(fence3, fence) self.assert_session_acquire_count(1) - def test_try_lock_and_get_fence_when_locked_by_another_thread(self): - t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.try_lock_and_get_fence())) + def test_try_lock_when_locked_by_another_thread(self): + t = self.start_new_thread(lambda: self.lock.try_lock()) t.join() - self.assert_not_valid_fence(self.lock.try_lock_and_get_fence()) + self.assertFalse(self.lock.try_lock()) self.assert_lock(True, False, 1) self.assert_session_acquire_count(1) - def test_try_lock_and_get_fence_when_locked_by_another_thread_with_timeout(self): - t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.try_lock_and_get_fence(1))) + def test_try_lock_when_locked_by_another_thread_with_timeout(self): + t = self.start_new_thread(lambda: self.assert_valid_fence(self.lock.try_lock())) t.join() - self.assert_not_valid_fence(self.lock.try_lock_and_get_fence(1)) + self.assert_not_valid_fence(self.lock.try_lock(1)) self.assert_lock(True, False, 1) self.assert_session_acquire_count(1) @@ -243,41 +181,6 @@ def test_unlock_when_locked_by_another_thread(self): self.assert_lock(True, False, 1) self.assert_session_acquire_count(1) - def test_get_fence_when_free(self): - with self.assertRaises(IllegalMonitorStateError): - self.lock.get_fence() - - self.assert_session_acquire_count(0) - - def test_get_fence_when_locked_by_self(self): - self.lock.lock() - fence = self.lock.get_fence() - self.assert_valid_fence(fence) - self.lock.lock() - fence2 = self.lock.get_fence() - self.assertEqual(fence, fence2) - self.lock.unlock() - self.lock.unlock() - self.lock.lock() - fence3 = self.lock.get_fence() - self.assert_valid_fence(fence3) - self.assertGreater(fence3, fence) - self.lock.unlock() - - with self.assertRaises(IllegalMonitorStateError): - self.lock.get_fence() - - self.assert_session_acquire_count(0) - - def test_get_fence_when_locked_by_another_thread(self): - t = self.start_new_thread(lambda: self.lock.lock()) - t.join() - - with self.assertRaises(IllegalMonitorStateError): - self.lock.get_fence() - - self.assert_session_acquire_count(1) - def test_is_locked_when_free(self): self.assertFalse(self.lock.is_locked()) self.assert_session_acquire_count(0) @@ -356,7 +259,7 @@ def test_lock(self): # Everything succeeds self.prepare_acquire_session(1) self.mock_request_lock(2) - self.proxy.lock() + self.assertEqual(2, self.proxy.lock()) self.assert_call_counts(1, 0, 0) self.assert_lock_session_id(1) @@ -397,7 +300,7 @@ def test_lock_on_session_expired_error(self): # Session expired error comes from the server on lock request, retries and gets valid fence self.prepare_acquire_session(1) self.mock_request_lock(2, SessionExpiredError()) - self.proxy.lock() + self.assertEqual(2, self.proxy.lock()) self.assert_call_counts(2, 0, 1) self.assert_lock_session_id(1) @@ -436,95 +339,11 @@ def test_lock_on_unspecified_error(self): self.assert_call_counts(1, 1, 0) self.assert_no_lock_session_id() - def test_lock_and_get_fence(self): - # Everything succeeds - self.prepare_acquire_session(1) - self.mock_request_lock(2) - self.assertEqual(2, self.proxy.lock_and_get_fence()) - self.assert_call_counts(1, 0, 0) - self.assert_lock_session_id(1) - - def test_lock_and_get_fence_when_acquire_session_fails(self): - # First call to acquire session fails, should not retry - self.prepare_acquire_session(-1, HazelcastRuntimeError("server_error")) - - with self.assertRaises(HazelcastRuntimeError): - self.proxy.lock_and_get_fence() - - self.assert_call_counts(1, 0, 0) - self.assert_no_lock_session_id() - - def test_lock_and_get_fence_when_server_closes_old_session(self): - # Same thread issues a new lock call while holding a lock. - # Server closes session related to the first lock, should not retry - self.prepare_acquire_session(2) - self.prepare_lock_session_ids(1) - - with self.assertRaises(LockOwnershipLostError): - self.proxy.lock_and_get_fence() - - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - - def test_lock_and_get_fence_when_lock_acquire_limit_reached(self): - # Lock acquire limit is reached, server returns invalid fence, should not retry - self.prepare_acquire_session(1) - self.mock_request_lock(FencedLock.INVALID_FENCE) - - with self.assertRaises(LockAcquireLimitReachedError): - self.proxy.lock_and_get_fence() - - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - - def test_lock_and_get_fence_on_session_expired_error(self): - # Session expired error comes from the server on lock request, retries and gets valid fence - self.prepare_acquire_session(1) - self.mock_request_lock(2, SessionExpiredError()) - self.assertEqual(2, self.proxy.lock_and_get_fence()) - self.assert_call_counts(2, 0, 1) - self.assert_lock_session_id(1) - - def test_lock_and_get_fence_on_session_expired_error_on_reentrant_lock_request(self): - # Session expired error comes from the server on second lock request, - # while holding a lock, should not retry - self.prepare_acquire_session(1) - self.prepare_lock_session_ids(1) - self.mock_request_lock(3, SessionExpiredError()) - - with self.assertRaises(LockOwnershipLostError): - self.proxy.lock_and_get_fence() - - self.assert_call_counts(1, 0, 1) - self.assert_no_lock_session_id() - - def test_lock_and_get_fence_on_wait_key_cancelled_error(self): - # Wait key cancelled error comes from the server, should not retry - self.prepare_acquire_session(1) - self.mock_request_lock(2, WaitKeyCancelledError()) - - with self.assertRaises(IllegalMonitorStateError): - self.proxy.lock_and_get_fence() - - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - - def test_lock_and_get_fence_on_unspecified_error(self): - # Server sends another error, should not retry - self.prepare_acquire_session(1) - self.mock_request_lock(-1, HazelcastRuntimeError("expected")) - - with self.assertRaises(HazelcastRuntimeError): - self.proxy.lock_and_get_fence() - - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - def test_try_lock(self): # Everything succeeds self.prepare_acquire_session(1) self.mock_request_try_lock(2) - self.assertTrue(self.proxy.try_lock()) + self.assertEqual(2, self.proxy.try_lock()) self.assert_call_counts(1, 0, 0) self.assert_lock_session_id(1) @@ -554,7 +373,7 @@ def test_try_lock_when_lock_acquire_limit_reached(self): # Lock acquire limit is reached, server returns invalid fence self.prepare_acquire_session(1) self.mock_request_try_lock(FencedLock.INVALID_FENCE) - self.assertFalse(self.proxy.try_lock()) + self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock()) self.assert_call_counts(1, 1, 0) self.assert_no_lock_session_id() @@ -563,7 +382,7 @@ def test_try_lock_on_session_expired_error(self): # client determines the timeout and returns invalid fence self.prepare_acquire_session(1) self.mock_request_try_lock(2, SessionExpiredError()) - self.assertFalse(self.proxy.try_lock()) + self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock()) self.assert_call_counts(1, 0, 1) self.assert_no_lock_session_id() @@ -572,7 +391,7 @@ def test_try_lock_on_session_expired_error_when_not_timed_out(self): # client retries due to not reaching timeout and succeeds self.prepare_acquire_session(1) self.mock_request_try_lock(2, SessionExpiredError()) - self.assertTrue(self.proxy.try_lock(100)) + self.assertEqual(2, self.proxy.try_lock(100)) self.assert_call_counts(2, 0, 1) self.assert_lock_session_id(1) @@ -593,7 +412,7 @@ def test_try_lock_on_wait_key_cancelled_error(self): # Wait key cancelled error comes from the server, invalid fence is returned self.prepare_acquire_session(1) self.mock_request_try_lock(2, WaitKeyCancelledError()) - self.assertFalse(self.proxy.try_lock()) + self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock()) self.assert_call_counts(1, 1, 0) self.assert_no_lock_session_id() @@ -608,94 +427,6 @@ def test_try_lock_on_unspecified_error(self): self.assert_call_counts(1, 1, 0) self.assert_no_lock_session_id() - def test_try_lock_and_get_fence(self): - # Everything succeeds - self.prepare_acquire_session(1) - self.mock_request_try_lock(2) - self.assertEqual(2, self.proxy.try_lock_and_get_fence()) - self.assert_call_counts(1, 0, 0) - self.assert_lock_session_id(1) - - def test_try_lock_and_get_fence_when_acquire_session_fails(self): - # First call to acquire session fails, should not retry - self.prepare_acquire_session(-1, HazelcastRuntimeError("server_error")) - - with self.assertRaises(HazelcastRuntimeError): - self.proxy.try_lock_and_get_fence() - - self.assert_call_counts(1, 0, 0) - self.assert_no_lock_session_id() - - def test_try_lock_and_get_fence_when_server_closes_old_session(self): - # Same thread issues a new lock call while holding a lock. - # Server closes session related to the first lock, should not retry - self.prepare_acquire_session(2) - self.prepare_lock_session_ids(1) - - with self.assertRaises(LockOwnershipLostError): - self.proxy.try_lock_and_get_fence() - - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - - def test_try_lock_and_get_fence_when_lock_acquire_limit_reached(self): - # Lock acquire limit is reached, server returns invalid fence - self.prepare_acquire_session(1) - self.mock_request_try_lock(FencedLock.INVALID_FENCE) - self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock_and_get_fence()) - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - - def test_try_lock_and_get_fence_on_session_expired_error(self): - # Session expired error comes from the server on lock request, - # client determines the timeout and returns invalid fence - self.prepare_acquire_session(1) - self.mock_request_try_lock(2, SessionExpiredError()) - self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock_and_get_fence()) - self.assert_call_counts(1, 0, 1) - self.assert_no_lock_session_id() - - def test_try_lock_and_get_fence_on_session_expired_error_when_not_timed_out(self): - # Session expired error comes from the server on lock request, - # client retries due to not reaching timeout and succeeds - self.prepare_acquire_session(1) - self.mock_request_try_lock(2, SessionExpiredError()) - self.assertEqual(2, self.proxy.try_lock_and_get_fence(100)) - self.assert_call_counts(2, 0, 1) - self.assert_lock_session_id(1) - - def test_try_lock_and_get_fence_on_session_expired_error_on_reentrant_lock_request(self): - # Session expired error comes from the server on second lock request, - # while holding a lock, should not retry - self.prepare_acquire_session(1) - self.prepare_lock_session_ids(1) - self.mock_request_try_lock(3, SessionExpiredError()) - - with self.assertRaises(LockOwnershipLostError): - self.proxy.try_lock_and_get_fence() - - self.assert_call_counts(1, 0, 1) - self.assert_no_lock_session_id() - - def test_try_lock_and_get_fence_on_wait_key_cancelled_error(self): - # Wait key cancelled error comes from the server, invalid fence is returned - self.prepare_acquire_session(1) - self.mock_request_try_lock(2, WaitKeyCancelledError()) - self.assertEqual(FencedLock.INVALID_FENCE, self.proxy.try_lock_and_get_fence()) - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - - def test_try_lock_and_get_fence_on_unspecified_error(self): - # Server sends another error, should not retry - self.prepare_acquire_session(1) - self.mock_request_try_lock(-1, HazelcastRuntimeError("expected")) - - with self.assertRaises(HazelcastRuntimeError): - self.proxy.try_lock_and_get_fence() - - self.assert_call_counts(1, 1, 0) - self.assert_no_lock_session_id() - def test_unlock(self): # Everything succeeds self.prepare_get_session(2) @@ -770,75 +501,6 @@ def test_unlock_on_unspecified_error(self): self.assert_call_counts(0, 0, 0) self.assert_no_lock_session_id() - def test_get_fence(self): - # Everything succeeds - self.prepare_get_session(2) - state = self.prepare_state(3, 1, 2, thread_id()) - self.mock_request_get_lock_ownership_state(state) - self.assertEqual(3, self.proxy.get_fence()) - self.assert_call_counts(0, 0, 0) - self.assert_lock_session_id(2) - - def test_get_fence_when_server_closes_old_session(self): - # Session id is different than what we store in the - # dict. The old session must be closed while we were - # holding the lock. - self.prepare_get_session(2) - self.prepare_lock_session_ids(1) - - with self.assertRaises(LockOwnershipLostError): - self.proxy.get_fence() - - self.assert_call_counts(0, 0, 0) - self.assert_no_lock_session_id() - - def test_get_fence_when_there_is_no_session(self): - # No active session for the current thread. - self.prepare_get_session(-1) - - with self.assertRaises(IllegalMonitorStateError): - self.proxy.get_fence() - - self.assert_call_counts(0, 0, 0) - self.assert_no_lock_session_id() - - def test_get_fence_when_server_returns_a_different_thread_id_for_lock_holder(self): - # Client thinks that it holds the lock, but server - # says it's not. - self.prepare_get_session(1) - self.prepare_lock_session_ids(1) - state = self.prepare_state(3, 1, 2, thread_id() - 1) - self.mock_request_get_lock_ownership_state(state) - - with self.assertRaises(LockOwnershipLostError): - self.proxy.get_fence() - - self.assert_call_counts(0, 0, 0) - self.assert_no_lock_session_id() - - def test_get_fence_when_not_holding_the_lock(self): - # Client is not holding the lock. - self.prepare_get_session(1) - state = self.prepare_state(3, 1, 2, thread_id() - 1) - self.mock_request_get_lock_ownership_state(state) - - with self.assertRaises(IllegalMonitorStateError): - self.proxy.get_fence() - - self.assert_call_counts(0, 0, 0) - self.assert_no_lock_session_id() - - def test_get_fence_on_unspecified_error(self): - # Server sends an unspecified error - self.prepare_get_session(1) - self.mock_request_get_lock_ownership_state(None, HazelcastRuntimeError()) - - with self.assertRaises(HazelcastRuntimeError): - self.proxy.get_fence() - - self.assert_call_counts(0, 0, 0) - self.assert_no_lock_session_id() - def test_is_locked(self): # Everything succeeds, client holds the lock self.prepare_get_session(2) From 740ddb51fbf27925ae642560eb3f92187355aee3 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 2 Nov 2020 10:12:23 +0300 Subject: [PATCH 6/6] advise using the blocking mode --- hazelcast/proxy/cp/fenced_lock.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/hazelcast/proxy/cp/fenced_lock.py b/hazelcast/proxy/cp/fenced_lock.py index fb572ad30a..ec27aa431a 100644 --- a/hazelcast/proxy/cp/fenced_lock.py +++ b/hazelcast/proxy/cp/fenced_lock.py @@ -32,6 +32,22 @@ class FencedLock(SessionAwareCPProxy): FencedLock does not block a lock call. Instead, it fails with ``LockAcquireLimitReachedError`` or a specified return value. Please check the locking methods to see details about the behaviour. + + It is advised to use this proxy in a blocking mode. Although it is + possible, non-blocking usage requires an extra care. FencedLock + uses the id of the thread that makes the request to distinguish lock + owners. When used in a non-blocking mode, added callbacks or + continuations are not generally executed in the thread that makes the + request. That causes the code below to fail most of the time since the + lock is acquired on the main thread but, unlock request is done in another + thread. :: + + lock = client.cp_subsystem.get_lock("lock") + + def cb(_): + lock.unlock() + + lock.lock().add_done_callback(cb) """ INVALID_FENCE = 0