Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mgr/diskprediction: merge initial version of disk failure prediction #24104

Merged
merged 1 commit into from Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions COPYING
Expand Up @@ -145,3 +145,8 @@ Files: src/include/timegm.h
Copyright (C) Copyright Howard Hinnant
Copyright (C) Copyright 2010-2011 Vicente J. Botet Escriba
License: Boost Software License, Version 1.0

Files: src/pybind/mgr/diskprediction/predictor/models/*
Copyright: None
License: Public domain

345 changes: 345 additions & 0 deletions doc/mgr/diskprediction.rst

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions doc/mgr/index.rst
Expand Up @@ -29,6 +29,7 @@ sensible.
Writing plugins <plugins>
Writing orchestrator plugins <orchestrator_modules>
Dashboard plugin <dashboard>
DiskPrediction plugin <diskprediction>
Local pool plugin <localpool>
RESTful plugin <restful>
Zabbix plugin <zabbix>
Expand Down
3 changes: 3 additions & 0 deletions qa/tasks/mgr/test_module_selftest.py
Expand Up @@ -47,6 +47,9 @@ def test_prometheus(self):
def test_influx(self):
self._selftest_plugin("influx")

def test_diskprediction(self):
self._selftest_plugin("diskprediction")

def test_telegraf(self):
self._selftest_plugin("telegraf")

Expand Down
2 changes: 2 additions & 0 deletions src/pybind/mgr/diskprediction/__init__.py
@@ -0,0 +1,2 @@
from __future__ import absolute_import
from .module import Module
38 changes: 38 additions & 0 deletions src/pybind/mgr/diskprediction/agent/__init__.py
@@ -0,0 +1,38 @@
from __future__ import absolute_import

from ..common import timeout, TimeoutError


class BaseAgent(object):

measurement = ''

def __init__(self, mgr_module, obj_sender, timeout=30):
self.data = []
self._client = None
self._client = obj_sender
self._logger = mgr_module.log
self._module_inst = mgr_module
self._timeout = timeout

def run(self):
try:
self._collect_data()
self._run()
except TimeoutError:
self._logger.error('{} failed to execute {} task'.format(
__name__, self.measurement))

def __nonzero__(self):
if not self._module_inst and not self._client:
return False
else:
return True

@timeout()
def _run(self):
pass

@timeout()
def _collect_data(self):
pass
61 changes: 61 additions & 0 deletions src/pybind/mgr/diskprediction/agent/metrics/__init__.py
@@ -0,0 +1,61 @@
from __future__ import absolute_import

from .. import BaseAgent
from ...common import DP_MGR_STAT_FAILED, DP_MGR_STAT_WARNING, DP_MGR_STAT_OK

AGENT_VERSION = '1.0.0'


class MetricsField(object):
def __init__(self):
self.tags = {}
self.fields = {}
self.timestamp = None

def __str__(self):
return str({
'tags': self.tags,
'fields': self.fields,
'timestamp': self.timestamp
})


class MetricsAgent(BaseAgent):

def log_summary(self, status_info):
try:
if status_info:
measurement = status_info['measurement']
success_count = status_info['success_count']
failure_count = status_info['failure_count']
total_count = success_count + failure_count
display_string = \
'%s agent stats in total count: %s, success count: %s, failure count: %s.'
self._logger.info(
display_string % (measurement, total_count, success_count, failure_count)
)
except Exception as e:
self._logger.error(str(e))

def _run(self):
collect_data = self.data
result = {}
if collect_data:
status_info = self._client.send_info(collect_data, self.measurement)
# show summary info
self.log_summary(status_info)
# write sub_agent buffer
total_count = status_info['success_count'] + status_info['failure_count']
if total_count:
if status_info['success_count'] == 0:
self._module_inst.status = \
{'status': DP_MGR_STAT_FAILED,
'reason': 'failed to send metrics data to the server'}
elif status_info['failure_count'] == 0:
self._module_inst.status = \
{'status': DP_MGR_STAT_OK}
else:
self._module_inst.status = \
{'status': DP_MGR_STAT_WARNING,
'reason': 'failed to send partial metrics data to the server'}
return result
146 changes: 146 additions & 0 deletions src/pybind/mgr/diskprediction/agent/metrics/ceph_cluster.py
@@ -0,0 +1,146 @@
from __future__ import absolute_import

import socket

from . import MetricsAgent, MetricsField
from ...common.clusterdata import ClusterAPI


class CephCluster(MetricsField):
""" Ceph cluster structure """
measurement = 'ceph_cluster'

def __init__(self):
super(CephCluster, self).__init__()
self.tags['cluster_id'] = None
self.fields['agenthost'] = None
self.tags['agenthost_domain_id'] = None
self.fields['cluster_health'] = ''
self.fields['num_mon'] = None
self.fields['num_mon_quorum'] = None
self.fields['num_osd'] = None
self.fields['num_osd_up'] = None
self.fields['num_osd_in'] = None
self.fields['osd_epoch'] = None
self.fields['osd_bytes'] = None
self.fields['osd_bytes_used'] = None
self.fields['osd_bytes_avail'] = None
self.fields['num_pool'] = None
self.fields['num_pg'] = None
self.fields['num_pg_active_clean'] = None
self.fields['num_pg_active'] = None
self.fields['num_pg_peering'] = None
self.fields['num_object'] = None
self.fields['num_object_degraded'] = None
self.fields['num_object_misplaced'] = None
self.fields['num_object_unfound'] = None
self.fields['num_bytes'] = None
self.fields['num_mds_up'] = None
self.fields['num_mds_in'] = None
self.fields['num_mds_failed'] = None
self.fields['mds_epoch'] = None


class CephClusterAgent(MetricsAgent):
measurement = 'ceph_cluster'

def _collect_data(self):
# process data and save to 'self.data'
obj_api = ClusterAPI(self._module_inst)
cluster_id = obj_api.get_cluster_id()

c_data = CephCluster()
cluster_state = obj_api.get_health_status()
c_data.tags['cluster_id'] = cluster_id
c_data.fields['cluster_health'] = str(cluster_state)
c_data.fields['agenthost'] = socket.gethostname()
c_data.tags['agenthost_domain_id'] = \
'%s_%s' % (cluster_id, c_data.fields['agenthost'])
c_data.fields['osd_epoch'] = obj_api.get_osd_epoch()
c_data.fields['num_mon'] = len(obj_api.get_mons())
c_data.fields['num_mon_quorum'] = \
len(obj_api.get_mon_status().get('quorum', []))

osds = obj_api.get_osds()
num_osd_up = 0
num_osd_in = 0
for osd_data in osds:
if osd_data.get('up'):
num_osd_up = num_osd_up + 1
if osd_data.get('in'):
num_osd_in = num_osd_in + 1
if osds:
c_data.fields['num_osd'] = len(osds)
else:
c_data.fields['num_osd'] = 0
c_data.fields['num_osd_up'] = num_osd_up
c_data.fields['num_osd_in'] = num_osd_in
c_data.fields['num_pool'] = len(obj_api.get_osd_pools())

df_stats = obj_api.get_df_stats()
total_bytes = df_stats.get('total_bytes', 0)
total_used_bytes = df_stats.get('total_used_bytes', 0)
total_avail_bytes = df_stats.get('total_avail_bytes', 0)
c_data.fields['osd_bytes'] = total_bytes
c_data.fields['osd_bytes_used'] = total_used_bytes
c_data.fields['osd_bytes_avail'] = total_avail_bytes
if total_bytes and total_avail_bytes:
c_data.fields['osd_bytes_used_percentage'] = \
round(float(total_used_bytes) / float(total_bytes) * 100, 4)
else:
c_data.fields['osd_bytes_used_percentage'] = 0.0000

pg_stats = obj_api.get_pg_stats()
num_bytes = 0
num_object = 0
num_object_degraded = 0
num_object_misplaced = 0
num_object_unfound = 0
num_pg_active = 0
num_pg_active_clean = 0
num_pg_peering = 0
for pg_data in pg_stats:
num_pg_active = num_pg_active + len(pg_data.get('acting'))
if 'active+clean' in pg_data.get('state'):
num_pg_active_clean = num_pg_active_clean + 1
if 'peering' in pg_data.get('state'):
num_pg_peering = num_pg_peering + 1

stat_sum = pg_data.get('stat_sum', {})
num_object = num_object + stat_sum.get('num_objects', 0)
num_object_degraded = \
num_object_degraded + stat_sum.get('num_objects_degraded', 0)
num_object_misplaced = \
num_object_misplaced + stat_sum.get('num_objects_misplaced', 0)
num_object_unfound = \
num_object_unfound + stat_sum.get('num_objects_unfound', 0)
num_bytes = num_bytes + stat_sum.get('num_bytes', 0)

c_data.fields['num_pg'] = len(pg_stats)
c_data.fields['num_object'] = num_object
c_data.fields['num_object_degraded'] = num_object_degraded
c_data.fields['num_object_misplaced'] = num_object_misplaced
c_data.fields['num_object_unfound'] = num_object_unfound
c_data.fields['num_bytes'] = num_bytes
c_data.fields['num_pg_active'] = num_pg_active
c_data.fields['num_pg_active_clean'] = num_pg_active_clean
c_data.fields['num_pg_peering'] = num_pg_active_clean

filesystems = obj_api.get_file_systems()
num_mds_in = 0
num_mds_up = 0
num_mds_failed = 0
mds_epoch = 0
for fs_data in filesystems:
num_mds_in = \
num_mds_in + len(fs_data.get('mdsmap', {}).get('in', []))
num_mds_up = \
num_mds_up + len(fs_data.get('mdsmap', {}).get('up', {}))
num_mds_failed = \
num_mds_failed + len(fs_data.get('mdsmap', {}).get('failed', []))
mds_epoch = mds_epoch + fs_data.get('mdsmap', {}).get('epoch', 0)
c_data.fields['num_mds_in'] = num_mds_in
c_data.fields['num_mds_up'] = num_mds_up
c_data.fields['num_mds_failed'] = num_mds_failed
c_data.fields['mds_epoch'] = mds_epoch
self.data.append(c_data)