Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, client, reactor, address_provider, lifecycle_service,
partition_service, cluster_service, invocation_service,
near_cache_manager):
self.live = False
self.active_connections = dict()
self.active_connections = dict() # uuid to connection, must be modified under the _lock
self.client_uuid = uuid.uuid4()

self._client = client
Expand All @@ -95,7 +95,8 @@ def __init__(self, client, reactor, address_provider, lifecycle_service,
self._connect_all_members_timer = None
self._async_start = config.async_start
self._connect_to_cluster_thread_running = False
self._pending_connections = dict()
self._pending_connections = dict() # must be modified under the _lock
self._addresses_to_connections = dict() # address to connection, must be modified under the _lock
self._shuffle_member_list = config.shuffle_member_list
self._lock = threading.RLock()
self._connection_id_generator = AtomicInteger()
Expand All @@ -118,10 +119,7 @@ def get_connection(self, member_uuid):
return self.active_connections.get(member_uuid, None)

def get_connection_from_address(self, address):
for connection in six.itervalues(self.active_connections):
if address == connection.remote_address:
return connection
return None
return self._addresses_to_connections.get(address, None)

def get_random_connection(self):
if self._smart_routing_enabled:
Expand All @@ -131,7 +129,9 @@ def get_random_connection(self):
if connection:
return connection

for connection in six.itervalues(self.active_connections):
# We should not get to this point under normal circumstances.
# Therefore, copying the list should be OK.
for connection in list(six.itervalues(self.active_connections)):
return connection

return None
Expand All @@ -156,16 +156,20 @@ def shutdown(self):
self._connect_all_members_timer.cancel()

self._heartbeat_manager.shutdown()
for connection_future in six.itervalues(self._pending_connections):
connection_future.set_exception(HazelcastClientNotActiveError("Hazelcast client is shutting down"))

# Need to create copy of connection values to avoid modification errors on runtime
for connection in list(six.itervalues(self.active_connections)):
connection.close("Hazelcast client is shutting down", None)
with self._lock:
for connection_future in six.itervalues(self._pending_connections):
connection_future.set_exception(HazelcastClientNotActiveError("Hazelcast client is shutting down"))

self._connection_listeners = []
self.active_connections.clear()
self._pending_connections.clear()
# Need to create copy of connection values to avoid modification errors on runtime
for connection in list(six.itervalues(self.active_connections)):
connection.close("Hazelcast client is shutting down", None)

self.active_connections.clear()
self._addresses_to_connections.clear()
self._pending_connections.clear()

del self._connection_listeners[:]

def connect_to_all_cluster_members(self):
if not self._smart_routing_enabled:
Expand All @@ -180,6 +184,7 @@ def connect_to_all_cluster_members(self):
def on_connection_close(self, closed_connection, cause):
connected_address = closed_connection.connected_address
remote_uuid = closed_connection.remote_uuid
remote_address = closed_connection.remote_address

if not connected_address:
_logger.debug("Destroying %s, but it has no remote address, hence nothing is "
Expand All @@ -188,6 +193,7 @@ def on_connection_close(self, closed_connection, cause):
with self._lock:
pending = self._pending_connections.pop(connected_address, None)
connection = self.active_connections.pop(remote_uuid, None)
self._addresses_to_connections.pop(remote_address, None)

if pending:
pending.set_exception(cause)
Expand Down Expand Up @@ -395,8 +401,8 @@ def _on_auth(self, response, connection, address):
raise err
else:
e = response.exception()
# This will set the exception for the pending connection future
connection.close("Failed to authenticate connection", e)
self._pending_connections.pop(address, None)
six.reraise(e.__class__, e, response.traceback())

def _handle_successful_auth(self, response, connection, address):
Expand All @@ -420,7 +426,8 @@ def _handle_successful_auth(self, response, connection, address):
self._on_cluster_restart()

with self._lock:
self.active_connections[response["member_uuid"]] = connection
self.active_connections[remote_uuid] = connection
self._addresses_to_connections[remote_address] = connection
self._pending_connections.pop(address, None)

if is_initial_connection:
Expand Down Expand Up @@ -494,11 +501,12 @@ def start(self):
"""Starts sending periodic HeartBeat operations."""

def _heartbeat():
if not self._connection_manager.live:
conn_manager = self._connection_manager
if not conn_manager.live:
return

now = time.time()
for connection in list(self._connection_manager.active_connections.values()):
for connection in list(six.itervalues(conn_manager.active_connections)):
self._check_connection(now, connection)
self._heartbeat_timer = self._reactor.add_timer(self._heartbeat_interval, _heartbeat)

Expand Down
2 changes: 1 addition & 1 deletion hazelcast/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def register_listener(self, registration_request, decode_register_response, enco
self._active_registrations[registration_id] = registration

futures = []
for connection in six.itervalues(self._connection_manager.active_connections):
for connection in list(six.itervalues(self._connection_manager.active_connections)):
future = self._register_on_connection_async(registration_id, registration, connection)
futures.append(future)

Expand Down
Loading