Skip to content

Commit

Permalink
Merge pull request #14 from giannisdoukas/provenance
Browse files Browse the repository at this point in the history
add execute with provenance
  • Loading branch information
giannisdoukas committed Jul 13, 2020
2 parents 65769d1 + e3526c3 commit 05ec84c
Show file tree
Hide file tree
Showing 8 changed files with 617 additions and 43 deletions.
42 changes: 35 additions & 7 deletions cwlkernel/CWLKernel.py
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import re
import shutil
import traceback
from io import StringIO
from pathlib import Path
Expand All @@ -22,7 +23,7 @@
from .git.CWLGitResolver import CWLGitResolver

version = "0.0.2"

BOOT_DIRECTORY = Path(os.getcwd()).absolute()
CONF = CWLExecuteConfigurator()


Expand All @@ -43,10 +44,12 @@ class CWLKernel(Kernel):

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._session_dir = os.path.join(CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident)
self._boot_directory: Path = BOOT_DIRECTORY
self._yaml_input_data: Optional[str] = None
self._results_manager = ResultsManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'results']))
runtime_file_manager = IOFileManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'runtime_data']))
self._cwl_executor = CoreExecutor(runtime_file_manager)
self._results_manager = ResultsManager(os.path.join(self._session_dir, 'results'))
runtime_file_manager = IOFileManager(os.path.join(self._session_dir, 'runtime_data'))
self._cwl_executor = CoreExecutor(runtime_file_manager, self._boot_directory)
self._pid = (os.getpid(), os.getppid())
self._cwl_logger = CWLLogger(os.path.join(CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'logs'))
self._set_process_ids()
Expand All @@ -60,6 +63,10 @@ def __init__(self, **kwargs):
if self.log is None: # pylint: disable=access-member-before-definition
self.log = logging.getLogger()

@property
def runtime_directory(self) -> Path:
return Path(self._cwl_executor.file_manager.ROOT_DIRECTORY).absolute()

@property
def workflow_repository(self) -> WorkflowRepository:
return self._workflow_repository
Expand Down Expand Up @@ -232,12 +239,12 @@ def _preprocess_data(self, data: Dict) -> Dict:
def _clear_data(self):
self._yaml_input_data = None

def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
def _execute_workflow(self, code_path: Path, provenance: bool = False) -> Optional[Exception]:
input_data = [self._yaml_input_data] if self._yaml_input_data is not None else []
self._cwl_executor.set_data(input_data)
self._cwl_executor.set_workflow_path(str(code_path))
self.log.debug('starting executing workflow ...')
run_id, results, exception = self._cwl_executor.execute()
run_id, results, exception, research_object = self._cwl_executor.execute(provenance)
self.log.debug(f'\texecution results: {run_id}, {results}, {exception}')
output_directory_for_that_run = str(run_id)
for output in results:
Expand All @@ -259,10 +266,27 @@ def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
metadata=results[output]
)
self.send_json_response(results)
if research_object is not None:
self.send_text_to_stdout(f'\nProvenance stored in directory {research_object.folder}')
for path, _, files in os.walk(research_object.folder):
for name in files:
file = os.path.relpath(os.path.join(path, name), self._boot_directory.as_posix())
self.send_response(
self.iopub_socket,
'display_data',
{
'data': {
"text/html": f'<a href="/files/{file}">{file}</a>',
"text/plain": f"{file}"
},
'metadata': {},
},
)
if exception is not None:
self.log.debug(f'execution error: {exception}')
self.send_response(self.iopub_socket, 'stream', {'name': 'stderr', 'text': str(exception)})
return exception
os.chdir(self._boot_directory.as_posix())
return exception

def get_past_results(self) -> List[str]:
return self._results_manager.get_files()
Expand All @@ -283,6 +307,10 @@ def do_complete(self, code: str, cursor_pos: int):
def send_text_to_stdout(self, text: str):
self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': text})

def __del__(self):
shutil.rmtree(self._session_dir, ignore_errors=True)
os.chdir(self._boot_directory.as_posix())


if __name__ == '__main__':
from ipykernel.kernelapp import IPKernelApp
Expand Down
137 changes: 122 additions & 15 deletions cwlkernel/CoreExecutor.py
@@ -1,5 +1,8 @@
import functools
import logging
import os
import sys
import tempfile
import traceback
from pathlib import Path
from subprocess import DEVNULL
Expand All @@ -8,22 +11,80 @@
List,
Optional,
Tuple,
NoReturn)
NoReturn, cast, IO, MutableMapping, MutableSequence)
from uuid import uuid4, UUID

from cwltool.context import RuntimeContext
from cwltool.context import RuntimeContext, LoadingContext
from cwltool.executors import JobExecutor
from cwltool.factory import Factory
from cwltool.load_tool import fetch_document
from cwltool.loghandler import _logger
from cwltool.main import ProvLogFormatter, prov_deps
from cwltool.process import add_sizes
from cwltool.provenance import ResearchObject
from cwltool.stdfsaccess import StdFsAccess
from cwltool.utils import CWLObjectType, visit_class
from ruamel import yaml

from .IOManager import IOFileManager


class JupyterFactory(Factory):

def __init__(self, root_directory: str,
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None, ):
super().__init__(
executor=executor,
loading_context=loading_context,
runtime_context=runtime_context,
)
self.runtime_context.outdir = root_directory
self.runtime_context.basedir = root_directory
self.runtime_context.default_stdin = DEVNULL
self.runtime_context.default_stdout = DEVNULL
self.runtime_context.default_stderr = DEVNULL


class ProvenanceFactory(JupyterFactory):

def __init__(self,
workflow_uri_path: str,
root_directory: str,
stove_provenance_directory: str,
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None) -> None:
"""Easy way to load a CWL document for execution."""
super().__init__(
root_directory=root_directory,
executor=executor,
loading_context=loading_context,
runtime_context=runtime_context
)
self.store_provenance_directory = stove_provenance_directory
self.loading_context, self.workflow_object, self.uri = fetch_document(workflow_uri_path, self.loading_context)
make_fs_access = self.runtime_context.make_fs_access \
if self.runtime_context.make_fs_access is not None else StdFsAccess
ro = ResearchObject(
make_fs_access(""),
)
self.runtime_context.research_obj = ro
log_file_io = ro.open_log_file_for_activity(ro.engine_uuid)
prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io))
prov_log_handler.setFormatter(ProvLogFormatter())
_logger.addHandler(prov_log_handler)
_logger.debug("[provenance] Logging to %s", log_file_io)


class CoreExecutor:

def __init__(self, file_manager: IOFileManager):
def __init__(self, file_manager: IOFileManager, provenance_directory: Optional[Path]):
self.file_manager = file_manager
self._workflow_path = None
self._data_paths = []
self.provenance_directory = provenance_directory if provenance_directory is not None else tempfile.mkdtemp()

def set_data(self, data: List[str]) -> List[str]:
self._data_paths = [self.file_manager.write(f'{str(uuid4())}.yml', d.encode()) for d in data]
Expand All @@ -37,20 +98,23 @@ def set_workflow_path(self, workflow_str: str) -> str:
self._workflow_path = workflow_str
return self._workflow_path

def execute(self) -> Tuple[UUID, Dict, Optional[Exception]]:
def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception], Optional[ResearchObject]]:
"""
:return: Run ID, dict with new files, exception if there is any
:param provenance: Execute with provenance enabled/disabled.
:return: Run ID, dict with new files, exception if there is any.
"""
run_id = uuid4()

runtime_context = RuntimeContext()
runtime_context.outdir = self.file_manager.ROOT_DIRECTORY
runtime_context.basedir = self.file_manager.ROOT_DIRECTORY
runtime_context.default_stdin = DEVNULL
runtime_context.default_stdout = DEVNULL
runtime_context.default_stderr = DEVNULL
factory: JupyterFactory
if not provenance:
factory = JupyterFactory(self.file_manager.ROOT_DIRECTORY)
else:
provenance_dir = os.path.join(self.provenance_directory.as_posix(), 'provenance')
factory = ProvenanceFactory(
Path(self._workflow_path).as_uri(),
self.file_manager.ROOT_DIRECTORY,
provenance_dir
)
os.chdir(self.file_manager.ROOT_DIRECTORY)
factory = Factory(runtime_context=runtime_context)
executable = factory.make(self._workflow_path)
data = {}
for data_file in self._data_paths:
Expand All @@ -59,10 +123,53 @@ def execute(self) -> Tuple[UUID, Dict, Optional[Exception]]:
data = {**new_data, **data}
try:
result: Dict = executable(**data)
return run_id, result, None
e = None
except Exception as e:
traceback.print_exc(file=sys.stderr)
return run_id, {}, e
result = {}

if provenance:
self._store_provenance(cast(ProvenanceFactory, factory), result)
return run_id, result, e, factory.runtime_context.research_obj

@classmethod
def _store_provenance(cls, factory: ProvenanceFactory, out) -> None:
"""Proxy method to cwltool's logic"""
runtime_context = factory.runtime_context
loading_context = factory.loading_context
workflow_object = factory.workflow_object
uri = factory.uri

if runtime_context.research_obj is not None:
runtime_context.research_obj.create_job(out, True)

def remove_at_id(doc: CWLObjectType) -> None:
for key in list(doc.keys()):
if key == "@id":
del doc[key]
else:
value = doc[key]
if isinstance(value, MutableMapping):
remove_at_id(value)
elif isinstance(value, MutableSequence):
for entry in value:
if isinstance(entry, MutableMapping):
remove_at_id(entry)

remove_at_id(out)
visit_class(
out,
("File",),
functools.partial(add_sizes, runtime_context.make_fs_access("")),
)

research_obj = runtime_context.research_obj
if loading_context.loader is not None:
research_obj.generate_snapshot(
prov_deps(workflow_object, loading_context.loader, uri)
)

research_obj.close(factory.store_provenance_directory)

@classmethod
def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn:
Expand Down
46 changes: 42 additions & 4 deletions cwlkernel/kernel_magics.py
Expand Up @@ -2,6 +2,7 @@
import json
import os
import random
import subprocess
from io import StringIO
from pathlib import Path
from typing import List
Expand Down Expand Up @@ -101,15 +102,30 @@ def snippet(kernel: CWLKernel, command: str):
class ExecutionMagics:

@staticmethod
@CWLKernel.register_magic()
def execute(kernel: CWLKernel, execute_argument_string: str):
def _parse_args(execute_argument_string: str):
execute_argument_string = execute_argument_string.splitlines()
cwl_id = execute_argument_string[0].strip()
yaml_str_data = '\n'.join(execute_argument_string[1:])
return cwl_id, yaml_str_data

@staticmethod
def _execute(kernel: CWLKernel, execute_argument_string: str, provenance: bool = False):
cwl_id, yaml_str_data = ExecutionMagics._parse_args(execute_argument_string)
cwl_component_path: Path = kernel.workflow_repository.get_tools_path_by_id(cwl_id)
kernel._set_data('\n'.join(execute_argument_string[1:]))
kernel._execute_workflow(cwl_component_path)
kernel._set_data(yaml_str_data)
kernel._execute_workflow(cwl_component_path, provenance=provenance)
kernel._clear_data()

@staticmethod
@CWLKernel.register_magic()
def execute(kernel: CWLKernel, execute_argument_string: str):
ExecutionMagics._execute(kernel, execute_argument_string, provenance=False)

@staticmethod
@CWLKernel.register_magic('executeWithProvenance')
def execute_with_provenance(kernel: CWLKernel, execute_argument_string: str):
ExecutionMagics._execute(kernel, execute_argument_string, provenance=True)

@staticmethod
@CWLKernel.register_magics_suggester('execute')
def suggest_execution_id(query_token: str, *args, **kwargs) -> List[str]:
Expand Down Expand Up @@ -372,6 +388,28 @@ def visualize_graph(kernel: CWLKernel, tool_id: str):
)


@CWLKernel.register_magic()
def system(kernel: CWLKernel, commands: str):
"""
Execute bash commands in the Runtime Directory of the session.
@param kernel:
@param commands:
@return:
"""
result = subprocess.run(
commands,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, cwd=kernel.runtime_directory.as_posix()
)
stdout = result.stdout.decode()
stderr = result.stderr.decode()
if len(stdout) > 0:
kernel.send_text_to_stdout(stdout)
if len(stderr) > 0:
kernel.send_error_response(stderr)


# import user's magic commands

if CWLKernel_CONF.CWLKERNEL_MAGIC_COMMANDS_DIRECTORY is not None:
Expand Down

0 comments on commit 05ec84c

Please sign in to comment.