Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 118 additions & 11 deletions WFM_Input_DashBoard/WFMonDBShort.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
"""
This scripts creates the overall job reports for monitoring in SSB
Should be set as a cronjob @15 min
Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, Running*.txt and Pending*.txt ( * in types )
Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, CondorJobs_Workflows.json, Running*.txt and Pending*.txt ( * in types )
"""

import sys,os,re,urllib,urllib2,subprocess,time
import sys,os,re,urllib,urllib2,subprocess,time,smtplib,os
from datetime import datetime
from email.MIMEMultipart import MIMEMultipart
from email.MIMEBase import MIMEBase
from email.MIMEText import MIMEText
from email.Utils import COMMASPACE, formatdate
from email import Encoders
try:
import json
except ImportError:
Expand Down Expand Up @@ -34,6 +39,10 @@
overview_pending_vobox = {}# Pending per vobox
json_name_vobox = "SSBCERN_voBoxInfo.json" # Output json file name

##Counting Workflows
overview_workflows = {}
json_name_workflows = "CondorJobs_Workflows.json" # Output json file name

##SSB plot links
site_link = "http://dashb-ssb.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site="
overalls_link = "http://dashb-ssb-dev.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site=All%20"
Expand All @@ -42,6 +51,9 @@
jobTypes = ['Processing', 'Production', 'Skim', 'Harvest', 'Merge', 'LogCollect', 'Cleanup', 'RelVal', 'T0']
t0Types = ['Repack', 'Express', 'Reco']

# Mailing list for notifications
mailingList = ['cms-comp-ops-workflow-team@cern.ch']

def createSiteList():
"""
Creates a initial site list with the data from site status in Dashboard
Expand Down Expand Up @@ -171,6 +183,51 @@ def increasePendingVoBox(sched,type):
"""
overview_pending_vobox[sched][type] += 1

def increaseRunningWorkflow(workflow,siteToExtract):
"""
Increases the number of running jobs per workflow
"""
if workflow not in overview_workflows.keys():
addWorkflow(workflow)
if siteToExtract in overview_workflows[workflow]['runningJobs'].keys():
overview_workflows[workflow]['runningJobs'][siteToExtract] += 1
overview_workflows[workflow]['condorJobs'] += 1
else:
overview_workflows[workflow]['runningJobs'][siteToExtract] = 1
overview_workflows[workflow]['condorJobs'] += 1
else:
if siteToExtract in overview_workflows[workflow]['runningJobs'].keys():
overview_workflows[workflow]['runningJobs'][siteToExtract] += 1
overview_workflows[workflow]['condorJobs'] += 1
else:
overview_workflows[workflow]['runningJobs'][siteToExtract] = 1
overview_workflows[workflow]['condorJobs'] += 1

def increasePendingWorkflow(workflow,siteToExtract):
"""
Increases the number of pending jobs per workflow
"""
if workflow not in overview_workflows.keys():
addWorkflow(workflow)
overview_workflows[workflow]['condorJobs'] += 1
overview_workflows[workflow]['pendingJobs'] += 1
overview_workflows[workflow]['desiredSites'] = overview_workflows[workflow]['desiredSites'].union(set(siteToExtract))
else:
overview_workflows[workflow]['condorJobs'] += 1
overview_workflows[workflow]['pendingJobs'] += 1
overview_workflows[workflow]['desiredSites'] = overview_workflows[workflow]['desiredSites'].union(set(siteToExtract))

def addWorkflow(workflow):
"""
Add a new workflow to overview_workflows
"""
overview_workflows[workflow] = {
'condorJobs' : 0,
'runningJobs' : {},
'pendingJobs' : 0,
'desiredSites' : set()
}

def findTask(id,sched,typeToExtract):
"""
This deduces job type from given info about scheduler and taskName
Expand Down Expand Up @@ -404,6 +461,40 @@ def jsonDict(json_name,currTime,date,hour,key):
jsonfile.write(json.dumps(update,sort_keys=True, indent=3))
jsonfile.close()

def send_mail(send_from, send_to, subject, text, files=[], server="localhost"):
"""
Method to send emails
"""
assert isinstance(send_to, list)
assert isinstance(files, list)

msg = MIMEMultipart()
msg['From'] = send_from
msg['To'] = COMMASPACE.join(send_to)
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = subject

msg.attach( MIMEText(text) )

for f in files:
part = MIMEBase('application', "octet-stream")
part.set_payload( open(f,"rb").read() )
Encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(f))
msg.attach(part)

smtp = smtplib.SMTP(server)
smtp.sendmail(send_from, send_to, msg.as_string())
smtp.close()

def set_default(obj):
"""
JSON enconder doesnt support sets, parse them to lists
"""
if isinstance(obj, set):
return list(obj)
raise TypeError

def main():
"""
Main algorithm
Expand All @@ -430,10 +521,14 @@ def main():
out, err = proc.communicate()
for line in err.split('\n') :
if 'Error' in line:
listcommand="bash send_email.sh %s ; " % col
proc = subprocess.Popen(listcommand, stderr = subprocess.PIPE,stdout = subprocess.PIPE, shell = True)
out2, err2 = proc.communicate()
print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email' % col
body_text = 'There is a problem with one of the collectors! The monitoring scripts will give false information:\n\n'
body_text += ' /afs/cern.ch/user/c/cmst1/scratch0/WFM_Input_DashBoard/WFMonDBShort.py\n\n'
body_text += 'See the log file in the same directory for the error output\n'
send_mail('luis89@fnal.gov',
mailingList,
'[Monitoring] Condor Collector %s Error' % col,
body_text)
print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email to: %s' % (col, str(mailingList))
#print out2, '\n', "Error: ", '\n', err2
break
for line in out.split('\n') :
Expand Down Expand Up @@ -472,6 +567,7 @@ def main():
# --> new software len(array) ={6,7} depending if the job is already running in a site
id = array[0]
status = array[1]
workflow = array[2].split('/')[1]
task = array[2].split('/')[-1]
siteToExtract = array[3].replace(' ', '').split(",")

Expand All @@ -493,9 +589,11 @@ def main():
if status == "2":
increaseRunning(siteToExtract[0],type) # I assume one job can only run at one site
increaseRunningVoBox(sched.replace(".","_").strip(),type)
increaseRunningWorkflow(workflow,siteToExtract[0])
elif status == "1":
temp_pending.append([type,siteToExtract])
increasePendingVoBox(sched.replace(".","_").strip(),type)
increasePendingWorkflow(workflow,siteToExtract)
else: # We do not care about jobs in another status (condor job status: https://htcondor-wiki.cs.wisc.edu/index.cgi/wiki?p=MagicNumbers)
continue
print "INFO: Full condor status pooling is done"
Expand All @@ -522,11 +620,15 @@ def main():
print "INFO: Smart pending site counting done \n"

# Handling jobs that failed task extraction logic
if jobs_failedTypeLogic != {}:
command="bash failedLogic_email.sh \"%s\"" % str(jobs_failedTypeLogic)
proc = subprocess.Popen(command, stderr = subprocess.PIPE,stdout = subprocess.PIPE, shell = True)
out, err = proc.communicate()
print 'ERROR: I find jobs that failed the type assignment logic, I will send an email'
if jobs_failedTypeLogic != {}:
body_text = 'There is a problem with the logic to deduce job type from the condor data.n'
body_text += 'Please have a look to the following jobs:\n\n'
body_text += '%s' % str(jobs_failedTypeLogic)
send_mail('luis89@fnal.gov',
mailingList,
'[Monitoring] Failed task type logic problem',
body_text)
print 'ERROR: I find jobs that failed the type assignment logic, I will send an email to: %s' % str(mailingList)
#print out, '\n', "Error: ", '\n', err

# Adding sites not in either of running/pending overviews
Expand All @@ -549,6 +651,11 @@ def main():
# Creates json file (This is needed for plots per site)
jsonDict( json_name_sites, currTime, date, hour, 'site')

# Creates json file for jobs per workflow
jsonfile = open(json_name_workflows,'w+')
jsonfile.write(json.dumps(overview_workflows, default=set_default, sort_keys=True, indent=4))
jsonfile.close()

print 'INFO: The script has finished after: ', datetime.now()-starttime

if __name__ == "__main__":
Expand Down
12 changes: 0 additions & 12 deletions WFM_Input_DashBoard/failedLogic_email.sh

This file was deleted.

12 changes: 0 additions & 12 deletions WFM_Input_DashBoard/send_email.sh

This file was deleted.