Skip to content

Commit

Permalink
Merge c3cf8ab into 66c5d85
Browse files Browse the repository at this point in the history
  • Loading branch information
giannisdoukas committed May 26, 2020
2 parents 66c5d85 + c3cf8ab commit 4ee7b35
Show file tree
Hide file tree
Showing 51 changed files with 2,401 additions and 533 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,8 @@ tests/*.txt
*.txt
tmp*.json

!examples/*.ipynb
!examples/*.ipynb

workflow-examples/*
/.python-version
/tmp
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ before_install:
install:
- pip install -r requirements.txt
- pip install -r test-requirements.txt
- python setup.py install
# command to run tests
script:
- pycodestyle --max-line-length=119 $(find cwlkernel -name '*.py')
- coverage run --source cwlkernel --omit cwlkernel/__main__.py -m unittest discover tests
after_script:
- coveralls
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ In examples directory there are many examples which illustrate how to use the ke
## How to contribute
If you are a CWL developer and you would like to contribute feel free to open an issue and ask for new features you
would like to see.

41 changes: 41 additions & 0 deletions cwlkernel/CWLBuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from abc import ABC, abstractmethod
import yaml
from cwlkernel.cwlrepository.CWLComponent import WorkflowComponent, CWLTool, CWLWorkflow
from io import StringIO


class CWLBuilder(ABC):
@abstractmethod
def build(self) -> WorkflowComponent:
pass


class CWLSnippetBuilder(CWLBuilder):
_code: str

def __init__(self):
self._code = ""

def append(self, code: str, indent: int = 0) -> None:
code = '\n'. \
join([''.join([' ' for _ in range(indent)]) + line
for line in code.splitlines()])
if self._code == "":
self._code = str(code)
else:
self._code += '\n' + str(code)

def get_current_code(self) -> str:
return self._code

def build(self) -> WorkflowComponent:
code = yaml.load(StringIO(self._code), yaml.Loader)
if 'id' not in code:
raise ValueError("the workflow must contain an id")
if code['class'] == 'CommandLineTool':
return CWLTool(code.get('id'), code)
elif code['class'] == 'Workflow':
return CWLWorkflow(code.get('id'), code)

def clear(self):
self._code = ""
6 changes: 2 additions & 4 deletions cwlkernel/CWLExecuteConfigurator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
from typing import Dict, Tuple, Callable
import os


# TODO: use tempfile for windows compatibility

class CWLExecuteConfigurator:

CWLKERNEL_MODE: str
CWLKERNEL_BOOT_DIRECTORY: str

# property "Name of the property": ("default", validator)
properties: Dict[str, Tuple[str, Callable]] = {
'CWLKERNEL_MODE': ('SIMPLE', lambda value: value.upper() in {'SIMPLE'}), # no case sensitive
'CWLKERNEL_MODE': ('SIMPLE', lambda value: value.upper() in {'SIMPLE'}), # no case sensitive
'CWLKERNEL_BOOT_DIRECTORY': ('/tmp/CWLKERNEL_DATA', lambda value: True)
}



def __init__(self):
for property, (default_value, validator) in self.properties.items():
value = os.environ.get(property, default_value)
Expand Down
200 changes: 153 additions & 47 deletions cwlkernel/CWLKernel.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from io import StringIO

import logging
import os
from typing import List, Dict, Optional, Tuple, Union

import re
from ipykernel.kernelbase import Kernel
from ruamel import yaml
from ruamel.yaml import YAML

from cwlkernel.CWLBuilder import CWLSnippetBuilder
from cwlkernel.CWLLogger import CWLLogger
from .CWLExecuteConfigurator import CWLExecuteConfigurator
from .CoreExecutor import CoreExecutor
from .IOManager import IOFileManager
from .cwlrepository.cwlrepository import WorkflowRepository
from .cwlrepository.CWLComponent import WorkflowComponentFactory, WorkflowComponent, CWLWorkflow

logger = logging.Logger('CWLKernel')

Expand All @@ -25,27 +30,31 @@ class CWLKernel(Kernel):
}
banner = "Common Workflow Language"

_magic_commands = frozenset(['logs', 'data', 'display_data'])
_magic_commands = frozenset(['execute', 'logs', 'data', 'display_data', 'snippet', 'newWorkflow',
'newWorkflowAddStep', 'newWorkflowAddInput', 'newWorkflowBuild'])

def __init__(self, **kwargs):
super().__init__(**kwargs)
conf = CWLExecuteConfigurator()
self._yaml_input_data: List[str] = []
self._yaml_input_data: Optional[str] = None
self._results_manager = IOFileManager(os.sep.join([conf.CWLKERNEL_BOOT_DIRECTORY, 'results']))
runtime_file_manager = IOFileManager(os.sep.join([conf.CWLKERNEL_BOOT_DIRECTORY, 'runtime_data']))
self._cwl_executor = CoreExecutor(runtime_file_manager)
self._pid = (os.getpid(), os.getppid())
self._cwl_logger = CWLLogger(os.path.join(conf.CWLKERNEL_BOOT_DIRECTORY, 'logs'))
self._set_process_ids()
self._cwl_logger.save()
self._workflow_repository = WorkflowRepository()
self._snippet_builder = CWLSnippetBuilder()
self._workflow_composer: Optional[CWLWorkflow] = None

def _set_process_ids(self):
self._cwl_logger.process_id = {
"process_id": os.getpid(),
"parent_process_id": os.getppid()
}

def _code_is_valid_yaml(self, code) -> Optional[Dict]:
def _code_is_valid_yaml(self, code: str) -> Optional[Dict]:
yaml = YAML(typ='safe')
try:
return yaml.load(code)
Expand All @@ -62,57 +71,155 @@ def _is_magic_command(self, code: str) -> bool:

def do_execute(self, code: str, silent=False, store_history: bool = True,
user_expressions=None, allow_stdin: bool = False) -> Dict:
if self._is_magic_command(code):
self._execute_magic_command(code)
return {
'status': 'ok',
# The base class increments the execution count
'execution_count': self.execution_count,
'payload': [],
'user_expressions': {},
}
else:
dict_code = self._code_is_valid_yaml(code)
if dict_code is None:
try:
if self._is_magic_command(code):
self._do_execute_magic_command(code)
return {
'status': 'error',
'status': 'ok',
# The base class increments the execution count
'execution_count': self.execution_count,
'payload': [],
'user_expressions': {},
}
else:
dict_code = self._code_is_valid_yaml(code)
if dict_code is None:
self.send_response(
self.iopub_socket, 'stream',
{'name': 'stderr', 'text': f'Unknown input'}
)
return {
'status': 'error',
# The base class increments the execution count
'execution_count': self.execution_count,
'payload': [],
'user_expressions': {},
}
else:
status, exception = self._do_execute_yaml(dict_code, code)
return {
'status': status,
# The base class increments the execution count
'execution_count': self.execution_count,
'payload': [],
'user_expressions': {},
}
except Exception as e:
import traceback
traceback.print_exc()
self.send_response(
self.iopub_socket, 'stream',
{'name': 'stderr', 'text': f'{type(e).__name__}: {e}'}
)
return {
'status': 'error',
# The base class increments the execution count
'execution_count': self.execution_count,
'payload': [],
'user_expressions': {},
}

def _do_execute_yaml(self, dict_code, code):
exception = None
if not self._is_cwl(dict_code):
exception = self._accumulate_data(code)
raise NotImplementedError()
else:
exception = self._execute_workflow(code)
self._clear_data()
try:
cwl_component = WorkflowComponentFactory().get_workflow_component(code)
self._workflow_repository.register_tool(cwl_component)
self.send_response(
self.iopub_socket, 'stream',
{'name': 'stdout', 'text': f"tool '{cwl_component.id}' registered"}
)
except Exception as e:
exception = e

status = 'ok' if exception is None else 'error'
if exception is not None:
self.send_response(
self.iopub_socket, 'stream',
{'name': 'stderr', 'text': f'{type(exception).__name__}: {exception}'}
)
return {
'status': status,
# The base class increments the execution count
'execution_count': self.execution_count,
'payload': [],
'user_expressions': {},
}
return status, exception

def _do_execute_magic_command(self, commands: str):
for command in re.compile(r'^%[ ]+', re.MULTILINE).split(commands):
command = command.strip()
if command == '':
continue
command = command.split(" ")
command_name = command[0].strip()
args = " ".join(command[1:])
getattr(self, f'_execute_magic_{command_name}')(args)

def _execute_magic_newWorkflowBuild(self, *args):
self._send_json_response(self._workflow_composer.to_dict())
self._workflow_repository.register_tool(self._workflow_composer)
self._workflow_composer = None

def _execute_magic_command(self, command: str):
command = command.split()[1:]
command_name = command[0]
args = command[1:]
getattr(self, f'_execute_magic_{command_name}')(args)
def _execute_magic_newWorkflowAddInput(self, args: str):
import yaml as y
args = args.splitlines()
step_id, step_in_id = args[0].split()
input_description = '\n'.join(args[1:])
input_description = y.load(StringIO(input_description), y.Loader)
self._workflow_composer.add_input(
workflow_input=input_description,
step_id=step_id.strip(),
in_step_id=step_in_id.strip())

def _execute_magic_display_data(self, data_name):
if len(data_name) != 1 or not isinstance(data_name[0], str):
self._send_error_response('ERROR: you must select an output to display. Correct format:\n % display_data [output name]')
def _execute_magic_newWorkflowAddStepIn(self, args: str):
args = args.splitlines()
step_in_args = args[0].split()
input_description = '\n'.join(args[1:])
import yaml as y
input_description = y.load(StringIO(input_description), y.Loader)
for input_id, description in input_description.items():
self._workflow_composer.add_step_in_out(description, input_id, *step_in_args)

def _execute_magic_newWorkflowAddStep(self, ids: str):
tool_id, step_id = ids.split()
tool = self._workflow_repository.get_by_id(tool_id)
self._workflow_composer.add(tool, step_id)

def _execute_magic_newWorkflow(self, id: str):
self._workflow_composer = CWLWorkflow(id)

def _execute_magic_snippet(self, command: str):
command = command.splitlines()
command[0] = command[0].strip()
y = YAML(typ='rt')
if command[0] == "add":
snippet = '\n'.join(command[1:])
self._snippet_builder.append(snippet)
current_code = y.load(StringIO(self._snippet_builder.get_current_code()))
elif command[0] == "build":
snippet = '\n'.join(command[1:])
self._snippet_builder.append(snippet)
workflow = self._snippet_builder.build()
self._workflow_repository.register_tool(workflow)
current_code = y.load(StringIO(self._snippet_builder.get_current_code()))
self._snippet_builder.clear()
else:
raise ValueError()
self._send_json_response(current_code)

def _execute_magic_execute(self, execute_argument_string: str):
execute_argument_string = execute_argument_string.splitlines()
cwl_id = execute_argument_string[0].strip()
cwl_component: WorkflowComponent = self._workflow_repository.get_by_id(cwl_id)
self._set_data('\n'.join(execute_argument_string[1:]))
self._execute_workflow(cwl_component.to_yaml(True))
self._clear_data()

def _execute_magic_display_data(self, data_name: str):
if not isinstance(data_name, str) or len(data_name.split()) == 0:
self._send_error_response(
'ERROR: you must select an output to display. Correct format:\n % display_data [output name]'
)
return
results = list(filter(lambda item: item[1]['id'] == data_name[0], self._results_manager.get_files_registry().items()))
results = list(
filter(lambda item: item[1]['id'] == data_name, self._results_manager.get_files_registry().items()))
if len(results) != 1:
self.send_response(self.iopub_socket, 'stream', {'name': 'stderr', 'text': 'Result not found'})
return
Expand All @@ -124,7 +231,7 @@ def _execute_magic_display_data(self, data_name):
def _send_error_response(self, text):
self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': text})

def _send_json_response(self, json_data: Union[Dict,List]):
def _send_json_response(self, json_data: Union[Dict, List]):
self.send_response(
self.iopub_socket,
'display_data',
Expand Down Expand Up @@ -187,20 +294,19 @@ def _execute_magic_data(self, *args):
}
)

def _accumulate_data(self, code: str) -> Optional[Exception]:
cwl = self._cwl_executor.file_manager.get_files_uri().path
try:
def _set_data(self, code: str) -> Optional[Exception]:
if len(code.split()) > 0:
cwl = self._cwl_executor.file_manager.get_files_uri().path
self._cwl_executor.validate_input_files(yaml.load(code, Loader=yaml.Loader), cwl)
except FileNotFoundError as e:
return e
self._yaml_input_data.append(code)
self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': 'Add data in memory'})
self._yaml_input_data = code
self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': 'Add data in memory'})

def _clear_data(self):
self._yaml_input_data = []
self._yaml_input_data = None

def _execute_workflow(self, code) -> Optional[Exception]:
self._cwl_executor.set_data(self._yaml_input_data)
input_data = [self._yaml_input_data] if self._yaml_input_data is not None else []
self._cwl_executor.set_data(input_data)
self._cwl_executor.set_workflow(code)
logger.debug('starting executing workflow ...')
run_id, results, exception = self._cwl_executor.execute()
Expand All @@ -209,7 +315,7 @@ def _execute_workflow(self, code) -> Optional[Exception]:
for output in results:
if isinstance(results[output], list):
for i, output_i in enumerate(results[output]):
results[output][i]['id'] = f'{output}_{i+1}'
results[output][i]['id'] = f'{output}_{i + 1}'
self._results_manager.append_files(
[results[output][i]['location']],
output_directory_for_that_run,
Expand Down
Loading

0 comments on commit 4ee7b35

Please sign in to comment.