diff --git a/src/couchapps/WMStats/views/jobsByStatusWorkflow/map.js b/src/couchapps/WMStats/views/jobsByStatusWorkflow/map.js index e2572ff9f1..232993464e 100644 --- a/src/couchapps/WMStats/views/jobsByStatusWorkflow/map.js +++ b/src/couchapps/WMStats/views/jobsByStatusWorkflow/map.js @@ -9,6 +9,7 @@ function(doc) { } } listErrors.sort(); - emit([doc.workflow, doc.task, doc.state, doc.exitcode, doc.site, listErrors], null); + emit([doc.workflow, doc.task, doc.state, doc.exitcode, doc.site, listErrors], + {'id': doc['_id'], 'rev': doc['_rev']}); } } diff --git a/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py b/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py index 78f41dffcb..8a145fcf4b 100644 --- a/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py +++ b/src/python/WMComponent/TaskArchiver/CleanCouchPoller.py @@ -38,8 +38,16 @@ def setup(self, parameters): # set the connection for local couchDB call self.useReqMgrForCompletionCheck = getattr(self.config.TaskArchiver, 'useReqMgrForCompletionCheck', True) self.wmstatsCouchDB = WMStatsWriter(self.config.TaskArchiver.localWMStatsURL) - self.centralCouchDBWriter = WMStatsWriter(self.config.TaskArchiver.centralWMStatsURL) self.centralCouchDBReader = WMStatsReader(self.config.TaskArchiver.centralWMStatsURL) + + if self.useReqMgrForCompletionCheck: + self.deletableStates = ["announced"] + self.centralCouchDBWriter = WMStatsWriter(self.config.TaskArchiver.centralWMStatsURL) + else: + # Tier0 case + self.deletableStates = ["completed"] + self.centralCouchDBWriter = self.wmstatsCouchDB + jobDBurl = sanitizeURL(self.config.JobStateMachine.couchurl)['url'] jobDBName = self.config.JobStateMachine.couchDBName self.jobCouchdb = CouchServer(jobDBurl) @@ -56,25 +64,35 @@ def algorithm(self, parameters): logging.info("%s docs deleted" % report) logging.info("getting complete and announced requests") - #TODO: define what is deletable status. Also add the code to delet summary document, - # request summary and job summary - if self.useReqMgrForCompletionCheck: - deletableWorkflows = self.centralCouchDBReader.workflowsByStatus(["announced"]) - else: - deletableWorkflows = self.centralCouchDBReader.workflowsByStatus(["completed"]) + deletableWorkflows = self.centralCouchDBReader.workflowsByStatus(self.deletableStates) logging.info("Ready to delete %s" % deletableWorkflows) for workflowName in deletableWorkflows: - logging.info("Deleting %s from JobCouch" % workflowName) - - report = self.deleteWorkflowFromJobCouch(workflowName, "JobDump") - logging.info("%s docs deleted from JobDump" % report) - report = self.deleteWorkflowFromJobCouch(workflowName, "FWJRDump") - logging.info("%s docs deleted from FWJRDump" % report) - - self.centralCouchDBWriter.updateRequestStatus(workflowName, "archived") - logging.info("status updated to archived %s" % workflowName) - + if self.cleanAllLocalCouchDB(workflowName): + self.centralCouchDBWriter.updateRequestStatus(workflowName, "normal-archived") + logging.info("status updated to normal-archived %s" % workflowName) + + abortedWorkflows = self.centralCouchDBReader.workflowsByStatus(["aborted-completed"]) + logging.info("Ready to delete aborted %s" % abortedWorkflows) + for workflowName in abortedWorkflows: + if self.cleanAllLocalCouchDB(workflowName): + self.centralCouchDBWriter.updateRequestStatus(workflowName, "aborted-archived") + logging.info("status updated to aborted-archived %s" % workflowName) + + #TODO: following code is temproraly - remove after production archived data is cleaned + removableWorkflows = self.centralCouchDBReader.workflowsByStatus(["archived"]) + + logging.info("Ready to delete %s from wmagent_summary" % removableWorkflows) + for workflowName in removableWorkflows: + logging.info("Deleting %s from WMAgent Summary Couch" % workflowName) + report = self.deleteWorkflowFromJobCouch(workflowName, "WMStats") + logging.info("%s docs deleted from wmagent_summary" % report) + # only updatet he status when delete is successful + # TODO: need to handle the case when there are multiple agent running the same request. + if report["status"] == "ok": + self.centralCouchDBWriter.updateRequestStatus(workflowName, "normal-archived") + logging.info("status updated to normal-archived from archived (this is temp solution for production) %s" % workflowName) + except Exception, ex: logging.error(str(ex)) logging.error("Error occurred, will try again next cycle") @@ -95,8 +113,11 @@ def deleteWorkflowFromJobCouch(self, workflowName, db): elif (db == "FWJRDump"): couchDB = self.fwjrdatabase view = "fwjrsByWorkflowName" - - options = {"startkey": [workflowName], "endkey": [workflowName, {}], "stale": "ok"} + elif (db == "WMStats"): + couchDB = self.wmstatsCouchDB.getDBInstance() + view = "jobsByStatusWorkflow" + + options = {"startkey": [workflowName], "endkey": [workflowName, {}], "stale": "ok", "reduce": False} jobs = couchDB.loadView(db, view, options = options)['rows'] for j in jobs: doc = {} @@ -109,12 +130,41 @@ def deleteWorkflowFromJobCouch(self, workflowName, db): #create the error report errorReport = {} deleted = 0 + status = "ok" for data in committed: if data.has_key('error'): errorReport.setdefault(data['error'], 0) errorReport[data['error']] += 1 + status = "error" else: deleted += 1 - return {'delete': deleted, 'error': errorReport} + return {'status': status, 'delete': deleted, 'message': errorReport} else: - return "nothing" + return {'status': 'warning', 'message': "no %s exist" % workflowName} + + + def cleanAllLocalCouchDB(self, workflowName): + logging.info("Deleting %s from JobCouch" % workflowName) + + jobReport = self.deleteWorkflowFromJobCouch(workflowName, "JobDump") + logging.info("%s docs deleted from JobDump" % jobReport) + + fwjrReport = self.deleteWorkflowFromJobCouch(workflowName, "FWJRDump") + logging.info("%s docs deleted from FWJRDump" % fwjrReport) + + wmstatsReport = self.deleteWorkflowFromJobCouch(workflowName, "WMStats") + logging.info("%s docs deleted from wmagent_summary" % wmstatsReport) + + # if one of the procedure fails return False + if (jobReport["status"] == "error" or fwjrReport["status"] == "error" or + wmstatsReport["status"] == "error"): + return False + + # if the data doesn't exist on all the db return False + if (jobReport["status"] == "warning" and + fwjrReport["status"] == "warning" and + wmstatsReport["status"] == "warning"): + return False + # other wise return True. + return True + \ No newline at end of file diff --git a/src/python/WMComponent/TaskArchiver/TaskArchiverPoller.py b/src/python/WMComponent/TaskArchiver/TaskArchiverPoller.py index 33086380db..2fcb72b253 100644 --- a/src/python/WMComponent/TaskArchiver/TaskArchiverPoller.py +++ b/src/python/WMComponent/TaskArchiver/TaskArchiverPoller.py @@ -211,6 +211,9 @@ def __init__(self, config): if not self.useReqMgrForCompletionCheck: #sets the local monitor summary couch db self.wmstatsCouchDB = WMStatsWriter(self.config.TaskArchiver.localWMStatsURL); + self.centralCouchDBWriter = self.wmstatsCouchDB; + else: + self.centralCouchDBWriter = WMStatsWriter(self.config.TaskArchiver.centralWMStatsURL); # Start a couch server for getting job info # from the FWJRs for committal to archive try: @@ -322,6 +325,7 @@ def archiveTasks(self): #Only delete those where the upload and notification succeeded logging.info("Found %d candidate workflows for deletion" % len(finishedwfs)) + abortedWorkflows = self.centralCouchDBWriter.workflowsByStatus(["aborted"], format = "dict"); wfsToDelete = {} for workflow in finishedwfs: try: @@ -343,6 +347,10 @@ def archiveTasks(self): self.wmstatsCouchDB.updateRequestStatus(workflow, "completed") logging.info("status updated to completed %s" % workflow) + if workflow in abortedWorkflows: + self.centralCouchDBWriter.updateRequestStatus(workflow, "aborted-completed") + logging.info("status updated to aborted-completed %s" % workflow) + wfsToDelete[workflow] = {"spec" : spec, "workflows": finishedwfs[workflow]["workflows"]} except TaskArchiverPollerException, ex: diff --git a/src/python/WMCore/Services/WMStats/WMStatsReader.py b/src/python/WMCore/Services/WMStats/WMStatsReader.py index 35420a6d98..ab85c825cb 100644 --- a/src/python/WMCore/Services/WMStats/WMStatsReader.py +++ b/src/python/WMCore/Services/WMStats/WMStatsReader.py @@ -17,15 +17,21 @@ def __init__(self, couchURL, dbName = None): self.couchServer = CouchServer(self.couchURL) self.couchDB = CouchServer(self.couchURL).connectDatabase(self.dbName, False) - def workflowsByStatus(self, statusList): + def workflowsByStatus(self, statusList, format = "list"): keys = statusList options = {"stale": "update_after"} result = self.couchDB.loadView("WMStats", "requestByStatus", options, keys) - workflowList = [] - for item in result["rows"]: - workflowList.append(item["id"]) - return workflowList - def replicate(self, target): - self.couchServer.replicate(self.dbName, target, - continuous = True) + if format == "dict": + workflowDict = {} + for item in result["rows"]: + workflowDict[item["id"]] = None + return workflowDict + else: + workflowList = [] + for item in result["rows"]: + workflowList.append(item["id"]) + return workflowList + + def getDBInstance(self): + return self.couchDB \ No newline at end of file diff --git a/src/python/WMCore/Services/WMStats/WMStatsWriter.py b/src/python/WMCore/Services/WMStats/WMStatsWriter.py index dd43782ba4..d3fa9b49cf 100644 --- a/src/python/WMCore/Services/WMStats/WMStatsWriter.py +++ b/src/python/WMCore/Services/WMStats/WMStatsWriter.py @@ -3,6 +3,7 @@ from WMCore.Database.CMSCouch import CouchServer from WMCore.Lexicon import splitCouchServiceURL, sanitizeURL from WMCore.Wrappers.JsonWrapper import JSONEncoder +from WMCore.Services.WMStats.WMStatsReader import WMStatsReader def monitorDocFromRequestSchema(schema): """ @@ -33,7 +34,7 @@ def monitorDocFromRequestSchema(schema): return doc -class WMStatsWriter(): +class WMStatsWriter(WMStatsReader): def __init__(self, couchURL, dbName = None): # set the connection for local couchDB call @@ -142,5 +143,9 @@ def deleteOldDocs(self, days): return "nothing" def replicate(self, target): - self.couchServer.replicate(self.dbName, target, continuous = True, + return self.couchServer.replicate(self.dbName, target, continuous = True, filter = 'WMStats/repfilter', useReplicator = True) + + def getDBInstance(self): + return self.couchDB +