Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion hazelcast/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ def _get_or_connect(self, address):
translated, self._client.config,
self._invocation_service.handle_client_message)
except IOError:
return ImmediateExceptionFuture(sys.exc_info()[1], sys.exc_info()[2])
error = sys.exc_info()
return ImmediateExceptionFuture(error[1], error[2])

future = self._authenticate(connection).continue_with(self._on_auth, connection, address)
self._pending_connections[address] = future
Expand Down
10 changes: 3 additions & 7 deletions hazelcast/discovery.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import json
import logging
import ssl

from hazelcast.errors import HazelcastCertificationError
from hazelcast.core import AddressHelper
from hazelcast.six.moves import http_client

try:
import ssl
except ImportError:
ssl = None

_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -93,8 +89,8 @@ def discover_nodes(self):
context=self._ctx)
https_connection.request(method="GET", url=self._url, headers={"Accept-Charset": "UTF-8"})
https_response = https_connection.getresponse()
except ssl.SSLError as ex:
raise HazelcastCertificationError(str(ex))
except ssl.SSLError as err:
raise HazelcastCertificationError(str(err))
self._check_error(https_response)
return self._parse_response(https_response)

Expand Down
208 changes: 125 additions & 83 deletions hazelcast/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import select
import socket
import ssl
import sys
import threading
import time
Expand All @@ -13,18 +14,12 @@
from functools import total_ordering
from heapq import heappush, heappop

from hazelcast import six
from hazelcast.config import SSLProtocol
from hazelcast.connection import Connection
from hazelcast.core import Address
from hazelcast.errors import HazelcastError
from hazelcast.future import Future

try:
import ssl
except ImportError:
ssl = None

try:
import fcntl
except ImportError:
Expand All @@ -38,6 +33,23 @@

_logger = logging.getLogger(__name__)

# We should retry receiving/sending the message in case of these errors
# EAGAIN: Resource temporarily unavailable
# EWOULDBLOCK: The read/write would block
# EDEADLK: Was added before, retrying it just to make sure that
# client behaves the same on some edge cases.
# SSL_ERROR_WANT_READ/WRITE: The socket could not satisfy the
# needs of the SSL_read/write. During the negotiation process
# SSL_read/write may also want to write/read data, hence may also
# raise SSL_ERROR_WANT_WRITE/READ.
_RETRYABLE_ERROR_CODES = (
errno.EAGAIN,
errno.EWOULDBLOCK,
errno.EDEADLK,
ssl.SSL_ERROR_WANT_WRITE,
ssl.SSL_ERROR_WANT_READ
)


def _set_nonblocking(fd):
if not fcntl:
Expand Down Expand Up @@ -354,6 +366,7 @@ class AsyncoreConnection(Connection, asyncore.dispatcher):
sent_protocol_bytes = False
receive_buffer_size = _BUFFER_SIZE
send_buffer_size = _BUFFER_SIZE
_close_timer = None

def __init__(self, reactor, connection_manager, connection_id, address,
config, message_callback):
Expand All @@ -364,87 +377,44 @@ def __init__(self, reactor, connection_manager, connection_id, address,
self.connected_address = address
self._write_queue = deque()
self._write_buf = io.BytesIO()
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)

timeout = config.connection_timeout
if not timeout:
timeout = six.MAXSIZE

self.socket.settimeout(timeout)

# set tcp no delay
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# set socket buffer
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, _BUFFER_SIZE)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, _BUFFER_SIZE)

for level, option_name, value in config.socket_options:
if option_name is socket.SO_RCVBUF:
self.receive_buffer_size = value
elif option_name is socket.SO_SNDBUF:
self.send_buffer_size = value

self.socket.setsockopt(level, option_name, value)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# set the socket timeout to 0 explicitly
self.socket.settimeout(0)
self._set_socket_options(config)
if config.ssl_enabled:
self._wrap_as_ssl_socket(config)

self.connect((address.host, address.port))

if ssl and config.ssl_enabled:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)

protocol = config.ssl_protocol

# Use only the configured protocol
try:
if protocol != SSLProtocol.SSLv2:
ssl_context.options |= ssl.OP_NO_SSLv2
if protocol != SSLProtocol.SSLv3:
ssl_context.options |= ssl.OP_NO_SSLv3
if protocol != SSLProtocol.TLSv1:
ssl_context.options |= ssl.OP_NO_TLSv1
if protocol != SSLProtocol.TLSv1_1:
ssl_context.options |= ssl.OP_NO_TLSv1_1
if protocol != SSLProtocol.TLSv1_2:
ssl_context.options |= ssl.OP_NO_TLSv1_2
if protocol != SSLProtocol.TLSv1_3:
ssl_context.options |= ssl.OP_NO_TLSv1_3
except AttributeError:
pass

ssl_context.verify_mode = ssl.CERT_REQUIRED

if config.ssl_cafile:
ssl_context.load_verify_locations(config.ssl_cafile)
else:
ssl_context.load_default_certs()

if config.ssl_certfile:
ssl_context.load_cert_chain(config.ssl_certfile, config.ssl_keyfile, config.ssl_password)

if config.ssl_ciphers:
ssl_context.set_ciphers(config.ssl_ciphers)

self.socket = ssl_context.wrap_socket(self.socket)

# the socket should be non-blocking from now on
self.socket.settimeout(0)
timeout = config.connection_timeout
if timeout > 0:
self._close_timer = reactor.add_timer(timeout, self._close_timer_cb)

self.local_address = Address(*self.socket.getsockname())

self._write_queue.append(b"CP2")

def handle_connect(self):
if self._close_timer:
self._close_timer.cancel()

self.start_time = time.time()
_logger.debug("Connected to %s", self.connected_address)

def handle_read(self):
reader = self._reader
receive_buffer_size = self.receive_buffer_size
while True:
data = self.recv(receive_buffer_size)
reader.read(data)
self.last_read_time = time.time()
if len(data) < receive_buffer_size:
break
try:
while True:
data = self.recv(receive_buffer_size)
reader.read(data)
self.last_read_time = time.time()
if len(data) < receive_buffer_size:
break
except socket.error as err:
if err.args[0] not in _RETRYABLE_ERROR_CODES:
# Other error codes are fatal, should close the connection
self.close(None, err)

if reader.length:
reader.process()
Expand Down Expand Up @@ -476,25 +446,34 @@ def handle_write(self):
bytes_ = buf.getvalue()
buf.truncate(0)

sent = self.send(bytes_)
self.last_write_time = time.time()
self.sent_protocol_bytes = True
try:
sent = self.send(bytes_)
except socket.error as err:
if err.args[0] in _RETRYABLE_ERROR_CODES:
# Couldn't write the bytes but we should
# retry it.
self._write_queue.appendleft(bytes_)
else:
# Other error codes are fatal, should close the connection
self.close(None, err)
else:
# No exception is thrown during the send
self.last_write_time = time.time()
self.sent_protocol_bytes = True

if sent < len(bytes_):
write_queue.appendleft(bytes_[sent:])
if sent < len(bytes_):
write_queue.appendleft(bytes_[sent:])

def handle_close(self):
_logger.warning("Connection closed by server")
self.close(None, IOError("Connection closed by server"))

def handle_error(self):
# We handle retryable error codes inside the
# handle_read/write. Anything else should be fatal.
error = sys.exc_info()[1]
if sys.exc_info()[0] is socket.error:
if error.errno != errno.EAGAIN and error.errno != errno.EDEADLK:
_logger.exception("Received error")
self.close(None, IOError(error))
else:
_logger.exception("Received unexpected error: %s", error)
_logger.exception("Received error")
self.close(None, error)

def readable(self):
return self.live and self.sent_protocol_bytes
Expand All @@ -507,9 +486,72 @@ def writable(self):
return len(self._write_queue) > 0

def _inner_close(self):
if self._close_timer:
# It might be the case that connection
# is closed before the timer. If we are
# closing via the timer, this call has
# no effects.
self._close_timer.cancel()

asyncore.dispatcher.close(self)
self._write_buf.close()

def _close_timer_cb(self):
if not self.connected:
self.close(None, IOError("Connection timed out"))

def _set_socket_options(self, config):
# set tcp no delay
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# set socket buffer
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, _BUFFER_SIZE)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, _BUFFER_SIZE)

for level, option_name, value in config.socket_options:
if option_name is socket.SO_RCVBUF:
self.receive_buffer_size = value
elif option_name is socket.SO_SNDBUF:
self.send_buffer_size = value

self.socket.setsockopt(level, option_name, value)

def _wrap_as_ssl_socket(self, config):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)

protocol = config.ssl_protocol

# Use only the configured protocol
try:
if protocol != SSLProtocol.SSLv2:
ssl_context.options |= ssl.OP_NO_SSLv2
if protocol != SSLProtocol.SSLv3:
ssl_context.options |= ssl.OP_NO_SSLv3
if protocol != SSLProtocol.TLSv1:
ssl_context.options |= ssl.OP_NO_TLSv1
if protocol != SSLProtocol.TLSv1_1:
ssl_context.options |= ssl.OP_NO_TLSv1_1
if protocol != SSLProtocol.TLSv1_2:
ssl_context.options |= ssl.OP_NO_TLSv1_2
if protocol != SSLProtocol.TLSv1_3:
ssl_context.options |= ssl.OP_NO_TLSv1_3
except AttributeError:
pass

ssl_context.verify_mode = ssl.CERT_REQUIRED

if config.ssl_cafile:
ssl_context.load_verify_locations(config.ssl_cafile)
else:
ssl_context.load_default_certs()

if config.ssl_certfile:
ssl_context.load_cert_chain(config.ssl_certfile, config.ssl_keyfile, config.ssl_password)

if config.ssl_ciphers:
ssl_context.set_ciphers(config.ssl_ciphers)

self.socket = ssl_context.wrap_socket(self.socket)

def __repr__(self):
return "Connection(id=%s, live=%s, remote_address=%s)" % (self._id, self.live, self.remote_address)

Expand Down
13 changes: 13 additions & 0 deletions tests/reactor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from hazelcast import six
from hazelcast.config import _Config
from hazelcast.core import Address
from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _BasicLoop, \
AsyncoreConnection
from hazelcast.util import AtomicInteger
Expand Down Expand Up @@ -319,3 +320,15 @@ def test_send_buffer_size(self):
self.assertEqual(size, conn.send_buffer_size)
finally:
conn._inner_close()

def test_constructor_with_unreachable_addresses(self):
addr = Address("192.168.0.1", 5701)
config = _Config()
start = time.time()
conn = AsyncoreConnection(MagicMock(map=dict()), MagicMock(), None, addr, config, None)
try:
# Server is unreachable, but this call should return
# before connection timeout
self.assertLess(time.time() - start, config.connection_timeout)
finally:
conn.close(None, None)
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.