-
Notifications
You must be signed in to change notification settings - Fork 725
/
Copy pathheartbeat.py
140 lines (115 loc) · 5.11 KB
/
heartbeat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2025, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
#########################################################################
"""Server heartbeat manager."""
import threading
import datetime
import config
from flask import session, current_app
from flask_babel import gettext
def log_server_heartbeat(data):
"""Log Server Heartbeat."""
from config import PG_DEFAULT_DRIVER
from pgadmin.utils.driver import get_driver
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(int(data['sid'])
)
_server_heartbeat = getattr(current_app, '_pgadmin_server_heartbeat', {})
session_id = session.sid
if session_id not in _server_heartbeat:
_server_heartbeat[session_id] = {}
if not manager:
stop_server_heartbeat(data)
msg = gettext("Manager not found. Stopped Heartbeat logging.")
current_app.logger.error(
f"Manager not found. Stopped Heartbeat logging for the "
f"session id: {session_id} and server id: {data['sid']}"
)
return False, msg
else:
_server_heartbeat[session_id][data['sid']] = {
'timestamp': datetime.datetime.now(),
'conn': manager.connections
}
current_app.logger.debug(
f"Heartbeat logged for the session id: {session_id} and "
f"server id: {data['sid']}"
)
setattr(current_app, '_pgadmin_server_heartbeat', _server_heartbeat)
return True, gettext("Heartbeat logged successfully.")
def stop_server_heartbeat(data):
"""Stop logging server heartbeat."""
_server_heartbeat = getattr(current_app, '_pgadmin_server_heartbeat', {})
if session.sid in _server_heartbeat and \
data['sid'] in _server_heartbeat[session.sid]:
_server_heartbeat[session.sid].pop(data['sid'])
current_app.logger.debug(
"Heartbeat logging stopped for the session"
" id##server id: {0}##{1}".format(session.sid, data['sid']))
setattr(current_app, '_pgadmin_server_heartbeat', _server_heartbeat)
return True, gettext("Stopped Heartbeat logging.")
def get_server_heartbeat(server_id):
_server_heartbeat = getattr(current_app, '_pgadmin_server_heartbeat', {})
if session.sid in _server_heartbeat and server_id in _server_heartbeat[
session.sid]:
return _server_heartbeat[session.sid][server_id]
else:
return None
class ServerHeartbeatTimer():
def __init__(self, sec, _app):
def func_wrapper():
self.t = threading.Timer(sec, func_wrapper)
self.t.start()
self.release_server_heartbeat()
self.t = threading.Timer(sec, func_wrapper)
self.t.daemon = True
self.t.start()
self._app = _app
def release_server_heartbeat(self):
with self._app.app_context():
_server_heartbeat = getattr(self._app,
'_pgadmin_server_heartbeat', {})
if len(_server_heartbeat) > 0:
for sess_id in list(_server_heartbeat):
for sid in list(_server_heartbeat[sess_id]):
last_heartbeat_time = _server_heartbeat[sess_id][sid][
'timestamp']
current_time = datetime.datetime.now()
diff = current_time - last_heartbeat_time
# Wait for 4 times then the timeout
if diff.total_seconds() > (
config.SERVER_HEARTBEAT_TIMEOUT * 4):
self._release_connections(
_server_heartbeat[sess_id][sid]['conn'],
sess_id, sid)
_server_heartbeat[sess_id].pop(sid)
if len(_server_heartbeat[sess_id]) == 0:
_server_heartbeat.pop(sess_id)
setattr(self._app, '_pgadmin_server_heartbeat',
_server_heartbeat)
@staticmethod
def _release_connections(server_conn, sess_id, sid):
for d in server_conn:
try:
# Release the connection only if it is connected
if server_conn[d].wasConnected:
server_conn[d]._release()
# Reconnect on the reload
server_conn[d].wasConnected = True
current_app.logger.debug(
"Heartbeat not received. Released "
"connection for the session "
"id##server id: {0}##{1}".format(
sess_id, sid))
except Exception as e:
current_app.logger.exception(e)
def cancel(self):
self.t.cancel()
def init_app(app):
setattr(app, '_pgadmin_server_heartbeat', {})
ServerHeartbeatTimer(sec=config.SERVER_HEARTBEAT_TIMEOUT,
_app=app)