-
Notifications
You must be signed in to change notification settings - Fork 967
/
app.py
253 lines (226 loc) · 11.4 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
from __future__ import absolute_import
import logging
import signal
import sys
import time
import os
from galaxy import config, jobs
import galaxy.model
import galaxy.security
import galaxy.queues
from galaxy.managers.collections import DatasetCollectionManager
import galaxy.quota
from galaxy.managers.tags import GalaxyTagManager
from galaxy.visualization.genomes import Genomes
from galaxy.visualization.data_providers.registry import DataProviderRegistry
from galaxy.visualization.plugins.registry import VisualizationsRegistry
from galaxy.tools.special_tools import load_lib_tools
from galaxy.tours import ToursRegistry
from galaxy.webapps.galaxy.config_watchers import ConfigWatchers
from galaxy.webhooks import WebhooksRegistry
from galaxy.sample_tracking import external_service_types
from galaxy.openid.providers import OpenIDProviders
from galaxy.tools.data_manager.manager import DataManagers
from galaxy.tools.cache import (
ToolCache,
ToolShedRepositoryCache
)
from galaxy.jobs import metrics as job_metrics
from galaxy.tools.error_reports import ErrorReports
from galaxy.web.proxy import ProxyManager
from galaxy.web.stack import application_stack_instance
from galaxy.queue_worker import GalaxyQueueWorker
from galaxy.util import (
ExecutionTimer,
heartbeat
)
from tool_shed.galaxy_install import update_repository_manager
log = logging.getLogger(__name__)
app = None
class UniverseApplication(object, config.ConfiguresGalaxyMixin):
"""Encapsulates the state of a Universe application"""
def __init__(self, **kwargs):
if not log.handlers:
# Paste didn't handle it, so we need a temporary basic log
# configured. The handler added here gets dumped and replaced with
# an appropriately configured logger in configure_logging below.
logging.basicConfig(level=logging.DEBUG)
self.name = 'galaxy'
self.startup_timer = ExecutionTimer()
self.new_installation = False
# Read config file and check for errors
self.config = config.Configuration(**kwargs)
self.config.check()
config.configure_logging(self.config)
log.debug("python path is: %s", ", ".join( sys.path ))
self.configure_fluent_log()
# A lot of postfork initialization depends on the server name, ensure it is set immediately after forking before other postfork functions
self.application_stack = application_stack_instance(app=self)
self.application_stack.register_postfork_function(self.application_stack.set_postfork_server_name, self)
self.config.reload_sanitize_whitelist(explicit='sanitize_whitelist_file' in kwargs)
self.amqp_internal_connection_obj = galaxy.queues.connection_from_config(self.config)
# control_worker *can* be initialized with a queue, but here we don't
# want to and we'll allow postfork to bind and start it.
self.control_worker = GalaxyQueueWorker(self)
self._configure_tool_shed_registry()
self._configure_object_store(fsmon=True)
# Setup the database engine and ORM
config_file = kwargs.get('global_conf', {}).get('__file__', None)
if config_file:
log.debug('Using "galaxy.ini" config file: %s', config_file)
check_migrate_tools = self.config.check_migrate_tools
self._configure_models(check_migrate_databases=True, check_migrate_tools=check_migrate_tools, config_file=config_file)
# Manage installed tool shed repositories.
from tool_shed.galaxy_install import installed_repository_manager
self.installed_repository_manager = installed_repository_manager.InstalledRepositoryManager(self)
self._configure_datatypes_registry(self.installed_repository_manager)
galaxy.model.set_datatypes_registry(self.datatypes_registry)
# Security helper
self._configure_security()
# Tag handler
self.tag_handler = GalaxyTagManager(self.model.context)
# Dataset Collection Plugins
self.dataset_collections_service = DatasetCollectionManager(self)
# Tool Data Tables
self._configure_tool_data_tables(from_shed_config=False)
# Load dbkey / genome build manager
self._configure_genome_builds(data_table_name="__dbkeys__", load_old_style=True)
# Genomes
self.genomes = Genomes(self)
# Data providers registry.
self.data_provider_registry = DataProviderRegistry()
# Initialize job metrics manager, needs to be in place before
# config so per-destination modifications can be made.
self.job_metrics = job_metrics.JobMetrics(self.config.job_metrics_config_file, app=self)
# Initialize error report plugins.
self.error_reports = ErrorReports(self.config.error_report_file, app=self)
# Initialize the job management configuration
self.job_config = jobs.JobConfiguration(self)
# Setup a Tool Cache
self.tool_cache = ToolCache()
self.tool_shed_repository_cache = ToolShedRepositoryCache(self)
# Watch various config files for immediate reload
self.watchers = ConfigWatchers(self)
self._configure_toolbox()
# Load Data Manager
self.data_managers = DataManagers(self)
# Load the update repository manager.
self.update_repository_manager = update_repository_manager.UpdateRepositoryManager(self)
# Load proprietary datatype converters and display applications.
self.installed_repository_manager.load_proprietary_converters_and_display_applications()
# Load datatype display applications defined in local datatypes_conf.xml
self.datatypes_registry.load_display_applications(self)
# Load datatype converters defined in local datatypes_conf.xml
self.datatypes_registry.load_datatype_converters(self.toolbox)
# Load external metadata tool
self.datatypes_registry.load_external_metadata_tool(self.toolbox)
# Load history import/export tools.
load_lib_tools(self.toolbox)
# visualizations registry: associates resources with visualizations, controls how to render
self.visualizations_registry = VisualizationsRegistry(
self,
directories_setting=self.config.visualization_plugins_directory,
template_cache_dir=self.config.template_cache)
# Tours registry
self.tour_registry = ToursRegistry(self.config.tour_config_dir)
# Webhooks registry
self.webhooks_registry = WebhooksRegistry(self.config.webhooks_dirs)
# Load security policy.
self.security_agent = self.model.security_agent
self.host_security_agent = galaxy.security.HostAgent(
model=self.security_agent.model,
permitted_actions=self.security_agent.permitted_actions)
# Load quota management.
if self.config.enable_quotas:
self.quota_agent = galaxy.quota.QuotaAgent(self.model)
else:
self.quota_agent = galaxy.quota.NoQuotaAgent(self.model)
# Heartbeat for thread profiling
self.heartbeat = None
# Container for OpenID authentication routines
if self.config.enable_openid:
from galaxy.web.framework import openid_manager
self.openid_manager = openid_manager.OpenIDManager(self.config.openid_consumer_cache_path)
self.openid_providers = OpenIDProviders.from_file(self.config.openid_config_file)
else:
self.openid_providers = OpenIDProviders()
from galaxy import auth
self.auth_manager = auth.AuthManager(self)
# Start the heartbeat process if configured and available (wait until
# postfork if using uWSGI)
if self.config.use_heartbeat:
if heartbeat.Heartbeat:
self.heartbeat = heartbeat.Heartbeat(
self.config,
period=self.config.heartbeat_interval,
fname=self.config.heartbeat_log
)
self.heartbeat.daemon = True
self.application_stack.register_postfork_function(self.heartbeat.start)
self.sentry_client = None
if self.config.sentry_dsn:
def postfork_sentry_client():
import raven
self.sentry_client = raven.Client(self.config.sentry_dsn)
self.application_stack.register_postfork_function(postfork_sentry_client)
# Transfer manager client
if self.config.get_bool('enable_beta_job_managers', False):
from galaxy.jobs import transfer_manager
self.transfer_manager = transfer_manager.TransferManager(self)
# Start the job manager
from galaxy.jobs import manager
self.job_manager = manager.JobManager(self)
self.application_stack.register_postfork_function(self.job_manager.start)
# FIXME: These are exposed directly for backward compatibility
#self.job_queue = self.job_manager.job_queue
#self.job_stop_queue = self.job_manager.job_stop_queue
self.proxy_manager = ProxyManager(self.config)
# Initialize the external service types
self.external_service_types = external_service_types.ExternalServiceTypesCollection(
self.config.external_service_type_config_file,
self.config.external_service_type_path, self)
from galaxy.workflow import scheduling_manager
# Must be initialized after job_config.
self.workflow_scheduling_manager = scheduling_manager.WorkflowSchedulingManager(self)
# Configure handling of signals
handlers = {}
if self.heartbeat:
handlers[signal.SIGUSR1] = self.heartbeat.dump_signal_handler
self._configure_signal_handlers(handlers)
# Start web stack message handling
self.application_stack.register_postfork_function(self.application_stack.start)
self.model.engine.dispose()
self.server_starttime = int(time.time()) # used for cachebusting
# When running the application without a web stack, this signals the application loop to break and call the shutdown method
self.exit = False
log.info("Galaxy app startup finished %s" % self.startup_timer)
def shutdown(self):
self.workflow_scheduling_manager.shutdown()
self.job_manager.shutdown()
self.object_store.shutdown()
if self.heartbeat:
self.heartbeat.shutdown()
self.update_repository_manager.shutdown()
try:
self.control_worker.shutdown()
except AttributeError:
# There is no control_worker
pass
try:
# If the datatypes registry was persisted, attempt to
# remove the temporary file in which it was written.
if self.datatypes_registry.integrated_datatypes_configs is not None:
os.unlink(self.datatypes_registry.integrated_datatypes_configs)
except:
pass
self.application_stack.shutdown()
# This is used to signal the webless application loop to terminate
self.exit = True
def configure_fluent_log(self):
if self.config.fluent_log:
from galaxy.util.log.fluent_log import FluentTraceLogger
self.trace_logger = FluentTraceLogger('galaxy', self.config.fluent_host, self.config.fluent_port)
else:
self.trace_logger = None
def is_job_handler(self):
return (self.config.track_jobs_in_database and self.job_config.is_handler(self.config.server_name)) or not self.config.track_jobs_in_database