Skip to content

Commit

Permalink
Modify job runner work thread shutdown behavior from "wait at least
Browse files Browse the repository at this point in the history
this long per thread" to "wait at least (but pretty close to) this long
for all threads"
  • Loading branch information
natefoo committed Oct 25, 2018
1 parent f7fa5a9 commit 93b93a7
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import os
import string
import subprocess
import sys
import threading
import time
import traceback

from six.moves.queue import (
Empty,
Expand Down Expand Up @@ -85,10 +87,21 @@ def _init_worker_threads(self):
log.debug('Starting %s %s workers' % (self.nworkers, self.runner_name))
for i in range(self.nworkers):
worker = threading.Thread(name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next)
worker.setDaemon(True)
worker.daemon = True
self.app.application_stack.register_postfork_function(worker.start)
self.work_threads.append(worker)

def _alive_worker_threads(self, cycle=False):
# yield endlessly as long as there are alive threads if cycle is True
alive = True
while alive:
alive = False
for thread in self.work_threads:
if thread.is_alive():
if cycle:
alive = True
yield thread

def run_next(self):
"""Run the next item in the work queue (a job waiting to run)
"""
Expand Down Expand Up @@ -129,22 +142,36 @@ def mark_as_queued(self, job_wrapper):
def shutdown(self):
"""Attempts to gracefully shut down the worker threads
"""
log.info("%s: Sending stop signal to %s worker threads" % (self.runner_name, len(self.work_threads)))
log.info("%s: Sending stop signal to %s job worker threads", self.runner_name, len(self.work_threads))
for i in range(len(self.work_threads)):
self.work_queue.put((STOP_SIGNAL, None))

join_timeout = self.app.config.monitor_thread_join_timeout
if join_timeout > 0:
exception = None
for thread in self.work_threads:
log.info("Waiting up to %d seconds for job worker threads to shutdown...", join_timeout)
start = time.time()
# NOTE: threads that have already joined by now are not going to be logged
for thread in self._alive_worker_threads(cycle=True):
if time.time() > (start + join_timeout):
break
try:
thread.join(join_timeout)
except Exception as e:
exception = e
log.exception("Faild to shutdown worker thread")
thread.join(2)
except Exception:
log.exception("Caught exception attempting to shutdown job worker thread %s:", thread.name)
if not thread.is_alive():
log.debug("Job worker thread terminated: %s", thread.name)
else:
log.info("All job worker threads shutdown cleanly")
return

if exception:
raise exception
for thread in self._alive_worker_threads():
try:
frame = sys._current_frames()[thread.ident]
except KeyError:
# thread is now stopped
continue
log.warning("Timed out waiting for job worker thread %s to terminate, shutdown will be unclean! Thread "
"stack is:\n%s", thread.name, ''.join(traceback.format_stack(frame)))

# Most runners should override the legacy URL handler methods and destination param method
def url_to_destination(self, url):
Expand Down

0 comments on commit 93b93a7

Please sign in to comment.