From a69d884f3b9cf7ac3ecf7444e1114011de62d37d Mon Sep 17 00:00:00 2001 From: Tao He Date: Wed, 28 Sep 2022 13:19:56 +0800 Subject: [PATCH] Closing the session if hearbeat fails for many times (when engine crashes) (#2088) Signed-off-by: Tao He --- .../{foo => graphscope_runtime}/__init__.py | 0 coordinator/gscoordinator/coordinator.py | 4 +++ coordinator/setup.py | 2 +- python/graphscope/client/session.py | 26 +++++++++++++++---- 4 files changed, 26 insertions(+), 6 deletions(-) rename coordinator/{foo => graphscope_runtime}/__init__.py (100%) diff --git a/coordinator/foo/__init__.py b/coordinator/graphscope_runtime/__init__.py similarity index 100% rename from coordinator/foo/__init__.py rename to coordinator/graphscope_runtime/__init__.py diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 462a3e43b221..1705a782b911 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -356,6 +356,10 @@ def _HeartBeat(self, request, context): # analytical engine request = message_pb2.HeartBeatRequest() + if self._analytical_engine_stub is None: + raise RuntimeError( + "Analytical engine is not launched or has already been terminated." + ) return self._analytical_engine_stub.HeartBeat(request) HeartBeatWrapped = catch_unknown_errors(message_pb2.HeartBeatResponse())(_HeartBeat) diff --git a/coordinator/setup.py b/coordinator/setup.py index 80f36e607e0b..3e8a911d4c31 100644 --- a/coordinator/setup.py +++ b/coordinator/setup.py @@ -288,7 +288,7 @@ def parsed_packages(): name = os.environ.get("package_name", "gs-coordinator") if name == "gs-coordinator": return find_packages(".") - return ["foo"] + return ["graphscope_runtime"] def parsed_package_data(): diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index d910c8a7091e..87c5bf31b87d 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -731,6 +731,7 @@ def __init__( ) self._heartbeat_sending_thread.daemon = True self._heartbeat_sending_thread.start() + self._heartbeat_maximum_failures = 3 # networkx module self._nx = None @@ -812,14 +813,26 @@ def eager(self): return self._config_params["mode"] == "eager" def _send_heartbeat(self): + # >1: failure, 0: reset when success + heartbeat_failure_count = 0 while not self._closed: if self._grpc_client: try: self._grpc_client.send_heartbeat() except Exception as exc: - logger.warning(exc) + if heartbeat_failure_count == 0: + logger.warning(exc) + heartbeat_failure_count = heartbeat_failure_count + 1 + if heartbeat_failure_count > self._heartbeat_maximum_failures: + logger.error( + "The connection between coordinator has lost after %d times " + "of heartbeat failure, closing the session ...", + heartbeat_failure_count, + ) + self.close() self._disconnected = True else: + heartbeat_failure_count = 0 self._disconnected = False time.sleep(self._heartbeat_interval_seconds) @@ -844,7 +857,7 @@ def close(self): else: self._close() - def _close(self): + def _close(self): # noqa: C901 if self._closed: return self._closed = True @@ -853,9 +866,12 @@ def _close(self): self._deregister_default() if self._heartbeat_sending_thread: - self._heartbeat_sending_thread.join( - timeout=self._heartbeat_interval_seconds - ) + try: + self._heartbeat_sending_thread.join( + timeout=self._heartbeat_interval_seconds + ) + except RuntimeError: # ignore the "cannot join current thread" error + pass self._heartbeat_sending_thread = None self._disconnected = True