diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 04189d3484..05f7c35536 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -351,7 +351,7 @@ def connection_factory(self, connection_manager, connection_id, address, network class AsyncoreConnection(Connection, asyncore.dispatcher): sent_protocol_bytes = False - read_buffer_size = _BUFFER_SIZE + receive_buffer_size = _BUFFER_SIZE def __init__(self, reactor, connection_manager, connection_id, address, config, message_callback): @@ -375,11 +375,11 @@ def __init__(self, reactor, connection_manager, connection_id, address, self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, _BUFFER_SIZE) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, _BUFFER_SIZE) - for socket_option in config.socket_options: - if socket_option.option is socket.SO_RCVBUF: - self.read_buffer_size = socket_option.value + for level, option_name, value in config.socket_options: + if option_name is socket.SO_RCVBUF: + self.receive_buffer_size = value - self.socket.setsockopt(socket_option.level, socket_option.option, socket_option.value) + self.socket.setsockopt(level, option_name, value) self.connect((address.host, address.port)) @@ -433,11 +433,12 @@ def handle_connect(self): def handle_read(self): reader = self._reader + receive_buffer_size = self.receive_buffer_size while True: - data = self.recv(self.read_buffer_size) + data = self.recv(receive_buffer_size) reader.read(data) self.last_read_time = time.time() - if len(data) < self.read_buffer_size: + if len(data) < receive_buffer_size: break if reader.length: diff --git a/tests/reactor_test.py b/tests/reactor_test.py index f9ceebb17f..38ec37df2a 100644 --- a/tests/reactor_test.py +++ b/tests/reactor_test.py @@ -8,7 +8,9 @@ from parameterized import parameterized from hazelcast import six -from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _BasicLoop +from hazelcast.config import _Config +from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _BasicLoop, \ + AsyncoreConnection from hazelcast.util import AtomicInteger from tests.base import HazelcastTestCase @@ -259,3 +261,45 @@ def test_close(self): with self.assertRaises(OSError): os.read(r_fd, 1) + + +class AsyncoreConnectionTest(HazelcastTestCase): + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc) + cls.member = cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.terminateCluster(cls.cluster.id) + cls.rc.exit() + + def test_socket_options(self): + config = _Config() + config.socket_options = [ + (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + ] + conn = AsyncoreConnection(MagicMock(map=dict()), None, None, self.member.address, config, None) + + try: + # By default this is set to 0 + self.assertEqual(1, conn.socket.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)) + finally: + conn._inner_close() + + def test_receive_buffer_size(self): + # When the SO_RCVBUF option is set, we should try + # to use that value while trying to read something. + config = _Config() + size = 64 * 1024 + config.socket_options = [ + (socket.SOL_SOCKET, socket.SO_RCVBUF, size) + ] + conn = AsyncoreConnection(MagicMock(map=dict()), None, None, self.member.address, config, None) + + try: + # By default this is set to 128000 + self.assertEqual(size, conn.receive_buffer_size) + finally: + conn._inner_close()