diff --git a/hazelcast/client.py b/hazelcast/client.py index d41ecfb144..c57daed94b 100644 --- a/hazelcast/client.py +++ b/hazelcast/client.py @@ -371,6 +371,9 @@ def __init__(self, **kwargs): self._invocation_service, ) self._shutdown_lock = threading.RLock() + self._invocation_service.init( + self._internal_partition_service, self._connection_manager, self._listener_service + ) self._init_context() self._start() @@ -394,20 +397,19 @@ def _init_context(self): def _start(self): self._reactor.start() try: - self._invocation_service.init( - self._internal_partition_service, self._connection_manager, self._listener_service - ) self._internal_lifecycle_service.start() + self._invocation_service.start() membership_listeners = self._config.membership_listeners self._internal_cluster_service.start(self._connection_manager, membership_listeners) self._cluster_view_listener.start() self._connection_manager.start(self._load_balancer) - if not self._config.async_start: + sync_start = not self._config.async_start + if sync_start: self._internal_cluster_service.wait_initial_member_list_fetched() - self._connection_manager.connect_to_all_cluster_members() + self._connection_manager.connect_to_all_cluster_members(sync_start) self._listener_service.start() - self._invocation_service.start() + self._invocation_service.add_backup_listener() self._load_balancer.init(self.cluster_service) self._statistics.start() except: diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 841d9db9f9..02150e6eb1 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -6,7 +6,6 @@ import threading import time import uuid -from collections import OrderedDict from hazelcast import six, __version__ from hazelcast.config import ReconnectMode @@ -126,8 +125,6 @@ def __init__( self._connect_all_members_timer = None self._async_start = config.async_start self._connect_to_cluster_thread_running = False - self._pending_connections = {} # must be modified under the _lock - self._addresses_to_connections = {} # must be modified under the _lock self._shuffle_member_list = config.shuffle_member_list self._lock = threading.RLock() self._connection_id_generator = AtomicInteger() @@ -149,9 +146,6 @@ def add_listener(self, on_connection_opened=None, on_connection_closed=None): def get_connection(self, member_uuid): return self.active_connections.get(member_uuid, None) - def get_connection_from_address(self, address): - return self._addresses_to_connections.get(address, None) - def get_random_connection(self): if self._smart_routing_enabled: member = self._load_balancer.next() @@ -175,8 +169,6 @@ def start(self, load_balancer): self._load_balancer = load_balancer self._heartbeat_manager.start() self._connect_to_cluster() - if self._smart_routing_enabled: - self._start_connect_all_members_timer() def shutdown(self): if not self.live: @@ -188,77 +180,72 @@ def shutdown(self): self._heartbeat_manager.shutdown() - with self._lock: - for connection_future in six.itervalues(self._pending_connections): - connection_future.set_exception( - HazelcastClientNotActiveError("Hazelcast client is shutting down") - ) - - # Need to create copy of connection values to avoid modification errors on runtime - for connection in list(six.itervalues(self.active_connections)): - connection.close("Hazelcast client is shutting down", None) - - self.active_connections.clear() - self._addresses_to_connections.clear() - self._pending_connections.clear() + # Need to create copy of connection values to avoid modification errors on runtime + for connection in list(six.itervalues(self.active_connections)): + connection.close("Hazelcast client is shutting down", None) + self.active_connections.clear() del self._connection_listeners[:] - def connect_to_all_cluster_members(self): + def connect_to_all_cluster_members(self, sync_start): if not self._smart_routing_enabled: return - for member in self._cluster_service.get_members(): - try: - self._get_or_connect(member.address).result() - except: - pass + if sync_start: + for member in self._cluster_service.get_members(): + try: + self._get_or_connect_to_member(member).result() + except: + pass + + self._start_connect_all_members_timer() - def on_connection_close(self, closed_connection, cause): - connected_address = closed_connection.connected_address + def on_connection_close(self, closed_connection): remote_uuid = closed_connection.remote_uuid remote_address = closed_connection.remote_address - if not connected_address: + if not remote_address: _logger.debug( "Destroying %s, but it has no remote address, hence nothing is " "removed from the connection dictionary", closed_connection, ) + return with self._lock: - pending = self._pending_connections.pop(connected_address, None) - connection = self.active_connections.pop(remote_uuid, None) - self._addresses_to_connections.pop(remote_address, None) - - if pending: - pending.set_exception(cause) - - if connection: + connection = self.active_connections.get(remote_uuid, None) + disconnected = False + removed = False + if connection == closed_connection: + self.active_connections.pop(remote_uuid, None) + removed = True _logger.info( "Removed connection to %s:%s, connection: %s", - connected_address, + remote_address, remote_uuid, connection, ) + if not self.active_connections: - self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) - self._trigger_cluster_reconnection() + disconnected = True - if connection: + if disconnected: + self._lifecycle_service.fire_lifecycle_event(LifecycleState.DISCONNECTED) + self._trigger_cluster_reconnection() + + if removed: for _, on_connection_closed in self._connection_listeners: if on_connection_closed: try: - on_connection_closed(connection, cause) + on_connection_closed(closed_connection) except: _logger.exception("Exception in connection listener") else: - if remote_uuid: - _logger.debug( - "Destroying %s, but there is no mapping for %s in the connection dictionary", - closed_connection, - remote_uuid, - ) + _logger.debug( + "Destroying %s, but there is no mapping for %s in the connection dictionary", + closed_connection, + remote_uuid, + ) def check_invocation_allowed(self): if self.active_connections: @@ -269,6 +256,50 @@ def check_invocation_allowed(self): else: raise IOError("No connection found to cluster") + def _get_or_connect_to_address(self, address): + for connection in list(six.itervalues(self.active_connections)): + if connection.remote_address == address: + return ImmediateFuture(connection) + + try: + translated = self._translate(address) + connection = self._create_connection(translated) + return self._authenticate(connection).continue_with(self._on_auth, connection) + except Exception as e: + return ImmediateExceptionFuture(e) + + def _get_or_connect_to_member(self, member): + connection = self.active_connections.get(member.uuid, None) + if connection: + return ImmediateFuture(connection) + + try: + translated = self._translate(member.address) + connection = self._create_connection(translated) + return self._authenticate(connection).continue_with(self._on_auth, connection) + except Exception as e: + return ImmediateExceptionFuture(e) + + def _create_connection(self, address): + factory = self._reactor.connection_factory + return factory( + self, + self._connection_id_generator.get_and_increment(), + address, + self._config, + self._invocation_service.handle_client_message, + ) + + def _translate(self, address): + translated = self._address_provider.translate(address) + if not translated: + raise ValueError( + "Address provider %s could not translate address %s" + % (self._address_provider.__class__.__name__, address) + ) + + return translated + def _trigger_cluster_reconnection(self): if self._reconnect_mode == ReconnectMode.OFF: _logger.info("Reconnect mode is OFF. Shutting down the client") @@ -295,30 +326,31 @@ def _init_wait_strategy(self, config): ) def _start_connect_all_members_timer(self): - connecting_addresses = set() + connecting_uuids = set() def run(): if not self._lifecycle_service.running: return for member in self._cluster_service.get_members(): - address = member.address + member_uuid = member.uuid + + if self.active_connections.get(member_uuid, None): + continue - if ( - not self.get_connection_from_address(address) - and address not in connecting_addresses - ): - connecting_addresses.add(address) - if not self._lifecycle_service.running: - break + if member_uuid in connecting_uuids: + continue - if not self.get_connection(member.uuid): - # Bind the address to the value - # in this loop iteration - def cb(_, address=address): - connecting_addresses.discard(address) + connecting_uuids.add(member_uuid) + if not self._lifecycle_service.running: + break - self._get_or_connect(address).add_done_callback(cb) + # Bind the bound_member_uuid to the value + # in this loop iteration + def cb(_, bound_member_uuid=member_uuid): + connecting_uuids.discard(bound_member_uuid) + + self._get_or_connect_to_member(member).add_done_callback(cb) self._connect_all_members_timer = self._reactor.add_timer(1, run) @@ -364,16 +396,37 @@ def _sync_connect_to_cluster(self): self._wait_strategy.reset() try: while True: + tried_addresses_per_attempt = set() + members = self._cluster_service.get_members() + if self._shuffle_member_list: + random.shuffle(members) + + for member in members: + self._check_client_active() + tried_addresses_per_attempt.add(member.address) + connection = self._connect(member, self._get_or_connect_to_member) + if connection: + return + for address in self._get_possible_addresses(): self._check_client_active() - tried_addresses.add(address) - connection = self._connect(address) + if address in tried_addresses_per_attempt: + # We already tried this address on from the member list + continue + + tried_addresses_per_attempt.add(address) + connection = self._connect(address, self._get_or_connect_to_address) if connection: return + + tried_addresses.update(tried_addresses_per_attempt) + # If the address providers load no addresses (which seems to be possible), # then the above loop is not entered and the lifecycle check is missing, # hence we need to repeat the same check at this point. - self._check_client_active() + if not tried_addresses_per_attempt: + self._check_client_active() + if not self._wait_strategy.sleep(): break except (ClientNotAllowedInClusterError, InvalidConfigurationError): @@ -393,57 +446,17 @@ def _sync_connect_to_cluster(self): msg = "Client is being shutdown" raise IllegalStateError(msg) - def _connect(self, address): - _logger.info("Trying to connect to %s", address) + def _connect(self, target, get_or_connect_func): + _logger.info("Trying to connect to %s", target) try: - return self._get_or_connect(address).result() + return get_or_connect_func(target).result() except (ClientNotAllowedInClusterError, InvalidConfigurationError) as e: - _logger.warning("Error during initial connection to %s: %s", address, e) + _logger.warning("Error during initial connection to %s", target, exc_info=True) raise e - except Exception as e: - _logger.warning("Error during initial connection to %s: %s", address, e) + except: + _logger.warning("Error during initial connection to %s", target, exc_info=True) return None - def _get_or_connect(self, address): - connection = self.get_connection_from_address(address) - if connection: - return ImmediateFuture(connection) - - with self._lock: - connection = self.get_connection_from_address(address) - if connection: - return ImmediateFuture(connection) - else: - pending = self._pending_connections.get(address, None) - if pending: - return pending - else: - try: - translated = self._address_provider.translate(address) - if not translated: - error = ValueError( - "Address translator could not translate address %s" % address - ) - return ImmediateExceptionFuture(error) - - factory = self._reactor.connection_factory - connection = factory( - self, - self._connection_id_generator.get_and_increment(), - translated, - self._config, - self._invocation_service.handle_client_message, - ) - except IOError: - error = sys.exc_info() - return ImmediateExceptionFuture(error[1], error[2]) - - future = self._authenticate(connection).continue_with( - self._on_auth, connection, address - ) - self._pending_connections[address] = future - return future - def _authenticate(self, connection): client = self._client cluster_name = self._config.cluster_name @@ -466,69 +479,70 @@ def _authenticate(self, connection): self._invocation_service.invoke(invocation) return invocation.future - def _on_auth(self, response, connection, address): - if response.is_success(): + def _on_auth(self, response, connection): + try: response = client_authentication_codec.decode_response(response.result()) - status = response["status"] - if status == _AuthenticationStatus.AUTHENTICATED: - return self._handle_successful_auth(response, connection, address) - - if status == _AuthenticationStatus.CREDENTIALS_FAILED: - err = AuthenticationError( - "Authentication failed. The configured cluster name on " - "the client does not match the one configured in the cluster." - ) - elif status == _AuthenticationStatus.NOT_ALLOWED_IN_CLUSTER: - err = ClientNotAllowedInClusterError("Client is not allowed in the cluster") - elif status == _AuthenticationStatus.SERIALIZATION_VERSION_MISMATCH: - err = IllegalStateError("Server serialization version does not match to client") - else: - err = AuthenticationError( - "Authentication status code not supported. status: %s" % status - ) - + except Exception as err: connection.close("Failed to authenticate connection", err) raise err + + status = response["status"] + if status == _AuthenticationStatus.AUTHENTICATED: + return self._handle_successful_auth(response, connection) + + if status == _AuthenticationStatus.CREDENTIALS_FAILED: + err = AuthenticationError( + "Authentication failed. The configured cluster name on " + "the client does not match the one configured in the cluster." + ) + elif status == _AuthenticationStatus.NOT_ALLOWED_IN_CLUSTER: + err = ClientNotAllowedInClusterError("Client is not allowed in the cluster") + elif status == _AuthenticationStatus.SERIALIZATION_VERSION_MISMATCH: + err = IllegalStateError("Server serialization version does not match to client") else: - e = response.exception() - # This will set the exception for the pending connection future - connection.close("Failed to authenticate connection", e) - six.reraise(e.__class__, e, response.traceback()) + err = AuthenticationError( + "Authentication status code not supported. status: %s" % status + ) - def _handle_successful_auth(self, response, connection, address): - self._check_partition_count(response["partition_count"]) + connection.close("Failed to authenticate connection", err) + raise err - server_version_str = response["server_hazelcast_version"] - remote_address = response["address"] - remote_uuid = response["member_uuid"] + def _handle_successful_auth(self, response, connection): + with self._lock: + self._check_partition_count(response["partition_count"]) - connection.remote_address = remote_address - connection.server_version = calculate_version(server_version_str) - connection.remote_uuid = remote_uuid + server_version_str = response["server_hazelcast_version"] + remote_address = response["address"] + remote_uuid = response["member_uuid"] - new_cluster_id = response["cluster_id"] + connection.remote_address = remote_address + connection.server_version = calculate_version(server_version_str) + connection.remote_uuid = remote_uuid - is_initial_connection = not self.active_connections - changed_cluster = ( - is_initial_connection - and self._cluster_id is not None - and self._cluster_id != new_cluster_id - ) - if changed_cluster: - _logger.warning( - "Switching from current cluster: %s to new cluster: %s", - self._cluster_id, - new_cluster_id, - ) - self._on_cluster_restart() + existing = self.active_connections.get(remote_uuid, None) + if existing: + connection.close( + "Duplicate connection to same member with UUID: %s" % remote_uuid, None + ) + return existing + + new_cluster_id = response["cluster_id"] + changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id + if changed_cluster: + self._check_client_state_on_cluster_change(connection) + _logger.warning( + "Switching from current cluster: %s to new cluster: %s", + self._cluster_id, + new_cluster_id, + ) + self._on_cluster_restart() - with self._lock: + is_initial_connection = not self.active_connections self.active_connections[remote_uuid] = connection - self._addresses_to_connections[remote_address] = connection - self._pending_connections.pop(address, None) + if is_initial_connection: + self._cluster_id = new_cluster_id if is_initial_connection: - self._cluster_id = new_cluster_id self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED) _logger.info( @@ -547,10 +561,21 @@ def _handle_successful_auth(self, response, connection, address): _logger.exception("Exception in connection listener") if not connection.live: - self.on_connection_close(connection, None) + self.on_connection_close(connection) return connection + def _check_client_state_on_cluster_change(self, connection): + if self.active_connections: + # If there are other connections, we must be connected to the wrong cluster. + # We should not stay connected to this new connection. + # Note that, in some racy scenarios, we might close a connection that + # we can operate on. In those scenarios, we rely on the fact that we will + # reopen the connections. + reason = "Connection does not belong to the cluster %s" % self._cluster_id + connection.close(reason, None) + raise ValueError(reason) + def _on_cluster_restart(self): self._near_cache_manager.clear_near_caches() self._cluster_service.clear_member_list_version() @@ -569,26 +594,20 @@ def _check_client_active(self): raise HazelcastClientNotActiveError() def _get_possible_addresses(self): - member_addresses = list( - map(lambda m: (m.address, None), self._cluster_service.get_members()) - ) - - if self._shuffle_member_list: - random.shuffle(member_addresses) - - addresses = OrderedDict(member_addresses) primaries, secondaries = self._address_provider.load_addresses() if self._shuffle_member_list: + # The relative order between primary and secondary addresses should + # not be changed. So we shuffle the lists separately and then add + # them to the final list so that secondary addresses are not tried + # before all primary addresses have been tried. Otherwise we can get + # startup delays random.shuffle(primaries) random.shuffle(secondaries) - for address in primaries: - addresses[address] = None - - for address in secondaries: - addresses[address] = None - - return six.iterkeys(addresses) + addresses = [] + addresses.extend(primaries) + addresses.extend(secondaries) + return addresses class _HeartbeatManager(object): @@ -770,7 +789,7 @@ def close(self, reason, cause): self._inner_close() except: _logger.exception("Error while closing the the connection %s", self) - self._connection_manager.on_connection_close(self, cause) + self._connection_manager.on_connection_close(self) def _log_close(self, reason, cause): msg = "%s closed. Reason: %s" diff --git a/hazelcast/invocation.py b/hazelcast/invocation.py index 68a71b9019..9d140ab14b 100644 --- a/hazelcast/invocation.py +++ b/hazelcast/invocation.py @@ -115,6 +115,8 @@ def init(self, partition_service, connection_manager, listener_service): def start(self): self._start_clean_resources_timer() + + def add_backup_listener(self): if self._backup_ack_to_client_enabled: self._register_backup_listener() diff --git a/hazelcast/listener.py b/hazelcast/listener.py index ffea93e319..9f266a7a1b 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -176,7 +176,7 @@ def _connection_added(self, connection): for user_reg_id, listener_registration in six.iteritems(self._active_registrations): self._register_on_connection(user_reg_id, listener_registration, connection) - def _connection_removed(self, connection, _): + def _connection_removed(self, connection): with self._registration_lock: for listener_registration in six.itervalues(self._active_registrations): event_registration = listener_registration.connection_registrations.pop( @@ -203,7 +203,7 @@ def start(self): def _connection_added(self, connection): self._try_register(connection) - def _connection_removed(self, connection, _): + def _connection_removed(self, connection): self._try_register_to_random_connection(connection) def _try_register_to_random_connection(self, old_connection): diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 42c47cc6e7..96dd0905fe 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -486,8 +486,8 @@ def handle_error(self): # We handle retryable error codes inside the # handle_read/write. Anything else should be fatal. error = sys.exc_info()[1] - _logger.exception("Received error") - self.close(None, error) + _logger.debug("Received error", exc_info=True) + self.close(str(error), None) def readable(self): return self.live and self.sent_protocol_bytes diff --git a/tests/integration/backward_compatible/heartbeat_test.py b/tests/integration/backward_compatible/heartbeat_test.py index 6551cfafce..5e5da38a57 100644 --- a/tests/integration/backward_compatible/heartbeat_test.py +++ b/tests/integration/backward_compatible/heartbeat_test.py @@ -64,5 +64,7 @@ def assert_heartbeat_stopped_and_restored(): @staticmethod def simulate_heartbeat_lost(client, address, timeout): - connection = client._connection_manager.get_connection_from_address(address) - connection.last_read_time -= timeout + for connection in client._connection_manager.active_connections.values(): + if connection.remote_address == address: + connection.last_read_time -= timeout + break diff --git a/tests/unit/invocation_test.py b/tests/unit/invocation_test.py index 8863367505..75e88be4e8 100644 --- a/tests/unit/invocation_test.py +++ b/tests/unit/invocation_test.py @@ -156,4 +156,5 @@ def _start_service(self, config=_Config()): c._internal_partition_service, c._connection_manager, c._listener_service ) invocation_service.start() + invocation_service.add_backup_listener() return c, invocation_service