Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mgr/influx: PEP-8 and other fixes to Influx module #19229

Merged
merged 4 commits into from
Dec 13, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
160 changes: 121 additions & 39 deletions src/pybind/mgr/influx/module.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

from datetime import datetime
from threading import Event
import json
import errno
import time

from mgr_module import MgrModule

Expand All @@ -12,29 +12,56 @@
except ImportError:
InfluxDBClient = None


class Module(MgrModule):
COMMANDS = [
{
"cmd": "influx config-set name=key,type=CephString "
"name=value,type=CephString",
"desc": "Set a configuration value",
"perm": "rw"
},
{
"cmd": "influx config-show",
"desc": "Show current configuration",
"perm": "r"
},
{
"cmd": "influx send",
"desc": "Force sending data to Influx",
"perm": "rw"
},
{
"cmd": "influx self-test",
"desc": "debug the module",
"perm": "rw"
"perm": "rw"
},
]

config_keys = {
'hostname': None,
'port': 8086,
'database': 'ceph',
'username': None,
'password': None,
'interval': 5
}

def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
self.event = Event()
self.run = True
self.run = True
self.config = dict()

def get_fsid(self):
return self.get('mon_map')['fsid']

def get_latest(self, daemon_type, daemon_name, stat):
data = self.get_counter(daemon_type, daemon_name, stat)[stat]
if data:
return data[-1][1]
else:
return 0

return 0

def get_df_stats(self):
df = self.get("df")
Expand All @@ -55,15 +82,15 @@ def get_df_stats(self):
point = {
"measurement": "ceph_pool_stats",
"tags": {
"pool_name" : pool['name'],
"pool_id" : pool['id'],
"type_instance" : df_type,
"mgr_id" : self.get_mgr_id(),
"pool_name": pool['name'],
"pool_id": pool['id'],
"type_instance": df_type,
"fsid": self.get_fsid()
},
"time" : datetime.utcnow().isoformat() + 'Z',
"fields": {
"value" : pool['stats'][df_type],
}
"time": datetime.utcnow().isoformat() + 'Z',
"fields": {
"value": pool['stats'][df_type],
}
}
data.append(point)
return data
Expand All @@ -86,7 +113,8 @@ def get_daemon_stats(self):
"tags": {
"ceph_daemon": daemon,
"type_instance": path,
"host": metadata['hostname']
"host": metadata['hostname'],
"fsid": self.get_fsid()
},
"time": datetime.utcnow().isoformat() + 'Z',
"fields": {
Expand All @@ -96,31 +124,67 @@ def get_daemon_stats(self):

return data

def set_config_option(self, option, value):
if option not in self.config_keys.keys():
raise RuntimeError('{0} is a unknown configuration '
'option'.format(option))

if option in ['port', 'interval']:
try:
value = int(value)
except (ValueError, TypeError):
raise RuntimeError('invalid {0} configured. Please specify '
'a valid integer'.format(option))

if option == 'interval' and value < 5:
raise RuntimeError('interval should be set to at least 5 seconds')

self.config[option] = value

def init_module_config(self):
self.config['hostname'] = \
self.get_config("hostname", default=self.config_keys['hostname'])
self.config['port'] = \
int(self.get_config("port", default=self.config_keys['port']))
self.config['database'] = \
self.get_config("database", default=self.config_keys['database'])
self.config['username'] = \
self.get_config("username", default=self.config_keys['username'])
self.config['password'] = \
self.get_config("password", default=self.config_keys['password'])
self.config['interval'] = \
int(self.get_config("interval",
default=self.config_keys['interval']))

def send_to_influx(self):
host = self.get_config("hostname")
if not host:
self.log.error("No InfluxDB server configured, please set"
"`hostname` configuration key.")
if not self.config['hostname']:
self.log.error("No Influx server configured, please set one using: "
"ceph influx config-set hostname <hostname>")
return

port = int(self.get_config("port", default="8086"))
database = self.get_config("database", default="ceph")

# If influx server has authentication turned off then
# missing username/password is valid.
username = self.get_config("username", default="")
password = self.get_config("password", default="")

client = InfluxDBClient(host, port, username, password, database)

# using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created
self.log.debug("Sending data to Influx host: %s",
self.config['hostname'])
client = InfluxDBClient(self.config['hostname'], self.config['port'],
self.config['username'],
self.config['password'],
self.config['database'])

# using influx client get_list_database requires admin privs,
# instead we'll catch the not found exception and inform the user if
# db can not be created
try:
client.write_points(self.get_df_stats(), 'ms')
client.write_points(self.get_daemon_stats(), 'ms')
except InfluxDBClientError as e:
if e.code == 404:
self.log.info("Database '{0}' not found, trying to create (requires admin privs). You can also create manually and grant write privs to user '{1}'".format(database,username))
client.create_database(database)
self.log.info("Database '%s' not found, trying to create "
"(requires admin privs). You can also create "
"manually and grant write privs to user "
"'%s'", self.config['database'],
self.config['username'])
client.create_database(self.config['database'])
else:
raise

Expand All @@ -130,18 +194,35 @@ def shutdown(self):
self.event.set()

def handle_command(self, cmd):
if cmd['prefix'] == 'influx config-show':
return 0, json.dumps(self.config), ''
elif cmd['prefix'] == 'influx config-set':
key = cmd['key']
value = cmd['value']
if not value:
return -errno.EINVAL, '', 'Value should not be empty or None'

self.log.debug('Setting configuration option %s to %s', key, value)
self.set_config_option(key, value)
self.set_config(key, value)
return 0, 'Configuration option {0} updated'.format(key), ''
elif cmd['prefix'] == 'influx send':
self.send_to_influx()
return 0, 'Sending data to Influx', ''
if cmd['prefix'] == 'influx self-test':
daemon_stats = self.get_daemon_stats()
assert len(daemon_stats)
df_stats = self.get_df_stats()

result = {
'daemon_stats': daemon_stats,
'df_stats': df_stats
}

return 0, json.dumps(result, indent=2), 'Self-test OK'
else:
return (-errno.EINVAL, '',
"Command not found '{0}'".format(cmd['prefix']))

return (-errno.EINVAL, '',
"Command not found '{0}'".format(cmd['prefix']))

def serve(self):
if InfluxDBClient is None:
Expand All @@ -150,13 +231,14 @@ def serve(self):
return

self.log.info('Starting influx module')
self.init_module_config()
self.run = True

while self.run:
start = time.time()
self.send_to_influx()
self.log.debug("Running interval loop")
interval = self.get_config("interval")
if interval is None:
interval = 5
self.log.debug("sleeping for %d seconds",interval)
self.event.wait(interval)

runtime = time.time() - start
self.log.debug('Finished sending data in Influx in %.3f seconds',
runtime)
self.log.debug("Sleeping for %d seconds", self.config['interval'])
self.event.wait(self.config['interval'])