From 177f2bb5b06a7da5acdea5e2ed2bcfb6cf16cb10 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 10 Mar 2021 15:49:36 +0300 Subject: [PATCH] Fix authentication mechanism As of now, during the authentication process, we, more or less, do the following: - Send an authentication request - Put a future that will be resolved after the authentication request completes to the pending connections that performs client-side authentication logic (ConnectionManager#on_auth) - Return the future The problem is that, through unlucky timing, we might perform the authentication logic before putting the future to the pending connections map. Such as, - Clients initiates get_or_connect in the main thread during startup - We send the authentication request in the main thread - Before we return the invocation future, and add a callback to it, Python schedules the reactor thread, we write the request to wire, and get back the response. - Python schedules the main thread again, it is now trying to add a callback to a completed future. Since it is completed, it executes the authentication logic immediately. One of the steps of the client side authentication logic is to remove the connection from the pending connections map. Since the future is not put into the map yet, nothing is removed. - We then add this future to the pending connections map, but since it is already completed, no one is going to remove it from the pending connections map. - During the shutdown, we see an unexpected entry in the pending connections map and fail. To solve this, we put a future directly into the pending connections and perform the logic afterward. --- hazelcast/connection.py | 35 +++++++++++++++++++++++------------ hazelcast/future.py | 2 +- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 841d9db9f9..2156a2064a 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -20,7 +20,7 @@ IllegalStateError, ClientOfflineError, ) -from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture +from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture, Future from hazelcast.invocation import Invocation from hazelcast.lifecycle import LifecycleState from hazelcast.protocol.client_message import ( @@ -438,10 +438,23 @@ def _get_or_connect(self, address): error = sys.exc_info() return ImmediateExceptionFuture(error[1], error[2]) - future = self._authenticate(connection).continue_with( - self._on_auth, connection, address - ) + future = Future() self._pending_connections[address] = future + + def cb(response): + self._on_auth(response, connection, address, future) + + try: + self._authenticate(connection).add_done_callback(cb) + except: + # We could not send authentication request + # successfully, clean up the request from the + # pending connections and set the + # result of the future appropriately. + error = sys.exc_info() + self._pending_connections.pop(address, None) + future.set_exception(error[1], error[2]) + return future def _authenticate(self, connection): @@ -466,12 +479,13 @@ def _authenticate(self, connection): self._invocation_service.invoke(invocation) return invocation.future - def _on_auth(self, response, connection, address): - if response.is_success(): + def _on_auth(self, response, connection, address, future): + try: response = client_authentication_codec.decode_response(response.result()) status = response["status"] if status == _AuthenticationStatus.AUTHENTICATED: - return self._handle_successful_auth(response, connection, address) + future.set_result(self._handle_successful_auth(response, connection, address)) + return if status == _AuthenticationStatus.CREDENTIALS_FAILED: err = AuthenticationError( @@ -487,13 +501,10 @@ def _on_auth(self, response, connection, address): "Authentication status code not supported. status: %s" % status ) - connection.close("Failed to authenticate connection", err) - raise err - else: - e = response.exception() # This will set the exception for the pending connection future + connection.close("Failed to authenticate connection", err) + except Exception as e: connection.close("Failed to authenticate connection", e) - six.reraise(e.__class__, e, response.traceback()) def _handle_successful_auth(self, response, connection, address): self._check_partition_count(response["partition_count"]) diff --git a/hazelcast/future.py b/hazelcast/future.py index 740f9343d9..1e079b6658 100644 --- a/hazelcast/future.py +++ b/hazelcast/future.py @@ -38,7 +38,7 @@ def set_exception(self, exception, traceback=None): """Sets the exception for this Future in case of errors. Args: - exception (Exception): Exception to be threw in case of error. + exception (BaseException): Exception to be threw in case of error. traceback (function): Function to be called on traceback. """ if not isinstance(exception, BaseException):