Skip to content

Commit

Permalink
Formalize workflow invocation and invocation step outputs.
Browse files Browse the repository at this point in the history
Workflow Invocations
--------------------

The workflow invocation outputs half of this is relatively straight forward. It is modelled somewhat on job outputs, output datasets and output dataset collections are now tracked for each workflow invocation and exposed via the workflow invocation API. This required adding new tables (linked to WorkflowInvocations and WorkflowOutputs) that track these output associations.

Previously one could imagine backtracking this information for simple tool steps via the WorkflowInvocationStep -> Job table, but for steps that have many jobs (i.e. mapping over a collection) or for non-tool steps such information was more difficult to recover (and simply couldn't be recovered from the API at all or even internally without significant knowledge of the underlying workflow).

Workflow Invocation Steps
-------------------------

Tracking the outputs of WorkflowInvocationSteps was not previously done at all, one would have to follow the Job table as well. A signficant downside to this is that one cannot map over empty collections in a workflow - since no such job would exist. Tracking job outputs for WorkflowInvocationSteps is not a simple matter of just attaching outputs to an existing table because we had no concept of a workflow step tracked - since there could be many WorklfowInvocationSteps corresponding to the same combination of WorkflowInvocation and WorkflowStep. That should feel wrong and that is because it is - when collections were added the possiblity of having many jobs for the same combination of WorkflowInvocation and WorkflowStep was added. I should have split WorkflowInvocationSteps into WorkflowInvocationSteps and WorkflowInvocationStepJobAssociations at that time but didn't. This commit now does it - effectively normalizing the ``workflow_invocation_step`` table by introducing the new ``workflow_invocation_step_job_association`` table.

Splitting up the WorkflowInvocationStep table this way allows recovering the mapped over output (e.g. the implicitly created collection from all the jobs) as well the outputs from the individual jobs (by walking WorkflowInvocationStep -> WorkflowInvocationStepJobAssociation -> Job -> JobToOutput*Association).

This split up involves failrly substantial changes to the workflow module interface. Any place a list of WorkflowInvocationSteps was assumed, I reworked it to just expect a single WorkflowInvocationStep. I vastly simplified recover_mapping to just use the persisted outputs (this was needed in order to also implment empty collection mapping in workflows). This also fixes a bug (or implements a missing feature) where Subworkflow moudles had no recover_mapping methods - so for instance if a tool that produces dynamic collections appeared anywhere in a workflow after a subworkflow step - that workflow would not complete scheduling properly.

Now that we have a way to reference the set of jobs corresponding to a workflow step within an invocation, we can start to track partial scheduling of such steps. This is outlined in #3883 and refactoring toward this goal is included here - including adding a state to WorkflowInvocationStep so Galaxy can determine if it has started scheduling this step and an index when scheduling jobs so it can tell how far into a scheduling things have gone as well as augmenting the tool executor to take a maximum number of jobs to execute and allow recovery of existing jobs for collection building purposes.

*Applications*

These changes will enable:

- A simple, consistent API for finding workflow outputs that can be consumed by Planemo for testing workflows.
- Mapping over empty collections in workflows.
- Re-scheduling workflow invocations that include subworkflow steps.
- Partial scheduling within steps requiring a large number of jobs when scheduling workflow invocations.
  • Loading branch information
jmchilton committed Nov 30, 2017
1 parent 55aa7bc commit d5f98f4
Show file tree
Hide file tree
Showing 10 changed files with 739 additions and 157 deletions.
12 changes: 6 additions & 6 deletions lib/galaxy/jobs/actions/post.py
Expand Up @@ -269,10 +269,10 @@ def execute(cls, app, sa_session, action, job, replacement_dict):
# concurrently, sometimes non-terminal steps won't be cleaned up
# because of the lag in job state updates.
sa_session.flush()
if not job.workflow_invocation_step:
if not job.workflow_invocation_step_assoc.workflow_invocation_step:
log.debug("This job is not part of a workflow invocation, delete intermediates aborted.")
return
wfi = job.workflow_invocation_step.workflow_invocation
wfi = job.workflow_invocation_step_assoc.workflow_invocation_step.workflow_invocation
sa_session.refresh(wfi)
if wfi.active:
log.debug("Workflow still scheduling so new jobs may appear, skipping deletion of intermediate files.")
Expand All @@ -285,9 +285,9 @@ def execute(cls, app, sa_session, action, job, replacement_dict):
jobs_to_check = []
for wfi_step in wfi_steps:
sa_session.refresh(wfi_step)
wfi_step_job = wfi_step.job
if wfi_step_job:
jobs_to_check.append(wfi_step_job)
wfi_step_job_assocs = wfi_step.jobs
if wfi_step_job_assocs:
jobs_to_check.extend(map(lambda j: j.job, wfi_step_job_assocs))
else:
log.debug("No job found yet for wfi_step %s, (step %s)" % (wfi_step, wfi_step.workflow_step))
for j2c in jobs_to_check:
Expand All @@ -302,7 +302,7 @@ def execute(cls, app, sa_session, action, job, replacement_dict):
for (input_dataset, creating_job) in creating_jobs:
sa_session.refresh(creating_job)
sa_session.refresh(input_dataset)
for input_dataset in [x.dataset for (x, creating_job) in creating_jobs if creating_job.workflow_invocation_step and creating_job.workflow_invocation_step.workflow_invocation == wfi]:
for input_dataset in [x.dataset for (x, creating_job) in creating_jobs if creating_job.workflow_invocation_step_assoc and creating_job.workflow_invocation_step_assoc.workflow_invocation_step.workflow_invocation == wfi]:
# note that the above input_dataset is a reference to a
# job.input_dataset.dataset at this point
safe_to_delete = True
Expand Down
172 changes: 135 additions & 37 deletions lib/galaxy/model/__init__.py
Expand Up @@ -803,8 +803,9 @@ def to_dict(self, view='collection', system_details=False):

def set_final_state(self, final_state):
self.set_state(final_state)
if self.workflow_invocation_step:
self.workflow_invocation_step.update()
workflow_invocation_step_assoc = self.workflow_invocation_step_assoc
if workflow_invocation_step_assoc:
workflow_invocation_step_assoc.workflow_invocation_step.update()

def get_destination_configuration(self, config, key, default=None):
""" Get a destination parameter that can be defaulted back
Expand Down Expand Up @@ -4036,17 +4037,16 @@ def step_invocations_by_step_id(self):
step_invocations = {}
for invocation_step in self.steps:
step_id = invocation_step.workflow_step_id
if step_id not in step_invocations:
step_invocations[step_id] = []
step_invocations[step_id].append(invocation_step)
assert step_id not in step_invocations
step_invocations[step_id] = invocation_step
return step_invocations

def step_invocations_for_step_id(self, step_id):
step_invocations = []
def step_invocation_for_step_id(self, step_id):
target_invocation_step = None
for invocation_step in self.steps:
if step_id == invocation_step.workflow_step_id:
step_invocations.append(invocation_step)
return step_invocations
target_invocation_step = invocation_step
return target_invocation_step

@staticmethod
def poll_active_workflow_ids(
Expand All @@ -4072,6 +4072,24 @@ def poll_active_workflow_ids(
# is relatively intutitive.
return [wid for wid in query.all()]

def add_output(self, workflow_output, step, output_object):
if output_object.history_content_type == "dataset":
output_assoc = WorkflowInvocationOutputDatasetAssociation()
output_assoc.workflow_invocation = self
output_assoc.workflow_output = workflow_output
output_assoc.workflow_step = step
output_assoc.dataset = output_object
self.output_datasets.append(output_assoc)
elif output_object.history_content_type == "dataset_collection":
output_assoc = WorkflowInvocationOutputDatasetCollectionAssociation()
output_assoc.workflow_invocation = self
output_assoc.workflow_output = workflow_output
output_assoc.workflow_step = step
output_assoc.dataset_collection = output_object
self.output_dataset_collections.append(output_assoc)
else:
raise Exception("Uknown output type encountered")

def to_dict(self, view='collection', value_mapper=None, step_details=False):
rval = super(WorkflowInvocation, self).to_dict(view=view, value_mapper=value_mapper)
if view == 'element':
Expand All @@ -4087,17 +4105,43 @@ def to_dict(self, view='collection', value_mapper=None, step_details=False):
inputs = {}
for step in self.steps:
if step.workflow_step.type == 'tool':
for step_input in step.workflow_step.input_connections:
output_step_type = step_input.output_step.type
if output_step_type in ['data_input', 'data_collection_input']:
src = "hda" if output_step_type == 'data_input' else 'hdca'
for job_input in step.job.input_datasets:
if job_input.name == step_input.input_name:
inputs[str(step_input.output_step.order_index)] = {
"id": job_input.dataset_id, "src": src,
"uuid" : str(job_input.dataset.dataset.uuid) if job_input.dataset.dataset.uuid is not None else None
}
for step_job_assoc in step.jobs:
for step_input in step.workflow_step.input_connections:
output_step_type = step_input.output_step.type
if output_step_type in ['data_input', 'data_collection_input']:
src = "hda" if output_step_type == 'data_input' else 'hdca'
for job_input in step_job_assoc.job.input_datasets:
if job_input.name == step_input.input_name:
inputs[str(step_input.output_step.order_index)] = {
"id": job_input.dataset_id, "src": src,
"uuid" : str(job_input.dataset.dataset.uuid) if job_input.dataset.dataset.uuid is not None else None
}
rval['inputs'] = inputs

outputs = {}
for output_assoc in self.output_datasets:
label = output_assoc.workflow_output.label
if not label:
continue

outputs[label] = {
'src': 'hda',
'id': output_assoc.dataset_id,
}

output_collections = {}
for output_assoc in self.output_dataset_collections:
label = output_assoc.workflow_output.label
if not label:
continue

output_collections[label] = {
'src': 'hdca',
'id': output_assoc.dataset_collection_id,
}

rval['outputs'] = outputs
rval['output_collections'] = output_collections
return rval

def update(self):
Expand Down Expand Up @@ -4137,36 +4181,70 @@ class WorkflowInvocationToSubworkflowInvocationAssociation(object, Dictifiable):


class WorkflowInvocationStep(object, Dictifiable):
dict_collection_visible_keys = ['id', 'update_time', 'job_id', 'workflow_step_id', 'action']
dict_element_visible_keys = ['id', 'update_time', 'job_id', 'workflow_step_id', 'action']
dict_collection_visible_keys = ['id', 'update_time', 'job_id', 'workflow_step_id', 'state', 'action']
dict_element_visible_keys = ['id', 'update_time', 'job_id', 'workflow_step_id', 'state', 'action']
states = Bunch(
NEW='new', # Brand new workflow invocation step
READY='ready', # Workflow invocation step ready for another iteration of scheduling.
SCHEDULED='scheduled', # Workflow invocation step has been scheduled.
# CANCELLED='cancelled', TODO: implement and expose
# FAILED='failed', TODO: implement and expose
)

def update(self):
self.workflow_invocation.update()

def add_output(self, output_name, output_object):
if output_object.history_content_type == "dataset":
output_assoc = WorkflowInvocationStepOutputDatasetAssociation()
output_assoc.workflow_invocation_step = self
output_assoc.dataset = output_object
output_assoc.output_name = output_name
self.output_datasets.append(output_assoc)
elif output_object.history_content_type == "dataset_collection":
output_assoc = WorkflowInvocationStepOutputDatasetCollectionAssociation()
output_assoc.workflow_invocation_step = self
output_assoc.dataset_collection = output_object
output_assoc.output_name = output_name
self.output_dataset_collections.append(output_assoc)
else:
raise Exception("Uknown output type encountered")

def to_dict(self, view='collection', value_mapper=None):
rval = super(WorkflowInvocationStep, self).to_dict(view=view, value_mapper=value_mapper)
rval['order_index'] = self.workflow_step.order_index
rval['workflow_step_label'] = self.workflow_step.label
rval['workflow_step_uuid'] = str(self.workflow_step.uuid)
rval['state'] = self.job.state if self.job is not None else None
if self.job is not None and view == 'element':
output_dict = {}
for i in self.job.output_datasets:
if i.dataset is not None:
output_dict[i.name] = {
"id" : i.dataset.id, "src" : "hda",
"uuid" : str(i.dataset.dataset.uuid) if i.dataset.dataset.uuid is not None else None
}
for i in self.job.output_library_datasets:
if i.dataset is not None:
output_dict[i.name] = {
"id" : i.dataset.id, "src" : "ldda",
"uuid" : str(i.dataset.dataset.uuid) if i.dataset.dataset.uuid is not None else None
}
rval['outputs'] = output_dict
# Following no longer makes sense...
# rval['state'] = self.job.state if self.job is not None else None
if view == 'element':
outputs = {}
for output_assoc in self.output_datasets:
name = output_assoc.output_name
outputs[name] = {
'src': 'hda',
'id': output_assoc.dataset.id,
'uuid': str(output_assoc.dataset.dataset.uuid) if output_assoc.dataset.dataset.uuid is not None else None
}

output_collections = {}
for output_assoc in self.output_dataset_collections:
name = output_assoc.output_name
output_collections[name] = {
'src': 'hdca',
'id': output_assoc.dataset_collection.id,
}

rval['outputs'] = outputs
rval['output_collections'] = output_collections
return rval


class WorkflowInvocationStepJobAssociation(object, Dictifiable):
dict_collection_visible_keys = ('id', 'job_id', 'workflow_invocation_step_id')
dict_element_visible_keys = ('id', 'job_id', 'workflow_invocation_step_id')


class WorkflowRequest(object, Dictifiable):
dict_collection_visible_keys = ['id', 'name', 'type', 'state', 'history_id', 'workflow_id']
dict_element_visible_keys = ['id', 'name', 'type', 'state', 'history_id', 'workflow_id']
Expand Down Expand Up @@ -4221,6 +4299,26 @@ class WorkflowRequestInputStepParmeter(object, Dictifiable):
dict_collection_visible_keys = ['id', 'workflow_invocation_id', 'workflow_step_id', 'parameter_value']


class WorkflowInvocationOutputDatasetAssociation(object, Dictifiable):
"""Represents links to output datasets for the workflow."""
dict_collection_visible_keys = ['id', 'workflow_invocation_id', 'workflow_step_id', 'dataset_id', 'name']


class WorkflowInvocationOutputDatasetCollectionAssociation(object, Dictifiable):
"""Represents links to output dataset collections for the workflow."""
dict_collection_visible_keys = ['id', 'workflow_invocation_id', 'workflow_step_id', 'dataset_collection_id', 'name']


class WorkflowInvocationStepOutputDatasetAssociation(object, Dictifiable):
"""Represents links to output datasets for the workflow."""
dict_collection_visible_keys = ['id', 'workflow_invocation_step_id', 'dataset_id', 'output_name']


class WorkflowInvocationStepOutputDatasetCollectionAssociation(object, Dictifiable):
"""Represents links to output dataset collections for the workflow."""
dict_collection_visible_keys = ['id', 'workflow_invocation_step_id', 'dataset_collection_id', 'output_name']


class MetadataFile(StorableObject):

def __init__(self, dataset=None, name=None):
Expand Down
97 changes: 90 additions & 7 deletions lib/galaxy/model/mapping.py
Expand Up @@ -901,9 +901,54 @@
Column("update_time", DateTime, default=now, onupdate=now),
Column("workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id"), index=True, nullable=False),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True, nullable=False),
Column("job_id", Integer, ForeignKey("job.id"), index=True, nullable=True),
Column("state", TrimmedString(64), index=True),
Column("action", JSONType, nullable=True))


model.WorkflowInvocationStepJobAssociation.table = Table(
"workflow_invocation_step_job_association", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_invocation_step_id", Integer, ForeignKey("workflow_invocation_step.id"), index=True, nullable=False),
Column("order_index", Integer, nullable=True), # recovering partially complete WorkflowInvocationSteps requires knowing which jobs have been scheduled
Column("job_id", Integer, ForeignKey("job.id"), index=True, nullable=False),
)


model.WorkflowInvocationOutputDatasetAssociation.table = Table(
"workflow_invocation_output_dataset_association", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id"), index=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("dataset_id", Integer, ForeignKey("history_dataset_association.id"), index=True),
Column("workflow_output_id", Integer, ForeignKey("workflow_output.id"), index=True),
)

model.WorkflowInvocationOutputDatasetCollectionAssociation.table = Table(
"workflow_invocation_output_dataset_collection_association", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id"), index=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("dataset_collection_id", Integer, ForeignKey("history_dataset_collection_association.id"), index=True),
Column("workflow_output_id", Integer, ForeignKey("workflow_output.id"), index=True),
)

model.WorkflowInvocationStepOutputDatasetAssociation.table = Table(
"workflow_invocation_step_output_dataset_association", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_invocation_step_id", Integer, ForeignKey("workflow_invocation_step.id"), index=True),
Column("dataset_id", Integer, ForeignKey("history_dataset_association.id"), index=True),
Column("output_name", String(255), nullable=True),
)

model.WorkflowInvocationStepOutputDatasetCollectionAssociation.table = Table(
"workflow_invocation_step_output_dataset_collection_association", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_invocation_step_id", Integer, ForeignKey("workflow_invocation_step.id"), index=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("dataset_collection_id", Integer, ForeignKey("history_dataset_collection_association.id"), index=True),
Column("output_name", String(255), nullable=True),
)

model.WorkflowInvocationToSubworkflowInvocationAssociation.table = Table(
"workflow_invocation_to_subworkflow_invocation_association", metadata,
Column("id", Integer, primary_key=True),
Expand Down Expand Up @@ -2310,7 +2355,7 @@ def simple_mapping(model, **kwds):
uselist=True,
),
steps=relation(model.WorkflowInvocationStep,
backref='workflow_invocation'),
backref="workflow_invocation"),
workflow=relation(model.Workflow)
))

Expand All @@ -2323,12 +2368,17 @@ def simple_mapping(model, **kwds):
workflow_step=relation(model.WorkflowStep),
))

mapper(model.WorkflowInvocationStep, model.WorkflowInvocationStep.table, properties=dict(
workflow_step=relation(model.WorkflowStep),

simple_mapping(model.WorkflowInvocationStepJobAssociation,
workflow_invocation_step=relation(model.WorkflowInvocationStep, backref="jobs"),
job=relation(model.Job,
backref=backref('workflow_invocation_step',
uselist=False))
))
backref=backref('workflow_invocation_step_assoc',
uselist=False)))


simple_mapping(model.WorkflowInvocationStep,
workflow_step=relation(model.WorkflowStep))


simple_mapping(model.WorkflowRequestInputParameter,
workflow_invocation=relation(model.WorkflowInvocation))
Expand Down Expand Up @@ -2358,6 +2408,39 @@ def simple_mapping(model, **kwds):
library_dataset=relation(model.LibraryDatasetDatasetAssociation)
))


simple_mapping(
model.WorkflowInvocationOutputDatasetAssociation,
workflow_invocation=relation(model.WorkflowInvocation, backref="output_datasets"),
workflow_step=relation(model.WorkflowStep),
dataset=relation(model.HistoryDatasetAssociation),
workflow_output=relation(model.WorkflowOutput),
)


simple_mapping(
model.WorkflowInvocationOutputDatasetCollectionAssociation,
workflow_invocation=relation(model.WorkflowInvocation, backref="output_dataset_collections"),
workflow_step=relation(model.WorkflowStep),
dataset_collection=relation(model.HistoryDatasetCollectionAssociation),
workflow_output=relation(model.WorkflowOutput),
)


simple_mapping(
model.WorkflowInvocationStepOutputDatasetAssociation,
workflow_invocation_step=relation(model.WorkflowInvocationStep, backref="output_datasets"),
dataset=relation(model.HistoryDatasetAssociation),
)


simple_mapping(
model.WorkflowInvocationStepOutputDatasetCollectionAssociation,
workflow_invocation_step=relation(model.WorkflowInvocationStep, backref="output_dataset_collections"),
dataset_collection=relation(model.HistoryDatasetCollectionAssociation),
)


mapper(model.PageRevision, model.PageRevision.table)

mapper(model.Page, model.Page.table, properties=dict(
Expand Down

0 comments on commit d5f98f4

Please sign in to comment.