Skip to content

Commit

Permalink
Copy workflow objects when importing them.
Browse files Browse the repository at this point in the history
If not anywhere workflow.stored_workflow is checked will be broken.
  • Loading branch information
jmchilton committed Jan 11, 2016
1 parent 24cb97c commit 66c9412
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 55 deletions.
73 changes: 72 additions & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3561,6 +3561,28 @@ def top_level_stored_workflow( self ):
"""
return self.top_level_workflow.stored_workflow

def copy(self):
""" Copy a workflow (without user information) for a new
StoredWorkflow object.
"""
copied_workflow = Workflow()
copied_workflow.name = self.name
copied_workflow.has_cycles = self.has_cycles
copied_workflow.has_errors = self.has_errors

# Map old step ids to new steps
step_mapping = {}
copied_steps = []
for step in self.steps:
copied_step = WorkflowStep()
copied_steps.append(copied_step)
step_mapping[step.id] = copied_step

for old_step, new_step in zip(self.steps, copied_steps):
old_step.copy_to(new_step, step_mapping)
copied_workflow.steps = copied_steps
return copied_workflow

def log_str(self):
extra = ""
if self.stored_workflow:
Expand All @@ -3579,8 +3601,9 @@ def __init__( self ):
self.position = None
self.input_connections = []
self.config = None
self.label = None
self.uuid = uuid4()
self.worklfow_outputs = []
self.workflow_outputs = []
self._input_connections_by_name = None

@property
Expand Down Expand Up @@ -3630,6 +3653,34 @@ def workflow_output_for(self, output_name):
break
return target_output

def copy_to(self, copied_step, step_mapping):
copied_step.order_index = self.order_index
copied_step.type = self.type
copied_step.tool_id = self.tool_id
copied_step.tool_inputs = self.tool_inputs
copied_step.tool_errors = self.tool_errors
copied_step.position = self.position
copied_step.config = self.config
copied_step.label = self.label
copied_step.input_connections = copy_list(self.input_connections)

subworkflow_step_mapping = {}
subworkflow = self.subworkflow
if subworkflow:
copied_subworkflow = subworkflow.copy()
copied_step.subworkflow = copied_subworkflow
for subworkflow_step, copied_subworkflow_step in zip(subworkflow.steps, copied_subworkflow.steps):
subworkflow_step_mapping[subworkflow_step.id] = copied_subworkflow_step

for old_conn, new_conn in zip(self.input_connections, copied_step.input_connections):
# new_conn.input_step = new_
new_conn.input_step = step_mapping[old_conn.input_step_id]
new_conn.output_step = step_mapping[old_conn.output_step_id]
if old_conn.input_subworkflow_step_id:
new_conn.input_subworkflow_step = subworkflow_step_mapping[old_conn.input_subworkflow_step_id]

copied_step.workflow_outputs = copy_list(self.workflow_outputs, copied_step)

def log_str(self):
return "WorkflowStep[index=%d,type=%s]" % (self.order_index, self.type)

Expand Down Expand Up @@ -3657,6 +3708,13 @@ def non_data_connection(self):
return (self.output_name == WorkflowStepConnection.NON_DATA_CONNECTION and
self.input_name == WorkflowStepConnection.NON_DATA_CONNECTION)

def copy(self):
# TODO: handle subworkflow ids...
copied_connection = WorkflowStepConnection()
copied_connection.output_name = self.output_name
copied_connection.input_name = self.input_name
return copied_connection


class WorkflowOutput(object):

Expand All @@ -3669,6 +3727,12 @@ def __init__( self, workflow_step, output_name=None, label=None, uuid=None):
else:
self.uuid = UUID(str(uuid))

def copy(self, copied_step):
copied_output = WorkflowOutput(copied_step)
copied_output.output_name = self.output_name
copied_output.label = self.label
return copied_output


class StoredWorkflowUserShareAssociation( object ):

Expand Down Expand Up @@ -5021,3 +5085,10 @@ def __init__( self, id=None, user_id=None, key=None):
self.id = id
self.user_id = user_id
self.key = key


def copy_list(lst, *args, **kwds):
if lst is None:
return lst
else:
return list(map(lambda el: el.copy(*args, **kwds), lst))
4 changes: 3 additions & 1 deletion lib/galaxy/web/base/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,9 @@ def _import_shared_workflow( self, trans, stored):
# Copy workflow.
imported_stored = model.StoredWorkflow()
imported_stored.name = "imported: " + stored.name
imported_stored.latest_workflow = stored.latest_workflow
workflow = stored.latest_workflow.copy()
workflow.stored_workflow = imported_stored
imported_stored.latest_workflow = workflow
imported_stored.user = trans.user
# Save new workflow.
session = trans.sa_session
Expand Down
117 changes: 68 additions & 49 deletions test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,52 @@
from requests import delete
from requests import put

SIMPLE_NESTED_WORKFLOW_YAML = """
class: GalaxyWorkflow
inputs:
- id: outer_input
steps:
- tool_id: cat1
label: first_cat
state:
input1:
$link: outer_input
- run:
class: GalaxyWorkflow
inputs:
- id: inner_input
outputs:
- id: workflow_output
source: random_lines#out_file1
steps:
- tool_id: random_lines1
label: random_lines
state:
num_lines: 1
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
__current_case__: 1
label: nested_workflow
connect:
inner_input: first_cat#out_file1
- tool_id: cat1
label: second_cat
state:
input1:
$link: nested_workflow#workflow_output
queries:
- input2:
$link: nested_workflow#workflow_output
test_data:
outer_input:
value: 1.bed
type: File
"""


class BaseWorkflowsApiTestCase( api.ApiTestCase, ImporterGalaxyInterface ):
# TODO: Find a new file for this class.
Expand All @@ -40,17 +86,18 @@ def _workflow_names( self ):
return names

# Import importer interface...
def import_workflow(self, workflow):
def import_workflow(self, workflow, **kwds):
workflow_str = dumps(workflow, indent=4)
data = {
'workflow': workflow_str
'workflow': workflow_str,
}
data.update(**kwds)
upload_response = self._post( "workflows", data=data )
self._assert_status_code_is( upload_response, 200 )
return upload_response.json()

def _upload_yaml_workflow(self, has_yaml, source_type=None):
workflow = convert_and_import_workflow(has_yaml, galaxy_interface=self, source_type=source_type)
def _upload_yaml_workflow(self, has_yaml, **kwds):
workflow = convert_and_import_workflow(has_yaml, galaxy_interface=self, **kwds)
return workflow[ "id" ]

def _setup_workflow_run( self, workflow, inputs_by='step_id', history_id=None ):
Expand Down Expand Up @@ -391,6 +438,22 @@ def test_import_annotations( self ):
step_annotations = set(map(lambda step: step["annotation"], imported_workflow["steps"].values()))
assert "input1 description" in step_annotations

def test_import_subworkflows( self ):
def get_subworkflow_content_id(workflow_id):
workflow_contents = self._download_workflow(workflow_id, style="editor")
steps = workflow_contents['steps']
subworkflow_step = filter(lambda s: s["type"] == "subworkflow", steps.values())[0]
return subworkflow_step['content_id']

workflow_id = self._upload_yaml_workflow(SIMPLE_NESTED_WORKFLOW_YAML, publish=True)
subworkflow_content_id = get_subworkflow_content_id(workflow_id)
with self._different_user():
other_import_response = self.__import_workflow( workflow_id )
self._assert_status_code_is( other_import_response, 200 )
imported_workflow_id = other_import_response.json()["id"]
imported_subworkflow_content_id = get_subworkflow_content_id(imported_workflow_id)
assert subworkflow_content_id != imported_subworkflow_content_id

def test_not_importable_prevents_import( self ):
workflow_id = self.workflow_populator.simple_workflow( "test_not_importportable" )
with self._different_user():
Expand Down Expand Up @@ -680,51 +743,7 @@ def test_workflow_run_dynamic_output_collections_2( self ):

def test_run_subworkflow_simple( self ):
history_id = self.dataset_populator.new_history()
self._run_jobs("""
class: GalaxyWorkflow
inputs:
- id: outer_input
steps:
- tool_id: cat1
label: first_cat
state:
input1:
$link: outer_input
- run:
class: GalaxyWorkflow
inputs:
- id: inner_input
outputs:
- id: workflow_output
source: random_lines#out_file1
steps:
- tool_id: random_lines1
label: random_lines
state:
num_lines: 1
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
__current_case__: 1
label: nested_workflow
connect:
inner_input: first_cat#out_file1
- tool_id: cat1
label: second_cat
state:
input1:
$link: nested_workflow#workflow_output
queries:
- input2:
$link: nested_workflow#workflow_output
test_data:
outer_input:
value: 1.bed
type: File
""", history_id=history_id)
self._run_jobs(SIMPLE_NESTED_WORKFLOW_YAML, history_id=history_id)

content = self.dataset_populator.get_history_dataset_content( history_id )
self.assertEquals("chr5\t131424298\t131424460\tCCDS4149.1_cds_0_0_chr5_131424299_f\t0\t+\nchr5\t131424298\t131424460\tCCDS4149.1_cds_0_0_chr5_131424299_f\t0\t+\n", content)
Expand Down
7 changes: 4 additions & 3 deletions test/api/workflows_format_2/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class ImporterGalaxyInterface(object):
"""

@abc.abstractmethod
def import_workflow(self, workflow):
def import_workflow(self, workflow, **kwds):
""" Import a workflow via POST /api/workflows or
comparable interface into Galaxy.
"""
Expand Down Expand Up @@ -59,10 +59,11 @@ def __init__(self, **kwds):
self._admin_gi = admin_gi
self._user_gi = user_gi

def import_workflow(self, workflow):
def import_workflow(self, workflow, **kwds):
workflow_str = json.dumps(workflow, indent=4)
return self._user_gi.workflows.import_workflow_json(
workflow_str
workflow_str,
**kwds
)

def import_tool(self, tool_representation):
Expand Down
6 changes: 5 additions & 1 deletion test/api/workflows_format_2/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ def convert_and_import_workflow(has_workflow, **kwds):
else:
workflow = yaml_to_workflow(has_workflow, galaxy_interface, workflow_directory)

return galaxy_interface.import_workflow(workflow)
publish = kwds.get("publish", False)
import_kwds = {}
if publish:
import_kwds["publish"] = True
return galaxy_interface.import_workflow(workflow, **import_kwds)

__all__ = [
'convert_and_import_workflow',
Expand Down

0 comments on commit 66c9412

Please sign in to comment.