Skip to content

Commit

Permalink
Merge pull request #1735 from jmchilton/more_wf_loading_dup
Browse files Browse the repository at this point in the history
Refactor workflow loading.
  • Loading branch information
mvdbeek committed Feb 29, 2016
2 parents 14f5826 + 92d978d commit 1019f7b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 79 deletions.
134 changes: 65 additions & 69 deletions lib/galaxy/managers/workflows.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import

from six import string_types

from collections import namedtuple
import logging
import json
Expand Down Expand Up @@ -191,50 +193,18 @@ def build_workflow_from_dict(
):
# Put parameters in workflow mode
trans.workflow_building_mode = True
# Create new workflow from incoming dict
workflow = model.Workflow()
# If there's a source, put it in the workflow name.
if source:
name = "%s (imported from %s)" % ( data['name'], source )
else:
name = data['name']
workflow.name = name
workflow, missing_tool_tups = self._workflow_from_dict(
trans,
data,
name=name,
)
if 'uuid' in data:
workflow.uuid = data['uuid']
# Assume no errors until we find a step that has some
workflow.has_errors = False
# Create each step
steps = []
# The editor will provide ids for each step that we don't need to save,
# but do need to use to make connections
steps_by_external_id = {}
# Keep track of tools required by the workflow that are not available in
# the local Galaxy instance. Each tuple in the list of missing_tool_tups
# will be ( tool_id, tool_name, tool_version ).
missing_tool_tups = []
for step_dict in self.__walk_step_dicts( data ):
module, step = self.__track_module_from_dict( trans, steps, steps_by_external_id, step_dict, secure=False )
if module.type == 'tool' and module.tool is None:
# A required tool is not available in the local Galaxy instance.
if 'content_id' in step_dict:
tool_id = step_dict[ 'content_id' ]
else:
# Support legacy workflows... (created pre 16.01)
tool_id = step_dict[ 'tool_id' ]
missing_tool_tup = ( tool_id, step_dict[ 'name' ], step_dict[ 'tool_version' ])
if missing_tool_tup not in missing_tool_tups:
missing_tool_tups.append( missing_tool_tup )
# Save the entire step_dict in the unused config field, be parsed later
# when we do have the tool
step.config = json.dumps(step_dict)
if step.tool_errors:
workflow.has_errors = True

# Second pass to deal with connections between steps
self.__connect_workflow_steps( steps, steps_by_external_id )

# Order the steps if possible
attach_ordered_steps( workflow, steps )

if create_stored_workflow:
# Connect up
Expand Down Expand Up @@ -271,58 +241,84 @@ def build_workflow_from_dict(
missing_tools=missing_tool_tups
)

def update_workflow_from_dict(self, trans, stored_workflow, workflow_data, from_editor=False):
def update_workflow_from_dict(self, trans, stored_workflow, workflow_data):
# Put parameters in workflow mode
trans.workflow_building_mode = True
# Convert incoming workflow data from json if coming from editor
data = json.loads(workflow_data) if from_editor else workflow_data
# Create new workflow from incoming data

workflow, missing_tool_tups = self._workflow_from_dict(
trans,
workflow_data,
name=stored_workflow.name,
)

if missing_tool_tups:
errors = []
for missing_tool_tup in missing_tool_tups:
errors.append("Step %s requires tool '%s'." % (missing_tool_tup[3], missing_tool_tup[0]))
raise MissingToolsException(workflow, errors)

# Connect up
workflow.stored_workflow = stored_workflow
stored_workflow.latest_workflow = workflow
# Persist
trans.sa_session.flush()
# Return something informative
errors = []
if workflow.has_errors:
errors.append( "Some steps in this workflow have validation errors" )
if workflow.has_cycles:
errors.append( "This workflow contains cycles" )
return workflow, errors

def _workflow_from_dict(self, trans, data, name):
if isinstance(data, string_types):
# If coming from the editor...
data = json.loads(data)

# Create new workflow from source data
workflow = model.Workflow()
# Just keep the last name (user can rename later)
workflow.name = stored_workflow.name

workflow.name = name

# Assume no errors until we find a step that has some
workflow.has_errors = False
# Create each step
steps = []
# The editor will provide ids for each step that we don't need to save,
# but do need to use to make connections
steps_by_external_id = {}
errors = []
for key, step_dict in data['steps'].iteritems():
is_tool = is_tool_module_type( step_dict[ 'type' ] )
tool_id = step_dict.get('content_id', step_dict.get('tool_id', None))
if is_tool and tool_id is None:
raise exceptions.RequestParameterInvalidException("No tool_id could be located for for step [%s]" % step_dict)
if is_tool and not trans.app.toolbox.has_tool( tool_id, exact=True ):
errors.append("Step %s requires tool '%s'." % (step_dict['id'], step_dict['tool_id']))
if errors:
raise MissingToolsException(workflow, errors)

# First pass to build step objects and populate basic values
# Keep track of tools required by the workflow that are not available in
# the local Galaxy instance. Each tuple in the list of missing_tool_tups
# will be ( tool_id, tool_name, tool_version ).
missing_tool_tups = []

for step_dict in self.__walk_step_dicts( data ):
module, step = self.__track_module_from_dict( trans, steps, steps_by_external_id, step_dict, secure=from_editor )
module, step = self.__track_module_from_dict( trans, steps, steps_by_external_id, step_dict, secure=False )
is_tool = is_tool_module_type( module.type )
if is_tool and module.tool is None:
# A required tool is not available in the local Galaxy instance.
tool_id = step_dict.get('content_id', step_dict.get('tool_id', None))
assert tool_id is not None # Threw an exception elsewhere if not

missing_tool_tup = ( tool_id, step_dict[ 'name' ], step_dict[ 'tool_version' ], step_dict[ 'id'] )
if missing_tool_tup not in missing_tool_tups:
missing_tool_tups.append( missing_tool_tup )

# Save the entire step_dict in the unused config field, be parsed later
# when we do have the tool
step.config = json.dumps(step_dict)

if step.tool_errors:
# DBTODO Check for conditional inputs here.
workflow.has_errors = True

# Second pass to deal with connections between steps
self.__connect_workflow_steps( steps, steps_by_external_id )

# Order the steps if possible
attach_ordered_steps( workflow, steps )
# Connect up
workflow.stored_workflow = stored_workflow
stored_workflow.latest_workflow = workflow
# Persist
trans.sa_session.flush()
# Return something informative
errors = []
if workflow.has_errors:
errors.append( "Some steps in this workflow have validation errors" )
if workflow.has_cycles:
errors.append( "This workflow contains cycles" )
return workflow, errors

return workflow, missing_tool_tups

def workflow_to_dict( self, trans, stored, style="export" ):
""" Export the workflow contents to a dictionary ready for JSON-ification and to be
Expand Down Expand Up @@ -874,7 +870,7 @@ def __connect_workflow_steps( self, steps, steps_by_external_id ):
del step.temp_input_connections


class MissingToolsException(object):
class MissingToolsException(exceptions.MessageException):

def __init__(self, workflow, errors):
self.workflow = workflow
Expand Down
1 change: 0 additions & 1 deletion lib/galaxy/webapps/galaxy/controllers/workflow.py
Expand Up @@ -693,7 +693,6 @@ def save_workflow( self, trans, id, workflow_data ):
trans,
stored,
workflow_data,
from_editor=True,
)
except workflows.MissingToolsException as e:
return dict(
Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/workflow/modules.py
Expand Up @@ -819,6 +819,8 @@ def from_dict( Class, trans, d, secure=True ):
tool_id = d.get( 'content_id', None )
if tool_id is None:
tool_id = d.get( 'tool_id', None ) # Older workflows will have exported this as tool_id.
if tool_id is None:
raise exceptions.RequestParameterInvalidException("No content id could be located for for step [%s]" % d)
tool_version = str( d.get( 'tool_version', None ) )
module = Class( trans, tool_id, tool_version=tool_version )
module.state = galaxy.tools.DefaultToolState()
Expand Down
47 changes: 38 additions & 9 deletions test/api/test_workflows.py
Expand Up @@ -331,7 +331,7 @@ def test_upload( self ):
def test_upload_deprecated( self ):
self.__test_upload( use_deprecated_route=True )

def __test_upload( self, use_deprecated_route=False, name="test_import", workflow=None ):
def __test_upload( self, use_deprecated_route=False, name="test_import", workflow=None, assert_ok=True ):
if workflow is None:
workflow = self.workflow_populator.load_workflow( name=name )
data = dict(
Expand All @@ -342,8 +342,9 @@ def __test_upload( self, use_deprecated_route=False, name="test_import", workflo
else:
route = "workflows"
upload_response = self._post( route, data=data )
self._assert_status_code_is( upload_response, 200 )
self._assert_user_has_workflow_with_name( "%s (imported from API)" % name )
if assert_ok:
self._assert_status_code_is( upload_response, 200 )
self._assert_user_has_workflow_with_name( "%s (imported from API)" % name )
return upload_response

def test_update( self ):
Expand All @@ -370,12 +371,7 @@ def check_label_and_uuid(order_index, step_dict):
workflow_id = upload_response.json()["id"]

def update(workflow_object):
data = dict(
workflow=workflow_object
)
raw_url = 'workflows/%s' % workflow_id
url = self._api_url( raw_url, use_key=True )
put_response = put( url, data=dumps(data) )
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is( put_response, 200 )
return put_response

Expand Down Expand Up @@ -410,6 +406,23 @@ def check_step(step):
# Make sure the positions have been updated.
map(tweak_step, updated_workflow_content['steps'].iteritems())

def test_update_no_tool_id( self ):
workflow_object = self.workflow_populator.load_workflow( name="test_import" )
upload_response = self.__test_upload( workflow=workflow_object )
workflow_id = upload_response.json()["id"]
del workflow_object["steps"]["2"]["tool_id"]
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is( put_response, 400 )

def test_update_missing_tool( self ):
# Create allows missing tools, update doesn't currently...
workflow_object = self.workflow_populator.load_workflow( name="test_import" )
upload_response = self.__test_upload( workflow=workflow_object )
workflow_id = upload_response.json()["id"]
workflow_object["steps"]["2"]["tool_id"] = "cat-not-found"
put_response = self._update_workflow(workflow_id, workflow_object)
self._assert_status_code_is( put_response, 400 )

def test_require_unique_step_uuids( self ):
workflow_dup_uuids = self.workflow_populator.load_workflow( name="test_import" )
uuid0 = str(uuid4())
Expand Down Expand Up @@ -534,6 +547,13 @@ def test_import_missing_tool( self ):
missing_tool_steps = filter(lambda v: v['tool_id'] == 'cat_missing_tool', steps.values())
assert len(missing_tool_steps) == 1

def test_import_no_tool_id( self ):
# Import works with missing tools, but not with absent content/tool id.
workflow = self.workflow_populator.load_workflow_from_resource( name="test_workflow_missing_tool" )
del workflow["steps"]["2"]["tool_id"]
create_response = self.__test_upload(workflow=workflow, assert_ok=False)
self._assert_status_code_is( create_response, 400 )

def test_import_export_with_runtime_inputs( self ):
workflow = self.workflow_populator.load_workflow_from_resource( name="test_workflow_with_runtime_input" )
workflow_id = self.workflow_populator.create_workflow( workflow )
Expand Down Expand Up @@ -1427,6 +1447,15 @@ def test_invocation_usage( self ):
self._assert_status_code_is( step_response, 200 )
self._assert_has_keys( step_response.json(), "id", "order_index" )

def _update_workflow(self, workflow_id, workflow_object):
data = dict(
workflow=workflow_object
)
raw_url = 'workflows/%s' % workflow_id
url = self._api_url( raw_url, use_key=True )
put_response = put( url, data=dumps(data) )
return put_response

def _invocation_step_details( self, workflow_id, invocation_id, step_id ):
invocation_step_response = self._get( "workflows/%s/usage/%s/steps/%s" % ( workflow_id, invocation_id, step_id ) )
self._assert_status_code_is( invocation_step_response, 200 )
Expand Down

0 comments on commit 1019f7b

Please sign in to comment.