Skip to content

Commit

Permalink
Abandoning unprogrammed mules (for now), fixes for programmed mules
Browse files Browse the repository at this point in the history
  • Loading branch information
natefoo committed Aug 22, 2017
1 parent a06d6cf commit ece3f24
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 25 deletions.
12 changes: 7 additions & 5 deletions lib/galaxy/app.py
Expand Up @@ -51,19 +51,18 @@ def __init__(self, **kwargs):
# configured. The handler added here gets dumped and replaced with
# an appropriately configured logger in configure_logging below.
logging.basicConfig(level=logging.DEBUG)
log.debug("python path is: %s", ", ".join(sys.path))
self.name = 'galaxy'
self.startup_timer = ExecutionTimer()
self.new_installation = False
self.application_stack = application_stack_instance(app=self)
# A lot of postfork initialization depends on the server name, ensure it is set immediately after forking before other postfork functions
self.application_stack.register_postfork_function(self.application_stack.set_postfork_server_name, self)
self.application_stack.register_postfork_function(self.application_stack.start)
# Read config file and check for errors
self.config = config.Configuration(**kwargs)
self.config.check()
config.configure_logging(self.config)
log.debug("python path is: %s", ", ".join( sys.path ))
self.configure_fluent_log()
# A lot of postfork initialization depends on the server name, ensure it is set immediately after forking before other postfork functions
self.application_stack = application_stack_instance(app=self)
self.application_stack.register_postfork_function(self.application_stack.set_postfork_server_name, self)
self.config.reload_sanitize_whitelist(explicit='sanitize_whitelist_file' in kwargs)
self.amqp_internal_connection_obj = galaxy.queues.connection_from_config(self.config)
# control_worker *can* be initialized with a queue, but here we don't
Expand Down Expand Up @@ -211,6 +210,9 @@ def postfork_sentry_client():
handlers[signal.SIGUSR1] = self.heartbeat.dump_signal_handler
self._configure_signal_handlers(handlers)

# Start web stack message handling
self.application_stack.register_postfork_function(self.application_stack.start)

self.model.engine.dispose()
self.server_starttime = int(time.time()) # used for cachebusting
# When running the application without a web stack, this signals the application loop to break and call the shutdown method
Expand Down
5 changes: 2 additions & 3 deletions lib/galaxy/jobs/manager.py
Expand Up @@ -23,6 +23,8 @@ class JobManager(object):

def __init__(self, app):
self.app = app
self.job_handler = NoopHandler()
self.job_stop_queue = NoopQueue()
if app.application_stack.setup_jobs_with_msg:
# defer setup to postfork
log.debug('######### registering manager init function')
Expand All @@ -40,9 +42,6 @@ def init(self):
# not a handler, but notification is via the application stack
self.job_handler = MessageJobHandler( self.app )
self.job_stop_queue = NoopQueue()
else:
self.job_handler = NoopHandler()
self.job_stop_queue = NoopQueue
self.job_queue = self.job_handler.job_queue
self.job_lock = False

Expand Down
31 changes: 16 additions & 15 deletions lib/galaxy/web/stack/__init__.py
Expand Up @@ -51,7 +51,7 @@ def register_func(self, func, name=None):

def deregister_func(self, func=None, name=None):
name = self.__func_name(func, name)
del self.__func[name]
del self.__funcs[name]

@property
def handler_count(self):
Expand Down Expand Up @@ -150,7 +150,7 @@ def register_message_handler(self, func, name=None):

def deregister_message_handler(self, func=None, name=None):
self.dispatcher.deregister_func(func, name)
self.transport.shutdown_if_needed()
self.transport.stop_if_unneeded()

def send_message(self, dest, msg=None, target=None, params=None, **kwargs):
assert msg is not None or target is not None, "Either 'msg' or 'target' parameters must be set"
Expand Down Expand Up @@ -214,13 +214,13 @@ def _handle_signal(self, signum, frame):
elif signum == signal.SIGINT:
log.info('######## Mule %s received SIGINT, shutting down immediately', uwsgi.mule_id())
self.shutdown()
## This terminates the application loop in the handler entrypoint
#self.app.exit = True
# This terminates the application loop in the handler entrypoint
self.app.exit = True
elif signum == signal.SIGHUP:
log.debug('######## Mule %s received SIGHUP, restarting', uwsgi.mule_id())
#self.shutdown()
## uWSGI master will restart us
#self.app.exit = True
self.shutdown()
# uWSGI master will restart us
self.app.exit = True

# FIXME: these are copied into UWSGIFarmMessageTransport
@property
Expand Down Expand Up @@ -265,11 +265,11 @@ def shutdown(self):
log.debug('######## STACK SHUTDOWN CALLED')
super(UWSGIApplicationStack, self).shutdown()
# FIXME: blech
#if not self._is_mule:
# for farm in self._farms:
# for mule in self._mules:
# # This will possibly generate more than we need, but that's ok
# self.transport.send_message(self.shutdown_msg, farm)
if not self._is_mule:
for farm in self._farms:
for mule in self._mules:
# This will possibly generate more than we need, but that's ok
self.transport.send_message(self.shutdown_msg, farm)


class PasteApplicationStack(ApplicationStack):
Expand Down Expand Up @@ -317,8 +317,9 @@ def process_in_pool(pool_name):
def _do_uwsgi_postfork():
import os
log.debug('######## postfork called, pid %s mule %s functions are: %s' % (os.getpid(), uwsgi.mule_id(), UWSGIApplicationStack.postfork_functions))
#if uwsgi.mule_id() > 0:
# # mules will inherit the postfork function list and call them immediately upon fork, but should not do that
# UWSGIApplicationStack.postfork_functions = []
if uwsgi.mule_id() > 0:
# mules will inherit the postfork function list and call them immediately upon fork, but should not do that
UWSGIApplicationStack.postfork_functions = []
log.debug('######## postfork called, pid %s mule %s NOW functions are: %s' % (os.getpid(), uwsgi.mule_id(), UWSGIApplicationStack.postfork_functions))
for f, args, kwargs in [t for t in UWSGIApplicationStack.postfork_functions]:
f(*args, **kwargs)
2 changes: 1 addition & 1 deletion lib/galaxy/web/stack/transport.py
Expand Up @@ -68,7 +68,7 @@ def shutdown(self):
self.running = False
if self.dispatcher_thread.is_alive():
# FIXME
self.send_message(self.shutdown_msg, 'job-handlers')
#self.send_message(self.shutdown_msg, 'job-handlers')
self.dispatcher_thread.join()
log.debug('######## Joined dispatcher thread')

Expand Down
4 changes: 4 additions & 0 deletions scripts/galaxy-main
Expand Up @@ -219,6 +219,7 @@ class GalaxyConfigBuilder(object):
# Galaxy will attempt to setup logging if loggers is not present in
# ini config file - this handles that loggers block however if present
# (the way paste normally would)
from galaxy.web.stack import application_stack_log_filter
if not self.ini_path:
return
raw_config = configparser.ConfigParser()
Expand All @@ -229,6 +230,9 @@ class GalaxyConfigBuilder(object):
config_file,
dict(__file__=config_file, here=os.path.dirname(config_file))
)
root = logging.getLogger()
for h in root.handlers:
h.addFilter(application_stack_log_filter()())


def main():
Expand Down
2 changes: 1 addition & 1 deletion scripts/get_uwsgi_args.py
Expand Up @@ -15,7 +15,7 @@


DESCRIPTION = "Script to determine uWSGI command line arguments."
COMMAND_TEMPLATE = '{virtualenv}--ini-paste {galaxy_ini}{mule}{farm}'
COMMAND_TEMPLATE = '{virtualenv}--ini-paste {galaxy_ini} --paste-logger --die-on-term --enable-threads{mule}{farm}'


def _get_uwsgi_args(args, kwargs):
Expand Down

0 comments on commit ece3f24

Please sign in to comment.