Skip to content

Commit

Permalink
Merge pull request #9 from giannisdoukas/dev
Browse files Browse the repository at this point in the history
Enable linking data between steps
  • Loading branch information
giannisdoukas committed Jul 1, 2020
2 parents 2092f66 + dba30a6 commit 11f5ef7
Show file tree
Hide file tree
Showing 25 changed files with 719 additions and 173 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.idea/

/tmp.*
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
60 changes: 60 additions & 0 deletions cwlkernel/AutoCompleteEngine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import Dict, Iterable, Optional

from pygtrie import CharTrie


class AutoCompleteEngine:
"""
AutoCompleteEngine generates suggestions given a users input.
"""

def __init__(self, magic_commands: Optional[Iterable]):
self._commands_trie = CharTrie()
if magic_commands is not None:
for magic in magic_commands:
self.add_magic_command(magic)

def suggest(self, code: str, cursor_pos: int) -> Dict:
"""
@param code: string contains the current state of the user's input. It could be a CWL file
or magic commands.
@param cursor_pos: current position of cursor
@return: {'matches': ['MATCH1', 'MATCH1'],
'cursor_end': ,
'cursor_start': , }
"""
cursor_end, cursor_start, token = self._parse(code, cursor_pos)
if token == '%':
token = ''
try:
matches = list(set(self._commands_trie.values(prefix=token)))
matches.sort(key=len)
except KeyError:
matches = []
cursor_end = cursor_pos
cursor_start = cursor_pos
return {
'matches': matches,
'cursor_end': cursor_end,
'cursor_start': cursor_start
}

@classmethod
def _parse(cls, code, cursor_pos):
code_length = len(code)
token_ends_at = code.find(" ", cursor_pos)
cursor_end = min(token_ends_at + 1, code_length - 1)
if token_ends_at == -1:
token_ends_at = code_length - 1
cursor_end = code_length
token_starts_at = code.rfind(" ", 0, cursor_pos)
cursor_start = token_starts_at + 1
if token_starts_at == -1:
token_starts_at = 0
cursor_start = cursor_pos
token = code[token_starts_at:token_ends_at + 1].strip().upper()
return cursor_end, cursor_start, token

def add_magic_command(self, magic_command: str):
for i in range(1, len(magic_command) + 1):
self._commands_trie[magic_command[-i:].upper()] = magic_command
4 changes: 3 additions & 1 deletion cwlkernel/CWLExecuteConfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
class CWLExecuteConfigurator:
CWLKERNEL_MODE: str
CWLKERNEL_BOOT_DIRECTORY: str
CWLKERNEL_MAGIC_COMMANDS_DIRECTORY: str

# property "Name of the property": ("default", validator)
properties: Dict[str, Tuple[str, Callable]] = {
'CWLKERNEL_MODE': ('SIMPLE', lambda value: value.upper() in {'SIMPLE'}), # no case sensitive
'CWLKERNEL_BOOT_DIRECTORY': ('/tmp/CWLKERNEL_DATA', lambda value: True)
'CWLKERNEL_BOOT_DIRECTORY': ('/tmp/CWLKERNEL_DATA', lambda value: True),
'CWLKERNEL_MAGIC_COMMANDS_DIRECTORY': (None, lambda value: value is None or os.path.isdir(value))
}

def __init__(self):
Expand Down
82 changes: 62 additions & 20 deletions cwlkernel/CWLKernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,34 @@
import os
import re
import traceback
from io import StringIO
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union, Callable, NoReturn

from ipykernel.kernelbase import Kernel
from ruamel import yaml
from ruamel.yaml import YAML

from .AutoCompleteEngine import AutoCompleteEngine
from .CWLBuilder import CWLSnippetBuilder
from .CWLExecuteConfigurator import CWLExecuteConfigurator
from .CWLLogger import CWLLogger
from .CoreExecutor import CoreExecutor
from .IOManager import IOFileManager
from .IOManager import IOFileManager, ResultsManager
from .cwlrepository.CWLComponent import WorkflowComponentFactory, CWLWorkflow
from .cwlrepository.cwlrepository import WorkflowRepository
from .git.CWLGitResolver import CWLGitResolver

logger = logging.Logger('CWLKernel')
version = "0.0.2"

CONF = CWLExecuteConfigurator()


class CWLKernel(Kernel):
"""Jupyter Notebook kernel for CWL."""
implementation = 'CWLKernel'
implementation_version = '0.1'
language_version = '1.0'
implementation_version = version
language_version = '1.1'
language_info = {
'name': 'yaml',
'mimetype': 'text/x-cwl',
Expand All @@ -35,28 +39,31 @@ class CWLKernel(Kernel):
banner = "Common Workflow Language"

_magic_commands: Dict = {}
_auto_complete_engine = AutoCompleteEngine(_magic_commands)

def __init__(self, **kwargs):
super().__init__(**kwargs)
conf = CWLExecuteConfigurator()
self._yaml_input_data: Optional[str] = None
self._results_manager = IOFileManager(os.sep.join([conf.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'results']))
runtime_file_manager = IOFileManager(os.sep.join([conf.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'runtime_data']))
self._results_manager = ResultsManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'results']))
runtime_file_manager = IOFileManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'runtime_data']))
self._cwl_executor = CoreExecutor(runtime_file_manager)
self._pid = (os.getpid(), os.getppid())
self._cwl_logger = CWLLogger(os.path.join(conf.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'logs'))
self._cwl_logger = CWLLogger(os.path.join(CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'logs'))
self._set_process_ids()
self._cwl_logger.save()
self._workflow_repository = WorkflowRepository(
Path(os.sep.join([conf.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'repo'])))
Path(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'repo'])))
self._snippet_builder = CWLSnippetBuilder()
self._workflow_composer: Optional[CWLWorkflow] = None
self._github_resolver: CWLGitResolver = CWLGitResolver(
Path(os.sep.join([conf.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'git'])))

@staticmethod
def register_magic(magic: Callable):
CWLKernel._magic_commands[magic.__name__] = magic
Path(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'git'])))
if self.log is None: # pylint: disable=access-member-before-definition
self.log = logging.getLogger()

@classmethod
def register_magic(cls, magic: Callable):
cls._magic_commands[magic.__name__] = magic
cls._auto_complete_engine.add_magic_command(magic.__name__)
return magic

def _set_process_ids(self):
Expand Down Expand Up @@ -149,21 +156,47 @@ def _send_json_response(self, json_data: Union[Dict, List]):

def _set_data(self, code: str) -> NoReturn:
if len(code.split()) > 0:
cwl = self._cwl_executor.file_manager.get_files_uri().path
self._cwl_executor.validate_input_files(yaml.load(code, Loader=yaml.Loader), cwl)
self._yaml_input_data = code
cwd = Path(self._cwl_executor.file_manager.get_files_uri().path)
data = self._preprocess_data(yaml.load(code, Loader=yaml.Loader))
self._cwl_executor.validate_input_files(data, cwd)
code_stream = StringIO()
yaml.safe_dump(data, code_stream)
self._yaml_input_data = code_stream.getvalue()
self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': 'Add data in memory'})

def _preprocess_data(self, data: Dict) -> Dict:
"""
On the execution the user can reference the data id of a file instead of the actual path. That function
apply that logic
@param data: the actual data
@return the data after the transformation
"""

has_change = False
for key_id in data:
if isinstance(data[key_id], dict) and \
'class' in data[key_id] and \
data[key_id]['class'] == 'File' and \
'$data' in data[key_id]:
has_change = True
data[key_id]['location'] = self._results_manager.get_last_result_by_id(data[key_id]["$data"])
data[key_id].pop('$data')
if has_change is True:
self.send_text_to_stdout('set data to:\n')
self._send_json_response(data)
return data

def _clear_data(self):
self._yaml_input_data = None

def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
input_data = [self._yaml_input_data] if self._yaml_input_data is not None else []
self._cwl_executor.set_data(input_data)
self._cwl_executor.set_workflow_path(str(code_path))
logger.debug('starting executing workflow ...')
self.log.debug('starting executing workflow ...')
run_id, results, exception = self._cwl_executor.execute()
logger.debug(f'\texecution results: {run_id}, {results}, {exception}')
self.log.debug(f'\texecution results: {run_id}, {results}, {exception}')
output_directory_for_that_run = str(run_id)
for output in results:
if isinstance(results[output], list):
Expand All @@ -185,7 +218,7 @@ def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
)
self._send_json_response(results)
if exception is not None:
logger.debug(f'execution error: {exception}')
self.log.debug(f'execution error: {exception}')
self.send_response(self.iopub_socket, 'stream', {'name': 'stderr', 'text': str(exception)})
return exception

Expand All @@ -199,6 +232,15 @@ def get_pid(self) -> Tuple[int, int]:
""":return: The process id and his parents id."""
return self._pid

def do_complete(self, code: str, cursor_pos: int):
self.log.debug(f"code: {code}\ncursor_pos: {cursor_pos}\ncode[{cursor_pos}]=XXX")
suggestions = self._auto_complete_engine.suggest(code, cursor_pos)
self.log.debug(f'suggestions: {suggestions["matches"]}')
return {**suggestions, 'status': 'ok'}

def send_text_to_stdout(self, text: str):
self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': text})


if __name__ == '__main__':
from ipykernel.kernelapp import IPKernelApp
Expand Down
1 change: 0 additions & 1 deletion cwlkernel/CoreExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def execute(self) -> Tuple[UUID, Dict, Optional[Exception]]:
def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn:
for arg in yaml_input:
if isinstance(yaml_input[arg], dict) and 'class' in yaml_input[arg] and yaml_input[arg]['class'] == 'File':
# TODO: check about path vs location
selector = 'location' if 'location' in yaml_input[arg] else 'path'
file_path = Path(yaml_input[arg][selector])
if not file_path.is_absolute():
Expand Down
17 changes: 17 additions & 0 deletions cwlkernel/IOManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,20 @@ def clear(self):
for f in os.listdir(self.ROOT_DIRECTORY):
if os.path.isfile(f):
os.remove(f)


class ResultsManager(IOFileManager):

def get_last_result_by_id(self, result_id: str) -> Optional[str]:
"""
The results manager may have multiple results with the same id, from multiple executions. That function will
return the path of the last result
@return: the path of last result with the requested id or None
"""
results = sorted(
filter(lambda item: item[1]['id'] == result_id, self.get_files_registry().items()),
key=lambda item: item[1]['result_counter']
)
if len(results) == 0:
return None
return results[-1][0]
3 changes: 2 additions & 1 deletion cwlkernel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# noinspection PyUnresolvedReferences
from . import kernel_magics # NOQA
from .CWLKernel import version

__version__ = "0.0.2"
__version__ = version
Loading

0 comments on commit 11f5ef7

Please sign in to comment.