Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
IllegalStateError,
ClientOfflineError,
)
from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture
from hazelcast.future import ImmediateFuture, ImmediateExceptionFuture, Future
from hazelcast.invocation import Invocation
from hazelcast.lifecycle import LifecycleState
from hazelcast.protocol.client_message import (
Expand Down Expand Up @@ -438,10 +438,23 @@ def _get_or_connect(self, address):
error = sys.exc_info()
return ImmediateExceptionFuture(error[1], error[2])

future = self._authenticate(connection).continue_with(
self._on_auth, connection, address
)
future = Future()
self._pending_connections[address] = future

def cb(response):
self._on_auth(response, connection, address, future)

try:
self._authenticate(connection).add_done_callback(cb)
except:
# We could not send authentication request
# successfully, clean up the request from the
# pending connections and set the
# result of the future appropriately.
error = sys.exc_info()
self._pending_connections.pop(address, None)
future.set_exception(error[1], error[2])

return future

def _authenticate(self, connection):
Expand All @@ -466,12 +479,13 @@ def _authenticate(self, connection):
self._invocation_service.invoke(invocation)
return invocation.future

def _on_auth(self, response, connection, address):
if response.is_success():
def _on_auth(self, response, connection, address, future):
try:
response = client_authentication_codec.decode_response(response.result())
status = response["status"]
if status == _AuthenticationStatus.AUTHENTICATED:
return self._handle_successful_auth(response, connection, address)
future.set_result(self._handle_successful_auth(response, connection, address))
return

if status == _AuthenticationStatus.CREDENTIALS_FAILED:
err = AuthenticationError(
Expand All @@ -487,13 +501,10 @@ def _on_auth(self, response, connection, address):
"Authentication status code not supported. status: %s" % status
)

connection.close("Failed to authenticate connection", err)
raise err
else:
e = response.exception()
# This will set the exception for the pending connection future
connection.close("Failed to authenticate connection", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this close calls cause self._authenticate(connection) call to raise exception. Is this right?

except Exception as e:
connection.close("Failed to authenticate connection", e)
six.reraise(e.__class__, e, response.traceback())

def _handle_successful_auth(self, response, connection, address):
self._check_partition_count(response["partition_count"])
Expand Down
2 changes: 1 addition & 1 deletion hazelcast/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def set_exception(self, exception, traceback=None):
"""Sets the exception for this Future in case of errors.

Args:
exception (Exception): Exception to be threw in case of error.
exception (BaseException): Exception to be threw in case of error.
traceback (function): Function to be called on traceback.
"""
if not isinstance(exception, BaseException):
Expand Down