Skip to content

Commit

Permalink
Closing the session if hearbeat fails for many times (when engine cra…
Browse files Browse the repository at this point in the history
…shes) (#2088)


Signed-off-by: Tao He <sighingnow@gmail.com>
  • Loading branch information
sighingnow committed Sep 28, 2022
1 parent 2119d48 commit a69d884
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
File renamed without changes.
4 changes: 4 additions & 0 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ def _HeartBeat(self, request, context):

# analytical engine
request = message_pb2.HeartBeatRequest()
if self._analytical_engine_stub is None:
raise RuntimeError(
"Analytical engine is not launched or has already been terminated."
)
return self._analytical_engine_stub.HeartBeat(request)

HeartBeatWrapped = catch_unknown_errors(message_pb2.HeartBeatResponse())(_HeartBeat)
Expand Down
2 changes: 1 addition & 1 deletion coordinator/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def parsed_packages():
name = os.environ.get("package_name", "gs-coordinator")
if name == "gs-coordinator":
return find_packages(".")
return ["foo"]
return ["graphscope_runtime"]


def parsed_package_data():
Expand Down
26 changes: 21 additions & 5 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,7 @@ def __init__(
)
self._heartbeat_sending_thread.daemon = True
self._heartbeat_sending_thread.start()
self._heartbeat_maximum_failures = 3

# networkx module
self._nx = None
Expand Down Expand Up @@ -812,14 +813,26 @@ def eager(self):
return self._config_params["mode"] == "eager"

def _send_heartbeat(self):
# >1: failure, 0: reset when success
heartbeat_failure_count = 0
while not self._closed:
if self._grpc_client:
try:
self._grpc_client.send_heartbeat()
except Exception as exc:
logger.warning(exc)
if heartbeat_failure_count == 0:
logger.warning(exc)
heartbeat_failure_count = heartbeat_failure_count + 1
if heartbeat_failure_count > self._heartbeat_maximum_failures:
logger.error(
"The connection between coordinator has lost after %d times "
"of heartbeat failure, closing the session ...",
heartbeat_failure_count,
)
self.close()
self._disconnected = True
else:
heartbeat_failure_count = 0
self._disconnected = False
time.sleep(self._heartbeat_interval_seconds)

Expand All @@ -844,7 +857,7 @@ def close(self):
else:
self._close()

def _close(self):
def _close(self): # noqa: C901
if self._closed:
return
self._closed = True
Expand All @@ -853,9 +866,12 @@ def _close(self):
self._deregister_default()

if self._heartbeat_sending_thread:
self._heartbeat_sending_thread.join(
timeout=self._heartbeat_interval_seconds
)
try:
self._heartbeat_sending_thread.join(
timeout=self._heartbeat_interval_seconds
)
except RuntimeError: # ignore the "cannot join current thread" error
pass
self._heartbeat_sending_thread = None

self._disconnected = True
Expand Down

0 comments on commit a69d884

Please sign in to comment.