<h1>This notebook retrieves from ES the info from jobs_archive about 10 top users, and sends alarm if usage is above certain thresholds</h1>

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import re
import json
from elasticsearch import Elasticsearch, exceptions as es_exceptions
from elasticsearch.helpers import scan
from pandas.io.json import json_normalize
from IPython.display import display
from pandas import HDFStore,DataFrame
import pandas as pd
from datetime import datetime, timedelta
import datetime

%matplotlib inline

<h2>Retrieve all job indexes from ES</h2>

In [3]:
#define function to filter on time
def time_filter(indices, last_days=1, pattern=''):
    if last_days == 0:
        return ["jobs_archive_*"]
    filtered = []
    if pattern:
        for i in indices:
            if pattern in i:
                filtered.append(i.rstrip())
        return filtered
    today = datetime.date.today()
    filtered = []
    datefmt = '%Y-%m-%d'
    for i in indices:
        day = re.sub(r'jobs_archive_', '', i).rstrip()
        #print(day)
        day = datetime.datetime.strptime(day, datefmt).date()
        diff = today - day
        if diff.days < last_days:
            filtered.append(i.rstrip())
    return filtered

In [4]:
es = Elasticsearch(hosts=[{'host':'atlas-kibana.mwt2.org', 'port':9200}],timeout=60)

#get job archive indices from ES
indices = es.cat.indices(index="jobs_archive_*", h="index", request_timeout=600).split('\n')
indices = sorted(indices)
indices = [x for x in indices if x != '']
if 'jobs_archive_2016_status' in indices:
    indices.remove('jobs_archive_2016_status')
#remove data due to central problem
#if 'jobs_archive_2016-12-29' in indices:
#    indices.remove('jobs_archive_2016-12-29')
#print(indices)

<h2>Retrieve job archives of interest from ES</h2>

In [5]:
#define function to create jobs object from scroll
def jobs_list(scroll, max=-1):
  
    i = 0   
    jobs = []
    
    for result in scroll:
        #print(result['_source']['pandaid'])
        #print_info(result)
        if (max<0):
            jobs.append(result)
        else:
            if (i<max):
                jobs.append(result)
        #if i<1: print_info(result)
        i = i+1
        if not i%100000:  
            print('processing hit '+str(i)+'...')

    return jobs

In [6]:
# retrieve job info from last 2 days
# use e.g. last_days=7 or pattern='2016-02' (no wildcard !)
NDAYS=2  #NDAYS=150 #NDAYS=''
PATTERN='' #PATTERN = '2016-03' #PATTERN=''
ind = time_filter(indices, last_days=NDAYS, pattern=PATTERN)
ind = ','.join(ind)
print(ind)

jobs_archive_2017-05-11,jobs_archive_2017-05-12


<h2>Get results as in Kibana Table Bigger Users</h2>
<h3>Get all analysis job in last 24 hours, aggregate by sum of walltime and inputfilebytes, return top 10 in job counts</h3>

In [6]:
#nested queries to get AND of all conditions

s = {
    "size": 1, # get one job entry only for debugging purposes
    "_source": ["produsername", "modificationtime", "wall_time", "inputfilebytes"], #returns only certain fields    
    'query':{
        'bool':{
            'must':[
                { "term": {"prodsourcelabel":"user" } },
                { 'range' : {
                    'modificationtime' : {
                        "gte" : "now-1d",
                        "lt" :  "now"}
                    }                
                },
                { 'bool' : {
                    'must_not':[
                        { "term": {"produsername": "gangarbt" } },
                        { "term": {"processingtype":"pmerge" } }    
                    ]                        
                    }
                }
            ],
        }
    },
    "aggs": {
        "users":{
            "terms": { "field": "produsername", "size": 10 },
            "aggs": {
                "walltime_sum": { "sum": { "field": "wall_time" } },
                "walltime_core_sum": { "sum": {"script" : {   # use scripted field to calculate corecount
                    "inline": "def core=doc['actualcorecount'].value; if (core!=null) {return doc['wall_time'].value * core} else {return doc['wall_time'].value}"
                } } },
                "inputfilebytes_sum": { "sum": { "field": "inputfilebytes" } },
                "actualcorecount_avg": { "avg": { "field": "actualcorecount" } }
            }
        }
    }
}

res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 

jobs = res['hits']['hits']
#print('check: reading in '+str(len(jobs))+' hits')
#print(jobs[0])

#print('Aggregations')
agg = res['aggregations']['users']['buckets']
#print(agg)

#create df
df_orig = json_normalize(agg)
display(df_orig)

Unnamed: 0,actualcorecount_avg.value,doc_count,inputfilebytes_sum.value,key,walltime_core_sum.value,walltime_sum.value
0,1.0,462416,21610130000000.0,Are Sivertsen Traeet,66455601.0,66456364.0
1,1.0,83976,0.0,Yasuyuki Okumura,188482344.0,188495320.0
2,1.0,62819,37689170000000.0,Katja Hannele Mankinen,6459865.0,6459865.0
3,1.0,60511,151968400000000.0,Ferdinand Schenck,267850325.0,267872225.0
4,1.0,57639,26432630000000.0,Danijela Bogavac,40236424.0,40264323.0
5,1.0,53874,6742656000000.0,Chris Malena Delitzsch,32947192.0,32950870.0
6,1.0,45794,100872300000000.0,Andrew Stephen Chisholm,137768819.0,137768819.0
7,1.0,39259,0.0,Pavel Starovoitov,384696297.0,384696297.0
8,1.0,28959,51913980000000.0,Aleksandr Gavriliuk,15158319.0,15158319.0
9,1.0,21989,195247600000000.0,Takuto Kunigo,10488221.0,10488221.0


In [7]:
#rename colmuns
df = df_orig.rename(index=str, columns={"key": "user", "walltime_sum.value": "walltime", "walltime_core_sum.value": "walltime_core",
         "inputfilebytes_sum.value":"inputsize", "actualcorecount_avg.value":"corecount", "doc_count":"njobs"})
#display(df)

#convert walltime in ncore * walltime[years]
df["walltime"] = df["walltime"] * df["corecount"]
df['walltime'] = df['walltime'].apply(lambda x: timedelta(seconds=int(x)).days/365.2)

df['walltime_core'] = df['walltime_core'].apply(lambda x: timedelta(seconds=int(x)).days/365.2)

#covert inputsize in TB
df['inputsize'] = df['inputsize'].apply(lambda x: x*0.00000000000089)

df = df.rename(index=str, columns={"inputsize":"inputsize[TB]", "walltime":"walltime[year]", "walltime_core":"walltime_core[year]" })

display(df)

Unnamed: 0,corecount,njobs,inputsize[TB],user,walltime_core[year],walltime[year]
0,1.0,462416,19.233015,Are Sivertsen Traeet,2.105696,2.105696
1,1.0,83976,0.0,Yasuyuki Okumura,5.97207,5.97207
2,1.0,62819,33.543362,Katja Hannele Mankinen,0.202629,0.202629
3,1.0,60511,135.251908,Ferdinand Schenck,8.488499,8.488499
4,1.0,57639,23.525044,Danijela Bogavac,1.273275,1.276013
5,1.0,53874,6.000964,Chris Malena Delitzsch,1.043264,1.043264
6,1.0,45794,89.776368,Andrew Stephen Chisholm,4.364732,4.364732
7,1.0,39259,0.0,Pavel Starovoitov,12.190581,12.190581
8,1.0,28959,46.203442,Aleksandr Gavriliuk,0.479189,0.479189
9,1.0,21989,173.770336,Takuto Kunigo,0.331325,0.331325


<h2>First Alarm</h2> 
<h3>get top 10 users/24 hours for walltime*core, and filter out sum walltime > 15 years</h3>

In [8]:
s = {
    "size": 0, 
    'query':{
        'bool':{
            'must':[
                { "term": {"prodsourcelabel":"user" } },
                { 'range' : {
                    'modificationtime' : {
                        "gte" : "now-1d",
                        "lt" :  "now"}
                    }                
                },
                { 'bool' : {
                    'must_not':[
                        { "term": {"produsername": "gangarbt" } },
                        { "term": {"processingtype":"pmerge" } } ]                        
                    }
                }
            ],
        }
    },
    "aggs": {
        "users":{
            "terms": { 
                "field": "produsername", 
                "order": {"walltime_core_sum": "desc"},
                "size": 10
            },
            "aggs": {
                "walltime_core_sum": {
                    "sum": {
                        "script" : {   # use scripted field to calculate corecount
                            "inline": "def core=doc['actualcorecount'].value; if (core!=null) {return doc['wall_time'].value * core} else {return doc['wall_time'].value}"
                        }
                    } 
                },
            }
        }
    }
}

res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 

agg = res['aggregations']['users']['buckets']
#print(agg)

#create df
df_w = json_normalize(agg)
df_w['walltime_core_sum.value'] = df_w['walltime_core_sum.value'].apply(lambda x: timedelta(seconds=int(x)).days/365.2)

LIMIT_WALLTIME = 15 # 5 for testing
df_w = df_w[df_w["walltime_core_sum.value"] > LIMIT_WALLTIME]
display(df_w)

Unnamed: 0,doc_count,key,walltime_core_sum.value


<h2>Second Alarm</h2> 
<h3>get top 10 users/24 hours for inputfilebytes, and filter out sum input size > 500 TB</h3>

In [9]:
s = {
    "size": 0, # get one job entry only for debugging purposes    
    'query':{
        'bool':{
            'must':[
                { "term": {"prodsourcelabel":"user" } },
                { 'range' : {
                    'modificationtime' : {
                        "gte" : "now-1d",
                        "lt" :  "now"}
                    }                
                },
                { 'bool' : {
                    'must_not':[
                        { "term": {"produsername": "gangarbt" } },
                        { "term": {"processingtype":"pmerge" } } ]                        
                    }
                }
            ],
        }
    },
    "aggs": {
        "users":{
            "terms": { 
                "field": "produsername", 
                "order": {"inputsize_sum": "desc"},
                "size": 10
            },
            "aggs": {
                "inputsize_sum": {
                    "sum": { "field": "inputfilebytes" }                     
                },
            }
        }
    }
}

res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 

agg = res['aggregations']['users']['buckets']
#print(agg)

#create df
df_i = json_normalize(agg)
df_i['inputsize_sum.value'] = df_i['inputsize_sum.value'].apply(lambda x: x*0.00000000000089)

LIMIT_INPUTSIZE = 500 # 5 for testing
df_i = df_i[df_i["inputsize_sum.value"] > LIMIT_INPUTSIZE]
display(df_i)

Unnamed: 0,doc_count,inputsize_sum.value,key


<h2>Third Alarm</h2> 
<h3>Notify if user job efficiency drops before 70%</h3>

In [39]:
s = {
    "size": 0, # get one job entry only for debugging purposes    
    'query':{
        'bool':{
            'must':[
                { "term": {"prodsourcelabel":"user" } },
                { 'range' : {
                    'modificationtime' : {
                        "gte" : "now-1d",
                        "lt" :  "now"}
                    }                
                },
                { 'bool' : {
                    'must_not':[
                        { "term": {"produsername": "gangarbt" } },
                        { "term": {"processingtype":"pmerge" } } ,
                        { "term": {"jobstatus" :"cancelled" } } ,
                        { "term": {"jobstatus" :"closed"}}
                        ]                        
                    }
                }
            ],
        }
    },
    "aggs": {
        "status":{
            "terms": { 
                "field": "jobstatus", 
                "order": {"corecount_sum": "desc"},
                "size": 5
            },
            "aggs": {
                "corecount_sum": {
                    "sum": { "field": "actualcorecount" }                     
                },
            }
        }
    }
}

            
res = es.search(index=ind, body=s, request_timeout=12000)
#print(res) 

agg = res['aggregations']['status']['buckets']
#print(agg)

#create df
df_e = json_normalize(agg)
#display(df_e)

finished = df_e[df_e['key']=='finished']
successful = finished['corecount_sum.value'].iloc[0]
failed = df_e[df_e['key']=='failed']
total = failed['corecount_sum.value'].iloc[0] + successful

LIMIT_EFFICIENCY = 0.7
Alarm = ''
if (total==0):
    Alarm = "Alarm, no finished user jobs in last 24 hours"
elif ((successful/total) < LIMIT_EFFICIENCY):
    Alarm = "Alarm, user job efficiency "+str(successful/total)    

if (len(Alarm)>0):
    print(Alarm)

Alarm, user job efficiency 0.609734121631
