From 8258774df9f467bc9b6824e0f366f94a95942fa7 Mon Sep 17 00:00:00 2001 From: Diego Ballesteros Date: Fri, 9 Nov 2012 01:13:25 -0600 Subject: [PATCH] Tier0 Plugin implementation The Tier0 plugin for the AnalyticsDataCollector components calculates the state of Tier-0 workflows based on subscription information found in WMBS and updates the request document in central WMStats with this status. Additionally this patch fixes some bugs with PluginInterface implementation patch and renames it from PlugIn to Plugin to conform with the same name as the BossAir package (PlugIn is a bit harder to remember). Fix the AnalyticsDataCollector unittest and removed it from the allowed failing tests. This requires #4227 to work, it also fixes the unit test breaks from it. Fixes #4264 --- .gitignore | 2 + .../AnalyticsDataCollector/AnalyticsPoller.py | 14 +- .../PluginInterface.py} | 13 +- .../Plugins/Tier0Plugin.py | 255 +++++++++++++ .../Plugins/__init__.py | 0 .../WMCore/Services/WMStats/WMStatsReader.py | 6 +- .../Subscriptions/GetSemiFinishedTasks.py | 86 +++++ .../WMBS/MySQL/Workflow/GetFinishedTasks.py | 42 +++ .../Subscriptions/GetSemiFinishedTasks.py | 15 + .../WMBS/Oracle/Workflow/GetFinishedTasks.py | 15 + .../DataCollectorAPI.py | 18 +- standards/allowed_failing_tests.txt | 1 - .../AnalyticsDataCollector_t.py | 15 +- .../Plugins_t/Tier0Plugin_t.py | 354 ++++++++++++++++++ .../Plugins_t/__init__.py | 0 .../TaskArchiver_t/TaskArchiver_t.py | 16 +- 16 files changed, 822 insertions(+), 30 deletions(-) rename src/python/WMComponent/AnalyticsDataCollector/{PlugIns/PlugInInterface.py => Plugins/PluginInterface.py} (84%) create mode 100644 src/python/WMComponent/AnalyticsDataCollector/Plugins/Tier0Plugin.py create mode 100644 src/python/WMComponent/AnalyticsDataCollector/Plugins/__init__.py create mode 100644 src/python/WMCore/WMBS/MySQL/Subscriptions/GetSemiFinishedTasks.py create mode 100644 src/python/WMCore/WMBS/MySQL/Workflow/GetFinishedTasks.py create mode 100644 src/python/WMCore/WMBS/Oracle/Subscriptions/GetSemiFinishedTasks.py create mode 100644 src/python/WMCore/WMBS/Oracle/Workflow/GetFinishedTasks.py create mode 100644 test/python/WMComponent_t/AnalyticsDataCollector_t/Plugins_t/Tier0Plugin_t.py create mode 100644 test/python/WMComponent_t/AnalyticsDataCollector_t/Plugins_t/__init__.py diff --git a/.gitignore b/.gitignore index 754486ea45..0bffea7da3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ WMCORE_JENKINS_REPLACEMENT_SOURCE.tar.gz .project .pydevproject +.couchapprc +.couchappignore .settings/ TEST_AREA/ list-tests-alerts.txt diff --git a/src/python/WMComponent/AnalyticsDataCollector/AnalyticsPoller.py b/src/python/WMComponent/AnalyticsDataCollector/AnalyticsPoller.py index 577daeeb3e..71075cee40 100644 --- a/src/python/WMComponent/AnalyticsDataCollector/AnalyticsPoller.py +++ b/src/python/WMComponent/AnalyticsDataCollector/AnalyticsPoller.py @@ -8,6 +8,7 @@ import threading import logging import time +import traceback from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread from WMCore.Services.WorkQueue.WorkQueue import WorkQueue as WorkQueueService from WMCore.Services.WMStats.WMStatsWriter import WMStatsWriter @@ -36,8 +37,8 @@ def __init__(self, config): # need to get campaign, user, owner info self.agentDocID = "agent+hostname" self.summaryLevel = (config.AnalyticsDataCollector.summaryLevel).lower() - self.pluginName = getattr(config.AnalyticsDataCollector.pluginName, None) - self.plugIn = None + self.pluginName = getattr(config.AnalyticsDataCollector, "pluginName", None) + self.plugin = None def setup(self, parameters): @@ -63,7 +64,7 @@ def setup(self, parameters): self.centralWMStatsCouchDB = WMStatsWriter(self.config.AnalyticsDataCollector.centralWMStatsURL) if self.pluginName != None: - pluginFactory = WMFactory("plugins", "WMComponent.AnalyticsDataCollector.PlugIns") + pluginFactory = WMFactory("plugins", "WMComponent.AnalyticsDataCollector.Plugins") self.plugin = pluginFactory.loadObject(classname = self.pluginName) def algorithm(self, parameters): @@ -105,8 +106,8 @@ def algorithm(self, parameters): self.agentInfo, uploadTime, self.summaryLevel) - if self.plugIn != None: - self.plugIn(requestDocs, self.localSummaryCouchDB, self.centralWMStatsCouchDB) + if self.plugin != None: + self.plugin(requestDocs, self.localSummaryCouchDB, self.centralWMStatsCouchDB) self.localSummaryCouchDB.uploadData(requestDocs) logging.info("Request data upload success\n %s request \n uploading agent data" % len(requestDocs)) @@ -120,5 +121,6 @@ def algorithm(self, parameters): logging.info("Agent data upload success\n %s request" % len(agentDocs)) except Exception, ex: - logging.error("Error occured: will retry later") + logging.error("Error occured, will retry later:") logging.error(str(ex)) + logging.error("Traceback: \n%s" % traceback.format_exc()) diff --git a/src/python/WMComponent/AnalyticsDataCollector/PlugIns/PlugInInterface.py b/src/python/WMComponent/AnalyticsDataCollector/Plugins/PluginInterface.py similarity index 84% rename from src/python/WMComponent/AnalyticsDataCollector/PlugIns/PlugInInterface.py rename to src/python/WMComponent/AnalyticsDataCollector/Plugins/PluginInterface.py index 5276cca52b..3caaf28368 100644 --- a/src/python/WMComponent/AnalyticsDataCollector/PlugIns/PlugInInterface.py +++ b/src/python/WMComponent/AnalyticsDataCollector/Plugins/PluginInterface.py @@ -1,10 +1,17 @@ +#!/usr/bin/env python +""" +_PluginInterface_ -class PlugInInterface(object): +Base class for AnalyticsDataCollector plug-ins + +""" + +class PluginInterface(object): """Interface for policies""" def __init__(self, **args): """ initialize args if needed""" pass - + def __call__(self, requestDocs, localSummaryCouchDB, centralWMStatsCouchDB): """ this needs to be overwritten @@ -17,4 +24,4 @@ def __call__(self, requestDocs, localSummaryCouchDB, centralWMStatsCouchDB): So don't push requestDocs to couchdb directly here. """ msg = "%s.__call__ is not implemented" % self.__class__.__name__ - raise NotImplementedError(msg) \ No newline at end of file + raise NotImplementedError(msg) diff --git a/src/python/WMComponent/AnalyticsDataCollector/Plugins/Tier0Plugin.py b/src/python/WMComponent/AnalyticsDataCollector/Plugins/Tier0Plugin.py new file mode 100644 index 0000000000..8e458ce7e6 --- /dev/null +++ b/src/python/WMComponent/AnalyticsDataCollector/Plugins/Tier0Plugin.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python +""" +_Tier0Plugin_ + +Plugin which collects information from the agent and updates +the state of the current tier-0 workflows in the system. + +Created on Nov 2, 2012 + +@author: dballest +""" + +import threading +import traceback +import re + +from WMCore.DAOFactory import DAOFactory +from WMCore.WMException import WMException + +from WMComponent.AnalyticsDataCollector.Plugins.PluginInterface import PluginInterface +from WMCore.WMSpec.WMWorkload import WMWorkloadHelper + + +def getTier0Regex(): + """ + _getTier0Regex_ + + Define the Tier0 states and matching regex out of the object + so it can be fetched without instancing the plugin. + This are the uncompiled regular expressions in the correct + order of the states + """ + regexDict = {'Repack' : [('Merge', [r'^/Repack_Run[0-9]+_Stream[\w]+/Repack$']), + ('Processing Done', [r'^/Repack_Run[0-9]+_Stream[\w]+/Repack/RepackMerge[\w]+$'])], + + 'PromptReco' : [('AlcaSkim', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco$']), + ('Merge', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco/AlcaSkim$']), + ('Harvesting', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco/AlcaSkim/AlcaSkimMerge[\w]+$', + r'^/PromptReco_Run[0-9]+_[\w]+/Reco/RecoMerge[\w]+$']), + ('Processing Done', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco/RecoMerge[\w]+/RecoMerge[\w]+DQMHarvest[\w]+$']) + ], + + 'Express' : [('Merge', [r'^/Express_Run[0-9]+_Stream[\w]+/Express$']), + ('Harvesting', [r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressMerge[\w]+$', + r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressAlcaSkim[\w]+$']), + ('Processing Done', [r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressMerge[\w]+/ExpressMerge[\w]+DQMHarvest[\w]+$', + r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressAlcaSkim[\w]+/ExpressAlcaSkim[\w]+AlcaHarvest[\w]+$']) + ]} + return regexDict + +class Tier0PluginError(WMException): + """ + _Tier0PluginError_ + + An error in the Tier0 plugin + """ + + def __init__(self, msg): + """ + __init__ + + Initialize the error, just accepts a message without error code + """ + WMException.__init__(self, msg) + return + +class Tier0Plugin(PluginInterface): + """ + _Tier0Plugin_ + + Tier0 plugin main class + """ + + def __init__(self): + """ + __init__ + + Initialize the plugin object + """ + PluginInterface.__init__(self) + + self.myThread = threading.currentThread() + self.daoFactory = DAOFactory(package = 'WMCore.WMBS', + logger = self.myThread.logger, + dbinterface = self.myThread.dbi) + + self.logger = self.myThread.logger + + # To get finished subscriptions it needs the cleanout state index + getCleanoutState = self.daoFactory(classname = 'Jobs.GetStateID') + self.cleanoutState = getCleanoutState.execute(state = 'cleanout') + + # Load the DAOs + self.getFinishedTasks = self.daoFactory(classname = 'Workflow.GetFinishedTasks') + self.getFinishedTasksNoInjection = self.daoFactory(classname = 'Subscriptions.GetSemiFinishedTasks') + + # To avoid doing many I/O operations, cache the list of tasks for each workflow + self.taskCache = {} + + self.setupStateRegex() + + return + + def setupStateRegex(self): + """ + _setupStateRegex_ + + Each state is mapped to a regular expression in the taskpaths + this depends on the current implementation of the T0. The regular + expressions obtained from the function Tier0Plugin.getTier0Regex + are compiled and stored in a dictionary. + + The structure of the dictionary is as follows: + + { : [(, [, ]), ...], + ... + } + """ + rawRegex = getTier0Regex() + + self.tier0Regex = {} + + for workflowType in rawRegex: + self.tier0Regex[workflowType] = [] + for pair in rawRegex[workflowType]: + compiledRegex = (pair[0], [re.compile(x).match for x in pair[1]]) + self.tier0Regex[workflowType].append(compiledRegex) + return + + def __call__(self, requestDocs, localSummaryCouchDB, centralWMStatsCouchDB): + """ + __call__ + + Call to this plugin, this method takes care of executing + the logic of the plugin, see guidelines in parent class documentation. + """ + # Get all current finished tasks (i.e. all associated subscriptions are finished) + finishedTasks = self.getFinishedTasks.execute() + self.logger.debug("Found %d finished tasks" % len(finishedTasks)) + # Repack workflows are not marked injected and therefore its subscriptions + # are not marked as finished until the injection happens (48h delay) + # get those tasks with a looser criteria + finishedTasksNoInjection = self.getFinishedTasksNoInjection.execute(state = self.cleanoutState, + pattern = '%Repack%') + self.logger.debug("Found %d finished Repack tasks" % len(finishedTasksNoInjection)) + # Get workflows which are not closed yet or completed + notClosedWorkflows = centralWMStatsCouchDB.workflowsByStatus(['new']) + + # Aggregate results by workflow name + finishedTasks.extend(finishedTasksNoInjection) + workflows = {} + for entry in finishedTasks: + workflowName = entry['name'] + if workflowName not in workflows: + workflows[workflowName] = [] + if workflowName not in self.taskCache: + self.loadTasks(workflowName, entry['spec']) + workflows[workflowName].append(entry['task']) + + self.logger.info('Updating the status of %d workflows' % len(workflows)) + #Go through the reported workflows + for workflowName in workflows: + try: + if workflowName in notClosedWorkflows: + # Nothing is completed for this workflow + continue + + # Sanity checks on workflow name + tokens = workflowName.split('_') + if not len(tokens): + self.logger.warning('This workflow does not match the Tier-0 naming structure, it will be ignored.') + continue + workflowType = tokens[0] + if workflowType not in self.tier0Regex: + self.logger.warning('This workflow does not match the Tier-0 naming structure, it will be ignored.') + continue + + completedTaskList = workflows[workflowName] + + workflowStatus = self.determineCurrentStatus(workflowName, workflowType, completedTaskList) + if workflowStatus is not None: + localSummaryCouchDB.updateRequestStatus(workflowName, workflowStatus) + + except Exception, ex: + # Plugins are meant to be not-critical + # If something fails then just keep going + self.logger.error('Error occurred while processing docs:\n%s' % str(ex)) + self.logger.error(traceback.format_exc()) + except Tier0PluginError, t0ex: + # More specific exception, just log it anyway + self.logger.error('Error ocurred while processing a doc:\n%s' % str(t0ex)) + + # Clean the task cache based on the documents we reported this cycle + self.cleanTaskCache([x['workflow'] for x in requestDocs]) + + return + + def loadTasks(self, workflowName, spec): + """ + _loadTasks_ + + Loads the list of tasks for the workflow, + stores them in the cache if not present + """ + if workflowName in self.taskCache: + return + try: + workloadHelper = WMWorkloadHelper() + workloadHelper.load(spec) + tasks = workloadHelper.listAllTaskPathNames() + self.taskCache[workflowName] = tasks + except IOError, ex: + msg = "Failed to load spec file %s\n" % spec + msg += "Original IOError: %s" % str(ex) + raise Tier0PluginError(msg) + return + + def cleanTaskCache(self, reportedWorkflows): + """ + _cleanTaskCache_ + + Keep the memory footprint of this plugin smaller + clean unused task lists + """ + self.logger.debug('Cleaning up task cache') + for workflow in self.taskCache.keys(): + if workflow not in reportedWorkflows: + self.taskCache.pop(workflow) + return + + def determineCurrentStatus(self, workflowName, workflowType, completedTasks): + """ + _determineCurrentStatus_ + + Process a completed task list for a workflow and + get the most advanced status of the workflow. + """ + self.logger.debug('Calculating status for %s' % workflowName) + currentStatus = None + typeRegex = self.tier0Regex[workflowType] + fullTaskList = self.taskCache[workflowName] + for pair in typeRegex: + completedTasksForStatus = 0 + totalTasksForStatus = 0 + for regex in pair[1]: + completedTasksForStatus += len(filter(regex, completedTasks)) + totalTasksForStatus += len(filter(regex, fullTaskList)) + if completedTasksForStatus == totalTasksForStatus: + currentStatus = pair[0] + else: + break + self.logger.debug('Status is %s' % currentStatus) + self.logger.debug('Completed task list: %s' % str(completedTasksForStatus)) + + return currentStatus diff --git a/src/python/WMComponent/AnalyticsDataCollector/Plugins/__init__.py b/src/python/WMComponent/AnalyticsDataCollector/Plugins/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/python/WMCore/Services/WMStats/WMStatsReader.py b/src/python/WMCore/Services/WMStats/WMStatsReader.py index ab85c825cb..7db6bdc083 100644 --- a/src/python/WMCore/Services/WMStats/WMStatsReader.py +++ b/src/python/WMCore/Services/WMStats/WMStatsReader.py @@ -17,9 +17,11 @@ def __init__(self, couchURL, dbName = None): self.couchServer = CouchServer(self.couchURL) self.couchDB = CouchServer(self.couchURL).connectDatabase(self.dbName, False) - def workflowsByStatus(self, statusList, format = "list"): + def workflowsByStatus(self, statusList, format = "list", stale = "update_after"): keys = statusList - options = {"stale": "update_after"} + options = {} + if stale: + options = {"stale": stale} result = self.couchDB.loadView("WMStats", "requestByStatus", options, keys) if format == "dict": diff --git a/src/python/WMCore/WMBS/MySQL/Subscriptions/GetSemiFinishedTasks.py b/src/python/WMCore/WMBS/MySQL/Subscriptions/GetSemiFinishedTasks.py new file mode 100644 index 0000000000..52e523776a --- /dev/null +++ b/src/python/WMCore/WMBS/MySQL/Subscriptions/GetSemiFinishedTasks.py @@ -0,0 +1,86 @@ +""" +_GetSemiFinishedTasks_ + +MySQL implementation of Subscriptions.GetSemiFinishedTasks + +Created on Nov 7, 2012 + +@author: dballest +""" + +from time import time + +from WMCore.Database.DBFormatter import DBFormatter + +class GetSemiFinishedTasks(DBFormatter): + """ + _GetSemiFinishedTasks_ + + In some particular cases (e.g. Repack) the workflows + can't be marked as injected for some time which prevents + the GetAndMarkNewFinishedSubscriptions DAO from picking actually finished + subscriptions. This DAO is a more relaxed version of the former, but it doesn't mark + anything and returns tasks instead of subscriptions. + + The relaxation is present in the following checks: + + - Not injected workflows are admitted + - Doesn't check for files which are shared with other workflows + + It also allows a pattern to filter in the workflow name + """ + + sql = """SELECT complete_subscriptions.name, complete_subscriptions.task, + complete_subscriptions.spec + FROM + (SELECT wmbs_subscription.id AS sub_id, + wmbs_workflow.task AS task, + wmbs_workflow.name AS name, + wmbs_workflow.id AS workflow, + wmbs_workflow.spec AS spec + FROM wmbs_subscription + INNER JOIN wmbs_fileset ON + wmbs_fileset.id = wmbs_subscription.fileset AND + wmbs_fileset.open = 0 + INNER JOIN wmbs_workflow ON + wmbs_workflow.id = wmbs_subscription.workflow AND + wmbs_workflow.name LIKE :pattern + LEFT OUTER JOIN wmbs_sub_files_available ON + wmbs_sub_files_available.subscription = wmbs_subscription.id + LEFT OUTER JOIN wmbs_sub_files_acquired ON + wmbs_sub_files_acquired.subscription = wmbs_subscription.id + LEFT OUTER JOIN wmbs_jobgroup ON + wmbs_jobgroup.subscription = wmbs_subscription.id + LEFT OUTER JOIN wmbs_job ON + wmbs_job.jobgroup = wmbs_jobgroup.id AND + wmbs_job.state_time > :maxTime AND + wmbs_job.state != %d + WHERE wmbs_subscription.finished = 0 + GROUP BY wmbs_workflow.task, + wmbs_workflow.name, + wmbs_workflow.spec, + wmbs_workflow.id, + wmbs_subscription.id + HAVING COUNT(wmbs_sub_files_available.subscription) = 0 + AND COUNT(wmbs_sub_files_acquired.subscription) = 0 + AND COUNT(wmbs_job.id) = 0 ) complete_subscriptions + RIGHT OUTER JOIN wmbs_subscription ON + wmbs_subscription.workflow = complete_subscriptions.workflow + GROUP BY complete_subscriptions.name, complete_subscriptions.task, + complete_subscriptions.spec, complete_subscriptions.workflow + HAVING COUNT(complete_subscriptions.sub_id) = COUNT(wmbs_subscription.id) + """ + + def execute(self, state, pattern = '%Repack%', timeOut = None, conn = None, transaction = False): + + currentTime = int(time()) + if timeOut == None: + timeOut = currentTime + + binds = {'maxTime' : currentTime - timeOut, + 'pattern' : pattern} + + result = self.dbi.processData(self.sql % state, + binds, conn = conn, + transaction = transaction) + return self.formatDict(result) diff --git a/src/python/WMCore/WMBS/MySQL/Workflow/GetFinishedTasks.py b/src/python/WMCore/WMBS/MySQL/Workflow/GetFinishedTasks.py new file mode 100644 index 0000000000..8df21834d8 --- /dev/null +++ b/src/python/WMCore/WMBS/MySQL/Workflow/GetFinishedTasks.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +""" +_GetFinishedTasks_ + +MySQL implementation of Workflow.GetFinishedTasks + +Created on Nov 7, 2012 + +@author: dballest +""" + +from WMCore.Database.DBFormatter import DBFormatter + +class GetFinishedTasks(DBFormatter): + """ + _GetFinishedTasks_ + + Returns those tasks where the associated subscriptions + are marked as finished + """ + + sql = """SELECT wmbs_workflow.name, wmbs_workflow.task, + wmbs_workflow.spec + FROM wmbs_workflow + LEFT OUTER JOIN wmbs_subscription ON + wmbs_subscription.workflow = wmbs_workflow.id AND + wmbs_subscription.finished = 0 + GROUP BY wmbs_workflow.name, wmbs_workflow.task, + wmbs_workflow.spec + HAVING COUNT(wmbs_subscription.id) = 0 + """ + + def execute(self, conn = None, transaction = False): + """ + _execute_ + + Returns a list of dictionaries with the following structure: + [ {'name' : , 'task' : }, ...] + """ + result = self.dbi.processData(self.sql, + conn = conn, transaction = transaction) + return self.formatDict(result) diff --git a/src/python/WMCore/WMBS/Oracle/Subscriptions/GetSemiFinishedTasks.py b/src/python/WMCore/WMBS/Oracle/Subscriptions/GetSemiFinishedTasks.py new file mode 100644 index 0000000000..c159651b63 --- /dev/null +++ b/src/python/WMCore/WMBS/Oracle/Subscriptions/GetSemiFinishedTasks.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +""" +_GetSemiFinishedTasks_ + +Oracle implementation of Subscriptions.GetSemiFinishedTasks + +Created on Nov 7, 2012 + +@author: dballest +""" + +from WMCore.WMBS.MySQL.Subscriptions.GetSemiFinishedTasks import GetSemiFinishedTasks as MySQLGetSemiFinishedTasks + +class GetSemiFinishedTasks(MySQLGetSemiFinishedTasks): + pass diff --git a/src/python/WMCore/WMBS/Oracle/Workflow/GetFinishedTasks.py b/src/python/WMCore/WMBS/Oracle/Workflow/GetFinishedTasks.py new file mode 100644 index 0000000000..ba92e449e8 --- /dev/null +++ b/src/python/WMCore/WMBS/Oracle/Workflow/GetFinishedTasks.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +""" +_GetFinishedTasks_ + +Oracle implementation of Workflow.GetFinishedTasks + +Created on Nov 7, 2012 + +@author: dballest +""" + +from WMCore.WMBS.MySQL.Workflow.GetFinishedTasks import GetFinishedTasks as MySQLGetFinishedTasks + +class GetFinishedTasks(MySQLGetFinishedTasks): + pass diff --git a/src/python/WMQuality/Emulators/AnalyticsDataCollector/DataCollectorAPI.py b/src/python/WMQuality/Emulators/AnalyticsDataCollector/DataCollectorAPI.py index f391a523a2..1d02661ce7 100644 --- a/src/python/WMQuality/Emulators/AnalyticsDataCollector/DataCollectorAPI.py +++ b/src/python/WMQuality/Emulators/AnalyticsDataCollector/DataCollectorAPI.py @@ -27,11 +27,12 @@ class LocalCouchDBData(): - def __init__(self, couchURL): + def __init__(self, couchURL, summaryLevel): # set the connection for local couchDB call print "Using LocalCouchDBData Emulator" self.couchURL = couchURL self.couchURLBase, self.dbName = splitCouchServiceURL(couchURL) + self.summaryLevel = summaryLevel logging.info("connect couch %s: %s" % (self.couchURLBase, self.dbName)) def getJobSummaryByWorkflowAndSite(self): @@ -61,6 +62,15 @@ def getJobSummaryByWorkflowAndSite(self): return doc + def getEventSummaryByWorkflow(self): + """ + _getEventSummaryByWorkflow_ + + Gets the event progress by workflow. + Returns an empty dict in the emulator + """ + return {} + class ReqMonDBData(): def __init__(self, couchURL): @@ -75,17 +85,15 @@ def uploadData(self, doc): class WMAgentDBData(): - def __init__(self, dbi, logger): + def __init__(self, summaryLevel, dbi, logger): # interface to WMBS/BossAir db print "Using %s Emulator" % self.__class__ - def getHeartBeatWarning(self, agentURL, acdcLink): + def getHeartBeatWarning(self): agentInfo = {} agentInfo['status'] = 'ok' - agentInfo['url'] = agentURL - agentInfo['acdc'] = acdcLink return agentInfo def getBatchJobInfo(self): diff --git a/standards/allowed_failing_tests.txt b/standards/allowed_failing_tests.txt index 3bf6e7b7bc..f2196662c3 100644 --- a/standards/allowed_failing_tests.txt +++ b/standards/allowed_failing_tests.txt @@ -28,7 +28,6 @@ UserFileCache_t.UserFileCacheTest.testChecksum WMBS_t.WorkQueueTest.testWorkQueueService WMComponent_t.JobAccountant_t.JobAccountant_t.JobAccountantTest.test4GMerge WMComponent_t.AlertGenerator_t.Pollers_t.Agent_t.AgentTest.testComponentsPollerBasic -WMComponent_t.AnalyticsDataCollector_t.AnalyticsDataCollector_t.AnalyticsDataCollector_t.testAnalyticsPoller WMComponent_t.JobSubmitter_t.JobSubmitter_t.JobSubmitterTest.testD_CreamCETest WMCore_t.BossAir_t.GLiteLIParser_t.LoggingInfoParserTest.testParseFile WMCore_t.JobSplitting_t.Harvest_t.HarvestTest.testHarvestEndOfRunTrigger diff --git a/test/python/WMComponent_t/AnalyticsDataCollector_t/AnalyticsDataCollector_t.py b/test/python/WMComponent_t/AnalyticsDataCollector_t/AnalyticsDataCollector_t.py index d37f8d2e1a..bdc5c0902c 100644 --- a/test/python/WMComponent_t/AnalyticsDataCollector_t/AnalyticsDataCollector_t.py +++ b/test/python/WMComponent_t/AnalyticsDataCollector_t/AnalyticsDataCollector_t.py @@ -52,7 +52,9 @@ def setUp(self): self.testInit = TestInit(__file__) self.testInit.setLogging() self.reqmonDBName = "wmstat_t" + self.localDBName = "wmstat_t_local" self.testInit.setupCouch(self.reqmonDBName, "WMStats") + self.testInit.setupCouch(self.localDBName, "WMStats") self.testDir = self.testInit.generateWorkDir() EmulatorHelper.setEmulators(localCouch = True, reqMon = False, wmagentDB = True) return @@ -89,14 +91,25 @@ def getConfig(self): config.Agent.useTrigger = False config.Agent.useHeartbeat = False + config.section_("ACDC") + config.ACDC.couchurl = couchURL + config.ACDC.database = "acdc" + config.component_("AnalyticsDataCollector") config.AnalyticsDataCollector.namespace = "WMComponent.AnalyticsDataCollector.AnalyticsDataCollector" - config.AnalyticsDataCollector.componentDir = config.General.workDir + "/AnalyticsDataCollector" + config.AnalyticsDataCollector.componentDir = config.General.workDir + "/AnalyticsDataCollector" config.AnalyticsDataCollector.logLevel = "DEBUG" config.AnalyticsDataCollector.pollInterval = 240 config.AnalyticsDataCollector.localCouchURL = "%s/%s" % (couchURL, "jobDump") config.AnalyticsDataCollector.localQueueURL = "%s/%s" % (couchURL, "workqueue") + config.AnalyticsDataCollector.localWMStatsURL = "%s/%s" % (couchURL, self.localDBName) + config.AnalyticsDataCollector.centralWMStatsURL = "%s/%s" % (couchURL, self.reqmonDBName) config.AnalyticsDataCollector.reqMonURL = "%s/%s" % (couchURL, self.reqmonDBName) + config.AnalyticsDataCollector.summaryLevel = "task" + + config.section_("WMBSService") + config.WMBSService.section_("Webtools") + config.WMBSService.Webtools.port = 9999 return config diff --git a/test/python/WMComponent_t/AnalyticsDataCollector_t/Plugins_t/Tier0Plugin_t.py b/test/python/WMComponent_t/AnalyticsDataCollector_t/Plugins_t/Tier0Plugin_t.py new file mode 100644 index 0000000000..9799152e5b --- /dev/null +++ b/test/python/WMComponent_t/AnalyticsDataCollector_t/Plugins_t/Tier0Plugin_t.py @@ -0,0 +1,354 @@ +""" +_Tier0Plugin_t_ + +Test the Tier-0 plugin for the AnalyticsDataCollector + +Created on Nov 7, 2012 + +@author: dballest +""" + +import os +import unittest + +from WMComponent.AnalyticsDataCollector.Plugins.Tier0Plugin import Tier0Plugin + +from WMCore.Services.WMStats.WMStatsWriter import WMStatsWriter +from WMCore.WMBS.Fileset import Fileset +from WMCore.WMBS.Subscription import Subscription +from WMCore.WMBS.Workflow import Workflow +from WMCore.WMSpec.WMWorkload import newWorkload +from WMCore.WMSpec.StdSpecs.PromptReco import getTestArguments, promptrecoWorkload + +from WMQuality.TestInitCouchApp import TestInitCouchApp as TestInit +from WMCore.WorkQueue.WMBSHelper import WMBSHelper + + + +class Tier0PluginTest(unittest.TestCase): + + + def setUp(self): + """ + _setUp_ + + Setup the test environment + """ + self.testInit = TestInit(__file__) + self.testInit.setDatabaseConnection() + self.testInit.setSchema(["WMCore.WMBS"]) + self.wmstatsCouchDB = 'wmstats_plugin_t' + self.testInit.setupCouch(self.wmstatsCouchDB, 'WMStats') + self.testDir = self.testInit.generateWorkDir() + + self.wmstatsWriter = WMStatsWriter(os.environ['COUCHURL'], self.wmstatsCouchDB) + + self.stateMap = {} + self.orderedStates = [] + self.plugin = None + + return + + def tearDown(self): + """ + _tearDown_ + + Clear databases and delete files + """ + self.testInit.tearDownCouch() + self.testInit.clearDatabase() + self.testInit.delWorkDir() + + return + + def setupRepackWorkflow(self): + """ + _setupRepackWorkflow_ + + Populate WMBS with a repack-like workflow, + every subscription must be unfinished at first + """ + + workflowName = 'Repack_Run481516_StreamZ' + mergeTasks = ['RepackMergewrite_QuadElectron_RAW', 'RepackMergewrite_TriPhoton_RAW', + 'RepackMergewrite_SingleNeutrino_RAW'] + + self.stateMap = {'Merge' : [], + 'Processing Done' : []} + self.orderedStates = ['Merge', 'Processing Done'] + + # Populate WMStats + self.wmstatsWriter.insertGenericRequest({'_id' : workflowName}) + self.wmstatsWriter.updateRequestStatus(workflowName, 'Closed') + + # Create a wmspec in disk + workload = newWorkload(workflowName) + repackTask = workload.newTask('Repack') + for task in mergeTasks: + repackTask.addTask(task) + repackTask.addTask('RepackCleanupUnmergedwrite_QuadElectron_RAW') + + specPath = os.path.join(self.testDir, 'Repack.pkl') + workload.save(specPath) + + # Populate WMBS + topFileset = Fileset(name = 'TestStreamerFileset') + topFileset.create() + + options = {'spec' : specPath, 'owner' : 'ItsAMeMario', + 'name' : workflowName, 'wfType' : 'tier0'} + topLevelWorkflow = Workflow(task = '/%s/Repack' % workflowName, + **options) + topLevelWorkflow.create() + topLevelSub = Subscription(topFileset, topLevelWorkflow) + topLevelSub.create() + self.stateMap['Merge'].append(topFileset) + for task in mergeTasks: + mergeWorkflow = Workflow(task = '/%s/Repack/%s' % (workflowName, task), **options) + mergeWorkflow.create() + unmergedFileset = Fileset(name = 'TestUnmergedFileset%s' % task) + unmergedFileset.create() + mergeSub = Subscription(unmergedFileset, mergeWorkflow) + mergeSub.create() + self.stateMap['Processing Done'].append(unmergedFileset) + cleanupWorkflow = Workflow(task = '/Repack_Run481516_StreamZ/Repack/RepackCleanupUnmergedwrite_QuadElectron_RAW', + **options) + cleanupWorkflow.create() + unmergedFileset = Fileset(name = 'TestUnmergedFilesetToCleanup') + unmergedFileset.create() + cleanupSub = Subscription(unmergedFileset, cleanupWorkflow) + cleanupSub.create() + + return + + def setupExpressWorkflow(self): + """ + _setupExpressWorkflow_ + + Populate WMBS with a express-like workflow, + every subscription must be unfinished at first + """ + + workflowName = 'Express_Run481516_StreamZFast' + secondLevelTasks = ['ExpressMergewrite_StreamZFast_DQM', 'ExpressMergewrite_ExpressPhysics_FEVT', + 'ExpressAlcaSkimwrite_StreamZFast_ALCARECO', 'ExpressCleanupUnmergedwrite_StreamZFast_DQM', + 'ExpressCleanupUnmergedwrite_ExpressPhysics_FEVT', 'ExpressCleanupUnmergedwrite_StreamZFast_ALCARECO'] + alcaHarvestTask = 'ExpressAlcaSkimwrite_StreamZFast_ALCARECOAlcaHarvestALCARECOStreamPromptCalibProd' + dqmHarvestTask = 'ExpressMergewrite_StreamZFast_DQMDQMHarvestMerged' + + self.stateMap = {'Merge' : [], + 'Harvesting' : [], + 'Processing Done' : []} + self.orderedStates = ['Merge', 'Harvesting', 'Processing Done'] + + # Populate WMStats + self.wmstatsWriter.insertGenericRequest({'_id' : workflowName}) + self.wmstatsWriter.updateRequestStatus(workflowName, 'Closed') + + # Create a wmspec in disk + workload = newWorkload(workflowName) + expressTask = workload.newTask('Express') + for task in secondLevelTasks: + secondLevelTask = expressTask.addTask(task) + if task == 'ExpressAlcaSkimwrite_StreamZFast_ALCARECO': + secondLevelTask.addTask(alcaHarvestTask) + elif task == 'ExpressMergewrite_StreamZFast_DQM': + secondLevelTask.addTask(dqmHarvestTask) + + specPath = os.path.join(self.testDir, 'Express.pkl') + workload.save(specPath) + + # Populate WMBS + sharedFileset = Fileset(name = 'TestFileset') + sharedFileset.create() + sharedFileset.markOpen(False) + + options = {'spec' : specPath, 'owner' : 'ItsAMeMario', + 'name' : workflowName, 'wfType' : 'tier0'} + topLevelWorkflow = Workflow(task = '/%s/Express' % workflowName, + **options) + topLevelWorkflow.create() + topLevelSub = Subscription(sharedFileset, topLevelWorkflow) + topLevelSub.create() + self.stateMap['Merge'].append(topLevelSub) + for task in filter(lambda x : not x.count('CleanupUnmerged'), secondLevelTasks): + secondLevelWorkflow = Workflow(task = '/%s/Express/%s' % (workflowName, task), **options) + secondLevelWorkflow.create() + mergeSub = Subscription(sharedFileset, secondLevelWorkflow) + mergeSub.create() + self.stateMap['Harvesting'].append(mergeSub) + + for (parent, child) in [('ExpressAlcaSkimwrite_StreamZFast_ALCARECO', alcaHarvestTask), + ('ExpressMergewrite_StreamZFast_DQM', dqmHarvestTask)]: + harvestingWorkflow = Workflow(task = '/%s/Express/%s/%s' % (workflowName, parent, child), + **options) + harvestingWorkflow.create() + harvestingSub = Subscription(sharedFileset, harvestingWorkflow) + harvestingSub.create() + self.stateMap['Processing Done'].append(harvestingSub) + + return + + def setupPromptRecoWorkflow(self): + """ + _setupPromptRecoWorkflow_ + + Populate WMBS with a real PromptReco workflow, + every subscription must be unfinished at first + """ + + # Populate disk and WMBS + testArguments = getTestArguments() + + workflowName = 'PromptReco_Run195360_Cosmics' + workload = promptrecoWorkload(workflowName, testArguments) + + wmbsHelper = WMBSHelper(workload, 'Reco', 'SomeBlock', cachepath = self.testDir) + wmbsHelper.createTopLevelFileset() + wmbsHelper.createSubscription(wmbsHelper.topLevelTask, wmbsHelper.topLevelFileset) + + self.stateMap = {'AlcaSkim' : [], + 'Merge' : [], + 'Harvesting' : [], + 'Processing Done' : []} + self.orderedStates = ['AlcaSkim', 'Merge', 'Harvesting', 'Processing Done'] + + # Populate WMStats + self.wmstatsWriter.insertGenericRequest({'_id' : workflowName}) + self.wmstatsWriter.updateRequestStatus(workflowName, 'Closed') + + topLevelTask = '/%s/Reco' % workflowName + alcaSkimTask = '%s/AlcaSkim' % topLevelTask + mergeTasks = ['%s/AlcaSkim/AlcaSkimMergeALCARECOStreamHcalCalHOCosmics', + '%s/AlcaSkim/AlcaSkimMergeALCARECOStreamTkAlCosmics0T', + '%s/AlcaSkim/AlcaSkimMergeALCARECOStreamMuAlGlobalCosmics', + '%s/RecoMergewrite_AOD', + '%s/RecoMergewrite_DQM', + '%s/RecoMergewrite_RECO'] + harvestingTask = '%s/RecoMergewrite_DQM/RecoMergewrite_DQMDQMHarvestMerged' % topLevelTask + + self.stateMap['AlcaSkim'].append(wmbsHelper.topLevelSubscription) + + alcaSkimWorkflow = Workflow(name = workflowName, task = alcaSkimTask) + alcaSkimWorkflow.load() + alcarecoFileset = Fileset(name = '/PromptReco_Run195360_Cosmics/Reco/unmerged-write_ALCARECO') + alcarecoFileset.load() + alcaSkimSub = Subscription(alcarecoFileset, alcaSkimWorkflow) + alcaSkimSub.load() + self.stateMap['Merge'].append(alcaSkimSub) + + for task in mergeTasks: + mergeTask = task % topLevelTask + mergeWorkflow = Workflow(name = workflowName, task = mergeTask) + mergeWorkflow.load() + if 'AlcaSkim' in mergeTask: + stream = mergeTask.split('/')[-1][13:] + unmergedFileset = Fileset(name = '%s/unmerged-%s' % (alcaSkimTask, stream)) + unmergedFileset.load() + else: + dataTier = mergeTask.split('/')[-1].split('_')[-1] + unmergedFileset = Fileset(name = '%s/unmerged-write_%s' % (topLevelTask, dataTier)) + unmergedFileset.load() + mergeSub = Subscription(unmergedFileset, mergeWorkflow) + mergeSub.load() + self.stateMap['Harvesting'].append(mergeSub) + + harvestingWorkflow = Workflow(name = workflowName, task = harvestingTask) + harvestingWorkflow.load() + harvestingFileset = Fileset(name = '/PromptReco_Run195360_Cosmics/Reco/RecoMergewrite_DQM/merged-Merged') + harvestingFileset.load() + harvestingSub = Subscription(harvestingFileset, harvestingWorkflow) + harvestingSub.load() + self.stateMap['Processing Done'].append(harvestingSub) + + return + + def verifyStateTransitions(self, transitionMethod = 'markFinished', transitionTrigger = True): + """ + _verifyStateTransitions_ + + Utility method which goes through the list of states in self.orderedStates and + finishes the tasks that demand a state transition in each step. This according + to the defined transition method and trigger. + It verifies that the request document in WMStats is moving according to the transitions + """ + + for idx in range(0, len(self.orderedStates) * 2): + nextState = self.orderedStates[idx / 2] + if (idx / 2) == 0: + currentState = 'Closed' + else: + currentState = self.orderedStates[idx / 2 - 1] + if idx % 2 == 0: + for transitionObject in self.stateMap[nextState][:-1]: + method = getattr(transitionObject, transitionMethod) + method(transitionTrigger) + self.plugin([], self.wmstatsWriter, self.wmstatsWriter) + currentStateWorkflows = self.wmstatsWriter.workflowsByStatus([currentState], stale = False) + nextStateWorkflows = self.wmstatsWriter.workflowsByStatus([nextState], stale = False) + self.assertEqual(len(currentStateWorkflows), 1, 'Workflow moved incorrectly from %s' % currentState) + self.assertEqual(len(nextStateWorkflows), 0, 'Workflow moved incorrectly to %s' % nextState) + else: + transitionObject = self.stateMap[nextState][-1] + method = getattr(transitionObject, transitionMethod) + method(transitionTrigger) + self.plugin([], self.wmstatsWriter, self.wmstatsWriter) + currentStateWorkflows = self.wmstatsWriter.workflowsByStatus([currentState], stale = False) + nextStateWorkflows = self.wmstatsWriter.workflowsByStatus([nextState], stale = False) + self.assertEqual(len(currentStateWorkflows), 0, 'Workflow did not move correctly from %s' % currentState) + self.assertEqual(len(nextStateWorkflows), 1, 'Workflow did not move correctly to %s' % nextState) + return + + def testA_RepackStates(self): + """ + _testA_RepackStates_ + + Setup an environment with a Repack workflow + and traverse through the different states. + Check that the transitions are sane. + """ + # Set the environment + self.setupRepackWorkflow() + self.plugin = Tier0Plugin() + + # Verify the transitions + self.verifyStateTransitions('markOpen', False) + + return + + def testB_ExpressStates(self): + """ + _testB_ExpressStates_ + + Setup an environment with a Express workflow + and traverse through the different states. + Check that the transitions are sane. + """ + # Set the environment + self.setupExpressWorkflow() + self.plugin = Tier0Plugin() + + # Verify the transitions + self.verifyStateTransitions() + + return + + def testC_PromptRecoStates(self): + """ + _testC_PromptRecoStates_ + + Setup an environment with a PromptReco workflow + and traverse through the different states. + Check that the transitions are sane. + """ + # Set the environment + self.setupPromptRecoWorkflow() + self.plugin = Tier0Plugin() + + # Verify the transitions + self.verifyStateTransitions() + + return + +if __name__ == "__main__": + unittest.main() diff --git a/test/python/WMComponent_t/AnalyticsDataCollector_t/Plugins_t/__init__.py b/test/python/WMComponent_t/AnalyticsDataCollector_t/Plugins_t/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/python/WMComponent_t/TaskArchiver_t/TaskArchiver_t.py b/test/python/WMComponent_t/TaskArchiver_t/TaskArchiver_t.py index a70b450a14..2a46596d19 100644 --- a/test/python/WMComponent_t/TaskArchiver_t/TaskArchiver_t.py +++ b/test/python/WMComponent_t/TaskArchiver_t/TaskArchiver_t.py @@ -75,7 +75,8 @@ def setUp(self): self.testInit.setupCouch("%s/workloadsummary" % self.databaseName, "WorkloadSummary") self.testInit.setupCouch("%s/jobs" % self.databaseName, "JobDump") self.testInit.setupCouch("%s/fwjrs" % self.databaseName, "FWJRDump") - + self.testInit.setupCouch("wmagent_summary_t", "WMStats") + self.testInit.setupCouch("wmagent_summary_central_t", "WMStats") self.daofactory = DAOFactory(package = "WMCore.WMBS", logger = myThread.logger, @@ -125,7 +126,7 @@ def getConfig(self): config.section_("JobStateMachine") config.JobStateMachine.couchurl = os.getenv("COUCHURL", "cmssrv52.fnal.gov:5984") config.JobStateMachine.couchDBName = self.databaseName - config.JobStateMachine.jobSummaryDBName = 'wmagent_summary' + config.JobStateMachine.jobSummaryDBName = 'wmagent_summary_t' config.component_("JobCreator") config.JobCreator.jobCacheDir = os.path.join(self.testDir, 'testDir') @@ -141,6 +142,7 @@ def getConfig(self): config.TaskArchiver.histogramLimit = 5 config.TaskArchiver.workloadSummaryCouchDBName = "%s/workloadsummary" % self.databaseName config.TaskArchiver.workloadSummaryCouchURL = config.JobStateMachine.couchurl + config.TaskArchiver.centralWMStatsURL = '%s/wmagent_summary_central_t' % config.JobStateMachine.couchurl config.TaskArchiver.requireCouch = True config.TaskArchiver.uploadPublishInfo = self.uploadPublishInfo config.TaskArchiver.uploadPublishDir = self.uploadPublishDir @@ -503,16 +505,6 @@ def testA_BasicFunctionTest(self): workloadSummary['performance']['/TestWorkload/ReReco']['cmsRun1'][x][y], places = 2) - # The TestWorkload should have no jobs left - workflowName = "TestWorkload" - jobs = jobdb.loadView("JobDump", "jobsByWorkflowName", - options = {"startkey": [workflowName], - "endkey": [workflowName, {}]})['rows'] - self.assertEqual(len(jobs), 0) - jobs = fwjrdb.loadView("FWJRDump", "fwjrsByWorkflowName", - options = {"startkey": [workflowName], - "endkey": [workflowName, {}]})['rows'] - self.assertEqual(len(jobs), 0) return def testB_testErrors(self):