Skip to content

Commit

Permalink
Change session status to disconnected when grpc failed to coordinator (
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Jan 25, 2021
1 parent d16ff63 commit e2854d1
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
2 changes: 1 addition & 1 deletion python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def fetch_logs(self):
self._logs_fetching_thread.daemon = True
self._logs_fetching_thread.start()

@suppress_grpc_error
@catch_grpc_error
def send_heartbeat(self):
request = message_pb2.HeartBeatRequest()
return self._stub.HeartBeat(request)
Expand Down
15 changes: 13 additions & 2 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from graphscope.deploy.kubernetes.cluster import KubernetesCluster
from graphscope.framework.errors import ConnectionError
from graphscope.framework.errors import FatalError
from graphscope.framework.errors import GRPCError
from graphscope.framework.errors import InteractiveEngineInternalError
from graphscope.framework.errors import InvalidArgumentError
from graphscope.framework.errors import K8sError
Expand Down Expand Up @@ -411,6 +412,8 @@ def __init__(
# create and connect session
self._proc, self._endpoint = self._connect()

self._disconnected = False

# heartbeat
self._heartbeat_interval_seconds = 5
self._heartbeat_sending_thread = threading.Thread(
Expand Down Expand Up @@ -441,7 +444,7 @@ def info(self):
info = {}
if self._closed:
info["status"] = "closed"
elif self._grpc_client is None:
elif self._grpc_client is None or self._disconnected:
info["status"] = "disconnected"
else:
info["status"] = "active"
Expand All @@ -465,7 +468,13 @@ def info(self):
def _send_heartbeat(self):
while not self._closed:
if self._grpc_client:
self._grpc_client.send_heartbeat()
try:
self._grpc_client.send_heartbeat()
except GRPCError as exc:
logger.warning(exc)
self._disconnected = True
else:
self._disconnected = False
time.sleep(self._heartbeat_interval_seconds)

def close(self):
Expand All @@ -486,6 +495,8 @@ def close(self):
)
self._heartbeat_sending_thread = None

self._disconnected = True

# close all interactive instances
for instance in self._interactive_instance_dict.values():
try:
Expand Down

0 comments on commit e2854d1

Please sign in to comment.