Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add execute with provenance #14

Merged
merged 10 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions cwlkernel/CWLKernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import re
import shutil
import traceback
from io import StringIO
from pathlib import Path
Expand All @@ -22,7 +23,7 @@
from .git.CWLGitResolver import CWLGitResolver

version = "0.0.2"

BOOT_DIRECTORY = Path(os.getcwd()).absolute()
CONF = CWLExecuteConfigurator()


Expand All @@ -43,10 +44,12 @@ class CWLKernel(Kernel):

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._session_dir = os.path.join(CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident)
self._boot_directory: Path = BOOT_DIRECTORY
self._yaml_input_data: Optional[str] = None
self._results_manager = ResultsManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'results']))
runtime_file_manager = IOFileManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'runtime_data']))
self._cwl_executor = CoreExecutor(runtime_file_manager)
self._results_manager = ResultsManager(os.path.join(self._session_dir, 'results'))
runtime_file_manager = IOFileManager(os.path.join(self._session_dir, 'runtime_data'))
self._cwl_executor = CoreExecutor(runtime_file_manager, self._boot_directory)
self._pid = (os.getpid(), os.getppid())
self._cwl_logger = CWLLogger(os.path.join(CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'logs'))
self._set_process_ids()
Expand All @@ -60,6 +63,10 @@ def __init__(self, **kwargs):
if self.log is None: # pylint: disable=access-member-before-definition
self.log = logging.getLogger()

@property
def runtime_directory(self) -> Path:
return Path(self._cwl_executor.file_manager.ROOT_DIRECTORY).absolute()

@property
def workflow_repository(self) -> WorkflowRepository:
return self._workflow_repository
Expand Down Expand Up @@ -232,12 +239,12 @@ def _preprocess_data(self, data: Dict) -> Dict:
def _clear_data(self):
self._yaml_input_data = None

def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
def _execute_workflow(self, code_path: Path, provenance: bool = False) -> Optional[Exception]:
input_data = [self._yaml_input_data] if self._yaml_input_data is not None else []
self._cwl_executor.set_data(input_data)
self._cwl_executor.set_workflow_path(str(code_path))
self.log.debug('starting executing workflow ...')
run_id, results, exception = self._cwl_executor.execute()
run_id, results, exception, research_object = self._cwl_executor.execute(provenance)
self.log.debug(f'\texecution results: {run_id}, {results}, {exception}')
output_directory_for_that_run = str(run_id)
for output in results:
Expand All @@ -259,10 +266,27 @@ def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
metadata=results[output]
)
self.send_json_response(results)
if research_object is not None:
self.send_text_to_stdout(f'\nProvenance stored in directory {research_object.folder}')
for path, _, files in os.walk(research_object.folder):
for name in files:
file = os.path.relpath(os.path.join(path, name), self._boot_directory.as_posix())
self.send_response(
self.iopub_socket,
'display_data',
{
'data': {
"text/html": f'<a href="/files/{file}">{file}</a>',
"text/plain": f"{file}"
},
'metadata': {},
},
)
if exception is not None:
self.log.debug(f'execution error: {exception}')
self.send_response(self.iopub_socket, 'stream', {'name': 'stderr', 'text': str(exception)})
return exception
os.chdir(self._boot_directory.as_posix())
return exception

def get_past_results(self) -> List[str]:
return self._results_manager.get_files()
Expand All @@ -283,6 +307,10 @@ def do_complete(self, code: str, cursor_pos: int):
def send_text_to_stdout(self, text: str):
self.send_response(self.iopub_socket, 'stream', {'name': 'stdout', 'text': text})

def __del__(self):
shutil.rmtree(self._session_dir, ignore_errors=True)
os.chdir(self._boot_directory.as_posix())


if __name__ == '__main__':
from ipykernel.kernelapp import IPKernelApp
Expand Down
137 changes: 122 additions & 15 deletions cwlkernel/CoreExecutor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import functools
import logging
import os
import sys
import tempfile
import traceback
from pathlib import Path
from subprocess import DEVNULL
Expand All @@ -8,22 +11,80 @@
List,
Optional,
Tuple,
NoReturn)
NoReturn, cast, IO, MutableMapping, MutableSequence)
from uuid import uuid4, UUID

from cwltool.context import RuntimeContext
from cwltool.context import RuntimeContext, LoadingContext
from cwltool.executors import JobExecutor
from cwltool.factory import Factory
from cwltool.load_tool import fetch_document
from cwltool.loghandler import _logger
from cwltool.main import ProvLogFormatter, prov_deps
from cwltool.process import add_sizes
from cwltool.provenance import ResearchObject
from cwltool.stdfsaccess import StdFsAccess
from cwltool.utils import CWLObjectType, visit_class
from ruamel import yaml

from .IOManager import IOFileManager


class JupyterFactory(Factory):

def __init__(self, root_directory: str,
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None, ):
super().__init__(
executor=executor,
loading_context=loading_context,
runtime_context=runtime_context,
)
self.runtime_context.outdir = root_directory
self.runtime_context.basedir = root_directory
self.runtime_context.default_stdin = DEVNULL
self.runtime_context.default_stdout = DEVNULL
self.runtime_context.default_stderr = DEVNULL


class ProvenanceFactory(JupyterFactory):

def __init__(self,
workflow_uri_path: str,
root_directory: str,
stove_provenance_directory: str,
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None) -> None:
"""Easy way to load a CWL document for execution."""
super().__init__(
root_directory=root_directory,
executor=executor,
loading_context=loading_context,
runtime_context=runtime_context
)
self.store_provenance_directory = stove_provenance_directory
self.loading_context, self.workflow_object, self.uri = fetch_document(workflow_uri_path, self.loading_context)
make_fs_access = self.runtime_context.make_fs_access \
if self.runtime_context.make_fs_access is not None else StdFsAccess
ro = ResearchObject(
make_fs_access(""),
)
self.runtime_context.research_obj = ro
log_file_io = ro.open_log_file_for_activity(ro.engine_uuid)
prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io))
prov_log_handler.setFormatter(ProvLogFormatter())
_logger.addHandler(prov_log_handler)
_logger.debug("[provenance] Logging to %s", log_file_io)


class CoreExecutor:

def __init__(self, file_manager: IOFileManager):
def __init__(self, file_manager: IOFileManager, provenance_directory: Optional[Path]):
self.file_manager = file_manager
self._workflow_path = None
self._data_paths = []
self.provenance_directory = provenance_directory if provenance_directory is not None else tempfile.mkdtemp()

def set_data(self, data: List[str]) -> List[str]:
self._data_paths = [self.file_manager.write(f'{str(uuid4())}.yml', d.encode()) for d in data]
Expand All @@ -37,20 +98,23 @@ def set_workflow_path(self, workflow_str: str) -> str:
self._workflow_path = workflow_str
return self._workflow_path

def execute(self) -> Tuple[UUID, Dict, Optional[Exception]]:
def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception], Optional[ResearchObject]]:
"""
:return: Run ID, dict with new files, exception if there is any
:param provenance: Execute with provenance enabled/disabled.
:return: Run ID, dict with new files, exception if there is any.
"""
run_id = uuid4()

runtime_context = RuntimeContext()
runtime_context.outdir = self.file_manager.ROOT_DIRECTORY
runtime_context.basedir = self.file_manager.ROOT_DIRECTORY
runtime_context.default_stdin = DEVNULL
runtime_context.default_stdout = DEVNULL
runtime_context.default_stderr = DEVNULL
factory: JupyterFactory
if not provenance:
factory = JupyterFactory(self.file_manager.ROOT_DIRECTORY)
else:
provenance_dir = os.path.join(self.provenance_directory.as_posix(), 'provenance')
factory = ProvenanceFactory(
Path(self._workflow_path).as_uri(),
self.file_manager.ROOT_DIRECTORY,
provenance_dir
)
os.chdir(self.file_manager.ROOT_DIRECTORY)
factory = Factory(runtime_context=runtime_context)
executable = factory.make(self._workflow_path)
data = {}
for data_file in self._data_paths:
Expand All @@ -59,10 +123,53 @@ def execute(self) -> Tuple[UUID, Dict, Optional[Exception]]:
data = {**new_data, **data}
try:
result: Dict = executable(**data)
return run_id, result, None
e = None
except Exception as e:
traceback.print_exc(file=sys.stderr)
return run_id, {}, e
result = {}

if provenance:
self._store_provenance(cast(ProvenanceFactory, factory), result)
return run_id, result, e, factory.runtime_context.research_obj

@classmethod
def _store_provenance(cls, factory: ProvenanceFactory, out) -> None:
"""Proxy method to cwltool's logic"""
runtime_context = factory.runtime_context
loading_context = factory.loading_context
workflow_object = factory.workflow_object
uri = factory.uri

if runtime_context.research_obj is not None:
runtime_context.research_obj.create_job(out, True)

def remove_at_id(doc: CWLObjectType) -> None:
for key in list(doc.keys()):
if key == "@id":
del doc[key]
else:
value = doc[key]
if isinstance(value, MutableMapping):
remove_at_id(value)
elif isinstance(value, MutableSequence):
for entry in value:
if isinstance(entry, MutableMapping):
remove_at_id(entry)

remove_at_id(out)
visit_class(
out,
("File",),
functools.partial(add_sizes, runtime_context.make_fs_access("")),
)

research_obj = runtime_context.research_obj
if loading_context.loader is not None:
research_obj.generate_snapshot(
prov_deps(workflow_object, loading_context.loader, uri)
)

research_obj.close(factory.store_provenance_directory)

@classmethod
def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn:
Expand Down
46 changes: 42 additions & 4 deletions cwlkernel/kernel_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import os
import random
import subprocess
from io import StringIO
from pathlib import Path
from typing import List
Expand Down Expand Up @@ -101,15 +102,30 @@ def snippet(kernel: CWLKernel, command: str):
class ExecutionMagics:

@staticmethod
@CWLKernel.register_magic()
def execute(kernel: CWLKernel, execute_argument_string: str):
def _parse_args(execute_argument_string: str):
execute_argument_string = execute_argument_string.splitlines()
cwl_id = execute_argument_string[0].strip()
yaml_str_data = '\n'.join(execute_argument_string[1:])
return cwl_id, yaml_str_data

@staticmethod
def _execute(kernel: CWLKernel, execute_argument_string: str, provenance: bool = False):
cwl_id, yaml_str_data = ExecutionMagics._parse_args(execute_argument_string)
cwl_component_path: Path = kernel.workflow_repository.get_tools_path_by_id(cwl_id)
kernel._set_data('\n'.join(execute_argument_string[1:]))
kernel._execute_workflow(cwl_component_path)
kernel._set_data(yaml_str_data)
kernel._execute_workflow(cwl_component_path, provenance=provenance)
kernel._clear_data()

@staticmethod
@CWLKernel.register_magic()
def execute(kernel: CWLKernel, execute_argument_string: str):
ExecutionMagics._execute(kernel, execute_argument_string, provenance=False)

@staticmethod
@CWLKernel.register_magic('executeWithProvenance')
def execute_with_provenance(kernel: CWLKernel, execute_argument_string: str):
ExecutionMagics._execute(kernel, execute_argument_string, provenance=True)

@staticmethod
@CWLKernel.register_magics_suggester('execute')
def suggest_execution_id(query_token: str, *args, **kwargs) -> List[str]:
Expand Down Expand Up @@ -372,6 +388,28 @@ def visualize_graph(kernel: CWLKernel, tool_id: str):
)


@CWLKernel.register_magic()
def system(kernel: CWLKernel, commands: str):
"""
Execute bash commands in the Runtime Directory of the session.

@param kernel:
@param commands:
@return:
"""
result = subprocess.run(
commands,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, cwd=kernel.runtime_directory.as_posix()
)
stdout = result.stdout.decode()
stderr = result.stderr.decode()
if len(stdout) > 0:
kernel.send_text_to_stdout(stdout)
if len(stderr) > 0:
kernel.send_error_response(stderr)


# import user's magic commands

if CWLKernel_CONF.CWLKERNEL_MAGIC_COMMANDS_DIRECTORY is not None:
Expand Down
Loading