Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Export CWL-abstract workflow representation #9407

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
148 changes: 144 additions & 4 deletions lib/galaxy/managers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
visit_input_values
)
from galaxy.tools.parameters.basic import (
ConnectedValue,
DataCollectionToolParameter,
DataToolParameter,
RuntimeValue,
Expand Down Expand Up @@ -481,14 +482,17 @@ def to_format_2(wf_dict, **kwds):
elif style == "format2_wrapped_yaml":
wf_dict = self._workflow_to_dict_export(trans, stored, workflow=workflow)
wf_dict = to_format_2(wf_dict, json_wrapper=True)
elif style == "cwl_abstract":
wf_dict = self._workflow_to_dict_cwl_abstract(trans, stored, workflow=workflow)
elif style == "ga":
wf_dict = self._workflow_to_dict_export(trans, stored, workflow=workflow)
else:
raise exceptions.RequestParameterInvalidException('Unknown workflow style [%s]' % style)
if version is not None:
wf_dict['version'] = version
else:
wf_dict['version'] = len(stored.workflows) - 1
if style != "cwl_abstract":
if version is not None:
wf_dict['version'] = version
else:
wf_dict['version'] = len(stored.workflows) - 1
return wf_dict

def _sync_stored_workflow(self, trans, stored_workflow):
Expand Down Expand Up @@ -978,6 +982,142 @@ def callback(input, prefixed_name, **kwargs):
data['steps'][step.order_index] = step_dict
return data

def _workflow_to_dict_cwl_abstract(self, trans, stored=None, workflow=None):
""" Return a cwl-abstract workflow schema in a dictionary ready for YAMLification and export.
"""
annotation_str = ""
if stored is not None:
annotation_str = self.get_item_annotation_str(trans.sa_session, trans.user, stored) or ''
tag_str = stored.make_tag_string_list()
# Pack workflow data into a dictionary and return
data = {}
data['class'] = 'Workflow'
data['cwlVersion'] = "v1.2.0-dev1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data['cwlVersion'] = "v1.2.0-dev1"
data['cwlVersion'] = "v1.2"

if annotation_str != '':
data['doc'] = workflow.name + ': ' + annotation_str
else:
data['doc'] = workflow.name
# how can I make use of tags in the cwl-abstract?
# data['tags'] = tag_str
data['steps'] = {}
global_input_dicts = {}
#global outputs
workflow_outputs_dicts = {}
for step in workflow.steps:
# Load from database representation
module = module_factory.from_workflow_step(trans, step)
if not module:
raise exceptions.MessageException('Unrecognized step type: %s' % step.type)
# Get user annotation.
annotation_str = self.get_item_annotation_str(trans.sa_session, trans.user, step) or ''
content_id = module.get_content_id()
# Export differences for backward compatibility
tool_state = module.get_export_state()
input_dicts = {}
# Step info
step_dict = {'out': []}
step_run_dict = {
'class' : 'Operation',
'doc' : '',
'id' : content_id,
}
if module.type == 'tool':
if module.tool and module.tool.tool_shed:
step_run_dict["tool_shed_repository"] = {
'name': module.tool.repository_name,
'owner': module.tool.repository_owner,
'changeset_revision': module.tool.changeset_revision,
'tool_shed': module.tool.tool_shed
}
tool_representation = None
dynamic_tool = step.dynamic_tool
if dynamic_tool:
tool_representation = dynamic_tool.value
step_dict['tool_representation'] = tool_representation
if util.is_uuid(step_dict['content_id']):
step_dict['content_id'] = None
step_dict['tool_id'] = None

# how to handle subworkflows?
step_state = module.state.inputs or {}
if module.type not in ['data_input', 'data_collection_input']:
step_run_dict['doc'] = module.get_name()
step_inputs = {}
inp_connections = step.input_connections
connection_num = 1
for conn in inp_connections:
# the combination of input_step.uuid and input_name should be unique
# the combination of output_step.uuid and output_name should be unique
# should i use input_id = str(conn.input_step.uuid) + '_' + conn.input_name
source_id = str(conn.output_step.uuid) + '/'+ conn.output_name
connection_num += 1
label = conn.input_name
# the input is not connected to a previous tool (data_input is not considered a step)
if conn.output_step.type in ['data_input', 'data_collection_input']:
format_description = 'data' if conn.output_step.type == 'data_input' else 'collection'
label = "Input file(s) for tool " + module.get_name()
input_dict = {
'format': format_description,
'type': 'File',
'name': label
}
source_id = str(conn.output_step.uuid) + '_' + conn.output_name # the data_input id is used to identify the file
if source_id not in global_input_dicts:
global_input_dicts[source_id] = input_dict
step_inputs[conn.input_name] = source_id
## case when input is not connected from a previous step. Assume it is a value and not a File?
input_dicts[conn.input_name] = {'name': label,
'doc': 'Input value for tool %s' % module.get_name(),
'type': 'Any'
}
for name, val in step_state.items():
input_type = type(val)
if input_type != ConnectedValue and module.type != 'data_input':
if name not in step_inputs: #not a connected value
source_id = str(step.uuid) + '_' + str(name) #global_input_id
if input_type == RuntimeValue:
input_dicts[name] = {'name': name,
'type': 'Any',
'doc': 'runtime parameter for tool %s' % module.get_name()
}
else:
step_inputs_dict = {'name': name,
'default': val,
'type': 'Any',
'doc': 'runtime parameter for tool %s' % module.get_name()
}
input_dicts[name] = step_inputs_dict
global_input_dicts[source_id] = step_inputs_dict
step_inputs[name] = source_id # connect operation input name with source

step_dict['in'] = step_inputs
step_run_dict['inputs'] = input_dicts
# Global wf outputs and step wf output list
for workflow_output in step.unique_workflow_outputs:
# do not list the User's inputs as wf outputs
if module.type not in ['data_input', 'data_collection_input']:
workflow_output_dict = {'label': workflow_output.label,
'name': workflow_output.output_name,
'outputSource': str(step.uuid) + '/' + workflow_output.output_name,
'type': 'Any'
}
workflow_outputs_dicts[str(workflow_output.uuid)] = workflow_output_dict
step_dict['out'].append(workflow_output.output_name)

# All step outputs
step_outputs_dict = {}
if type(module) is ToolModule:
for output in module.get_data_outputs():
step_outputs_dict[output['name']] = {'type': 'File' } # can get more info from output['extensions'][0] ?
step_run_dict['outputs'] = step_outputs_dict
if module.type not in ['data_input', 'data_collection_input']:
step_dict['run'] = step_run_dict
data['steps'][str(step.uuid)] = step_dict
# add the global input and outputs to the return value
data['outputs'] = workflow_outputs_dicts
data['inputs'] = global_input_dicts
return data

def _workflow_to_dict_instance(self, stored, workflow, legacy=True):
encode = self.app.security.encode_id
sa_session = self.app.model.context
Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/webapps/galaxy/api/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import os

import yaml
import requests
from gxformat2._yaml import ordered_dump
from markupsafe import escape
Expand Down Expand Up @@ -479,6 +480,10 @@ def workflow_dict(self, trans, workflow_id, **kwd):

if style == "format2" and download_format != 'json-download':
return ordered_dump(ret_dict)
elif style == "cwl_abstract":
noalias_dumper = yaml.dumper.SafeDumper
noalias_dumper.ignore_aliases = lambda self, data: True
return yaml.dump(ret_dict, default_flow_style=False, Dumper=noalias_dumper)
else:
return format_return_as_json(ret_dict, pretty=True)

Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy_test/api/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ def test_export_format2(self):
downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="format2")
assert downloaded_workflow["class"] == "GalaxyWorkflow"

def test_export_cwl_abstract(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export_format2") #should i use a different test wf
downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="cwl_abstract")
assert downloaded_workflow["class"] == "Workflow"

def test_export_editor(self):
uploaded_workflow_id = self.workflow_populator.simple_workflow("test_for_export")
downloaded_workflow = self._download_workflow(uploaded_workflow_id, style="editor")
Expand Down