Skip to content

Commit

Permalink
Implements watch_item and replaces the watched_items, to use the pare…
Browse files Browse the repository at this point in the history
…nt logic.
  • Loading branch information
pcm32 committed Apr 18, 2016
1 parent 820913c commit ef70f0e
Showing 1 changed file with 34 additions and 40 deletions.
74 changes: 34 additions & 40 deletions lib/galaxy/jobs/runners/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,47 +203,41 @@ def __get_k8s_container_name(self, job_wrapper):
return job_wrapper.job_destination.id


def check_watched_items( self ):
"""
Called by the monitor thread to look at each watched job and deal
with state changes.
"""
new_watched = []

job_states = self.__get_job_states()
def check_watched_item(self, job_state):
"""Checks the state of a job already submitted on k8s"""
jobs = Job.objects(self._pykube_api).filter(selector="app="+job_state.job_id)
if len(jobs.response['items']) == 1:
job = Job(self._pykube_api, jobs.response['items'][0])
succeeded = 0
active = 0
failed = 0
if 'succeeded' in job.obj['status']:
succeeded = job.obj['status']['succeeded']
if 'active' in job.obj['status']:
active = job.obj['status']['active']
if 'failed' in job.obj['status']:
failed = job.obj['status']['failed']

# This assumes jobs dependent on a single pod.
if succeeded > 0:
self.mark_as_finished(job_state)
elif active > 0 or succeeded + active + failed == 0:
self.mark_as_queued(job_state)
elif failed > job_state.job_destination.params['max_pod_retrials']:
self.mark_as_failed(job_state)
job.scale(replicas=0)

elif len(jobs.response['items']) == 0:
# there is no job responding to this job_id, it is either lost or something happened.
self.mark_as_failed(job_state)
return job_state
else:
# TODO: possibly some warning or message should be provided here to stderr
# TODO: of the job
# there is more than one job associated to the expected unique job id used as selector.
self.mark_as_failed(job_state)
return job_state

for ajs in self.watched:
external_job_id = ajs.job_id
id_tag = ajs.job_wrapper.get_id_tag()
old_state = ajs.old_state
state = job_states.get(external_job_id, None)
if state is None:
if ajs.job_wrapper.get_state() == model.Job.states.DELETED:
continue
log.debug("(%s/%s) job not found in batch state check" % ( id_tag, external_job_id ) )
shell_params, job_params = self.parse_destination_params(ajs.job_destination.params)
shell, job_interface = self.get_cli_plugins(shell_params, job_params)
cmd_out = shell.execute(job_interface.get_single_status(external_job_id))
state = job_interface.parse_single_status(cmd_out.stdout, external_job_id)
if state == model.Job.states.OK:
log.debug('(%s/%s) job execution finished, running job wrapper finish method' % ( id_tag, external_job_id ) )
self.work_queue.put( ( self.finish_job, ajs ) )
continue
else:
log.warning('(%s/%s) job not found in batch state check, but found in individual state check' % ( id_tag, external_job_id ) )
if state != old_state:
ajs.job_wrapper.change_state( state )
else:
if state != old_state:
log.debug("(%s/%s) state change: %s" % ( id_tag, external_job_id, state ) )
ajs.job_wrapper.change_state( state )
if state == model.Job.states.RUNNING and not ajs.running:
ajs.running = True
ajs.job_wrapper.change_state( model.Job.states.RUNNING )
ajs.old_state = state
new_watched.append( ajs )
# Replace the watch list with the updated version
self.watched = new_watched

def __get_job_states(self):
job_destinations = {}
Expand Down

0 comments on commit ef70f0e

Please sign in to comment.