From a0fcff904671beb8513ed1e4561a7c63f0d004c2 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 4 Nov 2020 14:58:36 +0300 Subject: [PATCH 01/15] Make writes non-blocking Formerly, writes were checking the size of the write queue, and if there are no items in it, were writing directly to the socket. That is against the nature of the non-blocking API we provide. Also, it restains us from optimizing the writes for non-blocking usage (multiple client messages can be concatenated into single buffer before writing to the socket to decrease the number of send calls). To solve that problem, connections are adding buffers into the write queue. Reactor thread pops elements from that queue and writes into the socket. (The optimization I mentioned above can be implemented later). Of course, this approach will kill the performance of blocking usage since the reactor thread will be in sleep most of the time on asyncore.loop call. We need to wake the reactor thread each time we want to write something if it is not awake. To do this, a pipe is added to the reactor's dispatchers map. Each time we want to write something, we check whether or not it is awake and if not, we write a single byte into the pipe. That wakes the reactor thread and makes it process the message we added to its write queue. The pipe works great on UNIX, but pipes are not selectable or pollable on Windows. We fall back into sockets in case the client is running on Windows. On the reactor, we try to create the loop, check if the added waker (pipe or socket) works or not. If it does, we use the wakable loop. If not, we fall back to the loop that does busy waiting. --- hazelcast/reactor.py | 286 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 241 insertions(+), 45 deletions(-) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index b2faf652c7..61eb989058 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -1,6 +1,7 @@ import asyncore import errno import logging +import os import select import socket import sys @@ -23,17 +24,128 @@ except ImportError: ssl = None +try: + import fcntl +except ImportError: + fcntl = None + _logger = logging.getLogger(__name__) -class AsyncoreReactor(object): - _thread = None - _is_live = False +def _set_nonblocking(fd): + if not fcntl: + return - def __init__(self): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + +class _FileWrapper(object): + def __init__(self, fd): + self._fd = fd + + def fileno(self): + return self._fd + + def close(self): + os.close(self._fd) + + def getsockopt(self, level, optname, buflen=None): + if level == socket.SOL_SOCKET and optname == socket.SO_ERROR and not buflen: + return 0 + raise NotImplementedError("Only asyncore specific behaviour is implemented.") + + +class _AbstractWaker(asyncore.dispatcher): + def __init__(self, map): + asyncore.dispatcher.__init__(self, map=map) + self.awake = False + + def writable(self): + return False + + def wake(self): + raise NotImplementedError("wake") + + +class _PipedWaker(_AbstractWaker): + def __init__(self, map): + _AbstractWaker.__init__(self, map) + self._read_fd, self._write_fd = os.pipe() + self.set_socket(_FileWrapper(self._read_fd)) + _set_nonblocking(self._read_fd) + _set_nonblocking(self._write_fd) + + def wake(self): + if not self.awake: + self.awake = True + try: + os.write(self._write_fd, b"x") + except (IOError, ValueError): + pass + + def handle_read(self): + try: + while len(os.read(self._read_fd, 4096)) == 4096: + pass + except IOError: + pass + self.awake = False + + def close(self): + _AbstractWaker.close(self) + os.close(self._write_fd) + + +class _SocketedWaker(_AbstractWaker): + def __init__(self, map): + _AbstractWaker.__init__(self, map) + self._writer = socket.socket() + self._writer.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + a = socket.socket() + a.bind(("127.0.0.1", 0)) + a.listen(1) + addr = a.getsockname() + + try: + self._writer.connect(addr) + self._reader, _ = a.accept() + finally: + a.close() + + self.set_socket(self._reader) + self._writer.settimeout(0) + self._reader.settimeout(0) + + def wake(self): + if not self.awake: + self.awake = True + try: + self._writer.send(b"x") + except (IOError, socket.error, ValueError): + pass + + def handle_read(self): + try: + while len(self._reader.recv(4096)) == 4096: + pass + except (IOError, socket.error): + pass + self.awake = False + + def close(self): + _AbstractWaker.close(self) + self._writer.close() + + +class _AbstractLoop(object): + def __init__(self, map): + self._map = map self._timers = [] # Accessed only from the reactor thread self._new_timers = deque() # Popped only from the reactor thread - self._map = {} + self._is_live = False + self._thread = None def start(self): self._is_live = True @@ -46,9 +158,9 @@ def _loop(self): Future._threading_locals.is_reactor_thread = True while self._is_live: try: - asyncore.loop(count=1, timeout=0.01, map=self._map) + self.run_loop() self._check_timers() - except select.error as err: + except select.error: # TODO: parse error type to catch only error "9" _logger.warning("Connection closed by server") pass @@ -59,6 +171,11 @@ def _loop(self): _logger.debug("Reactor Thread exited") self._cleanup_all_timers() + def add_timer(self, delay, callback): + timer = Timer(delay + time.time(), callback) + self._new_timers.append((timer.end, timer)) + return timer + def _check_timers(self): timers = self._timers @@ -83,13 +200,53 @@ def _check_timers(self): # timers in the heap. return - def add_timer_absolute(self, timeout, callback): - timer = Timer(timeout, callback) - self._new_timers.append((timer.end, timer)) - return timer + def _cleanup_all_timers(self): + timers = self._timers + new_timers = self._new_timers + + while timers: + _, timer = timers.pop() + timer.timer_ended_cb() + + # Although it is not the case with the current code base, + # the timers ended above may add new timers. So, the order + # is important. + while new_timers: + _, timer = new_timers.popleft() + timer.timer_ended_cb() + + def check_loop(self): + raise NotImplementedError("check_loop") + + def run_loop(self): + raise NotImplementedError("run_loop") + + def wake_loop(self): + raise NotImplementedError("wake_loop") + + def shutdown(self): + raise NotImplementedError("shutdown") - def add_timer(self, delay, callback): - return self.add_timer_absolute(delay + time.time(), callback) + +class _WakeableLoop(_AbstractLoop): + _waker_class = _PipedWaker if os.name != 'nt' else _SocketedWaker + + def __init__(self, map): + _AbstractLoop.__init__(self, map) + self.waker = _WakeableLoop._waker_class(map) + + def check_loop(self): + assert not self.waker.awake + self.wake_loop() + assert self.waker.awake + self.run_loop() + assert not self.waker.awake + + def run_loop(self): + asyncore.loop(timeout=0.1, use_poll=True, map=self._map, count=1) + + def wake_loop(self): + self.waker.wake() def shutdown(self): if not self._is_live: @@ -101,6 +258,9 @@ def shutdown(self): self._thread.join() for connection in list(self._map.values()): + if connection is self.waker: + continue + try: connection.close(None, HazelcastError("Client is shutting down")) except OSError as connection: @@ -108,26 +268,71 @@ def shutdown(self): pass else: raise + + self.waker.close() self._map.clear() - def connection_factory(self, connection_manager, connection_id, address, network_config, message_callback): - return AsyncoreConnection(self._map, connection_manager, connection_id, - address, network_config, message_callback) - def _cleanup_all_timers(self): - timers = self._timers - new_timers = self._new_timers +class _BusyWaitLoop(_AbstractLoop): + def check_loop(self): + pass - while timers: - _, timer = timers.pop() - timer.timer_ended_cb() + def run_loop(self): + if not self._map: + time.sleep(0.005) + asyncore.loop(timeout=0.01, use_poll=True, map=self._map, count=1) - # Although it is not the case with the current code base, - # the timers ended above may add new timers. So, the order - # is important. - while new_timers: - _, timer = new_timers.popleft() - timer.timer_ended_cb() + def wake_loop(self): + pass + + def shutdown(self): + if not self._is_live: + return + + self._is_live = False + + if self._thread is not threading.current_thread(): + self._thread.join() + + for connection in list(self._map.values()): + try: + connection.close(None, HazelcastError("Client is shutting down")) + except OSError as connection: + if connection.args[0] == socket.EBADF: + pass + else: + raise + + self._map.clear() + + +class AsyncoreReactor(object): + def __init__(self): + self.map = {} + loop = None + try: + loop = _WakeableLoop(self.map) + loop.check_loop() + except: + if loop: + loop.shutdown() + loop = _BusyWaitLoop(self.map) + self._loop = loop + + def start(self): + self._loop.start() + + def add_timer(self, delay, callback): + return self._loop.add_timer(delay, callback) + + def wake_loop(self): + self._loop.wake_loop() + + def shutdown(self): + self._loop.shutdown() + + def connection_factory(self, connection_manager, connection_id, address, network_config, message_callback): + return AsyncoreConnection(self, connection_manager, connection_id, address, network_config, message_callback) _BUFFER_SIZE = 128000 @@ -137,13 +342,13 @@ class AsyncoreConnection(Connection, asyncore.dispatcher): sent_protocol_bytes = False read_buffer_size = _BUFFER_SIZE - def __init__(self, dispatcher_map, connection_manager, connection_id, address, + def __init__(self, reactor, connection_manager, connection_id, address, config, message_callback): - asyncore.dispatcher.__init__(self, map=dispatcher_map) + asyncore.dispatcher.__init__(self, map=reactor.map) Connection.__init__(self, connection_manager, connection_id, message_callback) - self.connected_address = address - self._write_lock = threading.Lock() + self._reactor = reactor + self.connected_address = address self._write_queue = deque() self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -228,11 +433,12 @@ def handle_read(self): reader.process() def handle_write(self): - with self._write_lock: + while True: try: data = self._write_queue.popleft() except IndexError: return + sent = self.send(data) self.last_write_time = time.time() self.sent_protocol_bytes = True @@ -256,18 +462,8 @@ def readable(self): return self.live and self.sent_protocol_bytes def _write(self, buf): - # if write queue is empty, send the data right away, otherwise add to queue - if len(self._write_queue) == 0 and self._write_lock.acquire(False): - try: - sent = self.send(buf) - self.last_write_time = time.time() - if sent < len(buf): - _logger.info("Adding to queue") - self._write_queue.appendleft(buf[sent:]) - finally: - self._write_lock.release() - else: - self._write_queue.append(buf) + self._write_queue.append(buf) + self._reactor.wake_loop() def writable(self): return len(self._write_queue) > 0 From c6685c023a9f087f9c79269f33f4834c5eb1f1a8 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 10 Nov 2020 15:14:40 +0300 Subject: [PATCH 02/15] Return early from the handle_write if no data is written It may be the case that client is disconnected while we are in the loop, an no data is sent. In that case, we were adding the data to the write queue again, and try to pop from it within the same loop. That was causing an infinite loop. An early return is added to solve this problem. When disconnected, dispatcher will be closed, and handle_write won't be called again by the asyncore. --- hazelcast/reactor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 61eb989058..7b0bc01898 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -445,6 +445,9 @@ def handle_write(self): if sent < len(data): self._write_queue.appendleft(data[sent:]) + if sent == 0: + return + def handle_close(self): _logger.warning("Connection closed by server") self.close(None, IOError("Connection closed by server")) From 980a5268809d71efb8a472d4be9c34e687c46e02 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 11 Nov 2020 15:16:58 +0300 Subject: [PATCH 03/15] add tests --- hazelcast/reactor.py | 12 +- tests/reactor_test.py | 258 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 6 deletions(-) create mode 100644 tests/reactor_test.py diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 7b0bc01898..a032a107e7 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -40,7 +40,7 @@ def _set_nonblocking(fd): fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) -class _FileWrapper(object): +class _SocketAdapter(object): def __init__(self, fd): self._fd = fd @@ -72,7 +72,7 @@ class _PipedWaker(_AbstractWaker): def __init__(self, map): _AbstractWaker.__init__(self, map) self._read_fd, self._write_fd = os.pipe() - self.set_socket(_FileWrapper(self._read_fd)) + self.set_socket(_SocketAdapter(self._read_fd)) _set_nonblocking(self._read_fd) _set_nonblocking(self._write_fd) @@ -88,12 +88,12 @@ def handle_read(self): try: while len(os.read(self._read_fd, 4096)) == 4096: pass - except IOError: + except (IOError, OSError): pass self.awake = False def close(self): - _AbstractWaker.close(self) + _AbstractWaker.close(self) # Will close the reader os.close(self._write_fd) @@ -135,7 +135,7 @@ def handle_read(self): self.awake = False def close(self): - _AbstractWaker.close(self) + _AbstractWaker.close(self) # Will close the reader self._writer.close() @@ -233,7 +233,7 @@ class _WakeableLoop(_AbstractLoop): def __init__(self, map): _AbstractLoop.__init__(self, map) - self.waker = _WakeableLoop._waker_class(map) + self.waker = self._waker_class(map) def check_loop(self): assert not self.waker.awake diff --git a/tests/reactor_test.py b/tests/reactor_test.py new file mode 100644 index 0000000000..a3441a9b43 --- /dev/null +++ b/tests/reactor_test.py @@ -0,0 +1,258 @@ +import os +import socket +import threading +import time +from collections import OrderedDict + +from mock import MagicMock +from parameterized import parameterized + +from hazelcast import six +from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _BusyWaitLoop +from hazelcast.util import AtomicInteger +from tests.base import HazelcastTestCase + + +class ReactorTest(HazelcastTestCase): + def test_default_loop_is_wakeable(self): + reactor = AsyncoreReactor() + self.assertIsInstance(reactor._loop, _WakeableLoop) + + def test_reactor_lifetime(self): + t_count = threading.active_count() + reactor = AsyncoreReactor() + reactor.start() + try: + self.assertEqual(t_count + 1, threading.active_count()) # reactor thread + finally: + reactor.shutdown() + self.assertEqual(t_count, threading.active_count()) + + +LOOP_CLASSES = [ + ("wakeable", _WakeableLoop,), + ("busy_wait", _BusyWaitLoop,), +] + + +class LoopTest(HazelcastTestCase): + def test_wakeable_loop_default_waker(self): + loop = _WakeableLoop({}) + try: + if os.name == "nt": + self.assertIsInstance(loop.waker, _SocketedWaker) + else: + self.assertIsInstance(loop.waker, _PipedWaker) + finally: + loop.waker.close() + + def test_wakeable_loop_waker_closes_last(self): + dispatchers = OrderedDict() + loop = _WakeableLoop(dispatchers) # Waker comes first in the dict, + + mock_dispatcher = MagicMock(readable=lambda: False, writeable=lambda: False) + dispatchers[loop.waker._fileno + 1] = mock_dispatcher + + original_close = loop.waker.close + + def assertion(): + mock_dispatcher.close.assert_called() + original_close() + + loop.waker.close = assertion + + loop.shutdown() + + @parameterized.expand(LOOP_CLASSES) + def test_check_loop(self, _, cls): + loop = cls({}) + t_count = threading.active_count() + loop.start() + try: + loop.check_loop() + self.assertEqual(t_count + 1, threading.active_count()) + finally: + loop.shutdown() + self.assertEqual(t_count, threading.active_count()) + + @parameterized.expand(LOOP_CLASSES) + def test_add_timer(self, _, cls): + call_count = AtomicInteger() + + def callback(): + call_count.add(1) + + loop = cls({}) + loop.start() + loop.add_timer(0, callback) # already expired, should be run immediately + + def assertion(): + self.assertEqual(1, call_count.get()) + + try: + self.assertTrueEventually(assertion) + finally: + loop.shutdown() + + @parameterized.expand(LOOP_CLASSES) + def test_timer_cleanup(self, _, cls): + call_count = AtomicInteger() + + def callback(): + call_count.add(1) + + loop = cls({}) + loop.start() + loop.add_timer(float('inf'), callback) # never expired, must be cleaned up + time.sleep(1) + try: + self.assertEqual(0, call_count.get()) + finally: + loop.shutdown() + + def assertion(): + self.assertEqual(1, call_count.get()) + + self.assertTrueEventually(assertion) + + @parameterized.expand(LOOP_CLASSES) + def test_timer_that_adds_another_timer(self, _, cls): + loop = cls({}) + loop.start() + + call_count = AtomicInteger() + + def callback(): + if call_count.get() == 0: + loop.add_timer(0, callback) + call_count.add(1) + + loop.add_timer(float('inf'), callback) + + loop.shutdown() + + def assertion(): + self.assertEqual(2, call_count.get()) # newly added timer must also be cleaned up + + self.assertTrueEventually(assertion) + + @parameterized.expand(LOOP_CLASSES) + def test_timer_that_shutdowns_loop(self, _, cls): + # It may be the case that, we want to shutdown the client(hence, the loop) in timers + loop = cls({}) + loop.start() + + loop.add_timer(0, lambda: loop.shutdown()) + + def assertion(): + self.assertFalse(loop._is_live) + + try: + self.assertTrueEventually(assertion) + finally: + loop.shutdown() # Should be no op + + +class SocketedWakerTest(HazelcastTestCase): + def setUp(self): + self.waker = _SocketedWaker({}) + + def tearDown(self): + try: + self.waker.close() + except: + pass + + def test_wake(self): + waker = self.waker + self.assertFalse(waker.awake) + waker.wake() + self.assertTrue(waker.awake) + self.assertEqual(b"x", waker._reader.recv(1)) + + def test_wake_while_awake(self): + waker = self.waker + waker.wake() + waker.wake() + self.assertTrue(waker.awake) + self.assertEqual(b"x", waker._reader.recv(2)) # only the first one should write + + def test_handle_read(self): + waker = self.waker + waker.wake() + self.assertTrue(waker.awake) + waker.handle_read() + self.assertFalse(waker.awake) + + with self.assertRaises((IOError, socket.error)): # BlockingIOError on Py3, socket.error on Py2 + waker._reader.recv(1) # handle_read should consume the socket, there should be nothing + + def test_close(self): + waker = self.waker + writer = waker._writer + reader = waker._reader + self.assertNotEqual(-1, writer.fileno()) + self.assertNotEqual(-1, reader.fileno()) + + waker.close() + + if six.PY3: + self.assertEqual(-1, writer.fileno()) + self.assertEqual(-1, reader.fileno()) + else: + # Closed sockets raise socket.error with EBADF error code in Python2 + with self.assertRaises(socket.error): + writer.fileno() + + with self.assertRaises(socket.error): + reader.fileno() + + +class PipedWakerTest(HazelcastTestCase): + def setUp(self): + self.waker = _PipedWaker({}) + + def tearDown(self): + try: + self.waker.close() + except: + pass + + def test_wake(self): + waker = self.waker + self.assertFalse(waker.awake) + waker.wake() + self.assertTrue(waker.awake) + self.assertEqual(b"x", os.read(waker._read_fd, 1)) + + def test_wake_while_awake(self): + waker = self.waker + waker.wake() + waker.wake() + self.assertTrue(waker.awake) + self.assertEqual(b"x", os.read(waker._read_fd, 1)) # only the first one should write + + def test_handle_read(self): + waker = self.waker + waker.wake() + self.assertTrue(waker.awake) + waker.handle_read() + self.assertFalse(waker.awake) + + with self.assertRaises((IOError, OSError)): # BlockingIOError on Py3, OSError on Py2 + os.read(waker._read_fd, 1) # handle_read should consume the pipe, there should be nothing + + def test_close(self): + waker = self.waker + w_fd = waker._write_fd + r_fd = waker._read_fd + self.assertEqual(1, os.write(w_fd, b"x")) + self.assertEqual(b"x", os.read(r_fd, 1)) + + waker.close() + + with self.assertRaises(OSError): + os.write(w_fd, b"x") + + with self.assertRaises(OSError): + os.read(r_fd, 1) From 84508388d1bad518988af5b2199e441a941619d4 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 11 Nov 2020 16:08:54 +0300 Subject: [PATCH 04/15] fix the tests for Windows --- tests/reactor_test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/reactor_test.py b/tests/reactor_test.py index a3441a9b43..a19db37984 100644 --- a/tests/reactor_test.py +++ b/tests/reactor_test.py @@ -230,7 +230,7 @@ def test_wake_while_awake(self): waker.wake() waker.wake() self.assertTrue(waker.awake) - self.assertEqual(b"x", os.read(waker._read_fd, 1)) # only the first one should write + self.assertEqual(b"x", os.read(waker._read_fd, 2)) # only the first one should write def test_handle_read(self): waker = self.waker @@ -239,6 +239,9 @@ def test_handle_read(self): waker.handle_read() self.assertFalse(waker.awake) + if os.name == "nt": + return # pipes are not non-blocking on Windows, assertion below blocks forever on Windows + with self.assertRaises((IOError, OSError)): # BlockingIOError on Py3, OSError on Py2 os.read(waker._read_fd, 1) # handle_read should consume the pipe, there should be nothing From 2d08eff6af7639602983fcf050d8f232cb4fc7f4 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 11 Nov 2020 17:51:38 +0300 Subject: [PATCH 05/15] set awake to False before reading --- hazelcast/reactor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index a032a107e7..55b5eddf7f 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -85,12 +85,12 @@ def wake(self): pass def handle_read(self): + self.awake = False try: while len(os.read(self._read_fd, 4096)) == 4096: pass except (IOError, OSError): pass - self.awake = False def close(self): _AbstractWaker.close(self) # Will close the reader @@ -127,12 +127,12 @@ def wake(self): pass def handle_read(self): + self.awake = False try: while len(self._reader.recv(4096)) == 4096: pass except (IOError, socket.error): pass - self.awake = False def close(self): _AbstractWaker.close(self) # Will close the reader From f66ef41523090b8173700e3c7e660f3d6a3a7256 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 11 Nov 2020 17:51:51 +0300 Subject: [PATCH 06/15] remove unnecessary sleep --- tests/reconnect_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/reconnect_test.py b/tests/reconnect_test.py index 08b25031f2..4a73157b0e 100644 --- a/tests/reconnect_test.py +++ b/tests/reconnect_test.py @@ -1,6 +1,5 @@ import time from threading import Thread -from time import sleep from hazelcast.errors import HazelcastError, TargetDisconnectedError from hazelcast.lifecycle import LifecycleState @@ -76,7 +75,6 @@ def test_listener_re_register(self): reg_id = map.add_entry_listener(added_func=collector) self.logger.info("Registered listener with id %s", reg_id) member.shutdown() - sleep(3) self.cluster.start_member() count = AtomicInteger() From 97bbb6ef0c96d74258cc4712d52e0463c2c40d8f Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 16 Nov 2020 17:53:26 +0300 Subject: [PATCH 07/15] synchronize access to connection futures --- hazelcast/connection.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 6822fb1598..6cec3583fc 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -156,16 +156,19 @@ def shutdown(self): self._connect_all_members_timer.cancel() self._heartbeat_manager.shutdown() - for connection_future in six.itervalues(self._pending_connections): - connection_future.set_exception(HazelcastClientNotActiveError("Hazelcast client is shutting down")) - # Need to create copy of connection values to avoid modification errors on runtime - for connection in list(six.itervalues(self.active_connections)): - connection.close("Hazelcast client is shutting down", None) + with self._lock: + for connection_future in six.itervalues(self._pending_connections): + connection_future.set_exception(HazelcastClientNotActiveError("Hazelcast client is shutting down")) - self._connection_listeners = [] - self.active_connections.clear() - self._pending_connections.clear() + # Need to create copy of connection values to avoid modification errors on runtime + for connection in list(six.itervalues(self.active_connections)): + connection.close("Hazelcast client is shutting down", None) + + self.active_connections.clear() + self._pending_connections.clear() + + del self._connection_listeners[:] def connect_to_all_cluster_members(self): if not self._smart_routing_enabled: @@ -395,8 +398,8 @@ def _on_auth(self, response, connection, address): raise err else: e = response.exception() + # This will set the exception for the pending connection future connection.close("Failed to authenticate connection", e) - self._pending_connections.pop(address, None) six.reraise(e.__class__, e, response.traceback()) def _handle_successful_auth(self, response, connection, address): From 57e7f4c792321726fb4a5ec8747022c03c69df09 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Mon, 16 Nov 2020 18:07:21 +0300 Subject: [PATCH 08/15] update hazelcast version --- run-tests.ps1 | 2 +- run-tests.sh | 2 +- start-rc.sh | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/run-tests.ps1 b/run-tests.ps1 index 5b796d6b57..c8cac5380a 100644 --- a/run-tests.ps1 +++ b/run-tests.ps1 @@ -1,4 +1,4 @@ -$serverVersion = "4.1-SNAPSHOT" +$serverVersion = "4.1" $hazelcastTestVersion=$serverVersion $hazelcastEnterpriseTestVersion=$serverVersion diff --git a/run-tests.sh b/run-tests.sh index 244e6f4e85..1ad58b34e6 100644 --- a/run-tests.sh +++ b/run-tests.sh @@ -20,7 +20,7 @@ else USER="" fi -HZ_VERSION="4.1-SNAPSHOT" +HZ_VERSION="4.1" HAZELCAST_TEST_VERSION=${HZ_VERSION} HAZELCAST_ENTERPRISE_TEST_VERSION=${HZ_VERSION} diff --git a/start-rc.sh b/start-rc.sh index 02998f7654..368b0116a6 100644 --- a/start-rc.sh +++ b/start-rc.sh @@ -20,7 +20,7 @@ else USER="" fi -HZ_VERSION="4.1-SNAPSHOT" +HZ_VERSION="4.1" HAZELCAST_TEST_VERSION=${HZ_VERSION} HAZELCAST_ENTERPRISE_TEST_VERSION=${HZ_VERSION} From 76cd191255707dbb4e1b475e77e2ea5e8ffbc755 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 17 Nov 2020 12:26:43 +0300 Subject: [PATCH 09/15] avoid concurrent modification errors on active_connection dict --- hazelcast/connection.py | 25 +++++++++++++++---------- hazelcast/listener.py | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 6cec3583fc..350ec44b7a 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -75,7 +75,7 @@ def __init__(self, client, reactor, address_provider, lifecycle_service, partition_service, cluster_service, invocation_service, near_cache_manager): self.live = False - self.active_connections = dict() + self.active_connections = dict() # uuid to connection, must be modified under the _lock self.client_uuid = uuid.uuid4() self._client = client @@ -95,7 +95,8 @@ def __init__(self, client, reactor, address_provider, lifecycle_service, self._connect_all_members_timer = None self._async_start = config.async_start self._connect_to_cluster_thread_running = False - self._pending_connections = dict() + self._pending_connections = dict() # must be modified under the _lock + self._active_connections = dict() # address to connection, must be modified under the _lock self._shuffle_member_list = config.shuffle_member_list self._lock = threading.RLock() self._connection_id_generator = AtomicInteger() @@ -118,10 +119,7 @@ def get_connection(self, member_uuid): return self.active_connections.get(member_uuid, None) def get_connection_from_address(self, address): - for connection in six.itervalues(self.active_connections): - if address == connection.remote_address: - return connection - return None + return self._active_connections.get(address, None) def get_random_connection(self): if self._smart_routing_enabled: @@ -131,7 +129,9 @@ def get_random_connection(self): if connection: return connection - for connection in six.itervalues(self.active_connections): + # We should not get to this point under normal circumstances. + # Therefore, copying the list should be OK. + for connection in list(six.itervalues(self.active_connections)): return connection return None @@ -166,6 +166,7 @@ def shutdown(self): connection.close("Hazelcast client is shutting down", None) self.active_connections.clear() + self._active_connections.clear() self._pending_connections.clear() del self._connection_listeners[:] @@ -183,6 +184,7 @@ def connect_to_all_cluster_members(self): def on_connection_close(self, closed_connection, cause): connected_address = closed_connection.connected_address remote_uuid = closed_connection.remote_uuid + remote_address = closed_connection.remote_address if not connected_address: _logger.debug("Destroying %s, but it has no remote address, hence nothing is " @@ -191,6 +193,7 @@ def on_connection_close(self, closed_connection, cause): with self._lock: pending = self._pending_connections.pop(connected_address, None) connection = self.active_connections.pop(remote_uuid, None) + self._active_connections.pop(remote_address, None) if pending: pending.set_exception(cause) @@ -423,7 +426,8 @@ def _handle_successful_auth(self, response, connection, address): self._on_cluster_restart() with self._lock: - self.active_connections[response["member_uuid"]] = connection + self.active_connections[remote_uuid] = connection + self._active_connections[remote_address] = connection self._pending_connections.pop(address, None) if is_initial_connection: @@ -497,11 +501,12 @@ def start(self): """Starts sending periodic HeartBeat operations.""" def _heartbeat(): - if not self._connection_manager.live: + conn_manager = self._connection_manager + if not conn_manager.live: return now = time.time() - for connection in list(self._connection_manager.active_connections.values()): + for connection in list(six.itervalues(conn_manager.active_connections)): self._check_connection(now, connection) self._heartbeat_timer = self._reactor.add_timer(self._heartbeat_interval, _heartbeat) diff --git a/hazelcast/listener.py b/hazelcast/listener.py index 6e10418d5a..7575ff573f 100644 --- a/hazelcast/listener.py +++ b/hazelcast/listener.py @@ -53,7 +53,7 @@ def register_listener(self, registration_request, decode_register_response, enco self._active_registrations[registration_id] = registration futures = [] - for connection in six.itervalues(self._connection_manager.active_connections): + for connection in list(six.itervalues(self._connection_manager.active_connections)): future = self._register_on_connection_async(registration_id, registration, connection) futures.append(future) From f65b564ddd1001d1c752cad0fadac18424d0fdaa Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 17 Nov 2020 18:14:40 +0300 Subject: [PATCH 10/15] address review comments --- hazelcast/connection.py | 10 +++++----- hazelcast/reactor.py | 8 ++++++-- tests/reactor_test.py | 8 ++++---- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index 350ec44b7a..4f408167bb 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -96,7 +96,7 @@ def __init__(self, client, reactor, address_provider, lifecycle_service, self._async_start = config.async_start self._connect_to_cluster_thread_running = False self._pending_connections = dict() # must be modified under the _lock - self._active_connections = dict() # address to connection, must be modified under the _lock + self._addresses_to_connections = dict() # address to connection, must be modified under the _lock self._shuffle_member_list = config.shuffle_member_list self._lock = threading.RLock() self._connection_id_generator = AtomicInteger() @@ -119,7 +119,7 @@ def get_connection(self, member_uuid): return self.active_connections.get(member_uuid, None) def get_connection_from_address(self, address): - return self._active_connections.get(address, None) + return self._addresses_to_connections.get(address, None) def get_random_connection(self): if self._smart_routing_enabled: @@ -166,7 +166,7 @@ def shutdown(self): connection.close("Hazelcast client is shutting down", None) self.active_connections.clear() - self._active_connections.clear() + self._addresses_to_connections.clear() self._pending_connections.clear() del self._connection_listeners[:] @@ -193,7 +193,7 @@ def on_connection_close(self, closed_connection, cause): with self._lock: pending = self._pending_connections.pop(connected_address, None) connection = self.active_connections.pop(remote_uuid, None) - self._active_connections.pop(remote_address, None) + self._addresses_to_connections.pop(remote_address, None) if pending: pending.set_exception(cause) @@ -427,7 +427,7 @@ def _handle_successful_auth(self, response, connection, address): with self._lock: self.active_connections[remote_uuid] = connection - self._active_connections[remote_address] = connection + self._addresses_to_connections[remote_address] = connection self._pending_connections.pop(address, None) if is_initial_connection: diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 55b5eddf7f..f4557a39db 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -273,7 +273,7 @@ def shutdown(self): self._map.clear() -class _BusyWaitLoop(_AbstractLoop): +class _DormantLoop(_AbstractLoop): def check_loop(self): pass @@ -314,9 +314,13 @@ def __init__(self): loop = _WakeableLoop(self.map) loop.check_loop() except: + _logger.exception("Failed to initialize the wakeable loop. " + "Using the dormant loop instead. " + "When used in the blocking mode, client" + "may have sub-optimal performance.") if loop: loop.shutdown() - loop = _BusyWaitLoop(self.map) + loop = _DormantLoop(self.map) self._loop = loop def start(self): diff --git a/tests/reactor_test.py b/tests/reactor_test.py index a19db37984..e667f5e266 100644 --- a/tests/reactor_test.py +++ b/tests/reactor_test.py @@ -8,7 +8,7 @@ from parameterized import parameterized from hazelcast import six -from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _BusyWaitLoop +from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _DormantLoop from hazelcast.util import AtomicInteger from tests.base import HazelcastTestCase @@ -31,7 +31,7 @@ def test_reactor_lifetime(self): LOOP_CLASSES = [ ("wakeable", _WakeableLoop,), - ("busy_wait", _BusyWaitLoop,), + ("busy_wait", _DormantLoop,), ] @@ -48,7 +48,7 @@ def test_wakeable_loop_default_waker(self): def test_wakeable_loop_waker_closes_last(self): dispatchers = OrderedDict() - loop = _WakeableLoop(dispatchers) # Waker comes first in the dict, + loop = _WakeableLoop(dispatchers) # Waker comes first in the dict mock_dispatcher = MagicMock(readable=lambda: False, writeable=lambda: False) dispatchers[loop.waker._fileno + 1] = mock_dispatcher @@ -137,7 +137,7 @@ def assertion(): self.assertTrueEventually(assertion) @parameterized.expand(LOOP_CLASSES) - def test_timer_that_shutdowns_loop(self, _, cls): + def test_timer_that_shuts_down_loop(self, _, cls): # It may be the case that, we want to shutdown the client(hence, the loop) in timers loop = cls({}) loop.start() From 377bdf96290cc5a562c9b34ee8f776f8e86d1884 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 18 Nov 2020 10:14:08 +0300 Subject: [PATCH 11/15] address review comments --- hazelcast/reactor.py | 12 +++++------- tests/reactor_test.py | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index f4557a39db..a25d3cb73a 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -243,7 +243,7 @@ def check_loop(self): assert not self.waker.awake def run_loop(self): - asyncore.loop(timeout=0.1, use_poll=True, map=self._map, count=1) + asyncore.loop(timeout=0.01, use_poll=True, map=self._map, count=1) def wake_loop(self): self.waker.wake() @@ -273,14 +273,12 @@ def shutdown(self): self._map.clear() -class _DormantLoop(_AbstractLoop): +class _BasicLoop(_AbstractLoop): def check_loop(self): pass def run_loop(self): - if not self._map: - time.sleep(0.005) - asyncore.loop(timeout=0.01, use_poll=True, map=self._map, count=1) + asyncore.loop(timeout=0.001, use_poll=True, map=self._map, count=1) def wake_loop(self): pass @@ -315,12 +313,12 @@ def __init__(self): loop.check_loop() except: _logger.exception("Failed to initialize the wakeable loop. " - "Using the dormant loop instead. " + "Using the basic loop instead. " "When used in the blocking mode, client" "may have sub-optimal performance.") if loop: loop.shutdown() - loop = _DormantLoop(self.map) + loop = _BasicLoop(self.map) self._loop = loop def start(self): diff --git a/tests/reactor_test.py b/tests/reactor_test.py index e667f5e266..ba108f462a 100644 --- a/tests/reactor_test.py +++ b/tests/reactor_test.py @@ -8,7 +8,7 @@ from parameterized import parameterized from hazelcast import six -from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _DormantLoop +from hazelcast.reactor import AsyncoreReactor, _WakeableLoop, _SocketedWaker, _PipedWaker, _BasicLoop from hazelcast.util import AtomicInteger from tests.base import HazelcastTestCase @@ -31,7 +31,7 @@ def test_reactor_lifetime(self): LOOP_CLASSES = [ ("wakeable", _WakeableLoop,), - ("busy_wait", _DormantLoop,), + ("basic", _BasicLoop,), ] From 8f13df542727cebdbb559ab01c72476d89b942c9 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 18 Nov 2020 10:53:27 +0300 Subject: [PATCH 12/15] wake only if we are not in the reactor thread --- hazelcast/reactor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index a25d3cb73a..2f42b1ed89 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -246,7 +246,8 @@ def run_loop(self): asyncore.loop(timeout=0.01, use_poll=True, map=self._map, count=1) def wake_loop(self): - self.waker.wake() + if self._thread is not threading.current_thread(): + self.waker.wake() def shutdown(self): if not self._is_live: From 5753ed3e816b79cada5956db8301998e9d975e2e Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 18 Nov 2020 10:54:11 +0300 Subject: [PATCH 13/15] fix the flaky check_loop test --- tests/reactor_test.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tests/reactor_test.py b/tests/reactor_test.py index ba108f462a..3d5f8d2d53 100644 --- a/tests/reactor_test.py +++ b/tests/reactor_test.py @@ -66,14 +66,7 @@ def assertion(): @parameterized.expand(LOOP_CLASSES) def test_check_loop(self, _, cls): loop = cls({}) - t_count = threading.active_count() - loop.start() - try: - loop.check_loop() - self.assertEqual(t_count + 1, threading.active_count()) - finally: - loop.shutdown() - self.assertEqual(t_count, threading.active_count()) + loop.check_loop() @parameterized.expand(LOOP_CLASSES) def test_add_timer(self, _, cls): From 3c8341d7ffdae164ffa25cb1f1fe724fd2f7b1df Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 18 Nov 2020 11:28:38 +0300 Subject: [PATCH 14/15] use get_ident directly instead of the current_thread --- hazelcast/reactor.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 2f42b1ed89..04189d3484 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -29,6 +29,12 @@ except ImportError: fcntl = None +try: + from _thread import get_ident +except ImportError: + # Python2 + from thread import get_ident + _logger = logging.getLogger(__name__) @@ -146,12 +152,14 @@ def __init__(self, map): self._new_timers = deque() # Popped only from the reactor thread self._is_live = False self._thread = None + self._ident = -1 def start(self): self._is_live = True self._thread = threading.Thread(target=self._loop, name="hazelcast-reactor") self._thread.daemon = True self._thread.start() + self._ident = self._thread.ident def _loop(self): _logger.debug("Starting Reactor Thread") @@ -246,7 +254,7 @@ def run_loop(self): asyncore.loop(timeout=0.01, use_poll=True, map=self._map, count=1) def wake_loop(self): - if self._thread is not threading.current_thread(): + if self._ident != get_ident(): self.waker.wake() def shutdown(self): @@ -255,7 +263,7 @@ def shutdown(self): self._is_live = False - if self._thread is not threading.current_thread(): + if self._ident != get_ident(): self._thread.join() for connection in list(self._map.values()): @@ -290,7 +298,7 @@ def shutdown(self): self._is_live = False - if self._thread is not threading.current_thread(): + if self._ident != get_ident(): self._thread.join() for connection in list(self._map.values()): From 43ff8379e521f318cca16e63616e05b8da578c08 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 18 Nov 2020 11:47:46 +0300 Subject: [PATCH 15/15] clarify the check_loop test --- tests/reactor_test.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/reactor_test.py b/tests/reactor_test.py index 3d5f8d2d53..f9ceebb17f 100644 --- a/tests/reactor_test.py +++ b/tests/reactor_test.py @@ -66,6 +66,13 @@ def assertion(): @parameterized.expand(LOOP_CLASSES) def test_check_loop(self, _, cls): loop = cls({}) + # For the WakeableLoop, we are checking that + # the loop can be waken up, and once the reactor + # handles the written bytes, it is not awake + # anymore. Assertions are in the method + # implementation. For, the BasicLoop, this should + # be no-op, just checking it is not raising any + # error. loop.check_loop() @parameterized.expand(LOOP_CLASSES)