diff --git a/lib/galaxy/app.py b/lib/galaxy/app.py index 65e26c77357f..8b27caf7e5f1 100644 --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -51,19 +51,18 @@ def __init__(self, **kwargs): # configured. The handler added here gets dumped and replaced with # an appropriately configured logger in configure_logging below. logging.basicConfig(level=logging.DEBUG) - log.debug("python path is: %s", ", ".join(sys.path)) self.name = 'galaxy' self.startup_timer = ExecutionTimer() self.new_installation = False - self.application_stack = application_stack_instance(app=self) - # A lot of postfork initialization depends on the server name, ensure it is set immediately after forking before other postfork functions - self.application_stack.register_postfork_function(self.application_stack.set_postfork_server_name, self) - self.application_stack.register_postfork_function(self.application_stack.start) # Read config file and check for errors self.config = config.Configuration(**kwargs) self.config.check() config.configure_logging(self.config) + log.debug("python path is: %s", ", ".join( sys.path )) self.configure_fluent_log() + # A lot of postfork initialization depends on the server name, ensure it is set immediately after forking before other postfork functions + self.application_stack = application_stack_instance(app=self) + self.application_stack.register_postfork_function(self.application_stack.set_postfork_server_name, self) self.config.reload_sanitize_whitelist(explicit='sanitize_whitelist_file' in kwargs) self.amqp_internal_connection_obj = galaxy.queues.connection_from_config(self.config) # control_worker *can* be initialized with a queue, but here we don't @@ -211,6 +210,9 @@ def postfork_sentry_client(): handlers[signal.SIGUSR1] = self.heartbeat.dump_signal_handler self._configure_signal_handlers(handlers) + # Start web stack message handling + self.application_stack.register_postfork_function(self.application_stack.start) + self.model.engine.dispose() self.server_starttime = int(time.time()) # used for cachebusting # When running the application without a web stack, this signals the application loop to break and call the shutdown method diff --git a/lib/galaxy/jobs/manager.py b/lib/galaxy/jobs/manager.py index 43a50d27ad48..36117ec49cb1 100644 --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -23,6 +23,8 @@ class JobManager(object): def __init__(self, app): self.app = app + self.job_handler = NoopHandler() + self.job_stop_queue = NoopQueue() if app.application_stack.setup_jobs_with_msg: # defer setup to postfork log.debug('######### registering manager init function') @@ -40,9 +42,6 @@ def init(self): # not a handler, but notification is via the application stack self.job_handler = MessageJobHandler( self.app ) self.job_stop_queue = NoopQueue() - else: - self.job_handler = NoopHandler() - self.job_stop_queue = NoopQueue self.job_queue = self.job_handler.job_queue self.job_lock = False diff --git a/lib/galaxy/web/stack/__init__.py b/lib/galaxy/web/stack/__init__.py index 9dc8f18deda0..12029c9dabab 100644 --- a/lib/galaxy/web/stack/__init__.py +++ b/lib/galaxy/web/stack/__init__.py @@ -51,7 +51,7 @@ def register_func(self, func, name=None): def deregister_func(self, func=None, name=None): name = self.__func_name(func, name) - del self.__func[name] + del self.__funcs[name] @property def handler_count(self): @@ -150,7 +150,7 @@ def register_message_handler(self, func, name=None): def deregister_message_handler(self, func=None, name=None): self.dispatcher.deregister_func(func, name) - self.transport.shutdown_if_needed() + self.transport.stop_if_unneeded() def send_message(self, dest, msg=None, target=None, params=None, **kwargs): assert msg is not None or target is not None, "Either 'msg' or 'target' parameters must be set" @@ -214,13 +214,13 @@ def _handle_signal(self, signum, frame): elif signum == signal.SIGINT: log.info('######## Mule %s received SIGINT, shutting down immediately', uwsgi.mule_id()) self.shutdown() - ## This terminates the application loop in the handler entrypoint - #self.app.exit = True + # This terminates the application loop in the handler entrypoint + self.app.exit = True elif signum == signal.SIGHUP: log.debug('######## Mule %s received SIGHUP, restarting', uwsgi.mule_id()) - #self.shutdown() - ## uWSGI master will restart us - #self.app.exit = True + self.shutdown() + # uWSGI master will restart us + self.app.exit = True # FIXME: these are copied into UWSGIFarmMessageTransport @property @@ -265,11 +265,11 @@ def shutdown(self): log.debug('######## STACK SHUTDOWN CALLED') super(UWSGIApplicationStack, self).shutdown() # FIXME: blech - #if not self._is_mule: - # for farm in self._farms: - # for mule in self._mules: - # # This will possibly generate more than we need, but that's ok - # self.transport.send_message(self.shutdown_msg, farm) + if not self._is_mule: + for farm in self._farms: + for mule in self._mules: + # This will possibly generate more than we need, but that's ok + self.transport.send_message(self.shutdown_msg, farm) class PasteApplicationStack(ApplicationStack): @@ -317,8 +317,9 @@ def process_in_pool(pool_name): def _do_uwsgi_postfork(): import os log.debug('######## postfork called, pid %s mule %s functions are: %s' % (os.getpid(), uwsgi.mule_id(), UWSGIApplicationStack.postfork_functions)) - #if uwsgi.mule_id() > 0: - # # mules will inherit the postfork function list and call them immediately upon fork, but should not do that - # UWSGIApplicationStack.postfork_functions = [] + if uwsgi.mule_id() > 0: + # mules will inherit the postfork function list and call them immediately upon fork, but should not do that + UWSGIApplicationStack.postfork_functions = [] + log.debug('######## postfork called, pid %s mule %s NOW functions are: %s' % (os.getpid(), uwsgi.mule_id(), UWSGIApplicationStack.postfork_functions)) for f, args, kwargs in [t for t in UWSGIApplicationStack.postfork_functions]: f(*args, **kwargs) diff --git a/lib/galaxy/web/stack/transport.py b/lib/galaxy/web/stack/transport.py index b93edfdb8afd..12442f9bd8ae 100644 --- a/lib/galaxy/web/stack/transport.py +++ b/lib/galaxy/web/stack/transport.py @@ -68,7 +68,7 @@ def shutdown(self): self.running = False if self.dispatcher_thread.is_alive(): # FIXME - self.send_message(self.shutdown_msg, 'job-handlers') + #self.send_message(self.shutdown_msg, 'job-handlers') self.dispatcher_thread.join() log.debug('######## Joined dispatcher thread') diff --git a/scripts/galaxy-main b/scripts/galaxy-main index d3b3aece320b..629ed9d489cc 100755 --- a/scripts/galaxy-main +++ b/scripts/galaxy-main @@ -219,6 +219,7 @@ class GalaxyConfigBuilder(object): # Galaxy will attempt to setup logging if loggers is not present in # ini config file - this handles that loggers block however if present # (the way paste normally would) + from galaxy.web.stack import application_stack_log_filter if not self.ini_path: return raw_config = configparser.ConfigParser() @@ -229,6 +230,9 @@ class GalaxyConfigBuilder(object): config_file, dict(__file__=config_file, here=os.path.dirname(config_file)) ) + root = logging.getLogger() + for h in root.handlers: + h.addFilter(application_stack_log_filter()()) def main(): diff --git a/scripts/get_uwsgi_args.py b/scripts/get_uwsgi_args.py index e19e63eae9ae..cf9bdd019f29 100644 --- a/scripts/get_uwsgi_args.py +++ b/scripts/get_uwsgi_args.py @@ -15,7 +15,7 @@ DESCRIPTION = "Script to determine uWSGI command line arguments." -COMMAND_TEMPLATE = '{virtualenv}--ini-paste {galaxy_ini}{mule}{farm}' +COMMAND_TEMPLATE = '{virtualenv}--ini-paste {galaxy_ini} --paste-logger --die-on-term --enable-threads{mule}{farm}' def _get_uwsgi_args(args, kwargs):