Skip to content

Commit

Permalink
Merge pull request #7754 from ktf/mt-rm
Browse files Browse the repository at this point in the history
Add support for multithreading to runTheMatrix
  • Loading branch information
davidlange6 committed Feb 24, 2015
2 parents bcf11c3 + c176470 commit f418ee6
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
5 changes: 3 additions & 2 deletions Configuration/PyReleaseValidation/python/MatrixInjector.py
Expand Up @@ -10,11 +10,11 @@ def performInjectionOptionTest(opt):
sys.exit(-1)
if opt.wmcontrol=='init':
#init means it'll be in test mode
opt.nThreads=0
opt.nProcs=0
if opt.wmcontrol=='test':
#means the wf were created already, and we just dryRun it.
opt.dryRun=True
if opt.wmcontrol=='submit' and opt.nThreads==0:
if opt.wmcontrol=='submit' and opt.nProcs==0:
print 'Not injecting to wmagent in -j 0 mode. Need to run the worklfows.'
sys.exit(-1)
if opt.wmcontrol=='force':
Expand Down Expand Up @@ -104,6 +104,7 @@ def __init__(self,opt,mode='init',options=''):
"unmergedLFNBase" : "/store/unmerged",
"mergedLFNBase" : "/store/relval",
"dashboardActivity" : "relval",
"Multicore" : opt.nThreads,
"Memory" : 2400,
"SizePerEvent" : 1234,
"TimePerEvent" : 20
Expand Down
5 changes: 3 additions & 2 deletions Configuration/PyReleaseValidation/python/MatrixRunner.py
Expand Up @@ -9,12 +9,13 @@

class MatrixRunner(object):

def __init__(self, wfIn=None, nThrMax=4):
def __init__(self, wfIn=None, nThrMax=4, nThreads=1):

self.workFlows = wfIn

self.threadList = []
self.maxThreads = nThrMax
self.nThreads = nThreads

#the directories in which it happened
self.runDirs={}
Expand Down Expand Up @@ -60,7 +61,7 @@ def runTests(self, opt):

print '\nPreparing to run %s %s' % (wf.numId, item)

current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports)
current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads)
self.threadList.append(current)
current.start()
if not dryRun:
Expand Down
5 changes: 4 additions & 1 deletion Configuration/PyReleaseValidation/python/WorkFlowRunner.py
Expand Up @@ -86,7 +86,7 @@ def esReportWorkflow(**kwds):
pass

class WorkFlowRunner(Thread):
def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False):
def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1):
Thread.__init__(self)
self.wf = wf

Expand All @@ -99,6 +99,7 @@ def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobRe
self.cafVeto=cafVeto
self.dasOptions=dasOptions
self.jobReport=jobReport
self.nThreads=nThreads

self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
return
Expand Down Expand Up @@ -227,6 +228,8 @@ def closeCmd(i,ID):
cmd+=' --fileout file:step%s.root '%(istep,)
if self.jobReport:
cmd += ' --suffix "-j JobReport%s.xml " ' % istep
if self.nThreads > 1:
cmd += ' --nThreads %s' % self.nThreads
cmd+=closeCmd(istep,self.wf.nameId)

esReportWorkflow(workflow=self.wf.nameId,
Expand Down
14 changes: 10 additions & 4 deletions Configuration/PyReleaseValidation/scripts/runTheMatrix.py
@@ -1,5 +1,4 @@
#!/usr/bin/env python

import sys

from Configuration.PyReleaseValidation.MatrixReader import MatrixReader
Expand Down Expand Up @@ -27,7 +26,7 @@ def runSelected(opt):
mrd.show(opt.testList,opt.extended)
if opt.testList : print 'testListected items:', opt.testList
else:
mRunnerHi = MatrixRunner(mrd.workFlows, opt.nThreads)
mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
ret = mRunnerHi.runTests(opt)

if opt.wmcontrol:
Expand Down Expand Up @@ -71,10 +70,16 @@ def runSelected(opt):
parser = optparse.OptionParser(usage)

parser.add_option('-j','--nproc',
help='number of threads. 0 Will use 4 threads, not execute anything but create the wfs',
dest='nThreads',
help='number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
dest='nProcs',
default=4
)
parser.add_option('-t','--nThreads',
help='number of threads per process to use in cmsRun.',
dest='nThreads',
default=1
)

parser.add_option('-n','--showMatrix',
help='Only show the worflows. Use --ext to show more',
dest='show',
Expand Down Expand Up @@ -244,6 +249,7 @@ def stepOrIndex(s):

if opt.useInput: opt.useInput = opt.useInput.split(',')
if opt.fromScratch: opt.fromScratch = opt.fromScratch.split(',')
if opt.nProcs: opt.nProcs=int(opt.nProcs)
if opt.nThreads: opt.nThreads=int(opt.nThreads)

if opt.wmcontrol:
Expand Down

0 comments on commit f418ee6

Please sign in to comment.