<h1>This notebook retrieves from ES the info from jobs_archive about 10 top users, and sends alarm if usage is above certain thresholds</h1>

In [88]:
import numpy as np
import re
import subprocess
import json
from elasticsearch import Elasticsearch, exceptions as es_exceptions
from pandas.io.json import json_normalize
from IPython.display import display
from pandas import DataFrame
import pandas as pd
from datetime import datetime, timedelta
import datetime

<h2>Retrieve all job indexes from ES</h2>

In [89]:
#define function to filter on time
def time_filter(indices, last_days=1, pattern=''):
    if last_days == 0:
        return ["jobs_archive_*"]
    filtered = []
    if pattern:
        for i in indices:
            if pattern in i:
                filtered.append(i.rstrip())
        return filtered
    today = datetime.date.today()
    filtered = []
    datefmt = '%Y-%m-%d'
    for i in indices:
        day = re.sub(r'jobs_archive_', '', i).rstrip()
        #print(day)
        if '_reindexed' in day:
            day = re.sub(r'_reindexed', '', day).lstrip()        
        day = datetime.datetime.strptime(day, datefmt).date()
        diff = today - day
        if diff.days < last_days:
            filtered.append(i.rstrip())
    return filtered

In [90]:
es = Elasticsearch(hosts=[{'host':'atlas-kibana.mwt2.org', 'port':9200, 'scheme':'https'}],timeout=60)

#get job archive indices from ES
indices = es.cat.indices(index="jobs_archive_*", h="index", request_timeout=600).split('\n')
indices = sorted(indices)
indices = [x for x in indices if x != '']
if 'jobs_archive_2016_status' in indices:
    indices.remove('jobs_archive_2016_status')

<h2>Retrieve job archives of interest from ES</h2>

In [91]:
# retrieve job info from last 2 days
# use e.g. last_days=7 or pattern='2016-02' (no wildcard !)
NDAYS=2  #NDAYS=150 #NDAYS=''
PATTERN='' #PATTERN = '2016-03' #PATTERN=''
ind = time_filter(indices, last_days=NDAYS, pattern=PATTERN)
ind = ','.join(ind)
print(ind)

jobs_archive_2018-09-20,jobs_archive_2018-09-21


## Alerts and Alarms

In [92]:
from subscribers import subscribers
import alerts

S = subscribers()
A = alerts.alerts()

<h2>First Alarm</h2> 
<h3>get top 10 users/24 hours for walltime*core, and filter out sum walltime > 15 years</h3>
<h3>convert walltime in number of cores used per day, by assuming all jobs are single core</h3>

In [64]:
s = {
    "size": 0, 
    'query':{
        'bool':{
            'must':[
                { "term": {"prodsourcelabel":"user" } },
                { 'range' : {
                    'modificationtime' : {
                        "gte" : "now-1d",
                        "lt" :  "now"}
                    }                
                },
                { 'bool' : {
                    'must_not':[
                        { "term": {"produsername": "gangarbt" } },
                        { "term": {"processingtype":"pmerge" } } ,
                        { 'exists' : { "field" : "workinggroup" }}    # only users without workinggroup priviledges
                        ]                        
                    }
                }
            ],
        }
    },
    "aggs": {
        "users":{
            "terms": { 
                "field": "produsername", 
                "order": {"walltime_core_sum": "desc"},
                "size": 10
            },
            "aggs": {
                "walltime_core_sum": {
                    "sum": {
                        "script" : {   # use scripted field to calculate corecount
                            "inline": "def core=doc['actualcorecount'].value; if (core!=null) {return doc['wall_time'].value * core} else {return doc['wall_time'].value}"
                        }
                    } 
                    
                },
            }
        }
    }
}

res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 

agg = res['aggregations']['users']['buckets']
jsondata = json.dumps(agg)

process = subprocess.Popen(["curl", "-D-", "-H", "Content-Type:application/json" ,"-X","POST","--data", jsondata, "http://test-jgarcian.web.cern.ch/test-jgarcian/cgi-bin/usersJIRA.py"],stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
#print(agg)

#create df
df_w = json_normalize(agg)
df_w['walltime_core_sum.value'] = df_w['walltime_core_sum.value'].apply(lambda x: timedelta(seconds=int(x)).days/365.2)
df_w['ncores']=df_w['walltime_core_sum.value'].apply(lambda x: x*365.) #transform walltime[year] in walltime[day]

LIMIT_WALLTIME = 15 # 5 for testing
df_w = df_w[df_w["walltime_core_sum.value"] > LIMIT_WALLTIME]

display(df_w)
df_w.columns = ['jobs', 'user', 'walltime used [years]', 'number of cores']
print(df_w.to_string())


Unnamed: 0,doc_count,key,walltime_core_sum.value,ncores


Empty DataFrame
Columns: [jobs, user, walltime used [years], number of cores]
Index: []


In [72]:
if df_w.shape[0]>0:
    test_name='Top Analysis users [Large wall time]'
    for u in S.get_immediate_subscribers(test_name):
        body = 'Dear ' + u.name+',\n\n'
        body += 'the following users used substantial wall time (more than 15 years/last 24 hours, corresponding to 5475 cores/day):\n\n'
        body += df_w.to_string() + '\n'
        body += '\n To get more information about this alert message and its interpretation, please visit:\n'
        body += 'http://atlas-kibana.mwt2.org:5601/app/kibana#/dashboard/FL-Analysis-User'
        body += '\nhttps://its.cern.ch/jira/browse/ADCDPA-1'
        body += '\n To change your alerts preferences please use the following link:\n'+u.link
        body += '\n\nBest regards,\nATLAS Alarm & Alert Service'
        #A.sendMail(test_name, u.email, body)
        #print(body)
    #A.addAlert(test_name, u.name, str(df_w.shape[0])+' users with huge walltime.')
else:
    print('No Alarm')

Dear Ilija Vukotic,

the following users used substantial wall time (more than 15 years/last 24 hours, corresponding to 5475 cores/day):

    avg attempt  cpusonsumption [years]  failed jobs  jeditaskid                      User
1      1.725502                0.079409        43268  [15367137]             Ondrej Hladik
3     15.271063                1.013143        12344  [15220398]    Jackson Carl Burzynski
4      1.505893                1.547097        10860  [15376188]         Andrea Helen Knue
7      1.992601                6.078861         6893  [15358086]           Andrey Minaenko
8      1.615543                2.976451         6331  [15369330]        Paul Philipp Gadow
9      5.213180                0.073932         6267  [15368039]                 Chen Wang
12     1.910445                0.098576         4701  [15371710]  Benjamin Henry Hooberman
17     1.833138                1.278751         2565  [15367797]            Vincent Kitali
18     6.262295                0.019168    

<h2>Second Alarm</h2> 
<h3>get top 10 users/24 hours for inputfilebytes, and filter out sum input size > 500 TB</h3>

In [66]:
s = {
    "size": 0, # get one job entry only for debugging purposes    
    'query':{
        'bool':{
            'must':[
                { "term": {"prodsourcelabel":"user" } },
                { 'range' : {
                    'modificationtime' : {
                        "gte" : "now-1d",
                        "lt" :  "now"}
                    }                
                },
                { 'bool' : {
                    'must_not':[
                        { "term": {"produsername": "gangarbt" } },
                        { "term": {"processingtype":"pmerge" } },
                        { "term": {"jobstatus" :"closed"} },
                        { "term": {"jobstatus" :"cancelled"} },
                        { 'exists' : { "field" : "workinggroup" }}]                        
                    }
                }
            ],
        }
    },
    "aggs": {
        "users":{
            "terms": { 
                "field": "produsername", 
                "order": {"inputsize_sum": "desc"},
                "size": 10
            },
            "aggs": {
                "inputsize_sum": {
                    "sum": { "field": "inputfilebytes" }                     
                },
            }
        }
    }
}

res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 


agg = res['aggregations']['users']['buckets']
#print(agg)

jsondata = json.dumps(agg)

process = subprocess.Popen(["curl", "-D-", "-H", "Content-Type:application/json" ,"-X","POST","--data", jsondata, "http://test-jgarcian.web.cern.ch/test-jgarcian/cgi-bin/usersJIRA.py"],stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
print(stdout)

#create df
df_i = json_normalize(agg)
df_i['inputsize_sum.value'] = df_i['inputsize_sum.value'].apply(lambda x: x*0.00000000000089)

LIMIT_INPUTSIZE = 500 # 5 for testing
df_i = df_i[df_i["inputsize_sum.value"] > LIMIT_INPUTSIZE]
#display(df_i)

df_i.columns = ['jobs', 'input size [TB]', 'user']
print(df_i.to_string())

b'HTTP/1.1 200 OK\r\nDate: Fri, 14 Sep 2018 10:38:17 GMT\r\nServer: Apache\r\nVary: Accept-Encoding\r\nConnection: close\r\nTransfer-Encoding: chunked\r\nContent-Type: text/html\r\n\r\n\n'
Empty DataFrame
Columns: [jobs, input size [TB], user]
Index: []


In [67]:
if df_i.shape[0]>0:
    test_name='Top Analysis users [Large input data size]'
    for u in S.get_immediate_subscribers(test_name):
        body = 'Dear ' + u.name+',\n\n'
        body += 'the following users processed rather substantial input data (>500 TB/last 24 hours):\n\n'
        body += df_i.to_string() + '\n'
        body += '\n To get more information about this alert message and its interpretation, please visit:\n'
        body += 'http://atlas-kibana.mwt2.org:5601/app/kibana#/dashboard/FL-Analysis-User'
        body += '\nhttps://its.cern.ch/jira/browse/ADCDPA-1'
        body += '\n To change your alerts preferences please use the following link:\n'+u.link
        body += '\n\nBest regards,\nATLAS Alarm & Alert Service'
        #A.sendMail(test_name, u.email, body)
        #print(body)
        #A.addAlert(test_name, u.name, str(df_w.shape[0])+' users with huge walltime.')
else:
    print('No Alarm')        

No Alarm


<h2>Third Alarm</h2> 
<h3>Notify if user job efficiency drops before 70%</h3>

In [68]:
s = {
    "size": 0, # get one job entry only for debugging purposes    
    'query':{
        'bool':{
            'must':[
                { "term": {"prodsourcelabel":"user" } },
                { 'range' : {
                    'modificationtime' : {
                        "gte" : "now-1d",
                        "lt" :  "now"}
                    }                
                },
                { 'bool' : {
                    'must_not':[
                        { "term": {"produsername": "gangarbt" } },
                        { "term": {"processingtype":"pmerge" } } ,
                        { "term": {"jobstatus" :"cancelled" } } ,
                        { "term": {"jobstatus" :"closed"}}
                        ]                        
                    }
                }
            ],
        }
    },
    "aggs": {
        "status":{
            "terms": { 
                "field": "jobstatus", 
                "order": {"corecount_sum": "desc"},
                "size": 5
            },
            "aggs": {
                "corecount_sum": {
                    "sum": { "field": "actualcorecount" }                     
                },
            }
        }
    }
}

            
res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 

agg = res['aggregations']['status']['buckets']
#print(agg)

#create df
df_e = json_normalize(agg)
#display(df_e)

finished = df_e[df_e['key']=='finished']
successful = finished['corecount_sum.value'].iloc[0]
failed = df_e[df_e['key']=='failed']
total = failed['corecount_sum.value'].iloc[0] + successful


LIMIT_EFFICIENCY = 0.7
Alarm = ''
if (total==0):
    Alarm = "Alarm, no finished user jobs in last 24 hours"
else:
    efficiency = successful/total
    print(str(efficiency))
    if (efficiency < LIMIT_EFFICIENCY):
        Alarm = "Alarm, user job efficiency is "+str(round(efficiency,1))    

if (len(Alarm)>0):
    print(Alarm)

0.8455087889050154


In [69]:
if (len(Alarm)>0):
    test_name='Top Analysis users [Low efficiency]'
    for u in S.get_immediate_subscribers(test_name):
        body = 'Dear ' + u.name+',\n\n'
        body += 'the following alarm was raised regarding the global user job efficiency in the last 24 hours:\n\n'
        body += Alarm + '\n'
        body += '\n The efficiency is defined as walltime of successful jobs divided by the walltime of successful plus failed jobs'
        body += '\n The efficiency is calculated on all user jobs in the last 24 hours.'
        body += '\n To get more information about this alert message and its interpretation, please visit:\n'
        body += 'http://atlas-kibana.mwt2.org:5601/app/kibana#/dashboard/FL-Analysis'
        body += '\nhttp://atlas-kibana.mwt2.org:5601/app/kibana#/dashboard/FL-Analysis-User'
        body += '\n To change your alerts preferences please use the following link:\n'+u.link
        body += '\n\nBest regards,\nATLAS Alarm & Alert Service'
        A.sendMail(test_name, u.email, body)
        #print(body)
        A.addAlert(test_name, u.name, Alarm)
else:
    print('No Alarm') 

No Alarm


<h2>Fourth alarm</h2> 
<h3>Users with large number of failing jobs (>1000) and retries (>1.5) </h3>

In [93]:
s = {
  "size": 0,
  "_source": {
    "excludes": []
  },
  "aggs": {
    "users": {
      "terms": {
        "field": "produsername",
        "size": 20,
        "order": {
          "_count": "desc"
        }
      },
      "aggs": {
        "cpuconsumptiontime": {
          "sum": {
            "field": "cpuconsumptiontime"
          }
        },
        "attemptnr": {
          "avg": {
            "field": "attemptnr"
          }
        },
        "jeditaskid": {
          "top_hits": {
            "docvalue_fields": [
              "jeditaskid"
            ],
            "_source": "jeditaskid",
            "size": 1,       
            "sort": [
              {
                "jeditaskid": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    }
  },
  "query": {
    "bool": {
      "must": [
        {
          "query_string": {
            "query": "(prodsourcelabel:user) AND (NOT produsername:\"gangarbt\") AND (NOT processingtype:pmerge)",
            "analyze_wildcard": "true",
            "lowercase_expanded_terms": "false"
          }
        },
        {
            
          "range": {
            'modificationtime' : {
                "gte" : "now-1d",
                "lt" :  "now"}
            } 
        }
      ],
      "filter": [],
      "should": [],
      "must_not": []
    }
  }
}


res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 

agg = res['aggregations']['users']['buckets']
#print(agg)

nagg = []
for x in agg :
    y = x
    if isinstance(y['jeditaskid'], dict):
        y['jeditaskid'] = x['jeditaskid']['hits']['hits'][0]['sort']
    elif isinstance(y['jeditaskid'], list):
        y['jeditaskid'] = x['jeditaskid'][0]
    else :
        y['jeditaskid'] = x['jeditaskid']
    nagg.append(y)

#create df
df_w = json_normalize(nagg)
df_w['cpuconsumptiontime.value'] = df_w['cpuconsumptiontime.value'].apply(lambda x: timedelta(seconds=int(x)).days/365.2)

LIMIT_FAILURES = 1000 # 5 for testing
LIMIT_ATTEMPT = 1.5 # 5 for testing

df_w = df_w[df_w["doc_count"] > LIMIT_FAILURES]
df_w = df_w[df_w["attemptnr.value"] > LIMIT_ATTEMPT]

display(df_w)
df_w.columns = ['avg attempt', 'cpusonsumption [years]', 'failed jobs', 'jeditaskid', 'User']
print(df_w.to_string())

Unnamed: 0,attemptnr.value,cpuconsumptiontime.value,doc_count,jeditaskid,key
0,3.585699,1.519715,44837,[15448962],Johannes Junggeburth
1,3.586317,5.309419,38967,[15442115],Timothee Theveneaux-Pelzer
2,3.451228,1.711391,38424,[15418167],Masahiro Yamatani
15,4.431541,1.48138,6398,[15368764],Paul Philipp Gadow
16,10.885815,0.098576,5929,[15425194],Benjamin Henry Hooberman
17,3.457489,0.136911,5281,[15454659],Daniel Scheirich


    avg attempt  cpusonsumption [years]  failed jobs  jeditaskid                        User
0      3.585699                1.519715        44837  [15448962]        Johannes Junggeburth
1      3.586317                5.309419        38967  [15442115]  Timothee Theveneaux-Pelzer
2      3.451228                1.711391        38424  [15418167]           Masahiro Yamatani
15     4.431541                1.481380         6398  [15368764]          Paul Philipp Gadow
16    10.885815                0.098576         5929  [15425194]    Benjamin Henry Hooberman
17     3.457489                0.136911         5281  [15454659]            Daniel Scheirich


In [94]:
if df_w.shape[0]>0:
    test_name='Top Analysis users [Retrial attempts]'
    for u in S.get_immediate_subscribers(test_name):
        body = 'Dear ' + u.name+',\n\n'
        body += 'the following users have tasks having troubles running:\n\n'
        body += df_w.to_string() + '\n\n'
        body += '\n\nBest regards,\nATLAS Alarm & Alert Service'
        print(u.name)
        A.sendMail(test_name, u.email, body)
        #print(body)
        A.addAlert(test_name, u.name, str(df_w.shape[0])+' large number of failures.')
else:
    print('No Alarm')

Alessandro di Girolamo
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: Top Analysis users [Retrial attempts]
From: AAAS@mwt2.org
To: Alessandro.Di.Girolamo@cern.ch

Dear Alessandro di Girolamo,

the following users have tasks having troubles running:

    avg attempt  cpusonsumption [years]  failed jobs  jeditaskid                        User
0      3.585699                1.519715        44837  [15448962]        Johannes Junggeburth
1      3.586317                5.309419        38967  [15442115]  Timothee Theveneaux-Pelzer
2      3.451228                1.711391        38424  [15418167]           Masahiro Yamatani
15     4.431541                1.481380         6398  [15368764]          Paul Philipp Gadow
16    10.885815                0.098576         5929  [15425194]    Benjamin Henry Hooberman
17     3.457489                0.136911         5281  [15454659]            Daniel Scheirich



Best regards,
ATLAS Alarm & Alert Serv