forked from 418sec/postgresql-metrics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics_logic.py
307 lines (258 loc) · 12.8 KB
/
metrics_logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains the CLI and the logic for gathering and
forwarding Postgres metrics into a metrics gatherer using Spotify FFWD.
https://github.com/spotify/ffwd
Notice that the REAMDE.md in the repository root contains short descriptions
of all the Python modules within this project.
"""
import argparse
import json
import os
import socket
import time
from postgresql_metrics import metrics_gatherer
from postgresql_metrics.postgres_queries import (
get_db_name_from_connection,
get_db_connection,
get_major_version
)
from postgresql_metrics.prepare_db import prepare_databases_for_metrics
from postgresql_metrics.common import (
init_logging_file,
init_logging_stderr,
init_logging_syslog,
get_logger,
find_and_parse_config
)
LOG = get_logger()
DEFAULT_CONFIG_PATH = "/etc/postgresql-metrics/postgresql-metrics.yml"
# LAST_RUN_TIMES_FOR_STATS is a dict of dicts, i.e. key pointing to a key pointing to a value:
# database name -> stats function pointer -> last run timestamp
# This means that we have separate last run timestamps per database and per stats function.
LAST_RUN_TIMES_FOR_STATS = {}
DEFAULT_FFWD_PORT = 19000
DEFAULT_FFWD_HOST = '127.0.0.1'
def push_to_ffwd(metric_dicts, ffwd_addr, data_formatter=json.dumps):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
for metric in metric_dicts:
data = data_formatter(metric)
LOG.debug('send UDP packet to {} with data:\n{}', ffwd_addr, data)
s.sendto(bytes(data, 'UTF-8'), ffwd_addr)
finally:
s.close()
def _is_time_to_call_stats_func_and_update_ts(database_name, metrics_func, run_interval_sec):
"""Check if it is time to schedule new metrics gathering call,
and assume the call will be made immediately if yes.
This means that the call timestamp for the given database_name and metrics_func
is updated within this function.
"""
last_run_timestamp = LAST_RUN_TIMES_FOR_STATS.get(database_name, {}).get(metrics_func, 0)
if time.time() - last_run_timestamp > run_interval_sec:
if database_name not in LAST_RUN_TIMES_FOR_STATS:
LAST_RUN_TIMES_FOR_STATS[database_name] = {}
LAST_RUN_TIMES_FOR_STATS[database_name][metrics_func] = time.time()
return True
return False
def _call_all_db_functions(db_parameter, db_stats_functions, schedule=False, db_name=None):
"""Iterates through all given statistics functions, calling them with the given parameter.
The db_parameter can be a database connection or a file path to Postgres data directory,
depending on the statistics function to call.
"""
metrics = []
for (db_metrics_func, interval_s) in db_stats_functions:
if schedule:
if not db_name:
# DB name is given only if we want to make database specific scheduling.
db_name = "__cluster_global__"
is_call_required = \
_is_time_to_call_stats_func_and_update_ts(db_name, db_metrics_func, interval_s)
else:
is_call_required = True
if is_call_required:
try:
LOG.debug('calling stats function {}', db_metrics_func.__name__)
metrics.extend(db_metrics_func(db_parameter))
except Exception:
LOG.exception('failed calling stats function: ' + db_metrics_func.__name__)
return metrics
def get_stats_functions_from_conf(func_key_name, conf):
"""Finds the statistics function configured, and ensures that the callables
are found from metrics_gatherer.py."""
stats_functions = []
if func_key_name in conf and conf[func_key_name] is not None:
for func_name, call_interval in conf[func_key_name]:
stats_func = getattr(metrics_gatherer, func_name)
if not stats_func or not callable(stats_func):
raise Exception("statistics function '" + func_name +
"' not found in configuration under key name: " + func_key_name)
stats_functions.append((stats_func, int(call_interval)))
return stats_functions
def get_all_stats_functions_from_conf(conf):
db_functions = get_stats_functions_from_conf('db_functions', conf)
global_db_functions = get_stats_functions_from_conf('global_db_functions', conf)
data_dir_functions = get_stats_functions_from_conf('data_dir_functions', conf)
return db_functions, global_db_functions, data_dir_functions
def get_all_metrics_now(db_connections, conf):
"""Get all the metrics immediately without any scheduling.
First gets the global stats with first available database connection,
and then gets the rest per database.
"""
db_functions, global_db_functions, data_dir_functions = get_all_stats_functions_from_conf(conf)
data_dir = figure_out_postgres_data_dir(db_connections[0], conf)
all_metrics = _call_all_db_functions(db_connections[0], global_db_functions)
all_metrics.extend(_call_all_db_functions(data_dir, data_dir_functions))
for db_connection in db_connections:
all_metrics.extend(_call_all_db_functions(db_connection, db_functions))
return all_metrics
def get_all_metrics_scheduled(db_connections, conf):
"""Get all the metrics in scheduled manner, not calling all the functions every time.
First gets the global stats with first available database connection,
and then gets the rest per database.
"""
db_functions, global_db_functions, data_dir_functions = get_all_stats_functions_from_conf(conf)
data_dir = figure_out_postgres_data_dir(db_connections[0], conf)
all_metrics = _call_all_db_functions(db_connections[0], global_db_functions, schedule=True)
all_metrics.extend(_call_all_db_functions(data_dir, data_dir_functions, schedule=True))
for db_connection in db_connections:
db_name = get_db_name_from_connection(db_connection)
all_metrics.extend(_call_all_db_functions(db_connection, db_functions,
schedule=True, db_name=db_name))
return all_metrics
def run_long_running_ffwd(conf):
db_connections = get_db_connections_with_conf(conf)
ffwd_address = (DEFAULT_FFWD_HOST, DEFAULT_FFWD_PORT)
if conf and conf.get('ffwd'):
ffwd_address = (conf['ffwd'].get('host', DEFAULT_FFWD_HOST),
int(conf['ffwd'].get('port', DEFAULT_FFWD_PORT)))
try:
LOG.info("starting a long running statistics polling loop with {} database(s)",
len(db_connections))
while True:
try:
# Notice that the scheduling is separate from this few second sleep,
# but as the granularity is in tens of seconds, few seconds interval is enough.
time.sleep(5.0)
db_connections = confirm_connections_work(conf, db_connections)
metrics = get_all_metrics_scheduled(db_connections, conf)
if metrics:
LOG.info("sending {} metrics to ffwd...", len(metrics))
push_to_ffwd(metrics, ffwd_address)
except (KeyboardInterrupt, SystemExit):
LOG.warn('*** keyboard interrupt / system exit ***')
raise
except Exception:
LOG.exception('metrics check failed')
finally:
for db_connection in db_connections:
if not db_connection.closed:
db_connection.close()
def confirm_connections_work(conf, db_connections):
"""Call this to confirm that all connections are still alive before using them.
Will recreate any closed connections."""
confirmed_connections = []
for db_connection in db_connections:
if db_connection.closed:
db_name = get_db_name_from_connection(db_connection)
LOG.warn("database connection is closed to db '{}', reconnecting", db_name)
confirmed_connections.append(connect_to_single_db_with_conf(conf, db_name))
else:
confirmed_connections.append(db_connection)
return confirmed_connections
def connect_to_single_db_with_conf(conf, database_name):
LOG.info("open database connection to {}:{}, user '{}', database '{}'",
conf['postgres']['host'], conf['postgres']['port'],
conf['postgres']['user'], database_name)
return get_db_connection(database_name,
conf['postgres']['user'],
conf['postgres']['password'],
host=conf['postgres']['host'],
port=int(conf['postgres']['port']))
def get_db_connections_with_conf(conf):
connections = []
if 'databases' in conf['postgres']:
if not conf['postgres']['databases']:
raise Exception("no target databases defined in configuration")
for database_name in conf['postgres']['databases']:
connections.append(connect_to_single_db_with_conf(conf, database_name))
elif 'database' in conf['postgres']:
# this is here just for backward compatibility, before the databases option handled above
connections.append(connect_to_single_db_with_conf(conf, conf['postgres']['database']))
if not connections:
raise Exception("could not connect to database with configuration:\n" + str(conf))
return connections
def figure_out_postgres_data_dir(db_connection, conf):
data_dir = conf['postgres']['data_dir']
if not data_dir:
db_version = get_major_version(db_connection)
data_dir = "/var/lib/postgresql/{0}/main".format(db_version)
if not os.path.isdir(data_dir):
LOG.debug("data directory '{}' doesn't exist", data_dir)
data_dir = None
else:
LOG.debug('using postgres data directory: {}', data_dir)
return data_dir
DESCRIPTION = """Spotify PostgreSQL Metrics
This tool fetches metrics from a Postgres database cluster,
and returns the results in Metrics 2.0 compatible JSON format.
You can run the 'long-running-ffwd' as a background process that keeps
sending the gathered metrics into FFWD as configured, or you can call
this CLI tool directly for simply printing out the metrics for other
purposes.
Run the prepare-db command to prepare your monitored databases in
the Postgres cluster for the statistics gathering. You need to run
the prepare-db command with database super-user credentials.
"""
USAGE = """Usage: postgresql-metrics <command>
<command> can be:
all Show all available metrics
long-running-ffwd Run in infinite loop, sending metrics to FFWD
prepare-db Create required users, extensions, and functions for metrics.
"""
def main():
parser = argparse.ArgumentParser(description=DESCRIPTION, usage=USAGE)
parser.add_argument("command", help="the command to run")
parser.add_argument("-c", "--config-path", default=DEFAULT_CONFIG_PATH,
help="configuration path, checks also folder 'default' on given path [{}]"
.format(DEFAULT_CONFIG_PATH))
args = parser.parse_args()
conf = find_and_parse_config(args.config_path)
if 'postgres' not in conf:
raise Exception("failed parsing configuration from: " + args.config_path)
log_level = conf.get('log', {}).get('log_level', 'debug')
if args.command == 'all':
init_logging_stderr(log_level)
db_connections = get_db_connections_with_conf(conf)
get_all_metrics_now(db_connections, conf)
print("# sleep 5 s to get diffs on derivative metrics")
time.sleep(5.0)
for metric in get_all_metrics_now(db_connections, conf):
print(metric)
elif args.command == 'long-running-ffwd':
if conf['log']['log_to_stderr'] is True:
init_logging_stderr(log_level)
if conf['log']['log_to_file'] is True:
init_logging_file(conf['log']['filename'], log_level,
conf['log']['rotate_file_log'], conf['log']['file_rotate_max_size'])
if conf['log']['log_to_syslog'] is True:
init_logging_syslog(log_level, facility=conf['log']['syslog_facility'])
run_long_running_ffwd(conf)
elif args.command == 'prepare-db':
init_logging_stderr(log_level)
prepare_databases_for_metrics(conf)
if __name__ == '__main__': # if this script is called from command line
main()