From 0c4e97a8b752ff1fe53b29e8d40bc340d8d6cd7c Mon Sep 17 00:00:00 2001 From: mdumandag Date: Tue, 28 Aug 2018 15:28:42 +0300 Subject: [PATCH 1/4] server should not close clients when client only listens --- hazelcast/connection.py | 4 ++- hazelcast/reactor.py | 2 ++ tests/client_test.py | 54 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 tests/client_test.py diff --git a/hazelcast/connection.py b/hazelcast/connection.py index e37f7bb4ef..d28a8760a6 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -219,13 +219,14 @@ def _heartbeat(self): now = time.time() for connection in list(self._client.connection_manager.connections.values()): time_since_last_read = now - connection.last_read + time_since_last_write = now - connection.last_write if time_since_last_read > self._heartbeat_timeout: if connection.heartbeating: self.logger.warning( "Heartbeat: Did not hear back after %ss from %s" % (time_since_last_read, connection)) self._on_heartbeat_stopped(connection) - if time_since_last_read > self._heartbeat_interval: + if time_since_last_write > self._heartbeat_interval: request = client_ping_codec.encode_request() self._client.invoker.invoke_on_connection(request, connection, ignore_heartbeat=True) else: @@ -264,6 +265,7 @@ def __init__(self, address, connection_closed_callback, message_callback): self._builder = ClientMessageBuilder(message_callback) self._read_buffer = b"" self.last_read = time.time() + self.last_write = 0 def live(self): """ diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 3e6e14ee95..d98311c7ea 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -150,6 +150,7 @@ def handle_write(self): except IndexError: return sent = self.send(data) + self.last_write = time.time() self.sent_protocol_bytes = True if sent < len(data): self._write_queue.appendleft(data[sent:]) @@ -175,6 +176,7 @@ def write(self, data): if len(self._write_queue) == 0 and self._write_lock.acquire(False): try: sent = self.send(data) + self.last_write = time.time() if sent < len(data): self.logger.info("adding to queue") self._write_queue.appendleft(data[sent:]) diff --git a/tests/client_test.py b/tests/client_test.py new file mode 100644 index 0000000000..28fe9f79d4 --- /dev/null +++ b/tests/client_test.py @@ -0,0 +1,54 @@ +import time +import logging + +from tests.base import HazelcastTestCase +from hazelcast.config import ClientConfig, PROPERTY_HEARTBEAT_INTERVAL +from hazelcast.client import HazelcastClient +from hazelcast.lifecycle import LIFECYCLE_STATE_DISCONNECTED + + +class ClientTest(HazelcastTestCase): + def test_client_only_listens(self): + rc = self.create_rc() + client_heartbeat_seconds = 8 + + cluster_config = """ + + {} + + """.format(client_heartbeat_seconds) + cluster = self.create_cluster(rc, cluster_config) + member = cluster.start_member() + + client_config = ClientConfig() + client_config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 1000 + + client1 = HazelcastClient(client_config) + is_client_disconnected = [False] + + def lifecycle_listener(event, flag=is_client_disconnected): + if event == LIFECYCLE_STATE_DISCONNECTED: + flag[0] = True + + client1.lifecycle.add_listener(lifecycle_listener) + client2 = HazelcastClient() + + key = "topic-name" + topic = client1.get_topic(key).blocking() + + def message_listener(e): + pass + + topic.add_listener(message_listener) + + client2topic = client2.get_topic(key) + begin = time.time() + + while (time.time() - begin) < 2 * client_heartbeat_seconds: + client2topic.publish("message") + + self.assertFalse(is_client_disconnected[0]) + + rc.exit() From 064290d540d786bdb5538b291c091dbbdb91f6f1 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 29 Aug 2018 12:57:43 +0300 Subject: [PATCH 2/4] add heartbeat tests --- tests/heartbeat_test.py | 104 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 tests/heartbeat_test.py diff --git a/tests/heartbeat_test.py b/tests/heartbeat_test.py new file mode 100644 index 0000000000..8a061f7582 --- /dev/null +++ b/tests/heartbeat_test.py @@ -0,0 +1,104 @@ +from hazelcast import HazelcastClient +from hazelcast.core import Address +from tests.base import HazelcastTestCase +from hazelcast.config import ClientConfig, PROPERTY_HEARTBEAT_INTERVAL, PROPERTY_HEARTBEAT_TIMEOUT +from tests.util import configure_logging + + +class HeartbeatTest(HazelcastTestCase): + @classmethod + def setUpClass(cls): + configure_logging() + cls.rc = cls.create_rc() + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + def setUp(self): + self.cluster = self.create_cluster(self.rc) + + def tearDown(self): + self.rc.shutdownCluster(self.cluster.id) + + def test_heartbeat_stopped(self): + member = self.rc.startMember(self.cluster.id) + config = ClientConfig() + + config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 500 + config._properties[PROPERTY_HEARTBEAT_TIMEOUT] = 2000 + + client = HazelcastClient(config) + + client.cluster.add_listener(member_added=lambda m: client.connection_manager.get_or_connect(m.address)) + + def heartbeat_stopped_collector(): + connections = [] + + def connection_collector(c): + connections.append(c) + + connection_collector.connections = connections + return connection_collector + + collector = heartbeat_stopped_collector() + + client.heartbeat.add_listener(on_heartbeat_stopped=collector) + + member2 = self.rc.startMember(self.cluster.id) + self.simulate_heartbeat_lost(client, Address(member2.host, member2.port), 2) + + def assert_connection(): + self.assertTrue(len(collector.connections) > 0) + connection = collector.connections[0] + self.assertEqual(connection._address, (member2.host, member2.port)) + + self.assertTrueEventually(assert_connection) + client.shutdown() + + def test_heartbeat_restored(self): + member = self.rc.startMember(self.cluster.id) + config = ClientConfig() + + config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 500 + config._properties[PROPERTY_HEARTBEAT_TIMEOUT] = 2000 + + client = HazelcastClient(config) + + def member_added_func(m): + + def connection_callback(f): + conn = f.result() + self.simulate_heartbeat_lost(client, Address(conn._address[0], conn._address[1]), 2) + + client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) + + client.cluster.add_listener(member_added=member_added_func) + + def heartbeat_restored_collector(): + connections = [] + + def connection_collector(c): + connections.append(c) + + connection_collector.connections = connections + return connection_collector + + collector = heartbeat_restored_collector() + + client.heartbeat.add_listener(on_heartbeat_restored=collector) + + member2 = self.rc.startMember(self.cluster.id) + + def assert_event(): + self.assertTrue(len(collector.connections) > 0) + connection = collector.connections[0] + self.assertEqual(connection._address, (member2.host, member2.port)) + + self.assertTrueEventually(assert_event) + client.shutdown() + + @staticmethod + def simulate_heartbeat_lost(client, address, timeout): + client.connection_manager.connections[address].last_read -= timeout + client.connection_manager.connections[address].last_write += timeout From 33ef0fb574f41bb9bc3b5fc177dbe4a36b503160 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 5 Sep 2018 10:22:37 +0300 Subject: [PATCH 3/4] fix heartbeat resume condition --- hazelcast/connection.py | 6 ++--- tests/client_test.py | 25 ++++++++++++-------- tests/heartbeat_test.py | 51 +++++++++++++++++------------------------ 3 files changed, 40 insertions(+), 42 deletions(-) diff --git a/hazelcast/connection.py b/hazelcast/connection.py index d28a8760a6..65100498eb 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -225,13 +225,13 @@ def _heartbeat(self): self.logger.warning( "Heartbeat: Did not hear back after %ss from %s" % (time_since_last_read, connection)) self._on_heartbeat_stopped(connection) + else: + if not connection.heartbeating: + self._on_heartbeat_restored(connection) if time_since_last_write > self._heartbeat_interval: request = client_ping_codec.encode_request() self._client.invoker.invoke_on_connection(request, connection, ignore_heartbeat=True) - else: - if not connection.heartbeating: - self._on_heartbeat_restored(connection) def _on_heartbeat_restored(self, connection): self.logger.info("Heartbeat: Heartbeat restored for connection %s" % connection) diff --git a/tests/client_test.py b/tests/client_test.py index 28fe9f79d4..3cacc1ecda 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1,5 +1,4 @@ import time -import logging from tests.base import HazelcastTestCase from hazelcast.config import ClientConfig, PROPERTY_HEARTBEAT_INTERVAL @@ -26,17 +25,23 @@ def test_client_only_listens(self): client_config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 1000 client1 = HazelcastClient(client_config) - is_client_disconnected = [False] - def lifecycle_listener(event, flag=is_client_disconnected): - if event == LIFECYCLE_STATE_DISCONNECTED: - flag[0] = True + def lifecycle_event_collector(): + events = [] - client1.lifecycle.add_listener(lifecycle_listener) + def event_collector(e): + if e == LIFECYCLE_STATE_DISCONNECTED: + events.append(e) + + event_collector.events = events + return event_collector + + collector = lifecycle_event_collector() + client1.lifecycle.add_listener(collector) client2 = HazelcastClient() key = "topic-name" - topic = client1.get_topic(key).blocking() + topic = client1.get_topic(key) def message_listener(e): pass @@ -48,7 +53,9 @@ def message_listener(e): while (time.time() - begin) < 2 * client_heartbeat_seconds: client2topic.publish("message") + time.sleep(0.5) - self.assertFalse(is_client_disconnected[0]) - + self.assertEqual(0, len(collector.events)) + client1.shutdown() + client2.shutdown() rc.exit() diff --git a/tests/heartbeat_test.py b/tests/heartbeat_test.py index 8a061f7582..27cd6da721 100644 --- a/tests/heartbeat_test.py +++ b/tests/heartbeat_test.py @@ -17,20 +17,21 @@ def tearDownClass(cls): def setUp(self): self.cluster = self.create_cluster(self.rc) + self.member = self.rc.startMember(self.cluster.id) + self.config = ClientConfig() + + self.config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 500 + self.config._properties[PROPERTY_HEARTBEAT_TIMEOUT] = 2000 + + self.client = HazelcastClient(self.config) def tearDown(self): + self.client.shutdown() self.rc.shutdownCluster(self.cluster.id) def test_heartbeat_stopped(self): - member = self.rc.startMember(self.cluster.id) - config = ClientConfig() - - config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 500 - config._properties[PROPERTY_HEARTBEAT_TIMEOUT] = 2000 - client = HazelcastClient(config) - - client.cluster.add_listener(member_added=lambda m: client.connection_manager.get_or_connect(m.address)) + self.client.cluster.add_listener(member_added=lambda m: self.client.connection_manager.get_or_connect(m.address)) def heartbeat_stopped_collector(): connections = [] @@ -43,37 +44,29 @@ def connection_collector(c): collector = heartbeat_stopped_collector() - client.heartbeat.add_listener(on_heartbeat_stopped=collector) + self.client.heartbeat.add_listener(on_heartbeat_stopped=collector) member2 = self.rc.startMember(self.cluster.id) - self.simulate_heartbeat_lost(client, Address(member2.host, member2.port), 2) + self.simulate_heartbeat_lost(self.client, Address(member2.host, member2.port), 2) - def assert_connection(): - self.assertTrue(len(collector.connections) > 0) + def assert_heartbeat_stopped(): + self.assertEqual(1, len(collector.connections)) connection = collector.connections[0] self.assertEqual(connection._address, (member2.host, member2.port)) - self.assertTrueEventually(assert_connection) - client.shutdown() + self.assertTrueEventually(assert_heartbeat_stopped) def test_heartbeat_restored(self): - member = self.rc.startMember(self.cluster.id) - config = ClientConfig() - - config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 500 - config._properties[PROPERTY_HEARTBEAT_TIMEOUT] = 2000 - - client = HazelcastClient(config) def member_added_func(m): def connection_callback(f): conn = f.result() - self.simulate_heartbeat_lost(client, Address(conn._address[0], conn._address[1]), 2) + self.simulate_heartbeat_lost(self.client, Address(conn._address[0], conn._address[1]), 2) - client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) + self.client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) - client.cluster.add_listener(member_added=member_added_func) + self.client.cluster.add_listener(member_added=member_added_func) def heartbeat_restored_collector(): connections = [] @@ -86,19 +79,17 @@ def connection_collector(c): collector = heartbeat_restored_collector() - client.heartbeat.add_listener(on_heartbeat_restored=collector) + self.client.heartbeat.add_listener(on_heartbeat_restored=collector) member2 = self.rc.startMember(self.cluster.id) - def assert_event(): - self.assertTrue(len(collector.connections) > 0) + def assert_heartbeat_restored(): + self.assertEqual(1, len(collector.connections)) connection = collector.connections[0] self.assertEqual(connection._address, (member2.host, member2.port)) - self.assertTrueEventually(assert_event) - client.shutdown() + self.assertTrueEventually(assert_heartbeat_restored) @staticmethod def simulate_heartbeat_lost(client, address, timeout): client.connection_manager.connections[address].last_read -= timeout - client.connection_manager.connections[address].last_write += timeout From d00fd8a6b1a3c18e178517796f43ca01cb6c3594 Mon Sep 17 00:00:00 2001 From: mdumandag Date: Wed, 19 Sep 2018 16:13:27 +0300 Subject: [PATCH 4/4] make client_test and heartbeat_test compatible with client properties class --- tests/client_test.py | 4 +-- tests/heartbeat_test.py | 60 ++++++++++++++++------------------------- tests/reconnect_test.py | 4 ++- 3 files changed, 28 insertions(+), 40 deletions(-) diff --git a/tests/client_test.py b/tests/client_test.py index 3cacc1ecda..05d664be17 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1,7 +1,7 @@ import time from tests.base import HazelcastTestCase -from hazelcast.config import ClientConfig, PROPERTY_HEARTBEAT_INTERVAL +from hazelcast.config import ClientConfig, ClientProperties from hazelcast.client import HazelcastClient from hazelcast.lifecycle import LIFECYCLE_STATE_DISCONNECTED @@ -22,7 +22,7 @@ def test_client_only_listens(self): member = cluster.start_member() client_config = ClientConfig() - client_config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 1000 + client_config.set_property(ClientProperties.HEARTBEAT_INTERVAL.name, 1000) client1 = HazelcastClient(client_config) diff --git a/tests/heartbeat_test.py b/tests/heartbeat_test.py index 27cd6da721..67c90531db 100644 --- a/tests/heartbeat_test.py +++ b/tests/heartbeat_test.py @@ -1,7 +1,7 @@ from hazelcast import HazelcastClient from hazelcast.core import Address from tests.base import HazelcastTestCase -from hazelcast.config import ClientConfig, PROPERTY_HEARTBEAT_INTERVAL, PROPERTY_HEARTBEAT_TIMEOUT +from hazelcast.config import ClientConfig, ClientProperties from tests.util import configure_logging @@ -20,8 +20,8 @@ def setUp(self): self.member = self.rc.startMember(self.cluster.id) self.config = ClientConfig() - self.config._properties[PROPERTY_HEARTBEAT_INTERVAL] = 500 - self.config._properties[PROPERTY_HEARTBEAT_TIMEOUT] = 2000 + self.config.set_property(ClientProperties.HEARTBEAT_INTERVAL.name, 500) + self.config.set_property(ClientProperties.HEARTBEAT_TIMEOUT.name, 2000) self.client = HazelcastClient(self.config) @@ -31,7 +31,14 @@ def tearDown(self): def test_heartbeat_stopped(self): - self.client.cluster.add_listener(member_added=lambda m: self.client.connection_manager.get_or_connect(m.address)) + def member_added_func(m): + def connection_callback(f): + conn = f.result() + self.simulate_heartbeat_lost(self.client, Address(conn._address[0], conn._address[1]), 2) + + self.client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) + + self.client.cluster.add_listener(member_added=member_added_func) def heartbeat_stopped_collector(): connections = [] @@ -42,32 +49,6 @@ def connection_collector(c): connection_collector.connections = connections return connection_collector - collector = heartbeat_stopped_collector() - - self.client.heartbeat.add_listener(on_heartbeat_stopped=collector) - - member2 = self.rc.startMember(self.cluster.id) - self.simulate_heartbeat_lost(self.client, Address(member2.host, member2.port), 2) - - def assert_heartbeat_stopped(): - self.assertEqual(1, len(collector.connections)) - connection = collector.connections[0] - self.assertEqual(connection._address, (member2.host, member2.port)) - - self.assertTrueEventually(assert_heartbeat_stopped) - - def test_heartbeat_restored(self): - - def member_added_func(m): - - def connection_callback(f): - conn = f.result() - self.simulate_heartbeat_lost(self.client, Address(conn._address[0], conn._address[1]), 2) - - self.client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) - - self.client.cluster.add_listener(member_added=member_added_func) - def heartbeat_restored_collector(): connections = [] @@ -77,18 +58,23 @@ def connection_collector(c): connection_collector.connections = connections return connection_collector - collector = heartbeat_restored_collector() + stopped_collector = heartbeat_stopped_collector() + restored_collector = heartbeat_restored_collector() - self.client.heartbeat.add_listener(on_heartbeat_restored=collector) + self.client.heartbeat.add_listener(on_heartbeat_stopped=stopped_collector, + on_heartbeat_restored=restored_collector) member2 = self.rc.startMember(self.cluster.id) - def assert_heartbeat_restored(): - self.assertEqual(1, len(collector.connections)) - connection = collector.connections[0] - self.assertEqual(connection._address, (member2.host, member2.port)) + def assert_heartbeat_stopped_and_restored(): + self.assertEqual(1, len(stopped_collector.connections)) + self.assertEqual(1, len(restored_collector.connections)) + connection_stopped = stopped_collector.connections[0] + connection_restored = restored_collector.connections[0] + self.assertEqual(connection_stopped._address, (member2.host, member2.port)) + self.assertEqual(connection_restored._address, (member2.host, member2.port)) - self.assertTrueEventually(assert_heartbeat_restored) + self.assertTrueEventually(assert_heartbeat_stopped_and_restored) @staticmethod def simulate_heartbeat_lost(client, address, timeout): diff --git a/tests/reconnect_test.py b/tests/reconnect_test.py index ed519e9c55..b1d76b1e80 100644 --- a/tests/reconnect_test.py +++ b/tests/reconnect_test.py @@ -32,10 +32,12 @@ def test_start_client_with_no_member(self): self.create_client(config) def test_start_client_before_member(self): - Thread(target=self.cluster.start_member).start() + t = Thread(target=self.cluster.start_member) + t.start() config = ClientConfig() config.network_config.connection_attempt_limit = 10 self.create_client(config) + t.join() def test_restart_member(self): member = self.cluster.start_member()