diff --git a/hazelcast/connection.py b/hazelcast/connection.py index e37f7bb4ef..65100498eb 100644 --- a/hazelcast/connection.py +++ b/hazelcast/connection.py @@ -219,19 +219,20 @@ def _heartbeat(self): now = time.time() for connection in list(self._client.connection_manager.connections.values()): time_since_last_read = now - connection.last_read + time_since_last_write = now - connection.last_write if time_since_last_read > self._heartbeat_timeout: if connection.heartbeating: self.logger.warning( "Heartbeat: Did not hear back after %ss from %s" % (time_since_last_read, connection)) self._on_heartbeat_stopped(connection) - - if time_since_last_read > self._heartbeat_interval: - request = client_ping_codec.encode_request() - self._client.invoker.invoke_on_connection(request, connection, ignore_heartbeat=True) else: if not connection.heartbeating: self._on_heartbeat_restored(connection) + if time_since_last_write > self._heartbeat_interval: + request = client_ping_codec.encode_request() + self._client.invoker.invoke_on_connection(request, connection, ignore_heartbeat=True) + def _on_heartbeat_restored(self, connection): self.logger.info("Heartbeat: Heartbeat restored for connection %s" % connection) connection.heartbeating = True @@ -264,6 +265,7 @@ def __init__(self, address, connection_closed_callback, message_callback): self._builder = ClientMessageBuilder(message_callback) self._read_buffer = b"" self.last_read = time.time() + self.last_write = 0 def live(self): """ diff --git a/hazelcast/reactor.py b/hazelcast/reactor.py index 3e6e14ee95..d98311c7ea 100644 --- a/hazelcast/reactor.py +++ b/hazelcast/reactor.py @@ -150,6 +150,7 @@ def handle_write(self): except IndexError: return sent = self.send(data) + self.last_write = time.time() self.sent_protocol_bytes = True if sent < len(data): self._write_queue.appendleft(data[sent:]) @@ -175,6 +176,7 @@ def write(self, data): if len(self._write_queue) == 0 and self._write_lock.acquire(False): try: sent = self.send(data) + self.last_write = time.time() if sent < len(data): self.logger.info("adding to queue") self._write_queue.appendleft(data[sent:]) diff --git a/tests/client_test.py b/tests/client_test.py new file mode 100644 index 0000000000..05d664be17 --- /dev/null +++ b/tests/client_test.py @@ -0,0 +1,61 @@ +import time + +from tests.base import HazelcastTestCase +from hazelcast.config import ClientConfig, ClientProperties +from hazelcast.client import HazelcastClient +from hazelcast.lifecycle import LIFECYCLE_STATE_DISCONNECTED + + +class ClientTest(HazelcastTestCase): + def test_client_only_listens(self): + rc = self.create_rc() + client_heartbeat_seconds = 8 + + cluster_config = """ + + {} + + """.format(client_heartbeat_seconds) + cluster = self.create_cluster(rc, cluster_config) + member = cluster.start_member() + + client_config = ClientConfig() + client_config.set_property(ClientProperties.HEARTBEAT_INTERVAL.name, 1000) + + client1 = HazelcastClient(client_config) + + def lifecycle_event_collector(): + events = [] + + def event_collector(e): + if e == LIFECYCLE_STATE_DISCONNECTED: + events.append(e) + + event_collector.events = events + return event_collector + + collector = lifecycle_event_collector() + client1.lifecycle.add_listener(collector) + client2 = HazelcastClient() + + key = "topic-name" + topic = client1.get_topic(key) + + def message_listener(e): + pass + + topic.add_listener(message_listener) + + client2topic = client2.get_topic(key) + begin = time.time() + + while (time.time() - begin) < 2 * client_heartbeat_seconds: + client2topic.publish("message") + time.sleep(0.5) + + self.assertEqual(0, len(collector.events)) + client1.shutdown() + client2.shutdown() + rc.exit() diff --git a/tests/heartbeat_test.py b/tests/heartbeat_test.py new file mode 100644 index 0000000000..67c90531db --- /dev/null +++ b/tests/heartbeat_test.py @@ -0,0 +1,81 @@ +from hazelcast import HazelcastClient +from hazelcast.core import Address +from tests.base import HazelcastTestCase +from hazelcast.config import ClientConfig, ClientProperties +from tests.util import configure_logging + + +class HeartbeatTest(HazelcastTestCase): + @classmethod + def setUpClass(cls): + configure_logging() + cls.rc = cls.create_rc() + + @classmethod + def tearDownClass(cls): + cls.rc.exit() + + def setUp(self): + self.cluster = self.create_cluster(self.rc) + self.member = self.rc.startMember(self.cluster.id) + self.config = ClientConfig() + + self.config.set_property(ClientProperties.HEARTBEAT_INTERVAL.name, 500) + self.config.set_property(ClientProperties.HEARTBEAT_TIMEOUT.name, 2000) + + self.client = HazelcastClient(self.config) + + def tearDown(self): + self.client.shutdown() + self.rc.shutdownCluster(self.cluster.id) + + def test_heartbeat_stopped(self): + + def member_added_func(m): + def connection_callback(f): + conn = f.result() + self.simulate_heartbeat_lost(self.client, Address(conn._address[0], conn._address[1]), 2) + + self.client.connection_manager.get_or_connect(m.address).add_done_callback(connection_callback) + + self.client.cluster.add_listener(member_added=member_added_func) + + def heartbeat_stopped_collector(): + connections = [] + + def connection_collector(c): + connections.append(c) + + connection_collector.connections = connections + return connection_collector + + def heartbeat_restored_collector(): + connections = [] + + def connection_collector(c): + connections.append(c) + + connection_collector.connections = connections + return connection_collector + + stopped_collector = heartbeat_stopped_collector() + restored_collector = heartbeat_restored_collector() + + self.client.heartbeat.add_listener(on_heartbeat_stopped=stopped_collector, + on_heartbeat_restored=restored_collector) + + member2 = self.rc.startMember(self.cluster.id) + + def assert_heartbeat_stopped_and_restored(): + self.assertEqual(1, len(stopped_collector.connections)) + self.assertEqual(1, len(restored_collector.connections)) + connection_stopped = stopped_collector.connections[0] + connection_restored = restored_collector.connections[0] + self.assertEqual(connection_stopped._address, (member2.host, member2.port)) + self.assertEqual(connection_restored._address, (member2.host, member2.port)) + + self.assertTrueEventually(assert_heartbeat_stopped_and_restored) + + @staticmethod + def simulate_heartbeat_lost(client, address, timeout): + client.connection_manager.connections[address].last_read -= timeout diff --git a/tests/reconnect_test.py b/tests/reconnect_test.py index ed519e9c55..b1d76b1e80 100644 --- a/tests/reconnect_test.py +++ b/tests/reconnect_test.py @@ -32,10 +32,12 @@ def test_start_client_with_no_member(self): self.create_client(config) def test_start_client_before_member(self): - Thread(target=self.cluster.start_member).start() + t = Thread(target=self.cluster.start_member) + t.start() config = ClientConfig() config.network_config.connection_attempt_limit = 10 self.create_client(config) + t.join() def test_restart_member(self): member = self.cluster.start_member()