Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@
* [7.4.12. CP Subsystem](#7412-cp-subsystem)
* [7.4.12.1. Using AtomicLong](#74121-using-atomiclong)
* [7.4.12.2. Using Lock](#74122-using-lock)
* [7.4.12.3. Using CountDownLatch](#74123-using-countdownlatch)
* [7.4.12.4. Using AtomicReference](#74124-using-atomicreference)
* [7.4.12.3. Using Semaphore](#74123-using-semaphore)
* [7.4.12.3. Using CountDownLatch](#74124-using-countdownlatch)
* [7.4.12.4. Using AtomicReference](#74125-using-atomicreference)
* [7.5. Distributed Events](#75-distributed-events)
* [7.5.1. Cluster Events](#751-cluster-events)
* [7.5.1.1. Listening for Member Events](#7511-listening-for-member-events)
Expand Down Expand Up @@ -1663,7 +1664,65 @@ After that, once Client-1 comes back alive, its write request will be rejected b

You can read more about the fencing token idea in Martin Kleppmann's "How to do distributed locking" blog post and Google's Chubby paper.

#### 7.4.12.3. Using CountDownLatch

#### 7.4.12.3. Using Semaphore

Hazelcast `Semaphore` is the distributed implementation of a linearizable and distributed semaphore. It offers multiple operations for acquiring the permits.
This data structure is a part of CP Subsystem.

Semaphore is a cluster-wide counting semaphore. Conceptually, it maintains a set of permits. Each `acquire()` waits if necessary until a permit is available, and then takes it.
Dually, each `release()` adds a permit, potentially releasing a waiting acquirer. However, no actual permit objects are used; the semaphore just keeps a count of the number available and acts accordingly.

A basic Semaphore usage example is shown below.

```python
# Get a Semaphore called "my-semaphore"
semaphore = client.cp_subsystem.get_semaphore("my-semaphore").blocking()
# Try to initialize the semaphore
# (does nothing if the semaphore is already initialized)
semaphore.init(3)
# Acquire 3 permits out of 3
semaphore.acquire(3)
# Release 2 permits
semaphore.release(2)
# Check available permits
available = semaphore.available_permits()
print("Available:", available)
# Prints:
# Available: 2
```

Beware of the increased risk of indefinite postponement when using the multiple-permit acquire. If permits are released one by one, a caller waiting for one permit will acquire it before a caller waiting for multiple permits regardless of the call order.
Correct usage of a semaphore is established by programming convention in the application.

As an alternative, potentially safer approach to the multiple-permit acquire, you can use the `try_acquire()` method of Semaphore.
It tries to acquire the permits in optimistic manner and immediately returns with a `bool` operation result. It also accepts an optional `timeout` argument which specifies the timeout in seconds to acquire the permits before giving up.

```python
# Try to acquire 2 permits
success = semaphore.try_acquire(2)
# Check for the result of the acquire request
if success:
try:
pass
# Your guarded code goes here
finally:
# Make sure to release the permits
semaphore.release(2)
```

Semaphore data structure has two variations:

* The default implementation is session-aware. In this one, when a caller makes its very first `acquire()` call,
it starts a new CP session with the underlying CP group. Then, liveliness of the caller is tracked via this CP session.
When the caller fails, permits acquired by this caller are automatically and safely released.
However, the session-aware version comes with a limitation, that is, a Hazelcast client cannot release permits before acquiring them first.
In other words, a client can release only the permits it has acquired earlier.
* The second implementation is sessionless. This one does not perform auto-cleanup of acquired permits on failures.
Acquired permits are not bound to callers and permits can be released without acquiring first. However, you need to handle failed permit owners on your own.
If a Hazelcast server or a client fails while holding some permits, they will not be automatically released. You can use the sessionless CP Semaphore implementation by enabling JDK compatibility `jdk-compatible` server-side setting. Refer to [Semaphore configuration](https://docs.hazelcast.org/docs/latest/manual/html-single/index.html#semaphore-configuration) documentation for more details.

#### 7.4.12.4. Using CountDownLatch

Hazelcast `CountDownLatch` is the distributed implementation of a linearizable and distributed countdown latch.
This data structure is a cluster-wide synchronization aid that allows one or more callers to wait until a set of operations being performed in other callers completes.
Expand Down Expand Up @@ -1699,7 +1758,7 @@ print("Count is zero:", count_is_zero)

> **NOTE: CountDownLatch count can be reset with `try_set_count()` after a countdown has finished, but not during an active count.**

#### 7.4.12.4. Using AtomicReference
#### 7.4.12.5. Using AtomicReference

Hazelcast `AtomicReference` is the distributed implementation of a linearizable object reference.
It provides a set of atomic operations allowing to modify the value behind the reference.
Expand Down
3 changes: 2 additions & 1 deletion docs/hazelcast.proxy.cp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ CP Proxies
hazelcast.proxy.cp.atomic_long
hazelcast.proxy.cp.atomic_reference
hazelcast.proxy.cp.count_down_latch
hazelcast.proxy.cp.fenced_lock
hazelcast.proxy.cp.fenced_lock
hazelcast.proxy.cp.semaphore
4 changes: 4 additions & 0 deletions docs/hazelcast.proxy.cp.semaphore.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Semaphore
=========

.. autoclass:: hazelcast.proxy.cp.semaphore.Semaphore
20 changes: 20 additions & 0 deletions examples/cp/semaphore_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import hazelcast

client = hazelcast.HazelcastClient()

semaphore = client.cp_subsystem.get_semaphore("my-semaphore").blocking()

initialized = semaphore.init(3)
print("Initialized:", initialized)
available = semaphore.available_permits()
print("Available:", available)

semaphore.acquire(3)
available = semaphore.available_permits()
print("Available after acquire:", available)

semaphore.release(2)
available = semaphore.available_permits()
print("Available after release:", available)

client.shutdown()
42 changes: 40 additions & 2 deletions hazelcast/cp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from hazelcast.future import ImmediateExceptionFuture, ImmediateFuture, combine_futures
from hazelcast.invocation import Invocation
from hazelcast.protocol.codec import cp_group_create_cp_group_codec, cp_session_heartbeat_session_codec, \
cp_session_create_session_codec, cp_session_close_session_codec, cp_session_generate_thread_id_codec
cp_session_create_session_codec, cp_session_close_session_codec, cp_session_generate_thread_id_codec, \
semaphore_get_semaphore_type_codec
from hazelcast.proxy.cp.atomic_long import AtomicLong
from hazelcast.proxy.cp.atomic_reference import AtomicReference
from hazelcast.proxy.cp.count_down_latch import CountDownLatch
from hazelcast.proxy.cp.fenced_lock import FencedLock
from hazelcast.proxy.cp.semaphore import SessionAwareSemaphore, SessionlessSemaphore
from hazelcast.util import check_true, AtomicInteger, thread_id


Expand Down Expand Up @@ -108,7 +110,7 @@ def get_lock(self, name):

The instance is created on CP Subsystem.

If no group name is given within the `name` argument, then the
If no group name is given within the ``name`` argument, then the
FencedLock instance will be created on the DEFAULT CP group.
If a group name is given, like ``.get_lock("myLock@group1")``,
the given group will be initialized first, if not initialized
Expand All @@ -123,6 +125,25 @@ def get_lock(self, name):
"""
return self._proxy_manager.get_or_create(LOCK_SERVICE, name)

def get_semaphore(self, name):
"""Returns the distributed Semaphore instance instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the ``name`` argument, then the
Semaphore instance will be created on the DEFAULT CP group.
If a group name is given, like ``.get_semaphore("mySemaphore@group1")``,
the given group will be initialized first, if not initialized
already, and then the instance will be created on this group.

Args:
name (str): Name of the Semaphore

Returns:
hazelcast.proxy.cp.semaphore.Semaphore: The Semaphore proxy for the given name.
"""
return self._proxy_manager.get_or_create(SEMAPHORE_SERVICE, name)


_DEFAULT_GROUP_NAME = "default"

Expand Down Expand Up @@ -156,6 +177,7 @@ def _get_object_name_for_proxy(name):
ATOMIC_REFERENCE_SERVICE = "hz:raft:atomicRefService"
COUNT_DOWN_LATCH_SERVICE = "hz:raft:countDownLatchService"
LOCK_SERVICE = "hz:raft:lockService"
SEMAPHORE_SERVICE = "hz:raft:semaphoreService"


class CPProxyManager(object):
Expand All @@ -177,6 +199,10 @@ def get_or_create(self, service_name, proxy_name):
return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name)
elif service_name == LOCK_SERVICE:
return self._create_fenced_lock(group_id, proxy_name, object_name)
elif service_name == SEMAPHORE_SERVICE:
return self._create_semaphore(group_id, proxy_name, object_name)
else:
raise ValueError("Unknown service name: %s" % service_name)

def _create_fenced_lock(self, group_id, proxy_name, object_name):
with self._mux:
Expand All @@ -191,6 +217,18 @@ def _create_fenced_lock(self, group_id, proxy_name, object_name):
self._lock_proxies[proxy_name] = proxy
return proxy

def _create_semaphore(self, group_id, proxy_name, object_name):
codec = semaphore_get_semaphore_type_codec
request = codec.encode_request(proxy_name)
invocation = Invocation(request, response_handler=codec.decode_response)
invocation_service = self._context.invocation_service
invocation_service.invoke(invocation)
jdk_compatible = invocation.future.result()
if jdk_compatible:
return SessionlessSemaphore(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name)
else:
return SessionAwareSemaphore(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name)

def _get_group_id(self, proxy_name):
codec = cp_group_create_cp_group_codec
request = codec.encode_request(proxy_name)
Expand Down
7 changes: 0 additions & 7 deletions hazelcast/proxy/cp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ def get_group_id(self):
"""
return self._group_id

def _get_or_create_unique_thread_id(self):
"""
Returns:
hazelcast.future.Future[int]: Cluster-wide unique thread id.
"""
return self._session_manager.get_or_create_unique_thread_id(self._group_id)

def _get_session_id(self):
"""
Returns:
Expand Down
Loading