In [1]:
import pandas as pd
import os
from anomaly.tools import tsdb, slurm
from anomaly import utils

In [2]:
from ipywidgets import widgets
from IPython.display import display, clear_output, Image

In [3]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

In [4]:
from bokeh.layouts import column, row, widgetbox, gridplot
from bokeh.palettes import inferno, magma, viridis
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource, Span, Band, HoverTool, Button, Select, Legend
from bokeh.core.properties import field, value
from bokeh.plotting import figure, output_file, show
from bokeh.io import output_notebook, push_notebook
output_notebook()

## DEFINICIÓN DOS MÉTODOS

In [5]:
#Método que permite mejorar la visualización de las series temporales por nodo.
def chunks(l, n):
    """Yield successive n-sized chunks from l"""
    for i in range(0, len(l), n):
        yield l[i:i + n]

In [6]:
def plot(metric, ts):
    
    nodes = sorted(ts.columns)

    ts['min'] = ts.min(axis=1)
    ts['max'] = ts.max(axis=1)
    ts['p25'] = ts.quantile(q=0.25, axis=1)
    ts['p75'] = ts.quantile(q=0.75, axis=1)
    ts['median'] = ts.median(axis=1)
    ts['mean'] = ts.mean(axis=1)
    ds_ts = ColumnDataSource(data=ts)    

    p = figure(title=metric, x_axis_type="datetime", height=800, width=900)
    p.xaxis.axis_label = 'Date'
    p.yaxis.axis_label = 'Value'

    palette = viridis(len(nodes))

    # Plot lines
    renderers = {}
    for n, node in enumerate(nodes):
        line = p.line(x='index', y=node, alpha=0.3, line_color=palette[n], line_width=2, source=ds_ts)
        renderers[node] = line
        hover = HoverTool(tooltips=[('origin', node), ('date', '@index{%F %T}'), ('value', '$y')], renderers=[line])
        hover.formatters = {"index": "datetime"}
        p.add_tools(hover)

    # Min-Max band
    band1 = Band(base='index', lower='min', upper='max', source=ds_ts, level='underlay',
                fill_alpha=1.0)
    p.add_layout(band1)

    # IQR band
    band2 = Band(base='index', lower='p25', upper='p75', source=ds_ts, level='underlay',
                fill_alpha=1.0, line_width=1, line_color='black', fill_color='palegreen')
    p.add_layout(band2)

    for chunk in chunks(nodes, 13):
        legend = Legend(items=[(n, [renderers[n]]) for n in chunk],
                        orientation="horizontal", click_policy="hide")
        p.add_layout(legend, 'below')

    return p

In [7]:
def generate_plots(job_id, partition, metric, frequency):    
    job = slurm.job(job_id, partition)[0]
    series = tsdb.job_series(job, metric)
    ts = utils.to_dataframe(series)
    ts = ts.resample(frequency).mean()    
    p = plot(metric.alias, ts)
    show(p)
    push_notebook()

## XERACIÓN DA SERIE TEMPORAL: ENTENDEMENTO DE COMO PINTALAS

    - As series temporais evalúanse por traballo, polo que deberemos de buscalo mediante un identificador (JOB_ID). 
    - Ademáis, o traballo elexido para avaliar, pode estar duplicado na base de datos, polo que deberemos indicar tamén a partición. Se o traballo non está duplicado, ponse PATITION = None
    - É necesario establecer a frecuencia da serie temporal.
    - As métricas extraense mediante a API OpenTSDP. Hay un montón de métricas xeradas nos sensores que poden ser analizadas (Ficheiro metrics.xls), pero para este proxecto realizaremos o análise daquelas métricas máis representativas á hora de detectar unha anomalía

In [8]:
JOB_ID = '1029475'
PARTITION = 'thinnodes'

RESAMPLE_FREQ = ['15min', '5min', '10min', '30min', '1h']

METRICS = [
    tsdb.Metric('numa.node0.vmpage_action.numa_miss', alias='numa0_miss'),
    tsdb.Metric('numa.node0.vmpage_action.numa_miss', alias='numa1_miss'),
    #tsdb.Metric('proc.cpu.user', alias='proc_cpu_user'),
    #tsdb.Metric('proc.cpu.kernel', alias='proc_cpu_kernel'),
    #tsdb.Metric('proc.disk.reads.mb', alias='proc_disk_reads'),
    #tsdb.Metric('proc.disk.writes.mb', alias='proc_disk_writes'),
    #tsdb.Metric('proc.mem.swap', alias='proc_mem_swap'),
    #tsdb.Metric('proc.mem.resident', alias='proc_mem_rss'),
    #tsdb.Metric('proc.mem.virtual', alias='proc_mem_virtual'),
    #tsdb.Metric('processes.running.ps_state', alias='processes_running'),
    #tsdb.Metric('processes.blocked.ps_state', alias='processes_blocked'),
    #tsdb.Metric('processes.sleeping.ps_state', alias='processes_sleeping'),
    #tsdb.Metric('processes.stopped.ps_state', alias='processess_stopped'),
    #tsdb.Metric('processes.zombies.ps_state', alias='processes_zombies'),
    tsdb.Metric('memcpy.verylittle', filters={'socket': tsdb.Literal(0)}, alias='memcpy-socket0'),
    tsdb.Metric('memcpy.verylittle', filters={'socket': tsdb.Literal(1)}, alias='memcpy-socket1'),
    tsdb.Metric('load.load.shortterm', alias='load'),
    tsdb.Metric('memory.cached.memory', alias='mem_cached'),
    tsdb.Metric('memory.used.memory', alias='mem_used'),
    tsdb.Metric('aggregation.cpu-average.percent.wait', alias='cpu_wait'),
    tsdb.Metric('aggregation.cpu-average.percent.user', alias='cpu_user'),
    tsdb.Metric('aggregation.cpu-average.percent.system', alias='cpu_system'),
    tsdb.Metric('ipmi.PW_consumption', alias='power'),
    tsdb.Metric('ipmi.CPU1_Temp', alias='cpu1_temp'),
    tsdb.Metric('ipmi.CPU2_Temp', alias='cpu2_temp'),
]

Creamos a instancia que representa o traballo JOB_ID na PARTITION. Como vemos, este obxeto almacena información sobre o periodo de tempo no que se avalía a serie temporal, e os nodos sobre o que se executan os traballos.

In [9]:
JOB = slurm.job(JOB_ID, PARTITION)[0]
JOB

Job(id=1029475, partition=u'thinnodes', start=1519509615, end=1519768835, nodes=[u'c6923', u'c6941', u'c7308', u'c7312', u'c7313', u'c7314', u'c7315', u'c7326', u'c7341', u'c7342'])

Xeramos o dataframe correspondente a serie temporal relacionada co traballo JOB representada pola métrica load.load.shortterm. Posteriormente o que facemos é o resample dos datos en base a frecuencia definida (15 minutos). 

Cada unha das columnas representa os nodos nos que o JOB é executado

In [11]:
metric = tsdb.Metric('load.load.shortterm', alias='load')
series = tsdb.job_series(JOB, metric)
ts = utils.to_dataframe(series)
ts = ts.resample(RESAMPLE_FREQ[0]).mean() 
ts.head()

Unnamed: 0,c7313,c7308,c6923,c7315,c7314,c7326,c7312,c7341,c7342,c6941
2018-02-24 22:00:00,21.906,23.225334,21.482,21.496667,21.527333,22.94,21.912667,21.494667,21.488667,21.543333
2018-02-24 22:15:00,24.011333,24.002667,24.024,24.024,24.018,24.026,24.094,24.029333,24.024667,24.026667
2018-02-24 22:30:00,24.014,24.002667,24.006667,24.009333,24.006,24.037333,24.006,24.048,24.013334,24.022667
2018-02-24 22:45:00,24.006,24.003333,24.013333,24.008667,24.015333,24.055999,24.011333,24.061333,24.006667,24.012667
2018-02-24 23:00:00,24.014,24.0,24.02,24.006,24.010667,24.018,24.017333,24.011333,24.004667,24.062


Pintamos a gráfica das series temporais de todos os nodos. Na parte inferior da gráfica podemos desactivar ou activar a liña correspondete ós nodos. Se desactivamos a de todos os nodos, veremos duas franxas, unha amarela e outra verde. A verde representa o rango intercuartil, mentras que a amarela o máximo e o mínimo.

In [12]:
tools = ['save', 'lasso_select', "pan", "box_zoom", "box_select", "reset"]
grid = gridplot([plot(metric.alias, ts)], ncols=1)
show(grid)

## EXPLORACIÓN DAS MÉTRICAS

A continuación avaliaremos varias métricas (Predeterminadas para o proxecto), co obxetivo de visualizalas:

In [13]:
RESAMPLE_FREQ = ['15min','30min','1hour']

interact(generate_plots, job_id=JOB_ID, partition=PARTITION, metric=METRICS, frequency=RESAMPLE_FREQ)

aW50ZXJhY3RpdmUoY2hpbGRyZW49KFRleHQodmFsdWU9dScxMDI5NDc1JywgZGVzY3JpcHRpb249dSdqb2JfaWQnKSwgVGV4dCh2YWx1ZT11J3RoaW5ub2RlcycsIGRlc2NyaXB0aW9uPXUncGHigKY=


<function __main__.generate_plots>