Skip to content

Commit

Permalink
Merge pull request #4227 from ticoann/merge_wmagent_summary_delete
Browse files Browse the repository at this point in the history
deleting jobsummary from wmagent summary  when request is announced
  • Loading branch information
Stephen Foulkes committed Nov 13, 2012
2 parents 27e0f3d + ca944fe commit d6c68d7
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 32 deletions.
3 changes: 2 additions & 1 deletion src/couchapps/WMStats/views/jobsByStatusWorkflow/map.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ function(doc) {
}
}
listErrors.sort();
emit([doc.workflow, doc.task, doc.state, doc.exitcode, doc.site, listErrors], null);
emit([doc.workflow, doc.task, doc.state, doc.exitcode, doc.site, listErrors],
{'id': doc['_id'], 'rev': doc['_rev']});
}
}
92 changes: 71 additions & 21 deletions src/python/WMComponent/TaskArchiver/CleanCouchPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,16 @@ def setup(self, parameters):
# set the connection for local couchDB call
self.useReqMgrForCompletionCheck = getattr(self.config.TaskArchiver, 'useReqMgrForCompletionCheck', True)
self.wmstatsCouchDB = WMStatsWriter(self.config.TaskArchiver.localWMStatsURL)
self.centralCouchDBWriter = WMStatsWriter(self.config.TaskArchiver.centralWMStatsURL)
self.centralCouchDBReader = WMStatsReader(self.config.TaskArchiver.centralWMStatsURL)

if self.useReqMgrForCompletionCheck:
self.deletableStates = ["announced"]
self.centralCouchDBWriter = WMStatsWriter(self.config.TaskArchiver.centralWMStatsURL)
else:
# Tier0 case
self.deletableStates = ["completed"]
self.centralCouchDBWriter = self.wmstatsCouchDB

jobDBurl = sanitizeURL(self.config.JobStateMachine.couchurl)['url']
jobDBName = self.config.JobStateMachine.couchDBName
self.jobCouchdb = CouchServer(jobDBurl)
Expand All @@ -56,25 +64,35 @@ def algorithm(self, parameters):
logging.info("%s docs deleted" % report)
logging.info("getting complete and announced requests")

#TODO: define what is deletable status. Also add the code to delet summary document,
# request summary and job summary
if self.useReqMgrForCompletionCheck:
deletableWorkflows = self.centralCouchDBReader.workflowsByStatus(["announced"])
else:
deletableWorkflows = self.centralCouchDBReader.workflowsByStatus(["completed"])
deletableWorkflows = self.centralCouchDBReader.workflowsByStatus(self.deletableStates)

logging.info("Ready to delete %s" % deletableWorkflows)
for workflowName in deletableWorkflows:
logging.info("Deleting %s from JobCouch" % workflowName)

report = self.deleteWorkflowFromJobCouch(workflowName, "JobDump")
logging.info("%s docs deleted from JobDump" % report)
report = self.deleteWorkflowFromJobCouch(workflowName, "FWJRDump")
logging.info("%s docs deleted from FWJRDump" % report)

self.centralCouchDBWriter.updateRequestStatus(workflowName, "archived")
logging.info("status updated to archived %s" % workflowName)

if self.cleanAllLocalCouchDB(workflowName):
self.centralCouchDBWriter.updateRequestStatus(workflowName, "normal-archived")
logging.info("status updated to normal-archived %s" % workflowName)

abortedWorkflows = self.centralCouchDBReader.workflowsByStatus(["aborted-completed"])
logging.info("Ready to delete aborted %s" % abortedWorkflows)
for workflowName in abortedWorkflows:
if self.cleanAllLocalCouchDB(workflowName):
self.centralCouchDBWriter.updateRequestStatus(workflowName, "aborted-archived")
logging.info("status updated to aborted-archived %s" % workflowName)

#TODO: following code is temproraly - remove after production archived data is cleaned
removableWorkflows = self.centralCouchDBReader.workflowsByStatus(["archived"])

logging.info("Ready to delete %s from wmagent_summary" % removableWorkflows)
for workflowName in removableWorkflows:
logging.info("Deleting %s from WMAgent Summary Couch" % workflowName)
report = self.deleteWorkflowFromJobCouch(workflowName, "WMStats")
logging.info("%s docs deleted from wmagent_summary" % report)
# only updatet he status when delete is successful
# TODO: need to handle the case when there are multiple agent running the same request.
if report["status"] == "ok":
self.centralCouchDBWriter.updateRequestStatus(workflowName, "normal-archived")
logging.info("status updated to normal-archived from archived (this is temp solution for production) %s" % workflowName)

except Exception, ex:
logging.error(str(ex))
logging.error("Error occurred, will try again next cycle")
Expand All @@ -95,8 +113,11 @@ def deleteWorkflowFromJobCouch(self, workflowName, db):
elif (db == "FWJRDump"):
couchDB = self.fwjrdatabase
view = "fwjrsByWorkflowName"

options = {"startkey": [workflowName], "endkey": [workflowName, {}], "stale": "ok"}
elif (db == "WMStats"):
couchDB = self.wmstatsCouchDB.getDBInstance()
view = "jobsByStatusWorkflow"

options = {"startkey": [workflowName], "endkey": [workflowName, {}], "stale": "ok", "reduce": False}
jobs = couchDB.loadView(db, view, options = options)['rows']
for j in jobs:
doc = {}
Expand All @@ -109,12 +130,41 @@ def deleteWorkflowFromJobCouch(self, workflowName, db):
#create the error report
errorReport = {}
deleted = 0
status = "ok"
for data in committed:
if data.has_key('error'):
errorReport.setdefault(data['error'], 0)
errorReport[data['error']] += 1
status = "error"
else:
deleted += 1
return {'delete': deleted, 'error': errorReport}
return {'status': status, 'delete': deleted, 'message': errorReport}
else:
return "nothing"
return {'status': 'warning', 'message': "no %s exist" % workflowName}


def cleanAllLocalCouchDB(self, workflowName):
logging.info("Deleting %s from JobCouch" % workflowName)

jobReport = self.deleteWorkflowFromJobCouch(workflowName, "JobDump")
logging.info("%s docs deleted from JobDump" % jobReport)

fwjrReport = self.deleteWorkflowFromJobCouch(workflowName, "FWJRDump")
logging.info("%s docs deleted from FWJRDump" % fwjrReport)

wmstatsReport = self.deleteWorkflowFromJobCouch(workflowName, "WMStats")
logging.info("%s docs deleted from wmagent_summary" % wmstatsReport)

# if one of the procedure fails return False
if (jobReport["status"] == "error" or fwjrReport["status"] == "error" or
wmstatsReport["status"] == "error"):
return False

# if the data doesn't exist on all the db return False
if (jobReport["status"] == "warning" and
fwjrReport["status"] == "warning" and
wmstatsReport["status"] == "warning"):
return False
# other wise return True.
return True

8 changes: 8 additions & 0 deletions src/python/WMComponent/TaskArchiver/TaskArchiverPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def __init__(self, config):
if not self.useReqMgrForCompletionCheck:
#sets the local monitor summary couch db
self.wmstatsCouchDB = WMStatsWriter(self.config.TaskArchiver.localWMStatsURL);
self.centralCouchDBWriter = self.wmstatsCouchDB;
else:
self.centralCouchDBWriter = WMStatsWriter(self.config.TaskArchiver.centralWMStatsURL);
# Start a couch server for getting job info
# from the FWJRs for committal to archive
try:
Expand Down Expand Up @@ -322,6 +325,7 @@ def archiveTasks(self):

#Only delete those where the upload and notification succeeded
logging.info("Found %d candidate workflows for deletion" % len(finishedwfs))
abortedWorkflows = self.centralCouchDBWriter.workflowsByStatus(["aborted"], format = "dict");
wfsToDelete = {}
for workflow in finishedwfs:
try:
Expand All @@ -343,6 +347,10 @@ def archiveTasks(self):
self.wmstatsCouchDB.updateRequestStatus(workflow, "completed")
logging.info("status updated to completed %s" % workflow)

if workflow in abortedWorkflows:
self.centralCouchDBWriter.updateRequestStatus(workflow, "aborted-completed")
logging.info("status updated to aborted-completed %s" % workflow)

wfsToDelete[workflow] = {"spec" : spec, "workflows": finishedwfs[workflow]["workflows"]}

except TaskArchiverPollerException, ex:
Expand Down
22 changes: 14 additions & 8 deletions src/python/WMCore/Services/WMStats/WMStatsReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ def __init__(self, couchURL, dbName = None):
self.couchServer = CouchServer(self.couchURL)
self.couchDB = CouchServer(self.couchURL).connectDatabase(self.dbName, False)

def workflowsByStatus(self, statusList):
def workflowsByStatus(self, statusList, format = "list"):
keys = statusList
options = {"stale": "update_after"}
result = self.couchDB.loadView("WMStats", "requestByStatus", options, keys)
workflowList = []
for item in result["rows"]:
workflowList.append(item["id"])
return workflowList

def replicate(self, target):
self.couchServer.replicate(self.dbName, target,
continuous = True)
if format == "dict":
workflowDict = {}
for item in result["rows"]:
workflowDict[item["id"]] = None
return workflowDict
else:
workflowList = []
for item in result["rows"]:
workflowList.append(item["id"])
return workflowList

def getDBInstance(self):
return self.couchDB
9 changes: 7 additions & 2 deletions src/python/WMCore/Services/WMStats/WMStatsWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from WMCore.Database.CMSCouch import CouchServer
from WMCore.Lexicon import splitCouchServiceURL, sanitizeURL
from WMCore.Wrappers.JsonWrapper import JSONEncoder
from WMCore.Services.WMStats.WMStatsReader import WMStatsReader

def monitorDocFromRequestSchema(schema):
"""
Expand Down Expand Up @@ -33,7 +34,7 @@ def monitorDocFromRequestSchema(schema):
return doc


class WMStatsWriter():
class WMStatsWriter(WMStatsReader):

def __init__(self, couchURL, dbName = None):
# set the connection for local couchDB call
Expand Down Expand Up @@ -142,5 +143,9 @@ def deleteOldDocs(self, days):
return "nothing"

def replicate(self, target):
self.couchServer.replicate(self.dbName, target, continuous = True,
return self.couchServer.replicate(self.dbName, target, continuous = True,
filter = 'WMStats/repfilter', useReplicator = True)

def getDBInstance(self):
return self.couchDB

0 comments on commit d6c68d7

Please sign in to comment.