Skip to content

Commit

Permalink
Fix double enqueuing of upload jobs
Browse files Browse the repository at this point in the history
The canonical place to call enqueue is in
https://github.com/galaxyproject/galaxy/blob/a5bb3e92667b96ef802b378f17788b253a5c0bc6/lib/galaxy/tools/execute.py#L127.
In the process of enqueue the db-skip-locked assignment method will set
the handler name to the default tag. If you do this more than once the
job might be picked up by compatible handlers.

Fixes #11335
  • Loading branch information
mvdbeek committed Oct 22, 2021
1 parent 6fcdc5c commit 2a335c9
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 40 deletions.
3 changes: 1 addition & 2 deletions lib/galaxy/actions/library.py
Expand Up @@ -126,8 +126,7 @@ def _upload_dataset(self, trans, library_id, folder_id, replace_dataset=None, **
job_params['link_data_only'] = json.dumps(kwd.get('link_data_only', 'copy_files'))
job_params['uuid'] = json.dumps(kwd.get('uuid', None))
job, output = upload_common.create_job(trans, tool_params, tool, json_file_path, data_list, folder=library_bunch.folder, job_params=job_params)
trans.sa_session.add(job)
trans.sa_session.flush()
trans.app.job_manager.enqueue(job, tool=tool)
return output

def _get_server_dir_uploaded_datasets(self, trans, params, full_dir, import_dir_desc, library_bunch, response_code, message):
Expand Down
7 changes: 4 additions & 3 deletions lib/galaxy/datatypes/data.py
Expand Up @@ -637,14 +637,15 @@ def convert_dataset(self, trans, original_dataset, target_type, return_output=Fa
params[input_name] = original_dataset

# Run converter, job is dispatched through Queue
converted_dataset = converter.execute(trans, incoming=params, set_output_hid=visible, history=history)[1]
job, converted_datasets, *_ = converter.execute(trans, incoming=params, set_output_hid=visible, history=history)
trans.app.job_manager.enqueue(job, tool=converter)
if len(params) > 0:
trans.log_event("Converter params: %s" % (str(params)), tool_id=converter.id)
if not visible:
for value in converted_dataset.values():
for value in converted_datasets.values():
value.visible = False
if return_output:
return converted_dataset
return converted_datasets
return f"The file conversion of {converter.name} on data {original_dataset.hid} has been added to the Queue."

# We need to clear associated files before we set metadata
Expand Down
6 changes: 4 additions & 2 deletions lib/galaxy/managers/datasets.py
Expand Up @@ -374,9 +374,10 @@ def set_metadata(self, trans, dataset_assoc, overwrite=False, validate=True):
if spec.get('default'):
setattr(data.metadata, name, spec.unwrap(spec.get('default')))

self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute(
job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute(
self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={'input1': data, 'validate': validate},
overwrite=overwrite)
self.app.job_manager.enqueue(job, tool=self.app.datatypes_registry.set_external_metadata_tool)

def update_permissions(self, trans, dataset_assoc, **kwd):
action = kwd.get('action', 'set_permissions')
Expand Down Expand Up @@ -691,7 +692,8 @@ def deserialize_datatype(self, item, key, val, **context):
sa_session = self.app.model.context
sa_session.flush()
trans = context.get("trans")
self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute(self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={'input1': item}, overwrite=False) # overwrite is False as per existing behavior
job, *_ = self.app.datatypes_registry.set_external_metadata_tool.tool_action.execute(self.app.datatypes_registry.set_external_metadata_tool, trans, incoming={'input1': item}, overwrite=False) # overwrite is False as per existing behavior
trans.app.job_manager.enqueue(job, tool=trans.app.datatypes_registry.set_external_metadata_tool)
return item.datatype


Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/tools/__init__.py
Expand Up @@ -2606,10 +2606,11 @@ class SetMetadataTool(Tool):

def regenerate_imported_metadata_if_needed(self, hda, history, job):
if len(hda.metadata_file_types) > 0:
self.tool_action.execute_via_app(
job, *_ = self.tool_action.execute_via_app(
self, self.app, job.session_id,
history.id, job.user, incoming={'input1': hda}, overwrite=False
)
self.app.job_manager.enqueue(job=job, tool=self)

def exec_after_process(self, app, inp_data, out_data, param_dict, job=None, **kwds):
working_directory = app.object_store.get_filename(
Expand Down
3 changes: 0 additions & 3 deletions lib/galaxy/tools/actions/__init__.py
Expand Up @@ -600,9 +600,6 @@ def handle_output(name, output, hidden=None):
trans.sa_session.flush()
log.info(f"Flushed transaction for job {job.log_str()} {job_flush_timer}")

# Dispatch to a job handler. enqueue() is responsible for flushing the job
app.job_manager.enqueue(job, tool=tool)
trans.log_event("Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id)
return job, out_data, history

def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current_job, out_data):
Expand Down
5 changes: 0 additions & 5 deletions lib/galaxy/tools/actions/history_imp_exp.py
Expand Up @@ -67,11 +67,6 @@ def execute(self, tool, trans, incoming=None, set_output_hid=False, overwrite=Tr
job.add_parameter(name, value)

job.state = start_job_state # job inputs have been configured, restore initial job state

# Queue the job for execution
trans.app.job_manager.enqueue(job, tool=tool)
trans.log_event("Added import history job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id)

return job, {}


Expand Down
5 changes: 0 additions & 5 deletions lib/galaxy/tools/actions/metadata.py
Expand Up @@ -120,11 +120,6 @@ def execute_via_app(self, tool, app, session_id, history_id, user=None,
job.state = start_job_state # job inputs have been configured, restore initial job state
sa_session.flush()

# Queue the job for execution
app.job_manager.enqueue(job, tool=tool)
# FIXME: need to add event logging to app and log events there rather than trans.
# trans.log_event( "Added set external metadata job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id )

# clear e.g. converted files
dataset.datatype.before_setting_metadata(dataset)

Expand Down
14 changes: 1 addition & 13 deletions lib/galaxy/tools/actions/upload_common.py
Expand Up @@ -395,6 +395,7 @@ def create_job(trans, params, tool, json_file_path, outputs, folder=None, histor
Create the upload job.
"""
job = trans.app.model.Job()
trans.sa_session.add(job)
job.galaxy_version = trans.app.config.version_major
galaxy_session = trans.get_galaxy_session()
if type(galaxy_session) == trans.model.GalaxySession:
Expand All @@ -410,16 +411,10 @@ def create_job(trans, params, tool, json_file_path, outputs, folder=None, histor
job.tool_id = tool.id
job.tool_version = tool.version
job.dynamic_tool = tool.dynamic_tool
job.set_state(job.states.UPLOAD)
trans.sa_session.add(job)
trans.sa_session.flush()
log.info('tool %s created job id %d' % (tool.id, job.id))
trans.log_event('created job id %d' % job.id, tool_id=tool.id)

for name, value in tool.params_to_strings(params, trans.app).items():
job.add_parameter(name, value)
job.add_parameter('paramfile', dumps(json_file_path))
object_store_id = None
for i, output_object in enumerate(outputs):
output_name = "output%i" % i
if hasattr(output_object, "collection"):
Expand All @@ -432,18 +427,11 @@ def create_job(trans, params, tool, json_file_path, outputs, folder=None, histor
else:
job.add_output_dataset(output_name, dataset)

trans.sa_session.add(output_object)

job.object_store_id = object_store_id
job.set_state(job.states.NEW)
if job_params:
for name, value in job_params.items():
job.add_parameter(name, value)
trans.sa_session.add(job)

# Queue the job for execution
trans.app.job_manager.enqueue(job, tool=tool)
trans.log_event("Added job to the job queue, id: %s" % str(job.id), tool_id=job.tool_id)
output = {}
for i, v in enumerate(outputs):
if not hasattr(output_object, "collection_type"):
Expand Down
6 changes: 4 additions & 2 deletions lib/galaxy/webapps/base/controller.py
Expand Up @@ -436,7 +436,8 @@ def queue_history_export(self, trans, history, gzip=True, include_hidden=False,

# Run job to do export.
history_exp_tool = trans.app.toolbox.get_tool(export_tool_id)
job, _ = history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True)
job, *_ = history_exp_tool.execute(trans, incoming=params, history=history, set_output_hid=True)
trans.app.job_manager.enqueue(job, tool=history_exp_tool)
return job


Expand All @@ -446,7 +447,8 @@ def queue_history_import(self, trans, archive_type, archive_source):
# Run job to do import.
history_imp_tool = trans.app.toolbox.get_tool('__IMPORT_HISTORY__')
incoming = {'__ARCHIVE_SOURCE__': archive_source, '__ARCHIVE_TYPE__': archive_type}
job, _ = history_imp_tool.execute(trans, incoming=incoming)
job, *_ = history_imp_tool.execute(trans, incoming=incoming)
trans.app.job_manager.enqueue(job, tool=history_imp_tool)
return job


Expand Down
3 changes: 1 addition & 2 deletions lib/galaxy/webapps/galaxy/api/library_datasets.py
Expand Up @@ -519,8 +519,7 @@ def load(self, trans, payload=None, **kwd):
job_params['link_data_only'] = dumps(kwd.get('link_data_only', 'copy_files'))
job_params['uuid'] = dumps(kwd.get('uuid', None))
job, output = upload_common.create_job(trans, tool_params, tool, json_file_path, data_list, folder=folder, job_params=job_params)
trans.sa_session.add(job)
trans.sa_session.flush()
trans.app.job_manager.enqueue(job, tool=tool)
job_dict = job.to_dict()
job_dict['id'] = trans.security.encode_id(job_dict['id'])
return job_dict
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/webapps/galaxy/controllers/async.py
Expand Up @@ -89,7 +89,8 @@ def index(self, trans, tool_id=None, data_secret=None, **kwd):
raise Exception("Error: ToolOutput object not found")

original_history = trans.sa_session.query(trans.app.model.History).get(data.history_id)
tool.execute(trans, incoming=params, history=original_history)
job, *_ = tool.execute(trans, incoming=params, history=original_history)
trans.app.job_manager.enqueue(job, tool=tool)
else:
log.debug('async error -> %s' % STATUS)
trans.log_event('Async error -> %s' % STATUS)
Expand Down
3 changes: 2 additions & 1 deletion lib/galaxy/webapps/galaxy/controllers/dataset.py
Expand Up @@ -377,9 +377,10 @@ def set_edit(self, trans, payload=None, **kwd):
datatype = sniff.guess_ext(path, trans.app.datatypes_registry.sniff_order, is_binary=is_binary)
trans.app.datatypes_registry.change_datatype(data, datatype)
trans.sa_session.flush()
trans.app.datatypes_registry.set_external_metadata_tool.tool_action.execute(
job, *_ = trans.app.datatypes_registry.set_external_metadata_tool.tool_action.execute(
trans.app.datatypes_registry.set_external_metadata_tool, trans, incoming={'input1': data},
overwrite=False) # overwrite is False as per existing behavior
trans.app.job_manager.enqueue(job, tool=trans.app.datatypes_registry.set_external_metadata_tool)
message = 'Detection was finished and changed the datatype to %s.' % datatype
else:
return self.message_exception(trans, 'Changing datatype "%s" is not allowed.' % (data.extension))
Expand Down

0 comments on commit 2a335c9

Please sign in to comment.