Skip to content
Browse files

Part of WorfklowTask rewritten

Better extensibility, added test.
  • Loading branch information...
1 parent e44279e commit db3e0748ce6ce56c6271bca4cccee5394d2b2ab6 @fstagni fstagni committed with
Showing with 183 additions and 91 deletions.
  1. +147 −88 TransformationSystem/Client/TaskManager.py
  2. +36 −3 TransformationSystem/Client/test/test_Client.py
View
235 TransformationSystem/Client/TaskManager.py
@@ -1,31 +1,22 @@
-"""
+""" TaskManager contains WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks
"""
__RCSID__ = "$Id$"
COMPONENT_NAME = 'TaskManager'
-from DIRAC import gConfig, S_OK, S_ERROR, gLogger
+import re, time, types, os, copy
+from DIRAC import gConfig, S_OK, S_ERROR, gLogger
from DIRAC.Core.Utilities.List import sortList, fromChar
-from DIRAC.Interfaces.API.Job import Job
-
-from DIRAC.RequestManagementSystem.Client.RequestContainer import RequestContainer
-from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.ModuleFactory import ModuleFactory
-from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
-from DIRAC.RequestManagementSystem.Client.RequestClient import RequestClient
-from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
-from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
-from DIRAC.Core.Security.ProxyInfo import getProxyInfo
-from DIRAC.Core.Utilities.SiteSEMapping import getSitesForSE
-
-import re, time, types, os
+from DIRAC.RequestManagementSystem.Client.RequestContainer import RequestContainer
class TaskBase( object ):
def __init__( self, transClient = None, logger = None ):
if not transClient:
+ from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
self.transClient = TransformationClient()
else:
self.transClient = transClient
@@ -78,6 +69,7 @@ def __init__( self, transClient = None, logger = None, requestClient = None ):
super( RequestTasks, self ).__init__( transClient, logger )
if not requestClient:
+ from DIRAC.RequestManagementSystem.Client.RequestClient import RequestClient
self.requestClient = RequestClient()
else:
self.requestClient = requestClient
@@ -129,6 +121,8 @@ def submitTransformationTasks( self, taskDict ):
return S_OK( taskDict )
def submitTaskToExternal( self, request ):
+ """ Submits a request using RequestClient
+ """
if type( request ) in types.StringTypes:
oRequest = RequestContainer( request )
name = oRequest.getRequestName()['Value']
@@ -203,9 +197,15 @@ def getSubmittedFileStatus( self, fileDicts ):
return S_OK( updateDict )
class WorkflowTasks( TaskBase ):
+ """ Handles jobs
+ """
def __init__( self, transClient = None, logger = None, submissionClient = None, jobMonitoringClient = None,
- outputDataModule = None ):
+ outputDataModule = None, jobClass = None, opsH = None ):
+ """ Generates some default objects.
+ jobClass is by default "DIRAC.Interfaces.API.Job.Job". An extension of it also works:
+ VOs can pass in their job class extension, if present
+ """
if not logger:
logger = gLogger.getSubLogger( 'WorkflowTasks' )
@@ -213,24 +213,41 @@ def __init__( self, transClient = None, logger = None, submissionClient = None,
super( WorkflowTasks, self ).__init__( transClient, logger )
if not submissionClient:
+ from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient
self.submissionClient = WMSClient()
else:
self.submissionClient = submissionClient
if not jobMonitoringClient:
+ from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
self.jobMonitoringClient = JobMonitoringClient()
else:
self.jobMonitoringClient = jobMonitoringClient
if not outputDataModule:
- #FIXME: LHCb specific
- self.outputDataModule = gConfig.getValue( "/DIRAC/VOPolicy/OutputDataModule",
- "LHCbDIRAC.Core.Utilities.OutputDataPolicy" )
+ self.outputDataModule = gConfig.getValue( "/DIRAC/VOPolicy/OutputDataModule", "" )
else:
self.outputDataModule = outputDataModule
- def prepareTransformationTasks( self, transBody, taskDict, owner = '', ownerGroup = '', job = None ):
+ if not jobClass:
+ from DIRAC.Interfaces.API.Job import Job
+ self.jobClass = Job
+ else:
+ self.jobClass = jobClass
+
+ if not opsH:
+ from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
+ self.opsH = Operations()
+ else:
+ self.opsH = opsH
+
+
+ def prepareTransformationTasks( self, transBody, taskDict, owner = '', ownerGroup = '' ):
+ """ Prepare tasks, given a taskDict, that is created (with some manipulation) by the DB
+ jobClass is by default "DIRAC.Interfaces.API.Job.Job". An extension of it also works.
+ """
if ( not owner ) or ( not ownerGroup ):
+ from DIRAC.Core.Security.ProxyInfo import getProxyInfo
res = getProxyInfo( False, False )
if not res['OK']:
return res
@@ -238,10 +255,7 @@ def prepareTransformationTasks( self, transBody, taskDict, owner = '', ownerGrou
owner = proxyInfo['username']
ownerGroup = proxyInfo['group']
- if not job:
- oJob = Job( transBody )
- else:
- oJob = job( transBody )
+ oJob = self.jobClass( transBody )
for taskNumber in sortList( taskDict.keys() ):
paramsDict = taskDict[taskNumber]
@@ -258,71 +272,27 @@ def prepareTransformationTasks( self, transBody, taskDict, owner = '', ownerGrou
oJob._setParamValue( 'PRODUCTION_ID', str( transID ).zfill( 8 ) )
oJob._setParamValue( 'JOB_ID', str( taskNumber ).zfill( 8 ) )
inputData = None
- sites = []
- for paramName, paramValue in paramsDict.items():
- self.log.verbose( 'TransID: %s, TaskID: %s, ParamName: %s, ParamValue: %s' % ( transID, taskNumber,
- paramName, paramValue ) )
- if paramName == 'InputData':
- if paramValue:
- self.log.verbose( 'Setting input data to %s' % paramValue )
- oJob.setInputData( paramValue )
- elif paramName == 'Site':
- if paramValue:
- self.log.verbose( 'Setting allocated site to: %s' % ( paramValue ) )
- oJob.setDestination( paramValue )
- sites = fromChar( paramValue )
- elif paramValue:
- self.log.verbose( 'Setting %s to %s' % ( paramName, paramValue ) )
- oJob._addJDLParameter( paramName, paramValue )
- if 'TargetSE' in paramsDict:
- seSites = None
- seList = fromChar( paramsDict['TargetSE'] )
- for se in seList:
- res = getSitesForSE( se )
- if res['OK']:
- thisSESites = res['OK']['Value']
- if seSites == None:
- # If this is the first SE, initialize the vector
- seSites = thisSESites
- else:
- # If it is not the first SE, keep only those that are common
- for nSE in list( seSites ):
- if nSE not in thisSESites:
- seSites.remove( nSE )
- else:
- self.log.warn( 'Could not get Sites associated to SE', res['Message'] )
- seSites = []
- # Now we need to make the AND with the sites, if defined
- if seSites == None:
- seSites = []
- if sites:
- # Need to get the AND
- for nSE in list( seSites ):
- if nSE not in sites:
- seSites.remove( nSE )
- if not seSites:
- taskDict[taskNumber]['TaskObject'] = ''
- continue
-
- if not seSites:
- self.log.warn( 'Could not get a list a Sites for provided TargetSE', ', '.join( seList ) )
- taskDict[taskNumber]['TaskObject'] = ''
- continue
+ self.log.debug( 'TransID: %s, TaskID: %s, paramsDict: %s' % ( transID, taskNumber, str( paramsDict ) ) )
- sitesString = ', '.join( seSites )
- self.log.verbose( 'Setting Site according to TargetSE', sitesString )
+ #These helper functions do the real job
+ sites = self._handleDestination( paramsDict )
+ if not sites:
+ self.log.error( 'Could not get a list a sites', ', '.join( sites ) )
+ taskDict[taskNumber]['TaskObject'] = ''
+ continue
+ else:
+ sitesString = ', '.join( sites )
+ self.log.verbose( 'Setting Site: ', sitesString )
oJob.setDestination( sitesString )
- hospitalTrans = [int( x ) for x in Operations().getValue( "Hospital/Transformations", [] )]
+ self._handleInputs( oJob, paramsDict )
+ self._handleRest( oJob, paramsDict )
+
+ hospitalTrans = [int( x ) for x in self.opsH.getValue( "Hospital/Transformations", [] )]
if int( transID ) in hospitalTrans:
- hospitalSite = Operations().getValue( "Hospital/HospitalSite", 'DIRAC.JobDebugger.ch' )
- hospitalCEs = Operations().getValue( "Hospital/HospitalCEs", [] )
- oJob.setType( 'Hospital' )
- oJob.setDestination( hospitalSite )
- oJob.setInputDataPolicy( 'download', dataScheduling = False )
- if hospitalCEs:
- oJob._addJDLParameter( 'GridRequiredCEs', hospitalCEs )
+ self.handleHospital( oJob )
+
taskDict[taskNumber]['TaskObject'] = ''
res = self.getOutputData( {'Job':oJob._toXML(), 'TransformationID':transID,
'TaskID':taskNumber, 'InputData':inputData},
@@ -332,9 +302,96 @@ def prepareTransformationTasks( self, transBody, taskDict, owner = '', ownerGrou
continue
for name, output in res['Value'].items():
oJob._addJDLParameter( name, ';'.join( output ) )
- taskDict[taskNumber]['TaskObject'] = Job( oJob._toXML() )
+ taskDict[taskNumber]['TaskObject'] = self.jobClass( oJob._toXML() )
return S_OK( taskDict )
+ #############################################################################
+
+ def _handleDestination( self, paramsDict, getSitesForSE = None ):
+ """ Handle Sites and TargetSE in the parameters
+ """
+
+ try:
+ sites = ['ANY']
+ if paramsDict['Site']:
+ sites = fromChar( paramsDict['Site'] )
+ except KeyError:
+ pass
+
+ try:
+ seList = ['Unknown']
+ if paramsDict['TargetSE']:
+ seList = fromChar( paramsDict['TargetSE'] )
+ except KeyError:
+ pass
+
+ if not seList or seList == ['Unknown']:
+ return sites
+
+ if not getSitesForSE:
+ from DIRAC.Core.Utilities.SiteSEMapping import getSitesForSE
+
+ seSites = []
+ for se in seList:
+ res = getSitesForSE( se )
+ if not res['OK']:
+ self.log.warn( 'Could not get Sites associated to SE', res['Message'] )
+ else:
+ thisSESites = res['Value']
+ if not thisSESites:
+ continue
+ if seSites == []:
+ seSites = copy.deepcopy( thisSESites )
+ else:
+ # If it is not the first SE, keep only those that are common
+ for nSE in list( seSites ):
+ if nSE not in thisSESites:
+ seSites.remove( nSE )
+
+ # Now we need to make the AND with the sites, if defined
+ if sites == ['ANY']:
+ return seSites
+ else:
+ # Need to get the AND
+ for nSE in list( seSites ):
+ if nSE not in sites:
+ seSites.remove( nSE )
+
+ return seSites
+
+
+ def _handleInputs( self, oJob, paramsDict ):
+ """ set job inputs (+ metadata)
+ """
+ try:
+ if paramsDict['InputData']:
+ self.log.verbose( 'Setting input data to %s' % paramsDict['InputData'] )
+ oJob.setInputData( paramsDict['InputData'], runNumber = paramsDict['RunNumber'] )
+ except KeyError:
+ pass
+
+ def _handleRest( self, oJob, paramsDict ):
+ """ add as JDL parameters all the other parameters that are not for inputs or destination
+ """
+ for paramName, paramValue in paramsDict.items():
+ if paramName not in ( 'InputData', 'Site', 'TargetSE' ):
+ if paramValue:
+ self.log.verbose( 'Setting %s to %s' % ( paramName, paramValue ) )
+ oJob._addJDLParameter( paramName, paramValue )
+
+ def _handleHospital( self, oJob ):
+ """ Optional handle of hospital jobs
+ """
+ oJob.setType( 'Hospital' )
+ oJob.setInputDataPolicy( 'download', dataScheduling = False )
+ hospitalSite = self.opsH.getValue( "Hospital/HospitalSite", 'DIRAC.JobDebugger.ch' )
+ oJob.setDestination( hospitalSite )
+ hospitalCEs = self.opsH.getValue( "Hospital/HospitalCEs", [] )
+ if hospitalCEs:
+ oJob._addJDLParameter( 'GridRequiredCEs', hospitalCEs )
+
+ #############################################################################
+
def getOutputData( self, paramDict, moduleLocation ):
moduleFactory = ModuleFactory()
@@ -369,21 +426,23 @@ def submitTransformationTasks( self, taskDict ):
return S_OK( taskDict )
def submitTaskToExternal( self, job ):
+ """ Submits a job to the WMS.
+ """
if type( job ) in types.StringTypes:
try:
- job = Job( job )
+ oJob = self.jobClass( job )
except Exception, x:
self.log.exception( "Failed to create job object", '', x )
return S_ERROR( "Failed to create job object" )
elif type( job ) == types.InstanceType:
- pass
+ oJob = job
else:
self.log.error( "No valid job description found" )
return S_ERROR( "No valid job description found" )
workflowFile = open( "jobDescription.xml", 'w' )
- workflowFile.write( job._toXML() )
+ workflowFile.write( oJob._toXML() )
workflowFile.close()
- jdl = job._toJDL()
+ jdl = oJob._toJDL()
res = self.submissionClient.submitJob( jdl )
os.remove( "jobDescription.xml" )
return res
View
39 TransformationSystem/Client/test/test_Client.py
@@ -6,6 +6,12 @@
from mock import Mock
from DIRAC.TransformationSystem.Client.TaskManager import TaskBase, WorkflowTasks, RequestTasks
+def getSitesForSE( ses ):
+ if ses == 'pippo':
+ return {'OK':True, 'Value':['Site2', 'Site3']}
+ else:
+ return {'OK':True, 'Value':['Site3']}
+
#############################################################################
class ClientsTestCase( unittest.TestCase ):
@@ -26,11 +32,15 @@ def setUp( self ):
self.wfTasks = WorkflowTasks( transClient = self.mockTransClient,
submissionClient = self.WMSClientMock,
jobMonitoringClient = self.jobMonitoringClient,
- outputDataModule = "mock" )
+ outputDataModule = "mock",
+ jobClass = self.jobMock )
self.requestTasks = RequestTasks( transClient = self.mockTransClient,
requestClient = self.mockRequestClient
)
+ def tearDown( self ):
+ pass
+
#############################################################################
class TaskBaseSuccess( ClientsTestCase ):
@@ -48,9 +58,32 @@ def test_prepareTranformationTasks( self ):
2:{'TransformationID':1, 'a2':'aa2', 'b2':'bb2', 'InputData':['a1', 'a2']},
3:{'TransformationID':2, 'a3':'aa3', 'b3':'bb3'},
}
- res = self.wfTasks.prepareTransformationTasks( Mock(), taskDict, 'test_user', 'test_group', self.jobMock )
+ res = self.wfTasks.prepareTransformationTasks( Mock(), taskDict, 'test_user', 'test_group' )
+
+ self.assertEqual( res, {'OK': True,
+ 'Value': {1: {'a1': 'aa1', 'TaskObject': '', 'TransformationID': 1, 'b1': 'bb1', 'Site': 'MySite'},
+ 2: {'TaskObject': '', 'a2': 'aa2', 'TransformationID': 1, 'InputData': ['a1', 'a2'], 'b2': 'bb2'},
+ 3: {'TaskObject': '', 'a3': 'aa3', 'TransformationID': 2, 'b3': 'bb3'}}} )
+
+ def test__handleDestination( self ):
+ res = self.wfTasks._handleDestination( {'Site':'', 'TargetSE':''} )
+ self.assertEqual( res, ['ANY'] )
+ res = self.wfTasks._handleDestination( {'Site':'ANY', 'TargetSE':''} )
+ self.assertEqual( res, ['ANY'] )
+ res = self.wfTasks._handleDestination( {'TargetSE':'Unknown'} )
+ self.assertEqual( res, ['ANY'] )
+ res = self.wfTasks._handleDestination( {'Site':'Site1, Site2', 'TargetSE':''} )
+ self.assertEqual( res, ['Site1', 'Site2'] )
+ res = self.wfTasks._handleDestination( {'Site':'Site1, Site2', 'TargetSE':'pippo'}, getSitesForSE )
+ self.assertEqual( res, ['Site2'] )
+ res = self.wfTasks._handleDestination( {'Site':'Site1, Site2', 'TargetSE':'pippo, pluto'}, getSitesForSE )
+ self.assertEqual( res, [] )
+ res = self.wfTasks._handleDestination( {'Site':'Site1, Site2, Site3', 'TargetSE':'pippo, pluto'}, getSitesForSE )
+ self.assertEqual( res, ['Site3'] )
+ res = self.wfTasks._handleDestination( {'Site':'ANY', 'TargetSE':'pippo, pluto'}, getSitesForSE )
+ self.assertEqual( res, ['Site3'] )
+
- print res
#############################################################################

0 comments on commit db3e074

Please sign in to comment.
Something went wrong with that request. Please try again.