From cd2524be647f21d7555f7d44715e7518497baac4 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Fri, 2 Apr 2021 17:43:10 +0300 Subject: [PATCH 1/5] Simplify connection manager Similar to the Java-side fix, we now use the member UUID as the source of the truth and allow connections to be opened to the same address more than once. We just make sure that, on the active connections dictionary, there is just one entry per member via its UUID. This change simplifies a great deal of things. We now, don't need a dictionary for pending connections and managing the state of the connection manager is easier. Also, with this change, we now make sure that, for the same member, we don't have more than one active connection in the case of the issue reported on the Java-side PR. Also, improved the logging in case of the connection errors. --- hazelcast/client.py | 14 +- hazelcast/connection.py | 383 +++++++++--------- hazelcast/invocation.py | 2 + hazelcast/listener.py | 4 +- hazelcast/reactor.py | 4 +- .../backward_compatible/heartbeat_test.py | 6 +- 6 files changed, 218 insertions(+), 195 deletions(-) diff --git a/hazelcast/client.py b/hazelcast/client.py index d41ecfb144..c57daed94b 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -371,6 +371,9 @@ def __init__(self, **kwargs): self._invocation_service, ) self._shutdown_lock = threading.RLock() + self._invocation_service.init( + self._internal_partition_service, self._connection_manager, self._listener_service + ) self._init_context() self._start() @@ -394,20 +397,19 @@ def _init_context(self): def _start(self): self._reactor.start() try: - self._invocation_service.init( - self._internal_partition_service, self._connection_manager, self._listener_service - ) self._internal_lifecycle_service.start() + self._invocation_service.start() membership_listeners = self._config.membership_listeners self._internal_cluster_service.start(self._connection_manager, membership_listeners) self._cluster_view_listener.start() self._connection_manager.start(self._load_balancer) - if not self._config.async_start: + sync_start = not self._config.async_start + if sync_start: self._internal_cluster_service.wait_initial_member_list_fetched() - self._connection_manager.connect_to_all_cluster_members() + self._connection_manager.connect_to_all_cluster_members(sync_start) self._listener_service.start() - self._invocation_service.start() + self._invocation_service.add_backup_listener() self._load_balancer.init(self.cluster_service) self._statistics.start() except: diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 841d9db9f9..7a0470409c 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -6,7 +6,6 @@ import threading import time import uuid -from collections import OrderedDict from hazelcast import six, __version__ from hazelcast.config import ReconnectMode @@ -126,8 +125,6 @@ def __init__( self._connect_all_members_timer = None self._async_start = config.async_start self._connect_to_cluster_thread_running = False - self._pending_connections = {} # must be modified under the _lock - self._addresses_to_connections = {} # must be modified under the _lock self._shuffle_member_list = config.shuffle_member_list self._lock = threading.RLock() self._connection_id_generator = AtomicInteger() @@ -149,9 +146,6 @@ def add_listener(self, on_connection_opened=None, on_connection_closed=None): def get_connection(self, member_uuid): return self.active_connections.get(member_uuid, None) - def get_connection_from_address(self, address): - return self._addresses_to_connections.get(address, None) - def get_random_connection(self): if self._smart_routing_enabled: member = self._load_balancer.next() @@ -175,8 +169,6 @@ def start(self, load_balancer): self._load_balancer = load_balancer self._heartbeat_manager.start() self._connect_to_cluster() - if self._smart_routing_enabled: - self._start_connect_all_members_timer() def shutdown(self): if not self.live: @@ -188,77 +180,72 @@ def shutdown(self): self._heartbeat_manager.shutdown() - with self._lock: - for connection_future in six.itervalues(self._pending_connections): - connection_future.set_exception( - HazelcastClientNotActiveError("Hazelcast client is shutting down") - ) - - # Need to create copy of connection values to avoid modification errors on runtime - for connection in list(six.itervalues(self.active_connections)): - connection.close("Hazelcast client is shutting down", None) - - self.active_connections.clear() - self._addresses_to_connections.clear() - self._pending_connections.clear() + # Need to create copy of connection values to avoid modification errors on runtime + for connection in list(six.itervalues(self.active_connections)): + connection.close("Hazelcast client is shutting down", None) + self.active_connections.clear() del self._connection_listeners[:] - def connect_to_all_cluster_members(self): + def connect_to_all_cluster_members(self, sync_start): if not self._smart_routing_enabled: return - for member in self._cluster_service.get_members(): - try: - self._get_or_connect(member.address).result() - except: - pass + if sync_start: + for member in self._cluster_service.get_members(): + try: + self._get_or_connect_to_member(member).result() + except: + pass + + self._start_connect_all_members_timer() - def on_connection_close(self, closed_connection, cause): - connected_address = closed_connection.connected_address + def on_connection_close(self, closed_connection): remote_uuid = closed_connection.remote_uuid remote_address = closed_connection.remote_address - if not connected_address: + if not remote_address: _logger.debug( "Destroying %s, but it has no remote address, hence nothing is " "removed from the connection dictionary", closed_connection, ) + return with self._lock: - pending = self._pending_connections.pop(connected_address, None) - connection = self.active_connections.pop(remote_uuid, None) - self._addresses_to_connections.pop(remote_address, None) - - if pending: - pending.set_exception(cause) - - if connection: + connection = self.active_connections.get(remote_uuid, None) + disconnected = False + removed = False + if connection == closed_connection: + self.active_connections.pop(remote_uuid, None) + removed = True _logger.info( "Removed connection to %s:%s, connection: %s", - connected_address, + remote_address, remote_uuid, connection, ) + if not self.active_connections: - self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) - self._trigger_cluster_reconnection() + disconnected = True - if connection: + if disconnected: + self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) + self._trigger_cluster_reconnection() + + if removed: for _, on_connection_closed in self._connection_listeners: if on_connection_closed: try: - on_connection_closed(connection, cause) + on_connection_closed(closed_connection) except: _logger.exception("Exception in connection listener") else: - if remote_uuid: - _logger.debug( - "Destroying %s, but there is no mapping for %s in the connection dictionary", - closed_connection, - remote_uuid, - ) + _logger.debug( + "Destroying %s, but there is no mapping for %s in the connection dictionary", + closed_connection, + remote_uuid, + ) def check_invocation_allowed(self): if self.active_connections: @@ -269,6 +256,50 @@ def check_invocation_allowed(self): else: raise IOError("No connection found to cluster") + def _get_or_connect_to_address(self, address): + for connection in six.itervalues(self.active_connections): + if connection.remote_address == address: + return ImmediateFuture(connection) + + try: + translated = self._translate(address) + connection = self._create_connection(translated) + return self._authenticate(connection).continue_with(self._on_auth, connection) + except Exception as e: + return ImmediateExceptionFuture(e) + + def _get_or_connect_to_member(self, member): + connection = self.active_connections.get(member.uuid, None) + if connection: + return ImmediateFuture(connection) + + try: + translated = self._translate(member.address) + connection = self._create_connection(translated) + return self._authenticate(connection).continue_with(self._on_auth, connection) + except Exception as e: + return ImmediateExceptionFuture(e) + + def _create_connection(self, address): + factory = self._reactor.connection_factory + return factory( + self, + self._connection_id_generator.get_and_increment(), + address, + self._config, + self._invocation_service.handle_client_message, + ) + + def _translate(self, address): + translated = self._address_provider.translate(address) + if not translated: + raise ValueError( + "Address provider \"%s\" could not translate address %s" + % (self._address_provider.__class__.__name__, address) + ) + + return translated + def _trigger_cluster_reconnection(self): if self._reconnect_mode == ReconnectMode.OFF: _logger.info("Reconnect mode is OFF. Shutting down the client") @@ -295,30 +326,31 @@ def _init_wait_strategy(self, config): ) def _start_connect_all_members_timer(self): - connecting_addresses = set() + connecting_uuids = set() def run(): if not self._lifecycle_service.running: return for member in self._cluster_service.get_members(): - address = member.address + member_uuid = member.uuid - if ( - not self.get_connection_from_address(address) - and address not in connecting_addresses - ): - connecting_addresses.add(address) - if not self._lifecycle_service.running: - break + if self.active_connections.get(member_uuid, None): + continue - if not self.get_connection(member.uuid): - # Bind the address to the value - # in this loop iteration - def cb(_, address=address): - connecting_addresses.discard(address) + if member_uuid in connecting_uuids: + continue - self._get_or_connect(address).add_done_callback(cb) + connecting_uuids.add(member_uuid) + if not self._lifecycle_service.running: + break + + # Bind the bound_member_uuid to the value + # in this loop iteration + def cb(_, bound_member_uuid=member_uuid): + connecting_uuids.discard(bound_member_uuid) + + self._get_or_connect_to_member(member).add_done_callback(cb) self._connect_all_members_timer = self._reactor.add_timer(1, run) @@ -364,16 +396,37 @@ def _sync_connect_to_cluster(self): self._wait_strategy.reset() try: while True: + tried_addresses_per_attempt = set() + members = self._cluster_service.get_members() + if self._shuffle_member_list: + random.shuffle(members) + + for member in members: + self._check_client_active() + tried_addresses_per_attempt.add(member.address) + connection = self._connect(member, self._get_or_connect_to_member) + if connection: + return + for address in self._get_possible_addresses(): self._check_client_active() - tried_addresses.add(address) - connection = self._connect(address) + if address in tried_addresses_per_attempt: + # We already tried this address on from the member list + continue + + tried_addresses_per_attempt.add(address) + connection = self._connect(address, self._get_or_connect_to_address) if connection: return + + tried_addresses.update(tried_addresses_per_attempt) + # If the address providers load no addresses (which seems to be possible), # then the above loop is not entered and the lifecycle check is missing, # hence we need to repeat the same check at this point. - self._check_client_active() + if not tried_addresses_per_attempt: + self._check_client_active() + if not self._wait_strategy.sleep(): break except (ClientNotAllowedInClusterError, InvalidConfigurationError): @@ -393,57 +446,17 @@ def _sync_connect_to_cluster(self): msg = "Client is being shutdown" raise IllegalStateError(msg) - def _connect(self, address): - _logger.info("Trying to connect to %s", address) + def _connect(self, target, get_or_connect_func): + _logger.info("Trying to connect to %s", target) try: - return self._get_or_connect(address).result() + return get_or_connect_func(target).result() except (ClientNotAllowedInClusterError, InvalidConfigurationError) as e: - _logger.warning("Error during initial connection to %s: %s", address, e) + _logger.warning("Error during initial connection to %s", target, exc_info=True) raise e - except Exception as e: - _logger.warning("Error during initial connection to %s: %s", address, e) + except: + _logger.warning("Error during initial connection to %s", target, exc_info=True) return None - def _get_or_connect(self, address): - connection = self.get_connection_from_address(address) - if connection: - return ImmediateFuture(connection) - - with self._lock: - connection = self.get_connection_from_address(address) - if connection: - return ImmediateFuture(connection) - else: - pending = self._pending_connections.get(address, None) - if pending: - return pending - else: - try: - translated = self._address_provider.translate(address) - if not translated: - error = ValueError( - "Address translator could not translate address %s" % address - ) - return ImmediateExceptionFuture(error) - - factory = self._reactor.connection_factory - connection = factory( - self, - self._connection_id_generator.get_and_increment(), - translated, - self._config, - self._invocation_service.handle_client_message, - ) - except IOError: - error = sys.exc_info() - return ImmediateExceptionFuture(error[1], error[2]) - - future = self._authenticate(connection).continue_with( - self._on_auth, connection, address - ) - self._pending_connections[address] = future - return future - def _authenticate(self, connection): client = self._client cluster_name = self._config.cluster_name @@ -466,70 +479,69 @@ def _authenticate(self, connection): self._invocation_service.invoke(invocation) return invocation.future - def _on_auth(self, response, connection, address): - if response.is_success(): + def _on_auth(self, response, connection): + try: response = client_authentication_codec.decode_response(response.result()) - status = response["status"] - if status == _AuthenticationStatus.AUTHENTICATED: - return self._handle_successful_auth(response, connection, address) - - if status == _AuthenticationStatus.CREDENTIALS_FAILED: - err = AuthenticationError( - "Authentication failed. The configured cluster name on " - "the client does not match the one configured in the cluster." - ) - elif status == _AuthenticationStatus.NOT_ALLOWED_IN_CLUSTER: - err = ClientNotAllowedInClusterError("Client is not allowed in the cluster") - elif status == _AuthenticationStatus.SERIALIZATION_VERSION_MISMATCH: - err = IllegalStateError("Server serialization version does not match to client") - else: - err = AuthenticationError( - "Authentication status code not supported. status: %s" % status - ) - + except Exception as err: connection.close("Failed to authenticate connection", err) raise err + + status = response["status"] + if status == _AuthenticationStatus.AUTHENTICATED: + return self._handle_successful_auth(response, connection) + + if status == _AuthenticationStatus.CREDENTIALS_FAILED: + err = AuthenticationError( + "Authentication failed. The configured cluster name on " + "the client does not match the one configured in the cluster." + ) + elif status == _AuthenticationStatus.NOT_ALLOWED_IN_CLUSTER: + err = ClientNotAllowedInClusterError("Client is not allowed in the cluster") + elif status == _AuthenticationStatus.SERIALIZATION_VERSION_MISMATCH: + err = IllegalStateError("Server serialization version does not match to client") else: - e = response.exception() - # This will set the exception for the pending connection future - connection.close("Failed to authenticate connection", e) - six.reraise(e.__class__, e, response.traceback()) + err = AuthenticationError( + "Authentication status code not supported. status: %s" % status + ) - def _handle_successful_auth(self, response, connection, address): - self._check_partition_count(response["partition_count"]) + connection.close("Failed to authenticate connection", err) + raise err - server_version_str = response["server_hazelcast_version"] - remote_address = response["address"] - remote_uuid = response["member_uuid"] + def _handle_successful_auth(self, response, connection): + with self._lock: + self._check_partition_count(response["partition_count"]) - connection.remote_address = remote_address - connection.server_version = calculate_version(server_version_str) - connection.remote_uuid = remote_uuid + server_version_str = response["server_hazelcast_version"] + remote_address = response["address"] + remote_uuid = response["member_uuid"] - new_cluster_id = response["cluster_id"] + connection.remote_address = remote_address + connection.server_version = calculate_version(server_version_str) + connection.remote_uuid = remote_uuid - is_initial_connection = not self.active_connections - changed_cluster = ( - is_initial_connection - and self._cluster_id is not None - and self._cluster_id != new_cluster_id - ) - if changed_cluster: - _logger.warning( - "Switching from current cluster: %s to new cluster: %s", - self._cluster_id, - new_cluster_id, - ) - self._on_cluster_restart() + existing = self.active_connections.get(remote_uuid, None) + if existing: + connection.close( + "Duplicate connection to same member with UUID: %s" % remote_uuid, None + ) + return existing + + new_cluster_id = response["cluster_id"] + changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id + if changed_cluster: + self._check_client_state_on_cluster_change(connection) + _logger.warning( + "Switching from current cluster: %s to new cluster: %s", + self._cluster_id, + new_cluster_id, + ) + self._on_cluster_restart() - with self._lock: + is_initial_connection = not self.active_connections self.active_connections[remote_uuid] = connection - self._addresses_to_connections[remote_address] = connection - self._pending_connections.pop(address, None) - - if is_initial_connection: - self._cluster_id = new_cluster_id - self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) + if is_initial_connection: + self._cluster_id = new_cluster_id + self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) _logger.info( "Authenticated with server %s:%s, server version: %s, local address: %s", @@ -547,10 +559,21 @@ def _handle_successful_auth(self, response, connection, address): _logger.exception("Exception in connection listener") if not connection.live: - self.on_connection_close(connection, None) + self.on_connection_close(connection) return connection + def _check_client_state_on_cluster_change(self, connection): + if self.active_connections: + # If there are other connections, we must be connected to the wrong cluster. + # We should not stay connected to this new connection. + # Note that, in some racy scenarios, we might close a connection that + # we can operate on. In those scenarios, we rely on the fact that we will + # reopen the connections. + reason = "Connection does not belong to the cluster %s" % self._cluster_id + connection.close(reason, None) + raise ValueError(reason) + def _on_cluster_restart(self): self._near_cache_manager.clear_near_caches() self._cluster_service.clear_member_list_version() @@ -569,26 +592,20 @@ def _check_client_active(self): raise HazelcastClientNotActiveError() def _get_possible_addresses(self): - member_addresses = list( - map(lambda m: (m.address, None), self._cluster_service.get_members()) - ) - - if self._shuffle_member_list: - random.shuffle(member_addresses) - - addresses = OrderedDict(member_addresses) primaries, secondaries = self._address_provider.load_addresses() if self._shuffle_member_list: + # The relative order between primary and secondary addresses should + # not be changed. So we shuffle the lists separately and then add + # them to the final list so that secondary addresses are not tried + # before all primary addresses have been tried. Otherwise we can get + # startup delays random.shuffle(primaries) random.shuffle(secondaries) - for address in primaries: - addresses[address] = None - - for address in secondaries: - addresses[address] = None - - return six.iterkeys(addresses) + addresses = [] + addresses.extend(primaries) + addresses.extend(secondaries) + return addresses class _HeartbeatManager(object): @@ -770,7 +787,7 @@ def close(self, reason, cause): self._inner_close() except: _logger.exception("Error while closing the the connection %s", self) - self._connection_manager.on_connection_close(self, cause) + self._connection_manager.on_connection_close(self) def _log_close(self, reason, cause): msg = "%s closed. Reason: %s" diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index 68a71b9019..9d140ab14b 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -115,6 +115,8 @@ def init(self, partition_service, connection_manager, listener_service): def start(self): self._start_clean_resources_timer() + + def add_backup_listener(self): if self._backup_ack_to_client_enabled: self._register_backup_listener() diff --git a/hazelcast/listener.py b/hazelcast/listener.py index ffea93e319..9f266a7a1b 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -176,7 +176,7 @@ def _connection_added(self, connection): for user_reg_id, listener_registration in six.iteritems(self._active_registrations): self._register_on_connection(user_reg_id, listener_registration, connection) - def _connection_removed(self, connection, _): + def _connection_removed(self, connection): with self._registration_lock: for listener_registration in six.itervalues(self._active_registrations): event_registration = listener_registration.connection_registrations.pop( @@ -203,7 +203,7 @@ def start(self): def _connection_added(self, connection): self._try_register(connection) - def _connection_removed(self, connection, _): + def _connection_removed(self, connection): self._try_register_to_random_connection(connection) def _try_register_to_random_connection(self, old_connection): diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 42c47cc6e7..96dd0905fe 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -486,8 +486,8 @@ def handle_error(self): # We handle retryable error codes inside the # handle_read/write. Anything else should be fatal. error = sys.exc_info()[1] - _logger.exception("Received error") - self.close(None, error) + _logger.debug("Received error", exc_info=True) + self.close(str(error), None) def readable(self): return self.live and self.sent_protocol_bytes diff --git a/tests/integration/backward_compatible/heartbeat_test.py b/tests/integration/backward_compatible/heartbeat_test.py index 6551cfafce..5e5da38a57 100644 --- a/tests/integration/backward_compatible/heartbeat_test.py +++ b/tests/integration/backward_compatible/heartbeat_test.py @@ -64,5 +64,7 @@ def assert_heartbeat_stopped_and_restored(): @staticmethod def simulate_heartbeat_lost(client, address, timeout): - connection = client._connection_manager.get_connection_from_address(address) - connection.last_read_time -= timeout + for connection in client._connection_manager.active_connections.values(): + if connection.remote_address == address: + connection.last_read_time -= timeout + break From a5b561a322a1d5236e1b5f265bd9c7c99c14836d Mon Sep 17 00:00:00 2001 From: mdumandag Date: Fri, 2 Apr 2021 18:36:25 +0300 Subject: [PATCH 2/5] call add_backup_listener on unit tests --- tests/unit/invocation_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/invocation_test.py b/tests/unit/invocation_test.py index 8863367505..75e88be4e8 100644 --- a/tests/unit/invocation_test.py +++ b/tests/unit/invocation_test.py @@ -156,4 +156,5 @@ def _start_service(self, config=_Config()): c._internal_partition_service, c._connection_manager, c._listener_service ) invocation_service.start() + invocation_service.add_backup_listener() return c, invocation_service From d6ce8453c6012056d467ea20019a7059bdfdca73 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 5 Apr 2021 10:40:49 +0300 Subject: [PATCH 3/5] fix linter error --- hazelcast/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 7a0470409c..2931c95035 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -294,7 +294,7 @@ def _translate(self, address): translated = self._address_provider.translate(address) if not translated: raise ValueError( - "Address provider \"%s\" could not translate address %s" + "Address provider %s could not translate address %s" % (self._address_provider.__class__.__name__, address) ) From 1cd845c5f442380167f0b17c1c3ec00ffaeb0f7f Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 5 Apr 2021 12:42:27 +0300 Subject: [PATCH 4/5] copy active connections while iterating to avoid concurrent modification errors --- hazelcast/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 2931c95035..b9e7de1abb 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -257,7 +257,7 @@ def check_invocation_allowed(self): raise IOError("No connection found to cluster") def _get_or_connect_to_address(self, address): - for connection in six.itervalues(self.active_connections): + for connection in list(six.itervalues(self.active_connections)): if connection.remote_address == address: return ImmediateFuture(connection) From d3b60c88abbe82d1036c91055edf21b023f87572 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 5 Apr 2021 14:53:40 +0300 Subject: [PATCH 5/5] fire connected event while not holding the lock --- hazelcast/connection.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index b9e7de1abb..02150e6eb1 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -541,7 +541,9 @@ def _handle_successful_auth(self, response, connection): self.active_connections[remote_uuid] = connection if is_initial_connection: self._cluster_id = new_cluster_id - self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) + + if is_initial_connection: + self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) _logger.info( "Authenticated with server %s:%s, server version: %s, local address: %s",