Skip to content

Commit

Permalink
Make sure we're not changing datatypes on skipped outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Oct 26, 2022
1 parent 5f2cbee commit 171c285
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 61 deletions.
3 changes: 3 additions & 0 deletions lib/galaxy/job_execution/actions/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class ChangeDatatypeAction(DefaultJobAction):

@classmethod
def execute(cls, app, sa_session, action, job, replacement_dict, final_job_state=None):
if job.state == job.states.SKIPPED:
# Don't change datatype, must remain expression.json
return
for dataset_assoc in job.output_datasets:
if action.output_name == "" or dataset_assoc.name == action.output_name:
app.datatypes_registry.change_datatype(dataset_assoc.dataset, action.action_arguments["newtype"])
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ class states(str, Enum):
DELETED_NEW = "deleted_new" # now DELETING, remove after 21.0
STOPPING = "stop"
STOPPED = "stopped"
SKIPPED = "skipped"

terminal_states = [states.OK, states.ERROR, states.DELETED]
#: job states where the job hasn't finished and the model may still change
Expand Down
117 changes: 57 additions & 60 deletions lib/galaxy/tools/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,76 +640,73 @@ def handle_output(name, output, hidden=None):
parent_dataset.children.append(child_dataset)

log.info(add_datasets_timer)
job = trans.app.model.Job()
job.state = "ok"
job_setup_timer = ExecutionTimer()
# Create the job object
job, galaxy_session = self._new_job_for_session(trans, tool, history)
if skip:
job.state = "skipped"
job.state = job.states.SKIPPED
object_store_populator = ObjectStorePopulator(trans.app, trans.user)
for data in out_data.values():
object_store_populator.set_object_store_id(data)
data.extension = "expression.json"
data.state = "ok"
with open(data.dataset.file_name, "w") as out:
out.write(json.dumps(None))
else:
job_setup_timer = ExecutionTimer()
# Create the job object
job, galaxy_session = self._new_job_for_session(trans, tool, history)
self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections)
self._record_outputs(job, out_data, output_collections)
# execute immediate post job actions and associate post job actions that are to be executed after the job is complete
if job_callback:
job_callback(job)
if job_params:
job.params = dumps(job_params)
if completed_job:
job.set_copied_from_job_id(completed_job.id)
self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections)
self._record_outputs(job, out_data, output_collections)
# execute immediate post job actions and associate post job actions that are to be executed after the job is complete
if job_callback:
job_callback(job)
if job_params:
job.params = dumps(job_params)
if completed_job:
job.set_copied_from_job_id(completed_job.id)
trans.sa_session.add(job)
# Remap any outputs if this is a rerun and the user chose to continue dependent jobs
# This functionality requires tracking jobs in the database.
if app.config.track_jobs_in_database and rerun_remap_job_id is not None:
# Need to flush here so that referencing outputs by id works
session = trans.sa_session()
try:
session.expire_on_commit = False
session.flush()
finally:
session.expire_on_commit = True
self._remap_job_on_rerun(
trans=trans,
galaxy_session=galaxy_session,
rerun_remap_job_id=rerun_remap_job_id,
current_job=job,
out_data=out_data,
)
log.info(f"Setup for job {job.log_str()} complete, ready to be enqueued {job_setup_timer}")

# Some tools are not really executable, but jobs are still created for them ( for record keeping ).
# Examples include tools that redirect to other applications ( epigraph ). These special tools must
# include something that can be retrieved from the params ( e.g., REDIRECT_URL ) to keep the job
# from being queued.
if "REDIRECT_URL" in incoming:
# Get the dataset - there should only be 1
for name in inp_data.keys():
dataset = inp_data[name]
redirect_url = tool.parse_redirect_url(dataset, incoming)
# GALAXY_URL should be include in the tool params to enable the external application
# to send back to the current Galaxy instance
GALAXY_URL = incoming.get("GALAXY_URL", None)
assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config."
redirect_url += f"&GALAXY_URL={GALAXY_URL}"
# Job should not be queued, so set state to ok
job.set_state(app.model.Job.states.OK)
job.info = f"Redirected to: {redirect_url}"
trans.sa_session.add(job)
# Remap any outputs if this is a rerun and the user chose to continue dependent jobs
# This functionality requires tracking jobs in the database.
if app.config.track_jobs_in_database and rerun_remap_job_id is not None:
# Need to flush here so that referencing outputs by id works
session = trans.sa_session()
try:
session.expire_on_commit = False
session.flush()
finally:
session.expire_on_commit = True
self._remap_job_on_rerun(
trans=trans,
galaxy_session=galaxy_session,
rerun_remap_job_id=rerun_remap_job_id,
current_job=job,
out_data=out_data,
)
log.info(f"Setup for job {job.log_str()} complete, ready to be enqueued {job_setup_timer}")

# Some tools are not really executable, but jobs are still created for them ( for record keeping ).
# Examples include tools that redirect to other applications ( epigraph ). These special tools must
# include something that can be retrieved from the params ( e.g., REDIRECT_URL ) to keep the job
# from being queued.
if "REDIRECT_URL" in incoming:
# Get the dataset - there should only be 1
for name in inp_data.keys():
dataset = inp_data[name]
redirect_url = tool.parse_redirect_url(dataset, incoming)
# GALAXY_URL should be include in the tool params to enable the external application
# to send back to the current Galaxy instance
GALAXY_URL = incoming.get("GALAXY_URL", None)
assert GALAXY_URL is not None, "GALAXY_URL parameter missing in tool config."
redirect_url += f"&GALAXY_URL={GALAXY_URL}"
# Job should not be queued, so set state to ok
job.set_state(app.model.Job.states.OK)
job.info = f"Redirected to: {redirect_url}"
trans.sa_session.add(job)
trans.sa_session.flush()
trans.response.send_redirect(redirect_url)
else:
if flush_job:
# Set HID and add to history.
job_flush_timer = ExecutionTimer()
trans.sa_session.flush()
trans.response.send_redirect(redirect_url)
else:
if flush_job:
# Set HID and add to history.
job_flush_timer = ExecutionTimer()
trans.sa_session.flush()
log.info(f"Flushed transaction for job {job.log_str()} {job_flush_timer}")
log.info(f"Flushed transaction for job {job.log_str()} {job_flush_timer}")

return job, out_data, history

Expand Down
13 changes: 12 additions & 1 deletion lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,9 @@ def test_run_workflow_conditional_step_map_over_expression_tool(self):
tool_id: cat1
in:
input1: boolean_input_files
out:
out_file1:
change_datatype: txt
when:
source: param_out/boolean_param
test_data:
Expand All @@ -1961,7 +1964,15 @@ def test_run_workflow_conditional_step_map_over_expression_tool(self):
invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True)
for step in invocation_details["steps"]:
if step["workflow_step_label"] == "consume_expression_parameter":
assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1
skipped_jobs = [j for j in step["jobs"] if j["state"] == "skipped"]
assert len(skipped_jobs) == 1
# also assert that change_datatype was ignored for null output
job_details = self.dataset_populator.get_job_details(skipped_jobs[0]["id"], full=True).json()
skipped_hda_id = job_details["outputs"]["out_file1"]["id"]
dataset_details = self.dataset_populator.get_history_dataset_details(
history_id=history_id, content_id=skipped_hda_id
)
assert dataset_details["file_ext"] == "expression.json", dataset_details

def test_run_workflow_conditional_step_map_over_expression_tool_pick_value(self):
with self.dataset_populator.test_history() as history_id:
Expand Down

0 comments on commit 171c285

Please sign in to comment.