Skip to content

Commit

Permalink
Merging in development branch
Browse files Browse the repository at this point in the history
  • Loading branch information
benedictpaten committed Jun 28, 2013
1 parent 4892040 commit 76a4328
Show file tree
Hide file tree
Showing 32 changed files with 875 additions and 1,051 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
binPath = ./bin
files:=jobTreeRun jobTreeStatus jobTreeKill jobTreeSlave jobTreeStats multijob jobTreeTest_Dependencies.py scriptTreeTest_Wrapper.py scriptTreeTest_Wrapper2.py scriptTreeTest_Sort.py
files:=jobTreeRun jobTreeStatus jobTreeKill jobTreeStats multijob jobTreeTest_Dependencies.py scriptTreeTest_Wrapper.py scriptTreeTest_Wrapper2.py scriptTreeTest_Sort.py

.PHONY: all test clean

Expand All @@ -17,13 +17,13 @@ ${binPath}/multijob : batchSystems/multijob.py
mv $@.tmp $@
chmod +x $@

${binPath}/jobTreeTest_%.py : test/jobTree/jobTreeTest_%.py
${binPath}/jobTreeTest_%.py : test/jobTreeTest_%.py
mkdir -p $(dir $@)
cp $< $@.tmp
mv $@.tmp $@
chmod +x $@

${binPath}/scriptTreeTest_%.py : test/scriptTree/scriptTreeTest_%.py
${binPath}/scriptTreeTest_%.py : test/scriptTreeTest_%.py
mkdir -p $(dir $@)
cp $< $@.tmp
mv $@.tmp $@
Expand All @@ -40,4 +40,4 @@ clean :

test :
#Running python allTests.py
PYTHONPATH=.. PATH=../../bin:$$PATH python allTests.py --testLength=SHORT --logDebug
PYTHONPATH=.. PATH=../../bin:$$PATH python allTests.py --testLength=SHORT
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ jobTree replicates the environment in which jobTree or scriptTree is invoked and

Ideally when issuing children the parent job could just go to sleep on the cluster. But unless it frees the machine it's sleeping on, then the cluster soon jams up with sleeping jobs. This design is a pragmatic way of designing simple parallel code. It isn't heavy duty, it isn't map-reduce, but it has it's niche.

* _What do you mean 'crash only' software?_
* _How robust is jobTree to failures of nodes and/or the master?_

This is just a fancy way of saying that jobTree checkpoints its state on disk, so that it or the job manager can be wiped out and restarted. There is some gnarly test code to show how this works, it will keep crashing everything, at random points, but eventually everything will complete okay. As a consumer you needn't worry about any of this, but your child jobs must be atomic (as with all batch systems), and must follow the convention regarding input files.
JobTree checkpoints its state on disk, so that it or the job manager can be wiped out and restarted. There is some gnarly test code to show how this works, it will keep crashing everything, at random points, but eventually everything will complete okay. As a user you needn't worry about any of this, but your child jobs must be atomic (as with all batch systems), and must follow the convention regarding input files.

* _How scaleable?_

Probably not very, but it could be. You should be safe to have a 1000 concurrent jobs running, depending on your file-system and batch system.
Probably not very. You should be safe to have a 1000 concurrent jobs running, depending on your file-system and batch system.

* _Can you support my XYZ batch system?_

Expand Down
8 changes: 4 additions & 4 deletions allTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
#THE SOFTWARE.

import unittest
from jobTree.test.jobTree.jobTreeTest import TestCase as jobTreeTest
from jobTree.test.jobTree.jobTest import TestCase as jobTest
from jobTree.test.scriptTree.scriptTreeTest import TestCase as scriptTreeTest
from jobTree.test.jobTreeTest import TestCase as jobTreeTest
from jobTree.test.jobTest import TestCase as jobTest
from jobTree.test.scriptTreeTest import TestCase as scriptTreeTest
from jobTree.test.sort.sortTest import TestCase as sortTest
from jobTree.test.utilities.statsTest import TestCase as statsTest
from jobTree.test.statsTest import TestCase as statsTest
#import jobTree.test.jobTreeParasolCrashTest.TestCase as jobTreeParasolCrashTest

from sonLib.bioio import parseSuiteTestOptions
Expand Down
30 changes: 29 additions & 1 deletion batchSystems/abstractBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,31 @@
#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
#THE SOFTWARE.

from Queue import Empty

class AbstractBatchSystem:
"""An abstract (as far as python currently allows) base class
to represent the interface the batch system must provide to the jobTree.
"""
def __init__(self, config):
def __init__(self, config, maxCpus, maxMemory):
"""This method must be called.
The config object is setup by the jobTreeSetup script and
has configuration parameters for the job tree. You can add stuff
to that script to get parameters for your batch system.
"""
self.config = config
self.maxCpus = maxCpus
self.maxMemory = maxMemory

def checkResourceRequest(self, memory, cpu):
"""Check resource request is not greater than that available.
"""
assert memory != None
assert cpu != None
if cpu > self.maxCpus:
raise RuntimeError("Requesting more cpus than available. Requested: %s, Available: %s" % (cpu, self.maxCpus))
if memory > self.maxMemory:
raise RuntimeError("Requesting more memory than available. Requested: %s, Available: %s" % (memory, self.maxMemory))

def issueJob(self, command, memory, cpu):
"""Issues the following command returning a unique jobID. Command
Expand Down Expand Up @@ -70,6 +84,20 @@ def getRescueJobFrequency(self):
missing/overlong jobs.
"""
raise RuntimeError("Abstract method")


def getFromQueueSafely(self, queue, maxWait):
"""Returns an object from the given queue, avoiding a nasty bug in some versions of the multiprocessing queue python
"""
if maxWait <= 0:
try:
return queue.get(block=False)
except Empty:
return None
try:
return queue.get(timeout=maxWait)
except Empty:
return None

def main():
pass
Expand Down
44 changes: 33 additions & 11 deletions batchSystems/combinedBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,58 @@ class CombinedBatchSystem(AbstractBatchSystem):
"""Takes two batch systems and a choice function to decide which to issue to.
"""
def __init__(self, config, batchSystem1, batchSystem2, batchSystemChoiceFn):
AbstractBatchSystem.__init__(self, config) #Call the parent constructor
AbstractBatchSystem.__init__(self, config, 0, 0) #Call the parent constructor
self.batchSystem1 = batchSystem1
self.batchSystem2 = batchSystem2
self.batchSystemChoiceFn = batchSystemChoiceFn

def _jobIDForBatchSystem1(self, id):
return (1, id)

def _isJobIDForBatchSystem1(self, id):
return id[0] == 1

def _jobIDForBatchSystem2(self, id):
return (2, id)

def _isJobIDForBatchSystem2(self, id):
return id[0] == 2

def _strip(self, id):
return id[1]

def issueJob(self, command, memory, cpu):
if self.batchSystemChoiceFn(command, memory, cpu):
return self.batchSystem1.issueJob(command, memory, cpu)
return self._jobIDForBatchSystem1(self.batchSystem1.issueJob(command, memory, cpu))
else:
return self.batchSystem2.issueJob(command, memory, cpu)
return self._jobIDForBatchSystem2(self.batchSystem2.issueJob(command, memory, cpu))

def killJobs(self, jobIDs):
self.batchSystem1.killJobs(jobIDs)
self.batchSystem2.killJobs(jobIDs)
l, l2 = [], []
for jobID in jobIDs:
if self._isJobIDForBatchSystem1(jobID):
l.append(self._strip(jobID))
else:
assert self._isJobIDForBatchSystem2(jobID)
l2.append(self._strip(jobID))
self.batchSystem1.killJobs(l)
self.batchSystem2.killJobs(l2)

def getIssuedJobIDs(self):
return self.batchSystem1.getIssuedJobIDs() + self.batchSystem2.getIssuedJobIDs()
return [ self._jobIDForBatchSystem1(id) for id in self.batchSystem1.getIssuedJobIDs() ] + [ self._jobIDForBatchSystem2(id) for id in self.batchSystem2.getIssuedJobIDs() ]

def getRunningJobIDs(self):
return self.batchSystem1.getRunningJobIDs() + self.batchSystem2.getRunningJobIDs()
return [ self._jobIDForBatchSystem1(id) for id in self.batchSystem1.getRunningJobIDs() ] + [ self._jobIDForBatchSystem2(id) for id in self.batchSystem2.getRunningJobIDs() ]

def getUpdatedJob(self, maxWait):
endTime = time.time() + maxWait
while 1:
updatedJob = self.batchSystem2.getUpdatedJob(0)
updatedJob = self.batchSystem2.getUpdatedJob(0) #Small positive values of wait seem to
if updatedJob != None:
return updatedJob
return (self._jobIDForBatchSystem2(updatedJob[0]), updatedJob[1])
updatedJob = self.batchSystem1.getUpdatedJob(0)
if updatedJob != None:
return updatedJob
return (self._jobIDForBatchSystem1(updatedJob[0]), updatedJob[1])
remaining = endTime - time.time()
if remaining <= 0:
return None
Expand Down
20 changes: 9 additions & 11 deletions batchSystems/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ class GridengineBatchSystem(AbstractBatchSystem):
"""The interface for gridengine.
"""

def __init__(self, config):
AbstractBatchSystem.__init__(self, config) #Call the parent constructor
def __init__(self, config, maxCpus, maxMemory):
AbstractBatchSystem.__init__(self, config, maxCpus, maxMemory) #Call the parent constructor
self.gridengineResultsFile = getParasolResultsFileName(config.attrib["job_tree"])
#Reset the job queue and results (initially, we do this again once we've killed the jobs)
self.gridengineResultsFileHandle = open(self.gridengineResultsFile, 'w')
Expand All @@ -156,6 +156,7 @@ def __des__(self):
self.gridengineResultsFileHandle.close() #Close the results file, cos were done.

def issueJob(self, command, memory, cpu):
self.checkResourceRequest(memory, cpu)
jobID = self.nextJobID
self.nextJobID += 1

Expand Down Expand Up @@ -217,15 +218,12 @@ def getRunningJobIDs(self):
return times

def getUpdatedJob(self, maxWait):
i = None
try:
sgeJobID, retcode = self.updatedJobsQueue.get(timeout=maxWait)
self.updatedJobsQueue.task_done()
i = (self.jobIDs[sgeJobID], retcode)
self.currentjobs -= set([self.jobIDs[sgeJobID]])
except Empty:
pass

i = self.getFromQueueSafely(self.outputQueue, maxWait)
if i == None:
return None
sgeJobID, retcode = i
self.updatedJobsQueue.task_done()
self.currentjobs -= set([self.jobIDs[sgeJobID]])
return i

def getWaitDuration(self):
Expand Down
19 changes: 8 additions & 11 deletions batchSystems/parasol.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ def getUpdatedJob(parasolResultsFile, outputQueue1, outputQueue2):
class ParasolBatchSystem(AbstractBatchSystem):
"""The interface for Parasol.
"""
def __init__(self, config):
AbstractBatchSystem.__init__(self, config) #Call the parent constructor
def __init__(self, config, maxCpus, maxMemory):
AbstractBatchSystem.__init__(self, config, maxCpus, maxMemory) #Call the parent constructor
if maxMemory != sys.maxint:
logger.critical("A max memory has been specified for the parasol batch system class of %i, but currently this batchsystem interface does not support such limiting" % maxMemory)
#Keep the name of the results file for the pstat2 command..
self.parasolCommand = config.attrib["parasol_command"]
self.parasolResultsFile = getParasolResultsFileName(config.attrib["job_tree"])
Expand Down Expand Up @@ -122,14 +124,12 @@ def __init__(self, config):
worker.daemon = True
worker.start()
self.usedCpus = 0
self.maxCpus = int(config.attrib["max_jobs"])
self.jobIDsToCpu = {}

def issueJob(self, command, memory, cpu):
"""Issues parasol with job commands.
"""
assert memory != None
assert cpu != None
self.checkResourceRequest(memory, cpu)
pattern = re.compile("your job ([0-9]+).*")
parasolCommand = "%s -verbose -ram=%i -cpu=%i -results=%s add job '%s'" % (self.parasolCommand, memory, cpu, self.parasolResultsFile, command)
#Deal with the cpus
Expand Down Expand Up @@ -209,13 +209,10 @@ def getRunningJobIDs(self):
return runningJobs

def getUpdatedJob(self, maxWait):
i = None
try:
i = self.outputQueue2.get(timeout=maxWait)
jobID = self.getFromQueueSafely(self.outputQueue2, maxWait)
if jobID != None:
self.outputQueue2.task_done()
except Empty:
pass
return i
return jobID

def getRescueJobFrequency(self):
"""Parasol leaks jobs, but rescuing jobs involves calls to parasol list jobs and pstat2,
Expand Down
45 changes: 21 additions & 24 deletions batchSystems/singleMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,19 @@ class SingleMachineBatchSystem(AbstractBatchSystem):
"""The interface for running jobs on a single machine, runs all the jobs you
give it as they come in, but in parallel.
"""
def __init__(self, config, workerFn=worker):
AbstractBatchSystem.__init__(self, config) #Call the parent constructor
def __init__(self, config, maxCpus, maxMemory, workerFn=worker):
AbstractBatchSystem.__init__(self, config, maxCpus, maxMemory) #Call the parent constructor
self.jobIndex = 0
self.jobs = {}
self.maxThreads = int(config.attrib["max_threads"])
self.maxCpus = int(config.attrib["max_jobs"])
logger.info("Setting up the thread pool with %i threads given the max threads %i and the max cpus %i" % (min(self.maxThreads, self.maxCpus), self.maxThreads, self.maxCpus))
self.maxThreads = min(self.maxThreads, self.maxCpus)
self.cpusPerThread = float(self.maxCpus) / float(self.maxThreads)
self.memoryPerThread = self.maxThreads + float(self.maxMemory) / float(self.maxThreads) #Add the maxThreads to avoid losing memory by rounding.
assert self.cpusPerThread >= 1
assert self.maxThreads >= 1
assert self.maxMemory >= 1
assert self.memoryPerThread >= 1
self.inputQueue = Queue()
self.outputQueue = Queue()
self.workerFn = workerFn
Expand All @@ -82,19 +84,16 @@ def __init__(self, config, workerFn=worker):
def issueJob(self, command, memory, cpu):
"""Runs the jobs right away.
"""
assert memory != None
assert cpu != None
if cpu > self.maxCpus:
raise RuntimeError("Requesting more cpus than available. Requested: %s, Available: %s" % (cpu, self.maxCpus))
assert(cpu <= self.maxCpus)
self.checkResourceRequest(memory, cpu)
logger.debug("Issuing the command: %s with memory: %i, cpu: %i" % (command, memory, cpu))
self.jobs[self.jobIndex] = command
i = self.jobIndex
#Deal with the max cpus calculation
k = 0
while cpu > self.cpusPerThread:
while cpu > self.cpusPerThread or memory > self.memoryPerThread:
self.inputQueue.put(None)
cpu -= self.cpusPerThread
memory -= self.memoryPerThread
k += 1
assert k < self.maxThreads
self.inputQueue.put((command, self.jobIndex, k))
Expand All @@ -119,26 +118,24 @@ def getRunningJobIDs(self):
def getUpdatedJob(self, maxWait):
"""Returns a map of the run jobs and the return value of their processes.
"""
i = None
try:
jobID, exitValue, threadsToStart = self.outputQueue.get(timeout=maxWait)
i = (jobID, exitValue)
self.jobs.pop(jobID)
logger.debug("Ran jobID: %s with exit value: %i" % (jobID, exitValue))
for j in xrange(threadsToStart):
worker = Process(target=self.workerFn, args=(self.inputQueue, self.outputQueue))
worker.daemon = True
worker.start()
self.outputQueue.task_done()
except Empty:
pass
return i
i = self.getFromQueueSafely(self.outputQueue, maxWait)
if i == None:
return None
jobID, exitValue, threadsToStart = i
self.jobs.pop(jobID)
logger.debug("Ran jobID: %s with exit value: %i" % (jobID, exitValue))
for j in xrange(threadsToStart):
worker = Process(target=self.workerFn, args=(self.inputQueue, self.outputQueue))
worker.daemon = True
worker.start()
self.outputQueue.task_done()
return (jobID, exitValue)

def getRescueJobFrequency(self):
"""This should not really occur, wihtout an error. To exercise the
system we allow it every 90 minutes.
"""
return 5600
return 5400

def badWorker(inputQueue, outputQueue):
"""This is used to test what happens if we fail and restart jobs
Expand Down
32 changes: 0 additions & 32 deletions convertImports.py

This file was deleted.

Loading

0 comments on commit 76a4328

Please sign in to comment.