Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hazelcast/proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
SET_SERVICE: Set,
TOPIC_SERVICE: Topic,
PN_COUNTER_SERVICE: PNCounter,
FLAKE_ID_GENERATOR_SERVICE: FlakeIdGenerator
FLAKE_ID_GENERATOR_SERVICE: FlakeIdGenerator,
}


Expand Down
48 changes: 37 additions & 11 deletions hazelcast/proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from hazelcast import six
from hazelcast.util import get_attr_name

MAX_SIZE = float('inf')
MAX_SIZE = float("inf")


def _no_op_response_handler(_):
Expand Down Expand Up @@ -55,12 +55,16 @@ def _invoke_on_target(self, request, uuid, response_handler=_no_op_response_hand

def _invoke_on_key(self, request, key_data, response_handler=_no_op_response_handler):
partition_id = self._partition_service.get_partition_id(key_data)
invocation = Invocation(request, partition_id=partition_id, response_handler=response_handler)
invocation = Invocation(
request, partition_id=partition_id, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future

def _invoke_on_partition(self, request, partition_id, response_handler=_no_op_response_handler):
invocation = Invocation(request, partition_id=partition_id, response_handler=response_handler)
invocation = Invocation(
request, partition_id=partition_id, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future

Expand All @@ -78,7 +82,9 @@ def __init__(self, service_name, name, context):
self._partition_id = context.partition_service.get_partition_id(partition_key)

def _invoke(self, request, response_handler=_no_op_response_handler):
invocation = Invocation(request, partition_id=self._partition_id, response_handler=response_handler)
invocation = Invocation(
request, partition_id=self._partition_id, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future

Expand All @@ -95,7 +101,9 @@ def __init__(self, name, transaction, context):
self._to_data = serialization_service.to_data

def _invoke(self, request, response_handler=_no_op_response_handler):
invocation = Invocation(request, connection=self.transaction.connection, response_handler=response_handler)
invocation = Invocation(
request, connection=self.transaction.connection, response_handler=response_handler
)
self._invocation_service.invoke(invocation)
return invocation.future

Expand Down Expand Up @@ -202,8 +210,17 @@ class EntryEvent(object):
number_of_affected_entries (int): Number of affected entries by this event.
"""

def __init__(self, to_object, key, value, old_value, merging_value, event_type, uuid,
number_of_affected_entries):
def __init__(
self,
to_object,
key,
value,
old_value,
merging_value,
event_type,
uuid,
number_of_affected_entries,
):
self._to_object = to_object
self._key_data = key
self._value_data = value
Expand Down Expand Up @@ -234,10 +251,19 @@ def merging_value(self):
return self._to_object(self._merging_value_data)

def __repr__(self):
return "EntryEvent(key=%s, value=%s, old_value=%s, merging_value=%s, event_type=%s, uuid=%s, " \
"number_of_affected_entries=%s)" % (self.key, self.value, self.old_value, self.merging_value,
get_attr_name(EntryEventType, self.event_type), self.uuid,
self.number_of_affected_entries)
return (
"EntryEvent(key=%s, value=%s, old_value=%s, merging_value=%s, event_type=%s, uuid=%s, "
"number_of_affected_entries=%s)"
% (
self.key,
self.value,
self.old_value,
self.merging_value,
get_attr_name(EntryEventType, self.event_type),
self.uuid,
self.number_of_affected_entries,
)
)


class TopicMessage(object):
Expand Down
4 changes: 3 additions & 1 deletion hazelcast/proxy/cp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def _invoke(self, request, response_handler=_no_op_response_handler):

class SessionAwareCPProxy(BaseCPProxy):
def __init__(self, context, group_id, service_name, proxy_name, object_name):
super(SessionAwareCPProxy, self).__init__(context, group_id, service_name, proxy_name, object_name)
super(SessionAwareCPProxy, self).__init__(
context, group_id, service_name, proxy_name, object_name
)
self._session_manager = context.proxy_session_manager

def get_group_id(self):
Expand Down
12 changes: 9 additions & 3 deletions hazelcast/proxy/cp/atomic_long.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from hazelcast.protocol.codec import atomic_long_add_and_get_codec, atomic_long_compare_and_set_codec, \
atomic_long_get_codec, atomic_long_get_and_add_codec, atomic_long_get_and_set_codec, atomic_long_alter_codec, \
atomic_long_apply_codec
from hazelcast.protocol.codec import (
atomic_long_add_and_get_codec,
atomic_long_compare_and_set_codec,
atomic_long_get_codec,
atomic_long_get_and_add_codec,
atomic_long_get_and_set_codec,
atomic_long_alter_codec,
atomic_long_apply_codec,
)
from hazelcast.proxy.cp import BaseCPProxy
from hazelcast.util import check_not_none, check_is_int

Expand Down
9 changes: 7 additions & 2 deletions hazelcast/proxy/cp/atomic_reference.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from hazelcast.protocol.codec import atomic_ref_compare_and_set_codec, atomic_ref_get_codec, atomic_ref_set_codec, \
atomic_ref_contains_codec, atomic_ref_apply_codec
from hazelcast.protocol.codec import (
atomic_ref_compare_and_set_codec,
atomic_ref_get_codec,
atomic_ref_set_codec,
atomic_ref_contains_codec,
atomic_ref_apply_codec,
)
from hazelcast.proxy.cp import BaseCPProxy
from hazelcast.util import check_true, check_not_none

Expand Down
17 changes: 13 additions & 4 deletions hazelcast/proxy/cp/count_down_latch.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import uuid

from hazelcast.errors import OperationTimeoutError
from hazelcast.protocol.codec import count_down_latch_await_codec, count_down_latch_get_round_codec, \
count_down_latch_count_down_codec, count_down_latch_get_count_codec, count_down_latch_try_set_count_codec
from hazelcast.protocol.codec import (
count_down_latch_await_codec,
count_down_latch_get_round_codec,
count_down_latch_count_down_codec,
count_down_latch_get_count_codec,
count_down_latch_try_set_count_codec,
)
from hazelcast.proxy.cp import BaseCPProxy
from hazelcast.util import to_millis, check_true, check_is_number, check_is_int

Expand Down Expand Up @@ -64,7 +69,9 @@ def await_latch(self, timeout):
timeout = max(0, timeout)
invocation_uuid = uuid.uuid4()
codec = count_down_latch_await_codec
request = codec.encode_request(self._group_id, self._object_name, invocation_uuid, to_millis(timeout))
request = codec.encode_request(
self._group_id, self._object_name, invocation_uuid, to_millis(timeout)
)
return self._invoke(request, codec.decode_response)

def count_down(self):
Expand Down Expand Up @@ -136,5 +143,7 @@ def _get_round(self):

def _request_count_down(self, expected_round, invocation_uuid):
codec = count_down_latch_count_down_codec
request = codec.encode_request(self._group_id, self._object_name, invocation_uuid, expected_round)
request = codec.encode_request(
self._group_id, self._object_name, invocation_uuid, expected_round
)
return self._invoke(request)
79 changes: 56 additions & 23 deletions hazelcast/proxy/cp/fenced_lock.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import time
import uuid

from hazelcast.errors import LockOwnershipLostError, LockAcquireLimitReachedError, SessionExpiredError, \
WaitKeyCancelledError, IllegalMonitorStateError
from hazelcast.errors import (
LockOwnershipLostError,
LockAcquireLimitReachedError,
SessionExpiredError,
WaitKeyCancelledError,
IllegalMonitorStateError,
)
from hazelcast.future import ImmediateExceptionFuture
from hazelcast.protocol.codec import fenced_lock_lock_codec, fenced_lock_try_lock_codec, fenced_lock_unlock_codec, \
fenced_lock_get_lock_ownership_codec
from hazelcast.protocol.codec import (
fenced_lock_lock_codec,
fenced_lock_try_lock_codec,
fenced_lock_unlock_codec,
fenced_lock_get_lock_ownership_codec,
)
from hazelcast.proxy.cp import SessionAwareCPProxy
from hazelcast.util import thread_id, to_millis

Expand Down Expand Up @@ -202,7 +211,9 @@ def check_response(f):
self._lock_session_ids.pop(current_thread_id, None)
raise e

return self._request_unlock(session_id, current_thread_id, uuid.uuid4()).continue_with(check_response)
return self._request_unlock(session_id, current_thread_id, uuid.uuid4()).continue_with(
check_response
)

def is_locked(self):
"""Returns whether this lock is locked or not.
Expand Down Expand Up @@ -314,9 +325,11 @@ def check_fence(fence):
return self._do_lock(current_thread_id, invocation_uuid)
except WaitKeyCancelledError:
self._release_session(session_id)
error = IllegalMonitorStateError("Lock(%s) not acquired because the lock call on the CP group "
"is cancelled, possibly because of another indeterminate call "
"from the same thread." % self._object_name)
error = IllegalMonitorStateError(
"Lock(%s) not acquired because the lock call on the CP group "
"is cancelled, possibly because of another indeterminate call "
"from the same thread." % self._object_name
)
raise error
except Exception as e:
self._release_session(session_id)
Expand All @@ -327,11 +340,14 @@ def check_fence(fence):
return fence

self._release_session(session_id)
error = LockAcquireLimitReachedError("Lock(%s) reentrant lock limit is already reached!"
% self._object_name)
error = LockAcquireLimitReachedError(
"Lock(%s) reentrant lock limit is already reached!" % self._object_name
)
raise error

return self._request_lock(session_id, current_thread_id, invocation_uuid).continue_with(check_fence)
return self._request_lock(session_id, current_thread_id, invocation_uuid).continue_with(
check_fence
)

return self._acquire_session().continue_with(do_lock_once)

Expand Down Expand Up @@ -367,8 +383,9 @@ def check_fence(fence):

return fence

return self._request_try_lock(session_id, current_thread_id, invocation_uuid, timeout).continue_with(
check_fence)
return self._request_try_lock(
session_id, current_thread_id, invocation_uuid, timeout
).continue_with(check_fence)

return self._acquire_session().continue_with(do_try_lock_once)

Expand All @@ -387,30 +404,42 @@ def _verify_no_locked_session_id_present(self, current_thread_id):
raise self._new_lock_ownership_lost_error(lock_session_id)

def _new_lock_ownership_lost_error(self, lock_session_id):
error = LockOwnershipLostError("Current thread is not the owner of the Lock(%s) because its "
"Session(%s) is closed by the server." % (self._proxy_name, lock_session_id))
error = LockOwnershipLostError(
"Current thread is not the owner of the Lock(%s) because its "
"Session(%s) is closed by the server." % (self._proxy_name, lock_session_id)
)
return error

def _new_illegal_monitor_state_error(self):
error = IllegalMonitorStateError("Current thread is not the owner of the Lock(%s)" % self._proxy_name)
error = IllegalMonitorStateError(
"Current thread is not the owner of the Lock(%s)" % self._proxy_name
)
return error

def _request_lock(self, session_id, current_thread_id, invocation_uuid):
codec = fenced_lock_lock_codec
request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id,
invocation_uuid)
request = codec.encode_request(
self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid
)
return self._invoke(request, codec.decode_response)

def _request_try_lock(self, session_id, current_thread_id, invocation_uuid, timeout):
codec = fenced_lock_try_lock_codec
request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id,
invocation_uuid, to_millis(timeout))
request = codec.encode_request(
self._group_id,
self._object_name,
session_id,
current_thread_id,
invocation_uuid,
to_millis(timeout),
)
return self._invoke(request, codec.decode_response)

def _request_unlock(self, session_id, current_thread_id, invocation_uuid):
codec = fenced_lock_unlock_codec
request = codec.encode_request(self._group_id, self._object_name, session_id, current_thread_id,
invocation_uuid)
request = codec.encode_request(
self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid
)
return self._invoke(request, codec.decode_response)

def _request_get_lock_ownership_state(self):
Expand All @@ -430,4 +459,8 @@ def is_locked(self):
return self.fence != FencedLock.INVALID_FENCE

def is_locked_by(self, session_id, current_thread_id):
return self.is_locked() and self.session_id == session_id and self.thread_id == current_thread_id
return (
self.is_locked()
and self.session_id == session_id
and self.thread_id == current_thread_id
)
Loading