Skip to content
Permalink
Browse files

Merge pull request #198 from dannon/kombuaaaaarrgh

[15.05] Split initialization of galaxy queue worker.
  • Loading branch information...
dannon committed May 5, 2015
2 parents 77f9b3d + d96035d commit 69c59523b398fa5ee624af012f5b16595a12f369
Showing with 68 additions and 57 deletions.
  1. +6 −8 lib/galaxy/app.py
  2. +60 −46 lib/galaxy/queue_worker.py
  3. +2 −3 lib/galaxy/webapps/galaxy/buildapp.py
@@ -38,7 +38,12 @@ def __init__( self, **kwargs ):
self.config.check()
config.configure_logging( self.config )
self.configure_fluent_log()
self._amqp_internal_connection_obj = galaxy.queues.connection_from_config(self.config)

self.amqp_internal_connection_obj = galaxy.queues.connection_from_config(self.config)
# control_worker *can* be initialized with a queue, but here we don't
# want to and we'll allow postfork to bind and start it.
self.control_worker = GalaxyQueueWorker(self)

self._configure_tool_shed_registry()
self._configure_object_store( fsmon=True )
# Setup the database engine and ORM
@@ -152,13 +157,6 @@ def __init__( self, **kwargs ):
self.model.engine.dispose()
self.server_starttime = int(time.time()) # used for cachebusting

def setup_control_queue(self):
self.control_worker = GalaxyQueueWorker(self, galaxy.queues.control_queue_from_config(self.config),
galaxy.queue_worker.control_message_to_task,
self._amqp_internal_connection_obj)
self.control_worker.daemon = True
self.control_worker.start()

def shutdown( self ):
self.workflow_scheduling_manager.shutdown()
self.job_manager.shutdown()
@@ -24,52 +24,6 @@
log = logging.getLogger(__name__)


class GalaxyQueueWorker(ConsumerMixin, threading.Thread):
"""
This is a flexible worker for galaxy's queues. Each process, web or
handler, will have one of these used for dispatching so called 'control'
tasks.
"""
def __init__(self, app, queue, task_mapping, connection=None):
super(GalaxyQueueWorker, self).__init__()
log.info("Initalizing Galaxy Queue Worker on %s", util.mask_password_from_url(app.config.amqp_internal_connection))
if connection:
self.connection = connection
else:
self.connection = Connection(app.config.amqp_internal_connection)
self.app = app
# Eventually we may want different workers w/ their own queues and task
# mappings. Right now, there's only the one.
self.control_queue = queue
self.task_mapping = task_mapping
self.declare_queues = galaxy.queues.all_control_queues_for_declare(app.config)
# TODO we may want to purge the queue at the start to avoid executing
# stale 'reload_tool', etc messages. This can happen if, say, a web
# process goes down and messages get sent before it comes back up.
# Those messages will no longer be useful (in any current case)

def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.control_queue,
callbacks=[self.process_task])]

def process_task(self, body, message):
if body['task'] in self.task_mapping:
if body.get('noop', None) != self.app.config.server_name:
try:
f = self.task_mapping[body['task']]
log.info("Instance recieved '%s' task, executing now." % body['task'])
f(self.app, **body['kwargs'])
except Exception:
# this shouldn't ever throw an exception, but...
log.exception("Error running control task type: %s" % body['task'])
else:
log.warning("Recieved a malformed task message:\n%s" % body)
message.ack()

def shutdown(self):
self.should_stop = True


def send_control_task(trans, task, noop_self=False, kwargs={}):
log.info("Sending %s control task." % task)
payload = {'task': task,
@@ -127,3 +81,63 @@ def admin_job_lock(app, **kwargs):
'reload_display_application': reload_display_application,
'reload_tool_data_tables': reload_tool_data_tables,
'admin_job_lock': admin_job_lock}


class GalaxyQueueWorker(ConsumerMixin, threading.Thread):
"""
This is a flexible worker for galaxy's queues. Each process, web or
handler, will have one of these used for dispatching so called 'control'
tasks.
"""
def __init__(self, app, queue=None, task_mapping=control_message_to_task, connection=None):
super(GalaxyQueueWorker, self).__init__()
log.info("Initalizing %s Galaxy Queue Worker on %s", app.config.server_name, util.mask_password_from_url(app.config.amqp_internal_connection))
self.daemon = True
if connection:
self.connection = connection
else:
self.connection = app.amqp_internal_connection_obj
# explicitly force connection instead of lazy-connecting the first
# time it is required.
self.connection.connect()
self.app = app
# Eventually we may want different workers w/ their own queues and task
# mappings. Right now, there's only the one.
if queue:
# Allows assignment of a particular queue for this worker.
self.control_queue = queue
else:
# Default to figuring out which control queue to use based on the app config.
queue = galaxy.queues.control_queue_from_config(app.config)
self.task_mapping = task_mapping
self.declare_queues = galaxy.queues.all_control_queues_for_declare(app.config)
# TODO we may want to purge the queue at the start to avoid executing
# stale 'reload_tool', etc messages. This can happen if, say, a web
# process goes down and messages get sent before it comes back up.
# Those messages will no longer be useful (in any current case)

def bind_and_start(self):
log.info("Binding and starting galaxy control worker for %s", self.app.config.server_name)
self.control_queue = galaxy.queues.control_queue_from_config(self.app.config)
self.start()

def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.control_queue,
callbacks=[self.process_task])]

def process_task(self, body, message):
if body['task'] in self.task_mapping:
if body.get('noop', None) != self.app.config.server_name:
try:
f = self.task_mapping[body['task']]
log.info("Instance '%s' recieved '%s' task, executing now.", self.app.config.server_name, body['task'])
f(self.app, **body['kwargs'])
except Exception:
# this shouldn't ever throw an exception, but...
log.exception("Error running control task type: %s" % body['task'])
else:
log.warning("Recieved a malformed task message:\n%s" % body)
message.ack()

def shutdown(self):
self.should_stop = True
@@ -3,8 +3,6 @@
"""

import atexit
import os
import os.path
import sys

from paste import httpexceptions
@@ -126,7 +124,7 @@ def postfork_setup():
if app.config.is_uwsgi:
import uwsgi
app.config.server_name += ".%s" % uwsgi.worker_id()
app.setup_control_queue()
app.control_worker.bind_and_start()


def populate_api_routes( webapp, app ):
@@ -535,6 +533,7 @@ def populate_api_routes( webapp, app ):
webapp.mapper.connect( "create", "/api/metrics", controller="metrics",
action="create", conditions=dict( method=["POST"] ) )


def _add_item_tags_controller( webapp, name_prefix, path_prefix, **kwd ):
# Not just using map.resources because actions should be based on name not id
controller = "%stags" % name_prefix

0 comments on commit 69c5952

Please sign in to comment.
You can’t perform that action at this time.