Skip to content

Commit

Permalink
Merge pull request #6850 from jmchilton/workflows_track_step_inputs
Browse files Browse the repository at this point in the history
Track workflow step input definitions in our model.
  • Loading branch information
nsoranzo committed Nov 16, 2018
2 parents 7695569 + f2c17e5 commit f968545
Show file tree
Hide file tree
Showing 16 changed files with 463 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ functools32==3.2.3.post2 ; python_version == '2.7'
future==0.16.0
futures==3.2.0 ; python_version == '2.6' or python_version == '2.7'
galaxy-sequence-utils==1.1.3
gxformat2==0.7.1
gxformat2==0.8.0
h5py==2.8.0
idna==2.7
ipaddress==1.0.22 ; python_version < '3.3'
Expand Down
30 changes: 23 additions & 7 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,14 @@ def _workflow_to_dict_export(self, trans, stored=None, workflow=None):
for output in module.get_data_outputs():
step_dict['outputs'].append({'name': output['name'], 'type': output['extensions'][0]})

step_in = {}
for step_input in step.inputs:
if step_input.default_value_set:
step_in[step_input.name] = {"default": step_input.default_value}

if step_in:
step_dict["in"] = step_in

# Connections
input_connections = step.input_connections
if step.type is None or step.type == 'tool':
Expand Down Expand Up @@ -998,6 +1006,16 @@ def __module_from_dict(self, trans, steps, steps_by_external_id, step_dict, **kw
label=label,
)
trans.sa_session.add(m)

if "in" in step_dict:
for input_name, input_dict in step_dict["in"].items():
step_input = step.get_or_add_input(input_name)
NO_DEFAULT_DEFINED = object()
default = input_dict.get("default", NO_DEFAULT_DEFINED)
if default is not NO_DEFAULT_DEFINED:
step_input.default_value = default
step_input.default_value_set = True

return module, step

def __load_subworkflow_from_step_dict(self, trans, step_dict, subworkflow_id_map, **kwds):
Expand Down Expand Up @@ -1041,23 +1059,21 @@ def __connect_workflow_steps(self, steps, steps_by_external_id):
continue
if not isinstance(conn_list, list): # Older style singleton connection
conn_list = [conn_list]

for conn_dict in conn_list:
if 'output_name' not in conn_dict or 'id' not in conn_dict:
template = "Invalid connection [%s] - must be dict with output_name and id fields."
message = template % conn_dict
raise exceptions.MessageException(message)
conn = model.WorkflowStepConnection()
conn.input_step = step
conn.input_name = input_name
conn.output_name = conn_dict['output_name']
external_id = conn_dict['id']
if external_id not in steps_by_external_id:
raise KeyError("Failed to find external id %s in %s" % (external_id, steps_by_external_id.keys()))
conn.output_step = steps_by_external_id[external_id]
output_step = steps_by_external_id[external_id]

output_name = conn_dict["output_name"]
input_subworkflow_step_index = conn_dict.get('input_subworkflow_step_id', None)
if input_subworkflow_step_index is not None:
conn.input_subworkflow_step = step.subworkflow.step_by_index(input_subworkflow_step_index)

step.add_connection(input_name, output_name, output_step, input_subworkflow_step_index)

del step.temp_input_connections

Expand Down
80 changes: 73 additions & 7 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4071,13 +4071,45 @@ def __init__(self):
self.tool_inputs = None
self.tool_errors = None
self.position = None
self.input_connections = []
self.inputs = []
self.config = None
self.label = None
self.uuid = uuid4()
self.workflow_outputs = []
self._input_connections_by_name = None

def get_input(self, input_name):
for step_input in self.inputs:
if step_input.name == input_name:
return step_input

return None

def get_or_add_input(self, input_name):
step_input = self.get_input(input_name)

if step_input is None:
step_input = WorkflowStepInput(self)
step_input.name = input_name
return step_input

def add_connection(self, input_name, output_name, output_step, input_subworkflow_step_index=None):
step_input = self.get_or_add_input(input_name)

conn = WorkflowStepConnection()
conn.input_step_input = step_input
conn.output_name = output_name
conn.output_step = output_step
if input_subworkflow_step_index is not None:
input_subworkflow_step = self.subworkflow.step_by_index(input_subworkflow_step_index)
conn.input_subworkflow_step = input_subworkflow_step
return conn

@property
def input_connections(self):
connections = [_ for step_input in self.inputs for _ in step_input.connections]
return connections

@property
def unique_workflow_outputs(self):
# Older Galaxy workflows may have multiple WorkflowOutputs
Expand Down Expand Up @@ -4151,7 +4183,7 @@ def copy_to(self, copied_step, step_mapping):
copied_step.position = self.position
copied_step.config = self.config
copied_step.label = self.label
copied_step.input_connections = copy_list(self.input_connections)
copied_step.inputs = copy_list(self.inputs, copied_step)

subworkflow_step_mapping = {}
subworkflow = self.subworkflow
Expand All @@ -4162,8 +4194,7 @@ def copy_to(self, copied_step, step_mapping):
subworkflow_step_mapping[subworkflow_step.id] = copied_subworkflow_step

for old_conn, new_conn in zip(self.input_connections, copied_step.input_connections):
# new_conn.input_step = new_
new_conn.input_step = step_mapping[old_conn.input_step_id]
new_conn.input_step_input = copied_step.get_or_add_input(old_conn.input_name)
new_conn.output_step = step_mapping[old_conn.output_step_id]
if old_conn.input_subworkflow_step_id:
new_conn.input_subworkflow_step = subworkflow_step_mapping[old_conn.input_subworkflow_step_id]
Expand All @@ -4178,6 +4209,30 @@ def log_str(self):
return "WorkflowStep[index=%d,type=%s]" % (self.order_index, self.type)


class WorkflowStepInput(RepresentById):
default_merge_type = None
default_scatter_type = None

def __init__(self, workflow_step):
self.workflow_step = workflow_step
self.name = None
self.default_value = None
self.default_value_set = False
self.merge_type = self.default_merge_type
self.scatter_type = self.default_scatter_type

def copy(self, copied_step):
copied_step_input = WorkflowStepInput(copied_step)
copied_step_input.name = self.name
copied_step_input.default_value = self.default_value
copied_step_input.default_value_set = self.default_value_set
copied_step_input.merge_type = self.merge_type
copied_step_input.scatter_type = self.scatter_type

copied_step_input.connections = copy_list(self.connections)
return copied_step_input


class WorkflowStepConnection(RepresentById):
# Constant used in lieu of output_name and input_name to indicate an
# implicit connection between two steps that is not dependent on a dataset
Expand All @@ -4189,18 +4244,29 @@ class WorkflowStepConnection(RepresentById):
def __init__(self):
self.output_step_id = None
self.output_name = None
self.input_step_id = None
self.input_name = None
self.input_step_input_id = None

@property
def non_data_connection(self):
return (self.output_name == self.input_name == WorkflowStepConnection.NON_DATA_CONNECTION)

@property
def input_name(self):
return self.input_step_input.name

@property
def input_step(self):
return self.input_step_input and self.input_step_input.workflow_step

@property
def input_step_id(self):
input_step = self.input_step
return input_step and input_step.id

def copy(self):
# TODO: handle subworkflow ids...
copied_connection = WorkflowStepConnection()
copied_connection.output_name = self.output_name
copied_connection.input_name = self.input_name
return copied_connection


Expand Down
33 changes: 28 additions & 5 deletions lib/galaxy/model/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,23 @@
# Column( "input_connections", JSONType ),
Column("label", Unicode(255)))


model.WorkflowStepInput.table = Table(
"workflow_step_input", metadata,
Column("id", Integer, primary_key=True),
Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("name", TEXT),
Column("merge_type", TEXT),
Column("scatter_type", TEXT),
Column("value_from", JSONType),
Column("value_from_type", TEXT),
Column("default_value", JSONType),
Column("default_value_set", Boolean, default=False),
Column("runtime_value", Boolean, default=False),
UniqueConstraint("workflow_step_id", "name"),
)


model.WorkflowRequestStepState.table = Table(
"workflow_request_step_states", metadata,
Column("id", Integer, primary_key=True),
Expand Down Expand Up @@ -953,9 +970,8 @@
"workflow_step_connection", metadata,
Column("id", Integer, primary_key=True),
Column("output_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
Column("input_step_input_id", Integer, ForeignKey("workflow_step_input.id"), index=True),
Column("output_name", TEXT),
Column("input_name", TEXT),
Column("input_subworkflow_step_id", Integer, ForeignKey("workflow_step.id"), index=True),
)

Expand Down Expand Up @@ -2218,17 +2234,24 @@ def simple_mapping(model, **kwds):
backref="workflow_steps")
))

mapper(model.WorkflowStepInput, model.WorkflowStepInput.table, properties=dict(
workflow_step=relation(model.WorkflowStep,
backref=backref("inputs", uselist=True),
cascade="all",
primaryjoin=(model.WorkflowStepInput.table.c.workflow_step_id == model.WorkflowStep.table.c.id))
))

mapper(model.WorkflowOutput, model.WorkflowOutput.table, properties=dict(
workflow_step=relation(model.WorkflowStep,
backref='workflow_outputs',
primaryjoin=(model.WorkflowStep.table.c.id == model.WorkflowOutput.table.c.workflow_step_id))
))

mapper(model.WorkflowStepConnection, model.WorkflowStepConnection.table, properties=dict(
input_step=relation(model.WorkflowStep,
backref="input_connections",
input_step_input=relation(model.WorkflowStepInput,
backref="connections",
cascade="all",
primaryjoin=(model.WorkflowStepConnection.table.c.input_step_id == model.WorkflowStep.table.c.id)),
primaryjoin=(model.WorkflowStepConnection.table.c.input_step_input_id == model.WorkflowStepInput.table.c.id)),
input_subworkflow_step=relation(model.WorkflowStep,
backref=backref("parent_workflow_input_connections", uselist=True),
primaryjoin=(model.WorkflowStepConnection.table.c.input_subworkflow_step_id == model.WorkflowStep.table.c.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ def upgrade(migrate_engine):
for table in tables.values():
__create(table)

def nextval(table, col='id'):
if migrate_engine.name in ['postgres', 'postgresql']:
return "nextval('%s_%s_seq')" % (table, col)
elif migrate_engine.name in ['mysql', 'sqlite']:
return "null"
else:
raise Exception("Unhandled database type")

# Set default for creation to scheduled, actual mapping has new as default.
workflow_invocation_step_state_column = Column("state", TrimmedString(64), default="scheduled")
if migrate_engine.name in ['postgres', 'postgresql']:
Expand Down
Loading

0 comments on commit f968545

Please sign in to comment.