Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@
* [7.4.11.1. Configuring Flake ID Generator](#74111-configuring-flake-id-generator)
* [7.4.12. CP Subsystem](#7412-cp-subsystem)
* [7.4.12.1. Using AtomicLong](#74121-using-atomiclong)
* [7.4.12.2. Using CountDownLatch](#74122-using-countdownlatch)
* [7.4.12.3. Using AtomicReference](#74123-using-atomicreference)
* [7.4.12.2. Using Lock](#74122-using-lock)
* [7.4.12.3. Using CountDownLatch](#74123-using-countdownlatch)
* [7.4.12.4. Using AtomicReference](#74124-using-atomicreference)
* [7.5. Distributed Events](#75-distributed-events)
* [7.5.1. Cluster Events](#751-cluster-events)
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
Expand Down Expand Up @@ -1577,7 +1578,7 @@ Refer to [CP Subsystem](https://docs.hazelcast.org/docs/latest/manual/html-singl
Data structures in CP Subsystem run in CP groups. Each CP group elects its own Raft leader and runs the Raft consensus algorithm independently.
The CP data structures differ from the other Hazelcast data structures in two aspects.
First, an internal commit is performed on the METADATA CP group every time you fetch a proxy from this interface.
Hence, callers should cache returned proxy objects. Second, if you call `DistributedObject.destroy()` on a CP data structure proxy,
Hence, callers should cache returned proxy objects. Second, if you call `distributed_object.destroy()` on a CP data structure proxy,
that data structure is terminated on the underlying CP group and cannot be reinitialized until the CP group is force-destroyed.
For this reason, please make sure that you are completely done with a CP data structure before destroying its proxy.

Expand Down Expand Up @@ -1610,7 +1611,59 @@ print ('CAS operation result:', result)
AtomicLong implementation does not offer exactly-once / effectively-once execution semantics. It goes with at-least-once execution semantics by default and can cause an API call to be committed multiple times in case of CP member failures.
It can be tuned to offer at-most-once execution semantics. Please see [`fail-on-indeterminate-operation-state`](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-subsystem-configuration) server-side setting.

#### 7.4.12.2. Using CountDownLatch
#### 7.4.12.2. Using Lock

Hazelcast `FencedLock` is the distributed and reentrant implementation of a linearizable lock.
It is CP with respect to the CAP principle. It works on top of the Raft consensus algorithm.
It offers linearizability during crash-stop failures and network partitions.
If a network partition occurs, it remains available on at most one side of the partition.

A basic Lock usage example is shown below.

```python
# Get a FencedLock called "my-lock"
lock = client.cp_subsystem.get_lock("my-lock").blocking()
# Acquire the lock and get the fencing token
fence = lock.lock()
try:
# Your guarded code goes here
pass
finally:
# Make sure to release the lock
lock.unlock()
```

FencedLock works on top of CP sessions. It keeps a CP session open while the lock is acquired. Please refer to [CP Session](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#cp-sessions) documentation for more information.

By default, FencedLock is reentrant. Once a caller acquires the lock, it can acquire the lock reentrantly as many times as it wants in a linearizable manner.
You can configure the reentrancy behavior on the member side. For instance, reentrancy can be disabled and FencedLock can work as a non-reentrant mutex.
You can also set a custom reentrancy limit. When the reentrancy limit is already reached, FencedLock does not block a lock call.
Instead, it fails with ``LockAcquireLimitReachedError`` or a specified return value.

Distributed locks are unfortunately *not equivalent* to single-node mutexes because of the complexities in distributed systems, such as uncertain communication patterns, and independent and partial failures.
In an asynchronous network, no lock service can guarantee mutual exclusion, because there is no way to distinguish between a slow and a crashed process.
Consider the following scenario, where a Hazelcast client acquires a FencedLock, then hits a long pause.
Since it will not be able to commit session heartbeats while paused, its CP session will be eventually closed.
After this moment, another Hazelcast client can acquire this lock.
If the first client wakes up again, it may not immediately notice that it has lost ownership of the lock.
In this case, multiple clients think they hold the lock. If they attempt to perform an operation on a shared resource, they can break the system.
To prevent such situations, you can choose to use an infinite session timeout, but this time probably you are going to deal with liveliness issues.
For the scenario above, even if the first client actually crashes, requests sent by 2 clients can be re-ordered in the network and hit the external resource in reverse order.

There is a simple solution for this problem. Lock holders are ordered by a monotonic fencing token, which increments each time the lock is assigned to a new owner.
This fencing token can be passed to external services or resources to ensure sequential execution of side effects performed by lock holders.

The following diagram illustrates the idea. Client-1 acquires the lock first and receives ``1`` as its fencing token.
Then, it passes this token to the external service, which is our shared resource in this scenario.
Just after that, Client-1 hits a long GC pause and eventually loses ownership of the lock because it misses to commit CP session heartbeats.
Then, Client-2 chimes in and acquires the lock. Similar to Client-1, Client-2 passes its fencing token to the external service.
After that, once Client-1 comes back alive, its write request will be rejected by the external service, and only Client-2 will be able to safely talk to it.

![CP Fenced Lock diagram](https://docs.hazelcast.org/docs/latest/manual/html-single/images/FencedLock.png)

You can read more about the fencing token idea in Martin Kleppmann's "How to do distributed locking" blog post and Google's Chubby paper.

#### 7.4.12.3. Using CountDownLatch

Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch.
This data structure is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes.
Expand Down Expand Up @@ -1646,7 +1699,7 @@ print("Count is zero:", count_is_zero)

> **NOTE: CountDownLatch count can be reset with `try_set_count()` after a countdown has finished, but not during an active count.**

#### 7.4.12.3. Using AtomicReference
#### 7.4.12.4. Using AtomicReference

Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference.
It provides a set of atomic operations allowing to modify the value behind the reference.
Expand Down
4 changes: 4 additions & 0 deletions docs/hazelcast.proxy.cp.fenced_lock.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FencedLock
==============

.. automodule:: hazelcast.proxy.cp.fenced_lock
3 changes: 2 additions & 1 deletion docs/hazelcast.proxy.cp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ CP Proxies
.. toctree::
hazelcast.proxy.cp.atomic_long
hazelcast.proxy.cp.atomic_reference
hazelcast.proxy.cp.count_down_latch
hazelcast.proxy.cp.count_down_latch
hazelcast.proxy.cp.fenced_lock
25 changes: 25 additions & 0 deletions examples/cp/fenced_lock_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import hazelcast

client = hazelcast.HazelcastClient()

lock = client.cp_subsystem.get_lock("my-lock").blocking()

locked = lock.is_locked()
print("Locked initially:", locked)

fence = lock.lock()
print("Fence token:", fence)
try:
locked = lock.is_locked()
print("Locked after lock:", locked)

fence = lock.try_lock()
print("Locked reentrantly:", fence != lock.INVALID_FENCE)

# more guarded code
finally:
# unlock must be called for each successful lock request
lock.unlock()
lock.unlock()

client.shutdown()
14 changes: 7 additions & 7 deletions examples/org-website/lock_sample.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# TODO Fix this when we add CP Lock

import hazelcast

# Start the Hazelcast Client and connect to an already running Hazelcast Cluster on 127.0.0.1
# Start the Hazelcast Client and connect to an already running
# Hazelcast Cluster on 127.0.0.1
hz = hazelcast.HazelcastClient()
# Get a distributed lock called "my-distributed-lock"
lock = hz.get_lock("my-distributed-lock").blocking()
# Now create a lock and execute some guarded code.
lock.lock()
# Get the Distributed Lock from CP Subsystem
lock = hz.cp_subsystem.get_lock("my-distributed-lock").blocking()
# Now acquire the lock and execute some guarded code
fence = lock.lock()
print("Fence token:", fence)
try:
# do something here
pass
Expand Down
16 changes: 13 additions & 3 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from hazelcast.config import _Config
from hazelcast.connection import ConnectionManager, DefaultAddressProvider
from hazelcast.core import DistributedObjectInfo, DistributedObjectEvent
from hazelcast.cp import CPSubsystem
from hazelcast.cp import CPSubsystem, ProxySessionManager
from hazelcast.invocation import InvocationService, Invocation
from hazelcast.listener import ListenerService, ClusterViewListenerService
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
Expand Down Expand Up @@ -331,6 +331,7 @@ def __init__(self, **kwargs):
self._logger_extras)
self._proxy_manager = ProxyManager(self._context)
self.cp_subsystem = CPSubsystem(self._context)
self._proxy_session_manager = ProxySessionManager(self._context)
self._transaction_manager = TransactionManager(self._context, self._logger_extras)
self._lock_reference_id_generator = AtomicInteger(1)
self._statistics = Statistics(self, self._reactor, self._connection_manager,
Expand All @@ -348,7 +349,8 @@ def _init_context(self):
self._context.init_context(self.config, self._invocation_service, self._internal_partition_service,
self._internal_cluster_service, self._connection_manager,
self._serialization_service, self._listener_service, self._proxy_manager,
self._near_cache_manager, self._lock_reference_id_generator, self._logger_extras)
self._near_cache_manager, self._lock_reference_id_generator, self._logger_extras,
self.name, self._proxy_session_manager, self._reactor)

def _start(self):
self._reactor.start()
Expand Down Expand Up @@ -597,6 +599,7 @@ def shutdown(self):
if self._internal_lifecycle_service.running:
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN)
self._internal_lifecycle_service.shutdown()
self._proxy_session_manager.shutdown().result()
self._near_cache_manager.destroy_near_caches()
self._connection_manager.shutdown()
self._invocation_service.shutdown()
Expand Down Expand Up @@ -666,11 +669,15 @@ def __init__(self):
self.near_cache_manager = None
self.lock_reference_id_generator = None
self.logger_extras = None
self.name = None
self.proxy_session_manager = None
self.reactor = None

def init_context(self, config, invocation_service, partition_service,
cluster_service, connection_manager, serialization_service,
listener_service, proxy_manager, near_cache_manager,
lock_reference_id_generator, logger_extras):
lock_reference_id_generator, logger_extras, name,
proxy_session_manager, reactor):
self.config = config
self.invocation_service = invocation_service
self.partition_service = partition_service
Expand All @@ -682,3 +689,6 @@ def init_context(self, config, invocation_service, partition_service,
self.near_cache_manager = near_cache_manager
self.lock_reference_id_generator = lock_reference_id_generator
self.logger_extras = logger_extras
self.name = name
self.proxy_session_manager = proxy_session_manager
self.reactor = reactor
Loading