Skip to content

Commit

Permalink
Cgroup metric plugin: attempt to deal with multiline params, and store
Browse files Browse the repository at this point in the history
numeric values in the numeric table. Also fix a display bug.
  • Loading branch information
natefoo committed Apr 26, 2018
1 parent dbb08fe commit 84af648
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 28 deletions.
83 changes: 66 additions & 17 deletions lib/galaxy/jobs/metrics/instrumenters/cgroup.py
@@ -1,5 +1,7 @@
"""The module describes the ``cgroup`` job metrics plugin."""
import logging
import numbers
from collections import namedtuple

from galaxy.util import asbool, nice_size
from ..instrumenters import InstrumentPlugin
Expand All @@ -14,21 +16,20 @@
"memory.memsw.limit_in_bytes": "Memory limit on cgroup (MEM+SWP)",
"memory.soft_limit_in_bytes": "Memory softlimit on cgroup",
"memory.failcnt": "Failed to allocate memory count",
"memory.oom_control": "OOM Control enabled",
"under_oom": "Was OOM Killer active?",
"memory.oom_control.oom_kill_disable": "OOM Control enabled",
"memory.oom_control.under_oom": "Was OOM Killer active?",
"cpuacct.usage": "CPU Time"
}
CONVERSION = {
"memory.memsw.max_usage_in_bytes": nice_size,
"memory.max_usage_in_bytes": nice_size,
"memory.limit_in_bytes": nice_size,
"memory.memsw.limit_in_bytes": nice_size,
"memory.soft_limit_in_bytes": nice_size,
"under_oom": lambda x: "Yes" if x == "1" else "No",
"cpuacct.usage": lambda x: formatting.seconds_to_str(int(x) / 10**9) # convert nanoseconds
"memory.oom_control.oom_kill_disable": lambda x: "No" if x == 1 else "Yes",
"memory.oom_control.under_oom": lambda x: "Yes" if x == 1 else "No",
"cpuacct.usage": lambda x: formatting.seconds_to_str(x / 10**9) # convert nanoseconds
}


Metric = namedtuple("Metric", ("key", "subkey", "value"))


class CgroupPluginFormatter(formatting.JobMetricFormatter):

def format(self, key, value):
Expand All @@ -37,9 +38,11 @@ def format(self, key, value):
return title, CONVERSION[key](value)
elif key.endswith("_bytes"):
try:
return title, nice_size(key)
return title, nice_size(value)
except ValueError:
pass
elif isinstance(value, numbers.Number) and value == int(value):
value = int(value)
return title, value


Expand Down Expand Up @@ -73,17 +76,63 @@ def __cgroup_metrics_file(self, job_directory):

def __read_metrics(self, path):
metrics = {}
prev_metric = None
with open(path, "r") as infile:
for line in infile:
line = line.strip()
try:
key, value = line.split(": ")
if key in TITLES or self.verbose:
metrics[key] = value
except ValueError:
if line.startswith("under_oom"):
metrics["under_oom"] = line.split(" ")[1]
metric, prev_metric = self.__read_key_value(line, prev_metric)
except Exception:
log.exception("Caught exception attempting to read metric from cgroup line: %s", line)
metric = None
if not metric:
continue
self.__add_metric(metrics, prev_metric)
prev_metric = metric
self.__add_metric(metrics, prev_metric)
return metrics

def __add_metric(self, metrics, metric):
if metric and (metric.subkey in TITLES or self.verbose):
metrics[metric.subkey] = metric.value

def __read_key_value(self, line, prev_metric):
if not line.startswith('\t'):
# line is a single-line param or the first line of a multi-line param
try:
subkey, value = line.strip().split(": ", 1)
key = subkey
except ValueError:
# or not a param line at all, ignore
return None, prev_metric
else:
# line is a subsequent line of a multi-line param
subkey, value = line.strip().split(" ", 1)
key = prev_metric.key
subkey = ".".join((key, subkey))
prev_metric = self.__fix_prev_metric(prev_metric)
value = self.__type_value(value)
return (Metric(key, subkey, value), prev_metric)

def __fix_prev_metric(self, metric):
# we can't determine whether a param is single-line or multi-line until we read the second line, after which, we
# must go back and fix the first param to be subkeyed
if metric.key == metric.subkey:
try:
subkey, value = metric.value.split(" ", 1)
subkey = ".".join((metric.key, subkey))
metric = Metric(metric.key, subkey, self.__type_value(value))
except ValueError:
pass
return metric

def __type_value(self, value):
try:
try:
return int(value)
except ValueError:
return float(value)
except ValueError:
return value


__all__ = ('CgroupPlugin', )
17 changes: 14 additions & 3 deletions lib/galaxy/model/__init__.py
Expand Up @@ -65,6 +65,11 @@
# this be unlimited - filter in Python if over this limit.
MAX_IN_FILTER_LENGTH = 100

# The column sizes for job metrics
JOB_METRIC_MAX_LENGTH = 1023
JOB_METRIC_PRECISION = 22
JOB_METRIC_SCALE = 7


class NoConverterException(Exception):
def __init__(self, value):
Expand Down Expand Up @@ -149,22 +154,28 @@ def seconds_since_created(self):

class JobLike(object):

MAX_NUMERIC = 10**(JOB_METRIC_PRECISION - JOB_METRIC_SCALE) - 1

def _init_metrics(self):
self.text_metrics = []
self.numeric_metrics = []

def add_metric(self, plugin, metric_name, metric_value):
plugin = unicodify(plugin, 'utf-8')
metric_name = unicodify(metric_name, 'utf-8')
if isinstance(metric_value, numbers.Number):
number = isinstance(metric_value, numbers.Number)
if number and int(metric_value) <= JobLike.MAX_NUMERIC:
metric = self._numeric_metric(plugin, metric_name, metric_value)
self.numeric_metrics.append(metric)
elif number:
log.warning("Cannot store metric due to database column overflow (max: %s): %s: %s",
JobLike.MAX_NUMERIC, metric_name, metric_value)
else:
metric_value = unicodify(metric_value, 'utf-8')
if len(metric_value) > 1022:
if len(metric_value) > (JOB_METRIC_MAX_LENGTH - 1):
# Truncate these values - not needed with sqlite
# but other backends must need it.
metric_value = metric_value[:1022]
metric_value = metric_value[:(JOB_METRIC_MAX_LENGTH - 1)]
metric = self._text_metric(plugin, metric_name, metric_value)
self.text_metrics.append(metric)

Expand Down
11 changes: 4 additions & 7 deletions lib/galaxy/model/mapping.py
Expand Up @@ -665,40 +665,37 @@
Column("history_id", Integer, ForeignKey("history.id"), index=True),
Column("archive_dir", TEXT))


JOB_METRIC_MAX_LENGTH = 1023

model.JobMetricText.table = Table(
"job_metric_text", metadata,
Column("id", Integer, primary_key=True),
Column("job_id", Integer, ForeignKey("job.id"), index=True),
Column("plugin", Unicode(255)),
Column("metric_name", Unicode(255)),
Column("metric_value", Unicode(JOB_METRIC_MAX_LENGTH)))
Column("metric_value", Unicode(model.JOB_METRIC_MAX_LENGTH)))

model.TaskMetricText.table = Table(
"task_metric_text", metadata,
Column("id", Integer, primary_key=True),
Column("task_id", Integer, ForeignKey("task.id"), index=True),
Column("plugin", Unicode(255)),
Column("metric_name", Unicode(255)),
Column("metric_value", Unicode(JOB_METRIC_MAX_LENGTH)))
Column("metric_value", Unicode(model.JOB_METRIC_MAX_LENGTH)))

model.JobMetricNumeric.table = Table(
"job_metric_numeric", metadata,
Column("id", Integer, primary_key=True),
Column("job_id", Integer, ForeignKey("job.id"), index=True),
Column("plugin", Unicode(255)),
Column("metric_name", Unicode(255)),
Column("metric_value", Numeric(22, 7)))
Column("metric_value", Numeric(model.JOB_METRIC_PRECISION, model.JOB_METRIC_SCALE)))

model.TaskMetricNumeric.table = Table(
"task_metric_numeric", metadata,
Column("id", Integer, primary_key=True),
Column("task_id", Integer, ForeignKey("task.id"), index=True),
Column("plugin", Unicode(255)),
Column("metric_name", Unicode(255)),
Column("metric_value", Numeric(22, 7)))
Column("metric_value", Numeric(model.JOB_METRIC_PRECISION, model.JOB_METRIC_SCALE)))


model.GenomeIndexToolData.table = Table(
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/util/__init__.py
Expand Up @@ -1270,7 +1270,7 @@ def nice_size(size):
>>> nice_size(100000000)
'95.4 MB'
"""
words = ['bytes', 'KB', 'MB', 'GB', 'TB']
words = ['bytes', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB']
prefix = ''
try:
size = float(size)
Expand Down

0 comments on commit 84af648

Please sign in to comment.