Skip to content

Commit

Permalink
Dep holds updating database.
Browse files Browse the repository at this point in the history
Statefile tests and multi-chain tests pending.
  • Loading branch information
pmrich committed Mar 23, 2015
1 parent c1a42d8 commit bcc8dfd
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions src/lib/Components/cqm.py
Expand Up @@ -562,9 +562,6 @@ class Job (StateMachine):
__rc_xmlrpc = "xmlrpc"
__rc_unknown = "unknown"

all_dependencies = []
satisfied_dependencies = []

def __init__(self, spec):
self.initializing = True
seas = get_job_sm_seas(self)
Expand Down Expand Up @@ -625,10 +622,19 @@ def __init__(self, spec):
self.path = spec.get("path")
self.mode = spec.get("mode", "co")
self.envs = spec.get("envs", {})
self.force_kill_delay = spec.get("force_kill_delay",
self.force_kill_delay = spec.get("force_kill_delay",
get_cqm_config('force_kill_delay', DEFAULT_FORCE_KILL_DELAY))
self.attrs = spec.get("attrs", {})

self.all_dependencies = spec.get("all_dependencies")
if self.all_dependencies:
self.all_dependencies = str(self.all_dependencies).split(":")
logger.info("Job %s/%s: dependencies set to %s", self.jobid, self.user, ":".join(self.all_dependencies))
else:
self.all_dependencies = []
self.satisfied_dependencies = []


self.preemptable = spec.get("preemptable", False)
self.__preempts = 0
if self.preemptable:
Expand Down Expand Up @@ -697,23 +703,20 @@ def __init__(self, spec):
dbwriter.log_to_db(self.user, "user_hold", "job_prog", JobProgMsg(self))

# read passed dependencies and set dep_hold if necessary
passed_dependencies = spec.get("all_dependencies")
if passed_dependencies:
self.all_dependencies = str(passed_dependencies).split(":")
logger.info("Job %s/%s: dependencies set to %s", self.jobid, self.user, ":".join(self.all_dependencies))
else:
self.all_dependencies = []
# set dep_hold as needed:
self.update_dep_state()

self.initializing = False

# end def __init__()

def __setattr__(self, name, value):
super(Job, self).__setattr__(name, value)
if name in ('all_dependencies', 'satisfied_dependencies'):
self.update_dep_state()
#def __setattr__(self, name, value):
#super(Job, self).__setattr__(name, value)
#if name in ('all_dependencies', 'satisfied_dependencies'):
#self.update_dep_state()

def no_holds_left(self):
'''Check for whether any holds are set on a job'''
return not (self.admin_hold or
self.user_hold or
self.dep_hold or
Expand Down Expand Up @@ -2796,6 +2799,7 @@ def __set_dep_hold(self, hold_flag):
'''
if not hold_flag:
self.all_dependencies = []
self.update_dep_state()

dep_hold = property(__get_dep_hold, __set_dep_hold)

Expand Down Expand Up @@ -3673,6 +3677,7 @@ def _job_terminal_action(self, args):
for waiting_job in self.Queues.get_jobs([{'state':"dep_hold"}]):
if str(job.jobid) in waiting_job.all_dependencies:
waiting_job.satisfied_dependencies = waiting_job.satisfied_dependencies[:] + [str(job.jobid)]
waiting_job.update_dep_state()
if not waiting_job.dep_hold:
logger.info("Job %s/%s: dependencies satisfied", waiting_job.jobid, waiting_job.user)
dbwriter.log_to_db(None, "dep_hold_release", "job_prog", JobProgMsg(waiting_job))
Expand Down Expand Up @@ -3863,7 +3868,8 @@ def set_jobs(self, specs, updates, user_name=None):
message = ":".join(job.all_dependencies)
else:
message = "[]"
logger.info("Job %s/%s: dependencies set to %s", job.jobid, job.user, message)
logger.info("Job %s/%s: dependencies set to %s", job.jobid, job.user, message)
job.update_dep_state()
self.check_dep_fail()

# only do this if the new queue can accept this job
Expand Down

0 comments on commit bcc8dfd

Please sign in to comment.