Skip to content

Commit

Permalink
Tier0 Plugin implementation
Browse files Browse the repository at this point in the history
The Tier0 plugin for the AnalyticsDataCollector components
calculates the state of Tier-0 workflows based on subscription
information found in WMBS and updates the request document
in central WMStats with this status.

Additionally this patch fixes some bugs with PluginInterface implementation
patch and renames it from PlugIn to Plugin to conform with the same
name as the BossAir package (PlugIn is a bit harder to remember).

Fix the AnalyticsDataCollector unittest and removed it from the
allowed failing tests.

This requires #4227 to work, it also fixes the unit test breaks from it.

Fixes #4264
  • Loading branch information
dballesteros7 committed Nov 13, 2012
1 parent 2f5e7f8 commit 8258774
Show file tree
Hide file tree
Showing 16 changed files with 822 additions and 30 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -9,6 +9,8 @@ WMCORE_JENKINS_REPLACEMENT_SOURCE.tar.gz

.project
.pydevproject
.couchapprc
.couchappignore
.settings/
TEST_AREA/
list-tests-alerts.txt
Expand Down
14 changes: 8 additions & 6 deletions src/python/WMComponent/AnalyticsDataCollector/AnalyticsPoller.py
Expand Up @@ -8,6 +8,7 @@
import threading
import logging
import time
import traceback
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue as WorkQueueService
from WMCore.Services.WMStats.WMStatsWriter import WMStatsWriter
Expand Down Expand Up @@ -36,8 +37,8 @@ def __init__(self, config):
# need to get campaign, user, owner info
self.agentDocID = "agent+hostname"
self.summaryLevel = (config.AnalyticsDataCollector.summaryLevel).lower()
self.pluginName = getattr(config.AnalyticsDataCollector.pluginName, None)
self.plugIn = None
self.pluginName = getattr(config.AnalyticsDataCollector, "pluginName", None)
self.plugin = None


def setup(self, parameters):
Expand All @@ -63,7 +64,7 @@ def setup(self, parameters):
self.centralWMStatsCouchDB = WMStatsWriter(self.config.AnalyticsDataCollector.centralWMStatsURL)

if self.pluginName != None:
pluginFactory = WMFactory("plugins", "WMComponent.AnalyticsDataCollector.PlugIns")
pluginFactory = WMFactory("plugins", "WMComponent.AnalyticsDataCollector.Plugins")
self.plugin = pluginFactory.loadObject(classname = self.pluginName)

def algorithm(self, parameters):
Expand Down Expand Up @@ -105,8 +106,8 @@ def algorithm(self, parameters):
self.agentInfo, uploadTime, self.summaryLevel)


if self.plugIn != None:
self.plugIn(requestDocs, self.localSummaryCouchDB, self.centralWMStatsCouchDB)
if self.plugin != None:
self.plugin(requestDocs, self.localSummaryCouchDB, self.centralWMStatsCouchDB)

self.localSummaryCouchDB.uploadData(requestDocs)
logging.info("Request data upload success\n %s request \n uploading agent data" % len(requestDocs))
Expand All @@ -120,5 +121,6 @@ def algorithm(self, parameters):
logging.info("Agent data upload success\n %s request" % len(agentDocs))

except Exception, ex:
logging.error("Error occured: will retry later")
logging.error("Error occured, will retry later:")
logging.error(str(ex))
logging.error("Traceback: \n%s" % traceback.format_exc())
@@ -1,10 +1,17 @@
#!/usr/bin/env python
"""
_PluginInterface_
class PlugInInterface(object):
Base class for AnalyticsDataCollector plug-ins
"""

class PluginInterface(object):
"""Interface for policies"""
def __init__(self, **args):
""" initialize args if needed"""
pass

def __call__(self, requestDocs, localSummaryCouchDB, centralWMStatsCouchDB):
"""
this needs to be overwritten
Expand All @@ -17,4 +24,4 @@ def __call__(self, requestDocs, localSummaryCouchDB, centralWMStatsCouchDB):
So don't push requestDocs to couchdb directly here.
"""
msg = "%s.__call__ is not implemented" % self.__class__.__name__
raise NotImplementedError(msg)
raise NotImplementedError(msg)
255 changes: 255 additions & 0 deletions src/python/WMComponent/AnalyticsDataCollector/Plugins/Tier0Plugin.py
@@ -0,0 +1,255 @@
#!/usr/bin/env python
"""
_Tier0Plugin_
Plugin which collects information from the agent and updates
the state of the current tier-0 workflows in the system.
Created on Nov 2, 2012
@author: dballest
"""

import threading
import traceback
import re

from WMCore.DAOFactory import DAOFactory
from WMCore.WMException import WMException

from WMComponent.AnalyticsDataCollector.Plugins.PluginInterface import PluginInterface
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper


def getTier0Regex():
"""
_getTier0Regex_
Define the Tier0 states and matching regex out of the object
so it can be fetched without instancing the plugin.
This are the uncompiled regular expressions in the correct
order of the states
"""
regexDict = {'Repack' : [('Merge', [r'^/Repack_Run[0-9]+_Stream[\w]+/Repack$']),
('Processing Done', [r'^/Repack_Run[0-9]+_Stream[\w]+/Repack/RepackMerge[\w]+$'])],

'PromptReco' : [('AlcaSkim', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco$']),
('Merge', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco/AlcaSkim$']),
('Harvesting', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco/AlcaSkim/AlcaSkimMerge[\w]+$',
r'^/PromptReco_Run[0-9]+_[\w]+/Reco/RecoMerge[\w]+$']),
('Processing Done', [r'^/PromptReco_Run[0-9]+_[\w]+/Reco/RecoMerge[\w]+/RecoMerge[\w]+DQMHarvest[\w]+$'])
],

'Express' : [('Merge', [r'^/Express_Run[0-9]+_Stream[\w]+/Express$']),
('Harvesting', [r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressMerge[\w]+$',
r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressAlcaSkim[\w]+$']),
('Processing Done', [r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressMerge[\w]+/ExpressMerge[\w]+DQMHarvest[\w]+$',
r'^/Express_Run[0-9]+_Stream[\w]+/Express/ExpressAlcaSkim[\w]+/ExpressAlcaSkim[\w]+AlcaHarvest[\w]+$'])
]}
return regexDict

class Tier0PluginError(WMException):
"""
_Tier0PluginError_
An error in the Tier0 plugin
"""

def __init__(self, msg):
"""
__init__
Initialize the error, just accepts a message without error code
"""
WMException.__init__(self, msg)
return

class Tier0Plugin(PluginInterface):
"""
_Tier0Plugin_
Tier0 plugin main class
"""

def __init__(self):
"""
__init__
Initialize the plugin object
"""
PluginInterface.__init__(self)

self.myThread = threading.currentThread()
self.daoFactory = DAOFactory(package = 'WMCore.WMBS',
logger = self.myThread.logger,
dbinterface = self.myThread.dbi)

self.logger = self.myThread.logger

# To get finished subscriptions it needs the cleanout state index
getCleanoutState = self.daoFactory(classname = 'Jobs.GetStateID')
self.cleanoutState = getCleanoutState.execute(state = 'cleanout')

# Load the DAOs
self.getFinishedTasks = self.daoFactory(classname = 'Workflow.GetFinishedTasks')
self.getFinishedTasksNoInjection = self.daoFactory(classname = 'Subscriptions.GetSemiFinishedTasks')

# To avoid doing many I/O operations, cache the list of tasks for each workflow
self.taskCache = {}

self.setupStateRegex()

return

def setupStateRegex(self):
"""
_setupStateRegex_
Each state is mapped to a regular expression in the taskpaths
this depends on the current implementation of the T0. The regular
expressions obtained from the function Tier0Plugin.getTier0Regex
are compiled and stored in a dictionary.
The structure of the dictionary is as follows:
{ <workflowType> : [(<state>, [<matchObject>, <matchObject>]), ...],
...
}
"""
rawRegex = getTier0Regex()

self.tier0Regex = {}

for workflowType in rawRegex:
self.tier0Regex[workflowType] = []
for pair in rawRegex[workflowType]:
compiledRegex = (pair[0], [re.compile(x).match for x in pair[1]])
self.tier0Regex[workflowType].append(compiledRegex)
return

def __call__(self, requestDocs, localSummaryCouchDB, centralWMStatsCouchDB):
"""
__call__
Call to this plugin, this method takes care of executing
the logic of the plugin, see guidelines in parent class documentation.
"""
# Get all current finished tasks (i.e. all associated subscriptions are finished)
finishedTasks = self.getFinishedTasks.execute()
self.logger.debug("Found %d finished tasks" % len(finishedTasks))
# Repack workflows are not marked injected and therefore its subscriptions
# are not marked as finished until the injection happens (48h delay)
# get those tasks with a looser criteria
finishedTasksNoInjection = self.getFinishedTasksNoInjection.execute(state = self.cleanoutState,
pattern = '%Repack%')
self.logger.debug("Found %d finished Repack tasks" % len(finishedTasksNoInjection))
# Get workflows which are not closed yet or completed
notClosedWorkflows = centralWMStatsCouchDB.workflowsByStatus(['new'])

# Aggregate results by workflow name
finishedTasks.extend(finishedTasksNoInjection)
workflows = {}
for entry in finishedTasks:
workflowName = entry['name']
if workflowName not in workflows:
workflows[workflowName] = []
if workflowName not in self.taskCache:
self.loadTasks(workflowName, entry['spec'])
workflows[workflowName].append(entry['task'])

self.logger.info('Updating the status of %d workflows' % len(workflows))
#Go through the reported workflows
for workflowName in workflows:
try:
if workflowName in notClosedWorkflows:
# Nothing is completed for this workflow
continue

# Sanity checks on workflow name
tokens = workflowName.split('_')
if not len(tokens):
self.logger.warning('This workflow does not match the Tier-0 naming structure, it will be ignored.')
continue
workflowType = tokens[0]
if workflowType not in self.tier0Regex:
self.logger.warning('This workflow does not match the Tier-0 naming structure, it will be ignored.')
continue

completedTaskList = workflows[workflowName]

workflowStatus = self.determineCurrentStatus(workflowName, workflowType, completedTaskList)
if workflowStatus is not None:
localSummaryCouchDB.updateRequestStatus(workflowName, workflowStatus)

except Exception, ex:
# Plugins are meant to be not-critical
# If something fails then just keep going
self.logger.error('Error occurred while processing docs:\n%s' % str(ex))
self.logger.error(traceback.format_exc())
except Tier0PluginError, t0ex:
# More specific exception, just log it anyway
self.logger.error('Error ocurred while processing a doc:\n%s' % str(t0ex))

# Clean the task cache based on the documents we reported this cycle
self.cleanTaskCache([x['workflow'] for x in requestDocs])

return

def loadTasks(self, workflowName, spec):
"""
_loadTasks_
Loads the list of tasks for the workflow,
stores them in the cache if not present
"""
if workflowName in self.taskCache:
return
try:
workloadHelper = WMWorkloadHelper()
workloadHelper.load(spec)
tasks = workloadHelper.listAllTaskPathNames()
self.taskCache[workflowName] = tasks
except IOError, ex:
msg = "Failed to load spec file %s\n" % spec
msg += "Original IOError: %s" % str(ex)
raise Tier0PluginError(msg)
return

def cleanTaskCache(self, reportedWorkflows):
"""
_cleanTaskCache_
Keep the memory footprint of this plugin smaller
clean unused task lists
"""
self.logger.debug('Cleaning up task cache')
for workflow in self.taskCache.keys():
if workflow not in reportedWorkflows:
self.taskCache.pop(workflow)
return

def determineCurrentStatus(self, workflowName, workflowType, completedTasks):
"""
_determineCurrentStatus_
Process a completed task list for a workflow and
get the most advanced status of the workflow.
"""
self.logger.debug('Calculating status for %s' % workflowName)
currentStatus = None
typeRegex = self.tier0Regex[workflowType]
fullTaskList = self.taskCache[workflowName]
for pair in typeRegex:
completedTasksForStatus = 0
totalTasksForStatus = 0
for regex in pair[1]:
completedTasksForStatus += len(filter(regex, completedTasks))
totalTasksForStatus += len(filter(regex, fullTaskList))
if completedTasksForStatus == totalTasksForStatus:
currentStatus = pair[0]
else:
break
self.logger.debug('Status is %s' % currentStatus)
self.logger.debug('Completed task list: %s' % str(completedTasksForStatus))

return currentStatus
Empty file.
6 changes: 4 additions & 2 deletions src/python/WMCore/Services/WMStats/WMStatsReader.py
Expand Up @@ -17,9 +17,11 @@ def __init__(self, couchURL, dbName = None):
self.couchServer = CouchServer(self.couchURL)
self.couchDB = CouchServer(self.couchURL).connectDatabase(self.dbName, False)

def workflowsByStatus(self, statusList, format = "list"):
def workflowsByStatus(self, statusList, format = "list", stale = "update_after"):
keys = statusList
options = {"stale": "update_after"}
options = {}
if stale:
options = {"stale": stale}
result = self.couchDB.loadView("WMStats", "requestByStatus", options, keys)

if format == "dict":
Expand Down

0 comments on commit 8258774

Please sign in to comment.