From 9f2a4dde4415b726cc4d902d6447a9ca7d01a4fd Mon Sep 17 00:00:00 2001 From: Giulio Eulisse Date: Wed, 7 Jan 2015 15:06:24 +0100 Subject: [PATCH] Report status to an elasticsearch instance. If the two environment variables ES_HOSTNAME and ES_AUTH are properly defined, pushes status of each job to the elasticsearch instance specified in ES_HOSTNAME. This way we can track the status of all the jobs in realtime in kibana. Does not change behavior if either ES_HOSTNAME or ES_AUTH is not defined. --- .../python/WorkFlowRunner.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/Configuration/PyReleaseValidation/python/WorkFlowRunner.py b/Configuration/PyReleaseValidation/python/WorkFlowRunner.py index d99db7c839def..795a8c2d13b86 100644 --- a/Configuration/PyReleaseValidation/python/WorkFlowRunner.py +++ b/Configuration/PyReleaseValidation/python/WorkFlowRunner.py @@ -6,6 +6,39 @@ import shutil from subprocess import Popen from os.path import exists, basename, join +from os import getenv +from datetime import datetime +from hashlib import sha1 +import urllib2, base64, json + +# This is used to report results of the runTheMatrix to the elasticsearch +# instance used for IBs. This way we can track progress even if the logs are +# not available. +def esReportWorkflow(**kwds): + # Silently exit if we cannot contact elasticsearch + es_hostname = getenv("ES_HOSTNAME") + es_auth = getenv("ES_AUTH") + if not es_hostname and not es_auth: + return + payload = kwds + sha1_id = sha1(kwds["release"] + kwds["architecture"] + kwds["workflow"] + str(kwds["step"])).hexdigest() + d = datetime.now() + if "201" in kwds["release"]: + datepart = "201" + kwds["release"].split("201")[1] + d = datetime.strptime(datepart, "%Y-%m-%d-%H00") + url = "https://%s/ib-matrix.%s/runTheMatrix-data/%s" % (es_hostname, + d.strftime("%Y.%m"), + sha1_id) + request = urllib2.Request(url) + if es_auth: + base64string = base64.encodestring(es_auth).replace('\n', '') + request.add_header("Authorization", "Basic %s" % base64string) + request.get_method = lambda: 'PUT' + data = json.dumps(payload) + try: + result = urllib2.urlopen(request, data=data) + except HTTPError, e: + print e class WorkFlowRunner(Thread): def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False): @@ -86,6 +119,12 @@ def closeCmd(i,ID): isInputOk=True istep=istepmone+1 cmd = preamble + esReportWorkflow(workflow=self.wf.nameId, + release=getenv("CMSSW_VERSION"), + architecture=getenv("SCRAM_ARCH"), + step=istep, + command=cmd, + status="STARTED") if aborted: self.npass.append(0) self.nfail.append(0) @@ -176,6 +215,12 @@ def closeCmd(i,ID): self.nfail.append(0) self.stat.append('PASSED') + esReportWorkflow(workflow=self.wf.nameId, + release=getenv("CMSSW_VERSION"), + architecture=getenv("SCRAM_ARCH"), + step=istep, + command=cmd, + status=self.stat[-1]) os.chdir(startDir)