Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 104 additions & 41 deletions hazelcast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,26 @@
from hazelcast.listener import ListenerService, ClusterViewListenerService
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
from hazelcast.partition import PartitionService, _InternalPartitionService
from hazelcast.protocol.codec import client_get_distributed_objects_codec, \
client_add_distributed_object_listener_codec, client_remove_distributed_object_listener_codec
from hazelcast.proxy import ProxyManager, MAP_SERVICE, QUEUE_SERVICE, LIST_SERVICE, SET_SERVICE, MULTI_MAP_SERVICE, \
REPLICATED_MAP_SERVICE, RINGBUFFER_SERVICE, \
TOPIC_SERVICE, RELIABLE_TOPIC_SERVICE, \
EXECUTOR_SERVICE, PN_COUNTER_SERVICE, FLAKE_ID_GENERATOR_SERVICE
from hazelcast.protocol.codec import (
client_get_distributed_objects_codec,
client_add_distributed_object_listener_codec,
client_remove_distributed_object_listener_codec,
)
from hazelcast.proxy import (
ProxyManager,
MAP_SERVICE,
QUEUE_SERVICE,
LIST_SERVICE,
SET_SERVICE,
MULTI_MAP_SERVICE,
REPLICATED_MAP_SERVICE,
RINGBUFFER_SERVICE,
TOPIC_SERVICE,
RELIABLE_TOPIC_SERVICE,
EXECUTOR_SERVICE,
PN_COUNTER_SERVICE,
FLAKE_ID_GENERATOR_SERVICE,
)
from hazelcast.near_cache import NearCacheManager
from hazelcast.reactor import AsyncoreReactor
from hazelcast.serialization import SerializationServiceV1
Expand Down Expand Up @@ -313,45 +327,71 @@ def __init__(self, **kwargs):
self._invocation_service = InvocationService(self, self._reactor)
self._address_provider = self._create_address_provider()
self._internal_partition_service = _InternalPartitionService(self)
self.partition_service = PartitionService(self._internal_partition_service, self._serialization_service)
self.partition_service = PartitionService(
self._internal_partition_service, self._serialization_service
)
self._internal_cluster_service = _InternalClusterService(self)
self.cluster_service = ClusterService(self._internal_cluster_service)
self._connection_manager = ConnectionManager(self, self._reactor, self._address_provider,
self._internal_lifecycle_service,
self._internal_partition_service,
self._internal_cluster_service,
self._invocation_service,
self._near_cache_manager)
self._connection_manager = ConnectionManager(
self,
self._reactor,
self._address_provider,
self._internal_lifecycle_service,
self._internal_partition_service,
self._internal_cluster_service,
self._invocation_service,
self._near_cache_manager,
)
self._load_balancer = self._init_load_balancer(config)
self._listener_service = ListenerService(self, self._connection_manager,
self._invocation_service)
self._listener_service = ListenerService(
self, self._connection_manager, self._invocation_service
)
self._proxy_manager = ProxyManager(self._context)
self.cp_subsystem = CPSubsystem(self._context)
self._proxy_session_manager = ProxySessionManager(self._context)
self._transaction_manager = TransactionManager(self._context)
self._lock_reference_id_generator = AtomicInteger(1)
self._statistics = Statistics(self, self._reactor, self._connection_manager,
self._invocation_service, self._near_cache_manager)
self._cluster_view_listener = ClusterViewListenerService(self, self._connection_manager,
self._internal_partition_service,
self._internal_cluster_service,
self._invocation_service)
self._statistics = Statistics(
self,
self._reactor,
self._connection_manager,
self._invocation_service,
self._near_cache_manager,
)
self._cluster_view_listener = ClusterViewListenerService(
self,
self._connection_manager,
self._internal_partition_service,
self._internal_cluster_service,
self._invocation_service,
)
self._shutdown_lock = threading.RLock()
self._init_context()
self._start()

def _init_context(self):
self._context.init_context(self.config, self._invocation_service, self._internal_partition_service,
self._internal_cluster_service, self._connection_manager,
self._serialization_service, self._listener_service, self._proxy_manager,
self._near_cache_manager, self._lock_reference_id_generator,
self.name, self._proxy_session_manager, self._reactor)
self._context.init_context(
self.config,
self._invocation_service,
self._internal_partition_service,
self._internal_cluster_service,
self._connection_manager,
self._serialization_service,
self._listener_service,
self._proxy_manager,
self._near_cache_manager,
self._lock_reference_id_generator,
self.name,
self._proxy_session_manager,
self._reactor,
)

def _start(self):
self._reactor.start()
try:
self._invocation_service.init(self._internal_partition_service, self._connection_manager,
self._listener_service)
self._invocation_service.init(
self._internal_partition_service, self._connection_manager, self._listener_service
)
self._internal_lifecycle_service.start()
membership_listeners = self.config.membership_listeners
self._internal_cluster_service.start(self._connection_manager, membership_listeners)
Expand Down Expand Up @@ -540,9 +580,12 @@ def handle_distributed_object_event(name, service_name, event_type, source):
def event_handler(client_message):
return codec.handle(client_message, handle_distributed_object_event)

return self._listener_service.register_listener(request, codec.decode_response,
client_remove_distributed_object_listener_codec.encode_request,
event_handler)
return self._listener_service.register_listener(
request,
codec.decode_response,
client_remove_distributed_object_listener_codec.encode_request,
event_handler,
)

def remove_distributed_object_listener(self, registration_id):
"""Removes the specified distributed object listener.
Expand Down Expand Up @@ -573,14 +616,20 @@ def get_distributed_objects(self):
distributed_objects = self._proxy_manager.get_distributed_objects()
local_distributed_object_infos = set()
for dist_obj in distributed_objects:
local_distributed_object_infos.add(DistributedObjectInfo(dist_obj.service_name, dist_obj.name))
local_distributed_object_infos.add(
DistributedObjectInfo(dist_obj.service_name, dist_obj.name)
)

for dist_obj_info in response:
local_distributed_object_infos.discard(dist_obj_info)
self._proxy_manager.get_or_create(dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False)
self._proxy_manager.get_or_create(
dist_obj_info.service_name, dist_obj_info.name, create_on_remote=False
)

for dist_obj_info in local_distributed_object_infos:
self._proxy_manager.destroy_proxy(dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False)
self._proxy_manager.destroy_proxy(
dist_obj_info.service_name, dist_obj_info.name, destroy_on_remote=False
)

return self._proxy_manager.get_distributed_objects()

Expand All @@ -605,9 +654,11 @@ def _create_address_provider(self):
cloud_discovery_token = config.cloud_discovery_token
cloud_enabled = cloud_discovery_token is not None
if address_list_provided and cloud_enabled:
raise IllegalStateError("Only one discovery method can be enabled at a time. "
"Cluster members given explicitly: %s, Hazelcast Cloud enabled: %s"
% (address_list_provided, cloud_enabled))
raise IllegalStateError(
"Only one discovery method can be enabled at a time. "
"Cluster members given explicitly: %s, Hazelcast Cloud enabled: %s"
% (address_list_provided, cloud_enabled)
)

if cloud_enabled:
connection_timeout = self._get_connection_timeout(config)
Expand Down Expand Up @@ -654,10 +705,22 @@ def __init__(self):
self.proxy_session_manager = None
self.reactor = None

def init_context(self, config, invocation_service, partition_service,
cluster_service, connection_manager, serialization_service,
listener_service, proxy_manager, near_cache_manager,
lock_reference_id_generator, name, proxy_session_manager, reactor):
def init_context(
self,
config,
invocation_service,
partition_service,
cluster_service,
connection_manager,
serialization_service,
listener_service,
proxy_manager,
near_cache_manager,
lock_reference_id_generator,
name,
proxy_session_manager,
reactor,
):
self.config = config
self.invocation_service = invocation_service
self.partition_service = partition_service
Expand Down
36 changes: 25 additions & 11 deletions hazelcast/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ def __init__(self, client_uuid, address, name, labels):
self.labels = labels

def __repr__(self):
return "ClientInfo(uuid=%s, address=%s, name=%s, labels=%s)" % (self.uuid, self.address, self.name, self.labels)
return "ClientInfo(uuid=%s, address=%s, name=%s, labels=%s)" % (
self.uuid,
self.address,
self.name,
self.labels,
)


_EMPTY_SNAPSHOT = _MemberListSnapshot(-1, OrderedDict())
Expand Down Expand Up @@ -103,7 +108,6 @@ def get_members(self, member_selector=None):


class _InternalClusterService(object):

def __init__(self, client):
self._client = client
self._connection_manager = None
Expand Down Expand Up @@ -150,7 +154,9 @@ def get_local_client(self):
connection_manager = self._connection_manager
connection = connection_manager.get_random_connection()
local_address = None if not connection else connection.local_address
return ClientInfo(connection_manager.client_uuid, local_address, self._client.name, self._labels)
return ClientInfo(
connection_manager.client_uuid, local_address, self._client.name, self._labels
)

def add_listener(self, member_added=None, member_removed=None, fire_for_existing=False):
registration_id = str(uuid.uuid4())
Expand Down Expand Up @@ -192,8 +198,11 @@ def clear_member_list_version(self):
def handle_members_view_event(self, version, member_infos):
snapshot = self._create_snapshot(version, member_infos)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug("Handling new snapshot with membership version: %s, member string: %s",
version, self._members_string(snapshot))
_logger.debug(
"Handling new snapshot with membership version: %s, member string: %s",
version,
self._members_string(snapshot),
)

current = self._member_list_snapshot
if version >= current.version:
Expand Down Expand Up @@ -235,9 +244,14 @@ def _detect_membership_events(self, old, new):
for dead_member in dead_members:
connection = self._connection_manager.get_connection(dead_member.uuid)
if connection:
connection.close(None, TargetDisconnectedError("The client has closed the connection to this member, "
"after receiving a member left event from the cluster. "
"%s" % connection))
connection.close(
None,
TargetDisconnectedError(
"The client has closed the connection to this member, "
"after receiving a member left event from the cluster. "
"%s" % connection
),
)

if (len(new_members) + len(dead_members)) > 0:
if len(new.members) > 0:
Expand All @@ -261,7 +275,7 @@ def _create_snapshot(version, member_infos):

class VectorClock(object):
"""Vector clock consisting of distinct replica logical clocks.

The vector clock may be read from different thread but concurrent
updates must be synchronized externally. There is no guarantee for
concurrent updates.
Expand Down Expand Up @@ -309,15 +323,15 @@ def entry_set(self):
"""Returns the entry set of the replica timestamps in a format of list of tuples.

Each tuple contains the replica ID and the timestamp associated with it.

Returns:
list: List of tuples.
"""
return list(self._replica_timestamps.items())

def size(self):
"""Returns the number of timestamps that are in the replica timestamps dictionary.

Returns:
int: Number of timestamps in the replica timestamps.
"""
Expand Down
Loading