Skip to content

Commit

Permalink
add jupyter factory
Browse files Browse the repository at this point in the history
  • Loading branch information
giannisdoukas committed Jul 13, 2020
1 parent 0502dac commit b1b3905
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 38 deletions.
23 changes: 20 additions & 3 deletions cwlkernel/CWLKernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ class CWLKernel(Kernel):

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._boot_directory: Path = Path(os.getcwd()).absolute()
self._yaml_input_data: Optional[str] = None
self._results_manager = ResultsManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'results']))
runtime_file_manager = IOFileManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'runtime_data']))
self._cwl_executor = CoreExecutor(runtime_file_manager)
self._cwl_executor = CoreExecutor(runtime_file_manager, self._boot_directory)
self._pid = (os.getpid(), os.getppid())
self._cwl_logger = CWLLogger(os.path.join(CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'logs'))
self._set_process_ids()
Expand Down Expand Up @@ -236,12 +237,12 @@ def _preprocess_data(self, data: Dict) -> Dict:
def _clear_data(self):
self._yaml_input_data = None

def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
def _execute_workflow(self, code_path: Path, provenance: bool = False) -> Optional[Exception]:
input_data = [self._yaml_input_data] if self._yaml_input_data is not None else []
self._cwl_executor.set_data(input_data)
self._cwl_executor.set_workflow_path(str(code_path))
self.log.debug('starting executing workflow ...')
run_id, results, exception = self._cwl_executor.execute()
run_id, results, exception, research_object = self._cwl_executor.execute(provenance)
self.log.debug(f'\texecution results: {run_id}, {results}, {exception}')
output_directory_for_that_run = str(run_id)
for output in results:
Expand All @@ -263,6 +264,22 @@ def _execute_workflow(self, code_path: Path) -> Optional[Exception]:
metadata=results[output]
)
self.send_json_response(results)
if research_object is not None:
self.send_text_to_stdout(f'\nProvenance stored in directory {research_object.folder}')
for path, _, files in os.walk(research_object.folder):
for name in files:
file = os.path.relpath(os.path.join(path, name), self._boot_directory.as_posix())
self.send_response(
self.iopub_socket,
'display_data',
{
'data': {
"text/html": f'<a href="/files/{file}">{file}</a>',
"text/plain": f"full path...."
},
'metadata': {},
},
)
if exception is not None:
self.log.debug(f'execution error: {exception}')
self.send_response(self.iopub_socket, 'stream', {'name': 'stderr', 'text': str(exception)})
Expand Down
142 changes: 117 additions & 25 deletions cwlkernel/CoreExecutor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import functools
import logging
import os
import sys
import tempfile
import traceback
from pathlib import Path
from subprocess import DEVNULL
Expand All @@ -8,23 +11,80 @@
List,
Optional,
Tuple,
NoReturn)
NoReturn, cast, IO, MutableMapping, MutableSequence)
from uuid import uuid4, UUID

from cwltool.context import RuntimeContext, LoadingContext
from cwltool.executors import JobExecutor
from cwltool.factory import Factory
from cwltool.load_tool import fetch_document
from cwltool.loghandler import _logger
from cwltool.main import ProvLogFormatter, prov_deps
from cwltool.process import add_sizes
from cwltool.provenance import ResearchObject
from cwltool.stdfsaccess import StdFsAccess
from cwltool.utils import CWLObjectType, visit_class
from ruamel import yaml

from .IOManager import IOFileManager


class JupyterFactory(Factory):

def __init__(self, root_directory: str,
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None, ):
super().__init__(
executor=executor,
loading_context=loading_context,
runtime_context=runtime_context,
)
self.runtime_context.outdir = root_directory
self.runtime_context.basedir = root_directory
self.runtime_context.default_stdin = DEVNULL
self.runtime_context.default_stdout = DEVNULL
self.runtime_context.default_stderr = DEVNULL


class ProvenanceFactory(JupyterFactory):

def __init__(self,
workflow_uri_path: str,
root_directory: str,
stove_provenance_directory: str,
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None) -> None:
"""Easy way to load a CWL document for execution."""
super().__init__(
root_directory=root_directory,
executor=executor,
loading_context=loading_context,
runtime_context=runtime_context
)
self.store_provenance_directory = stove_provenance_directory
self.loading_context, self.workflow_object, self.uri = fetch_document(workflow_uri_path, self.loading_context)
make_fs_access = self.runtime_context.make_fs_access \
if self.runtime_context.make_fs_access is not None else StdFsAccess
ro = ResearchObject(
make_fs_access(""),
)
self.runtime_context.research_obj = ro
log_file_io = ro.open_log_file_for_activity(ro.engine_uuid)
prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io))
prov_log_handler.setFormatter(ProvLogFormatter())
_logger.addHandler(prov_log_handler)
_logger.debug("[provenance] Logging to %s", log_file_io)


class CoreExecutor:

def __init__(self, file_manager: IOFileManager):
def __init__(self, file_manager: IOFileManager, provenance_directory: Optional[Path]):
self.file_manager = file_manager
self._workflow_path = None
self._data_paths = []
self.provenance_directory = provenance_directory if provenance_directory is not None else tempfile.mkdtemp()

def set_data(self, data: List[str]) -> List[str]:
self._data_paths = [self.file_manager.write(f'{str(uuid4())}.yml', d.encode()) for d in data]
Expand All @@ -38,14 +98,23 @@ def set_workflow_path(self, workflow_str: str) -> str:
self._workflow_path = workflow_str
return self._workflow_path

def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception]]:
def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception], Optional[ResearchObject]]:
"""
:param provenance: Execute with provenance enabled/disabled.
:return: Run ID, dict with new files, exception if there is any.
"""
run_id = uuid4()
factory: JupyterFactory
if not provenance:

factory = JupyterFactory(self.file_manager.ROOT_DIRECTORY)
factory = JupyterFactory(self.file_manager.ROOT_DIRECTORY)
else:
provenance_dir = os.path.join(self.provenance_directory.as_posix(), 'provenance')
factory = ProvenanceFactory(
Path(self._workflow_path).as_uri(),
self.file_manager.ROOT_DIRECTORY,
provenance_dir
)
os.chdir(self.file_manager.ROOT_DIRECTORY)
executable = factory.make(self._workflow_path)
data = {}
Expand All @@ -55,10 +124,52 @@ def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception]]:
data = {**new_data, **data}
try:
result: Dict = executable(**data)
return run_id, result, None
e = None
except Exception as e:
traceback.print_exc(file=sys.stderr)
return run_id, {}, e
result = {}

if provenance:
self._store_provenance(factory, result)
return run_id, result, e, factory.runtime_context.research_obj

def _store_provenance(self, factory: ProvenanceFactory, out) -> Path:
"""Proxy method to cwltool's logic"""
runtime_context = factory.runtime_context
loading_context = factory.loading_context
workflow_object = factory.workflow_object
uri = factory.uri

if runtime_context.research_obj is not None:
runtime_context.research_obj.create_job(out, True)

def remove_at_id(doc: CWLObjectType) -> None:
for key in list(doc.keys()):
if key == "@id":
del doc[key]
else:
value = doc[key]
if isinstance(value, MutableMapping):
remove_at_id(value)
elif isinstance(value, MutableSequence):
for entry in value:
if isinstance(entry, MutableMapping):
remove_at_id(entry)

remove_at_id(out)
visit_class(
out,
("File",),
functools.partial(add_sizes, runtime_context.make_fs_access("")),
)

research_obj = runtime_context.research_obj
if loading_context.loader is not None:
research_obj.generate_snapshot(
prov_deps(workflow_object, loading_context.loader, uri)
)

research_obj.close(factory.store_provenance_directory)

@classmethod
def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn:
Expand All @@ -70,22 +181,3 @@ def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn:
file_path = cwd / file_path
if not file_path.exists():
raise FileNotFoundError(file_path)


class JupyterFactory(Factory):

def __init__(self, root_directory: str,
executor: Optional[JobExecutor] = None,
loading_context: Optional[LoadingContext] = None,
runtime_context: Optional[RuntimeContext] = None, ):
runtime_context = runtime_context if runtime_context is not None else RuntimeContext()
runtime_context.outdir = root_directory
runtime_context.basedir = root_directory
runtime_context.default_stdin = DEVNULL
runtime_context.default_stdout = DEVNULL
runtime_context.default_stderr = DEVNULL
super().__init__(
executor=executor,
loading_context=loading_context,
runtime_context=runtime_context,
)
23 changes: 19 additions & 4 deletions cwlkernel/kernel_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,30 @@ def snippet(kernel: CWLKernel, command: str):
class ExecutionMagics:

@staticmethod
@CWLKernel.register_magic()
def execute(kernel: CWLKernel, execute_argument_string: str):
def _parse_args(execute_argument_string: str):
execute_argument_string = execute_argument_string.splitlines()
cwl_id = execute_argument_string[0].strip()
yaml_str_data = '\n'.join(execute_argument_string[1:])
return cwl_id, yaml_str_data

@staticmethod
def _execute(kernel: CWLKernel, execute_argument_string: str, provenance: bool = False):
cwl_id, yaml_str_data = ExecutionMagics._parse_args(execute_argument_string)
cwl_component_path: Path = kernel.workflow_repository.get_tools_path_by_id(cwl_id)
kernel._set_data('\n'.join(execute_argument_string[1:]))
kernel._execute_workflow(cwl_component_path)
kernel._set_data(yaml_str_data)
kernel._execute_workflow(cwl_component_path, provenance=provenance)
kernel._clear_data()

@staticmethod
@CWLKernel.register_magic()
def execute(kernel: CWLKernel, execute_argument_string: str):
ExecutionMagics._execute(kernel, execute_argument_string, provenance=False)

@staticmethod
@CWLKernel.register_magic('executeWithProvenance')
def execute_with_provenance(kernel: CWLKernel, execute_argument_string: str):
ExecutionMagics._execute(kernel, execute_argument_string, provenance=True)

@staticmethod
@CWLKernel.register_magics_suggester('execute')
def suggest_execution_id(query_token: str, *args, **kwargs) -> List[str]:
Expand Down
Loading

0 comments on commit b1b3905

Please sign in to comment.