Skip to content

Commit

Permalink
WIP: implement heartbeat to keep connection alive
Browse files Browse the repository at this point in the history
  • Loading branch information
chaudum committed Jul 14, 2020
1 parent 8e4b6d0 commit 1b3dcf2
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions crate/crash/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import os
import re
import sys
import time
from argparse import ArgumentParser, ArgumentTypeError
from collections import namedtuple
from distutils.version import StrictVersion
from getpass import getpass
from logging import NullHandler
from operator import itemgetter
from threading import Thread

import urllib3
from appdirs import user_config_dir, user_data_dir
Expand Down Expand Up @@ -72,6 +74,26 @@
TABLE_TYPE_MIN_VERSION = StrictVersion("2.0.0")


class Heartbeat(Thread):

def __init__(self, connection, interval, **kwargs):
self.connection = connection
self.interval = interval
super().__init__(**kwargs)

def run(self):
cursor = self.connection.cursor()
while not self.connection._closed:
try:
cursor.execute("SELECT 1 AS heartbeat")
except ConnectionError:
pass
time.sleep(self.interval)

def stop(self):
self.join(timeout=0)


def parse_config_path(args=sys.argv):
"""
Preprocess sys.argv and extract --config argument.
Expand Down Expand Up @@ -250,6 +272,7 @@ def __init__(self,
# establish connection
self.cursor = None
self.connection = None
self.heartbeat = None
self._connect(crate_hosts)

def __enter__(self):
Expand Down Expand Up @@ -302,6 +325,9 @@ def close(self):
if self.connection:
self.connection.close()
self.connection = None
if self.heartbeat:
self.heartbeat.stop()
self.heartbeat = None

def is_closed(self):
return not (self.cursor and self.connection)
Expand Down Expand Up @@ -334,8 +360,8 @@ def is_conn_available(self):

def _connect(self, servers):
self.last_connected_servers = servers
if self.cursor or self.connection:
self.close() # reset open cursor and connection
if self.heartbeat or self.cursor or self.connection:
self.close() # reset heartbeat, open cursor and connection
self.connection = connect(servers,
error_trace=self.error_trace,
verify_ssl_cert=self.verify_ssl,
Expand All @@ -347,6 +373,7 @@ def _connect(self, servers):
schema=self.schema,
timeout=self.timeout)
self.cursor = self.connection.cursor()
self.heartbeat = self._start_heartbeat()
self._fetch_session_info()

def _connect_and_print_result(self, servers):
Expand Down Expand Up @@ -416,6 +443,11 @@ def _fetch_session_info(self):
else:
self.connect_info = ConnectionMeta(None, None, None)

def _start_heartbeat(self):
heartbeat = Heartbeat(self.connection, 5, daemon=True)
heartbeat.start()
return heartbeat

def _try_exec_cmd(self, line):
words = line.split(' ', 1)
if not words or not words[0]:
Expand Down

0 comments on commit 1b3dcf2

Please sign in to comment.