From b1b3905473087e4e021419e13794c9a9eeefc220 Mon Sep 17 00:00:00 2001 From: Giannis Doukas Date: Mon, 13 Jul 2020 13:35:16 +0100 Subject: [PATCH] add jupyter factory --- cwlkernel/CWLKernel.py | 23 ++- cwlkernel/CoreExecutor.py | 142 ++++++++++++++---- cwlkernel/kernel_magics.py | 23 ++- examples/introduction.ipynb | 291 +++++++++++++++++++++++++++++++++++- tests/test_CWLKernel.py | 39 +++++ tests/test_CoreExecutor.py | 5 +- 6 files changed, 485 insertions(+), 38 deletions(-) diff --git a/cwlkernel/CWLKernel.py b/cwlkernel/CWLKernel.py index adb297c..0583167 100644 --- a/cwlkernel/CWLKernel.py +++ b/cwlkernel/CWLKernel.py @@ -43,10 +43,11 @@ class CWLKernel(Kernel): def __init__(self, **kwargs): super().__init__(**kwargs) + self._boot_directory: Path = Path(os.getcwd()).absolute() self._yaml_input_data: Optional[str] = None self._results_manager = ResultsManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'results'])) runtime_file_manager = IOFileManager(os.sep.join([CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'runtime_data'])) - self._cwl_executor = CoreExecutor(runtime_file_manager) + self._cwl_executor = CoreExecutor(runtime_file_manager, self._boot_directory) self._pid = (os.getpid(), os.getppid()) self._cwl_logger = CWLLogger(os.path.join(CONF.CWLKERNEL_BOOT_DIRECTORY, self.ident, 'logs')) self._set_process_ids() @@ -236,12 +237,12 @@ def _preprocess_data(self, data: Dict) -> Dict: def _clear_data(self): self._yaml_input_data = None - def _execute_workflow(self, code_path: Path) -> Optional[Exception]: + def _execute_workflow(self, code_path: Path, provenance: bool = False) -> Optional[Exception]: input_data = [self._yaml_input_data] if self._yaml_input_data is not None else [] self._cwl_executor.set_data(input_data) self._cwl_executor.set_workflow_path(str(code_path)) self.log.debug('starting executing workflow ...') - run_id, results, exception = self._cwl_executor.execute() + run_id, results, exception, research_object = self._cwl_executor.execute(provenance) self.log.debug(f'\texecution results: {run_id}, {results}, {exception}') output_directory_for_that_run = str(run_id) for output in results: @@ -263,6 +264,22 @@ def _execute_workflow(self, code_path: Path) -> Optional[Exception]: metadata=results[output] ) self.send_json_response(results) + if research_object is not None: + self.send_text_to_stdout(f'\nProvenance stored in directory {research_object.folder}') + for path, _, files in os.walk(research_object.folder): + for name in files: + file = os.path.relpath(os.path.join(path, name), self._boot_directory.as_posix()) + self.send_response( + self.iopub_socket, + 'display_data', + { + 'data': { + "text/html": f'{file}', + "text/plain": f"full path...." + }, + 'metadata': {}, + }, + ) if exception is not None: self.log.debug(f'execution error: {exception}') self.send_response(self.iopub_socket, 'stream', {'name': 'stderr', 'text': str(exception)}) diff --git a/cwlkernel/CoreExecutor.py b/cwlkernel/CoreExecutor.py index d6603d6..01414cb 100644 --- a/cwlkernel/CoreExecutor.py +++ b/cwlkernel/CoreExecutor.py @@ -1,5 +1,8 @@ +import functools +import logging import os import sys +import tempfile import traceback from pathlib import Path from subprocess import DEVNULL @@ -8,23 +11,80 @@ List, Optional, Tuple, - NoReturn) + NoReturn, cast, IO, MutableMapping, MutableSequence) from uuid import uuid4, UUID from cwltool.context import RuntimeContext, LoadingContext from cwltool.executors import JobExecutor from cwltool.factory import Factory +from cwltool.load_tool import fetch_document +from cwltool.loghandler import _logger +from cwltool.main import ProvLogFormatter, prov_deps +from cwltool.process import add_sizes +from cwltool.provenance import ResearchObject +from cwltool.stdfsaccess import StdFsAccess +from cwltool.utils import CWLObjectType, visit_class from ruamel import yaml from .IOManager import IOFileManager +class JupyterFactory(Factory): + + def __init__(self, root_directory: str, + executor: Optional[JobExecutor] = None, + loading_context: Optional[LoadingContext] = None, + runtime_context: Optional[RuntimeContext] = None, ): + super().__init__( + executor=executor, + loading_context=loading_context, + runtime_context=runtime_context, + ) + self.runtime_context.outdir = root_directory + self.runtime_context.basedir = root_directory + self.runtime_context.default_stdin = DEVNULL + self.runtime_context.default_stdout = DEVNULL + self.runtime_context.default_stderr = DEVNULL + + +class ProvenanceFactory(JupyterFactory): + + def __init__(self, + workflow_uri_path: str, + root_directory: str, + stove_provenance_directory: str, + executor: Optional[JobExecutor] = None, + loading_context: Optional[LoadingContext] = None, + runtime_context: Optional[RuntimeContext] = None) -> None: + """Easy way to load a CWL document for execution.""" + super().__init__( + root_directory=root_directory, + executor=executor, + loading_context=loading_context, + runtime_context=runtime_context + ) + self.store_provenance_directory = stove_provenance_directory + self.loading_context, self.workflow_object, self.uri = fetch_document(workflow_uri_path, self.loading_context) + make_fs_access = self.runtime_context.make_fs_access \ + if self.runtime_context.make_fs_access is not None else StdFsAccess + ro = ResearchObject( + make_fs_access(""), + ) + self.runtime_context.research_obj = ro + log_file_io = ro.open_log_file_for_activity(ro.engine_uuid) + prov_log_handler = logging.StreamHandler(cast(IO[str], log_file_io)) + prov_log_handler.setFormatter(ProvLogFormatter()) + _logger.addHandler(prov_log_handler) + _logger.debug("[provenance] Logging to %s", log_file_io) + + class CoreExecutor: - def __init__(self, file_manager: IOFileManager): + def __init__(self, file_manager: IOFileManager, provenance_directory: Optional[Path]): self.file_manager = file_manager self._workflow_path = None self._data_paths = [] + self.provenance_directory = provenance_directory if provenance_directory is not None else tempfile.mkdtemp() def set_data(self, data: List[str]) -> List[str]: self._data_paths = [self.file_manager.write(f'{str(uuid4())}.yml', d.encode()) for d in data] @@ -38,14 +98,23 @@ def set_workflow_path(self, workflow_str: str) -> str: self._workflow_path = workflow_str return self._workflow_path - def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception]]: + def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception], Optional[ResearchObject]]: """ :param provenance: Execute with provenance enabled/disabled. :return: Run ID, dict with new files, exception if there is any. """ run_id = uuid4() + factory: JupyterFactory + if not provenance: - factory = JupyterFactory(self.file_manager.ROOT_DIRECTORY) + factory = JupyterFactory(self.file_manager.ROOT_DIRECTORY) + else: + provenance_dir = os.path.join(self.provenance_directory.as_posix(), 'provenance') + factory = ProvenanceFactory( + Path(self._workflow_path).as_uri(), + self.file_manager.ROOT_DIRECTORY, + provenance_dir + ) os.chdir(self.file_manager.ROOT_DIRECTORY) executable = factory.make(self._workflow_path) data = {} @@ -55,10 +124,52 @@ def execute(self, provenance=False) -> Tuple[UUID, Dict, Optional[Exception]]: data = {**new_data, **data} try: result: Dict = executable(**data) - return run_id, result, None + e = None except Exception as e: traceback.print_exc(file=sys.stderr) - return run_id, {}, e + result = {} + + if provenance: + self._store_provenance(factory, result) + return run_id, result, e, factory.runtime_context.research_obj + + def _store_provenance(self, factory: ProvenanceFactory, out) -> Path: + """Proxy method to cwltool's logic""" + runtime_context = factory.runtime_context + loading_context = factory.loading_context + workflow_object = factory.workflow_object + uri = factory.uri + + if runtime_context.research_obj is not None: + runtime_context.research_obj.create_job(out, True) + + def remove_at_id(doc: CWLObjectType) -> None: + for key in list(doc.keys()): + if key == "@id": + del doc[key] + else: + value = doc[key] + if isinstance(value, MutableMapping): + remove_at_id(value) + elif isinstance(value, MutableSequence): + for entry in value: + if isinstance(entry, MutableMapping): + remove_at_id(entry) + + remove_at_id(out) + visit_class( + out, + ("File",), + functools.partial(add_sizes, runtime_context.make_fs_access("")), + ) + + research_obj = runtime_context.research_obj + if loading_context.loader is not None: + research_obj.generate_snapshot( + prov_deps(workflow_object, loading_context.loader, uri) + ) + + research_obj.close(factory.store_provenance_directory) @classmethod def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn: @@ -70,22 +181,3 @@ def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn: file_path = cwd / file_path if not file_path.exists(): raise FileNotFoundError(file_path) - - -class JupyterFactory(Factory): - - def __init__(self, root_directory: str, - executor: Optional[JobExecutor] = None, - loading_context: Optional[LoadingContext] = None, - runtime_context: Optional[RuntimeContext] = None, ): - runtime_context = runtime_context if runtime_context is not None else RuntimeContext() - runtime_context.outdir = root_directory - runtime_context.basedir = root_directory - runtime_context.default_stdin = DEVNULL - runtime_context.default_stdout = DEVNULL - runtime_context.default_stderr = DEVNULL - super().__init__( - executor=executor, - loading_context=loading_context, - runtime_context=runtime_context, - ) diff --git a/cwlkernel/kernel_magics.py b/cwlkernel/kernel_magics.py index f6c199a..1bf1592 100644 --- a/cwlkernel/kernel_magics.py +++ b/cwlkernel/kernel_magics.py @@ -102,15 +102,30 @@ def snippet(kernel: CWLKernel, command: str): class ExecutionMagics: @staticmethod - @CWLKernel.register_magic() - def execute(kernel: CWLKernel, execute_argument_string: str): + def _parse_args(execute_argument_string: str): execute_argument_string = execute_argument_string.splitlines() cwl_id = execute_argument_string[0].strip() + yaml_str_data = '\n'.join(execute_argument_string[1:]) + return cwl_id, yaml_str_data + + @staticmethod + def _execute(kernel: CWLKernel, execute_argument_string: str, provenance: bool = False): + cwl_id, yaml_str_data = ExecutionMagics._parse_args(execute_argument_string) cwl_component_path: Path = kernel.workflow_repository.get_tools_path_by_id(cwl_id) - kernel._set_data('\n'.join(execute_argument_string[1:])) - kernel._execute_workflow(cwl_component_path) + kernel._set_data(yaml_str_data) + kernel._execute_workflow(cwl_component_path, provenance=provenance) kernel._clear_data() + @staticmethod + @CWLKernel.register_magic() + def execute(kernel: CWLKernel, execute_argument_string: str): + ExecutionMagics._execute(kernel, execute_argument_string, provenance=False) + + @staticmethod + @CWLKernel.register_magic('executeWithProvenance') + def execute_with_provenance(kernel: CWLKernel, execute_argument_string: str): + ExecutionMagics._execute(kernel, execute_argument_string, provenance=True) + @staticmethod @CWLKernel.register_magics_suggester('execute') def suggest_execution_id(query_token: str, *args, **kwargs) -> List[str]: diff --git a/examples/introduction.ipynb b/examples/introduction.ipynb index 43e1819..ade298f 100644 --- a/examples/introduction.ipynb +++ b/examples/introduction.ipynb @@ -89,20 +89,20 @@ "data": { "application/json": { "example_output": { - "basename": "962e7e02e9c8b4ebb94b0fff3f46694014d0cc79", + "basename": "dea13d39e960dfdad075c55ea61ed207dcada01d", "checksum": "sha1$47a013e660d408619d894b20806b1d5086aab03b", "class": "File", "http://commonwl.org/cwltool#generation": 0, "id": "example_output", - "location": "file:///private/tmp/CWLKERNEL_DATA/6ba2c2e0-1b1d-470a-88a2-a9737aefa60c/runtime_data/962e7e02e9c8b4ebb94b0fff3f46694014d0cc79", + "location": "file:///private/tmp/CWLKERNEL_DATA/2bd19feb-6520-446d-be42-54c64fde5cd9/runtime_data/dea13d39e960dfdad075c55ea61ed207dcada01d", "nameext": "", - "nameroot": "962e7e02e9c8b4ebb94b0fff3f46694014d0cc79", + "nameroot": "dea13d39e960dfdad075c55ea61ed207dcada01d", "result_counter": 0, "size": 13 } }, "text/plain": [ - "{\"example_output\": {\"location\": \"file:///private/tmp/CWLKERNEL_DATA/6ba2c2e0-1b1d-470a-88a2-a9737aefa60c/runtime_data/962e7e02e9c8b4ebb94b0fff3f46694014d0cc79\", \"basename\": \"962e7e02e9c8b4ebb94b0fff3f46694014d0cc79\", \"nameroot\": \"962e7e02e9c8b4ebb94b0fff3f46694014d0cc79\", \"nameext\": \"\", \"class\": \"File\", \"checksum\": \"sha1$47a013e660d408619d894b20806b1d5086aab03b\", \"size\": 13, \"http://commonwl.org/cwltool#generation\": 0, \"id\": \"example_output\", \"result_counter\": 0}}" + "{\"example_output\": {\"location\": \"file:///private/tmp/CWLKERNEL_DATA/2bd19feb-6520-446d-be42-54c64fde5cd9/runtime_data/dea13d39e960dfdad075c55ea61ed207dcada01d\", \"basename\": \"dea13d39e960dfdad075c55ea61ed207dcada01d\", \"nameroot\": \"dea13d39e960dfdad075c55ea61ed207dcada01d\", \"nameext\": \"\", \"class\": \"File\", \"checksum\": \"sha1$47a013e660d408619d894b20806b1d5086aab03b\", \"size\": 13, \"http://commonwl.org/cwltool#generation\": 0, \"id\": \"example_output\", \"result_counter\": 0}}" ] }, "metadata": { @@ -119,6 +119,289 @@ "message: Hello world!" ] }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Add data in memory" + ] + }, + { + "data": { + "application/json": { + "example_output": { + "basename": "07d090896bde77c6d2de664ba9d9e32dd23c4133", + "checksum": "sha1$47a013e660d408619d894b20806b1d5086aab03b", + "class": "File", + "http://commonwl.org/cwltool#generation": 0, + "id": "example_output", + "location": "file:///private/tmp/CWLKERNEL_DATA/3991bf77-e2c9-41a9-ad5a-028113e1d2a3/runtime_data/07d090896bde77c6d2de664ba9d9e32dd23c4133", + "nameext": "", + "nameroot": "07d090896bde77c6d2de664ba9d9e32dd23c4133", + "result_counter": 0, + "size": 13 + } + }, + "text/plain": [ + "{\"example_output\": {\"location\": \"file:///private/tmp/CWLKERNEL_DATA/3991bf77-e2c9-41a9-ad5a-028113e1d2a3/runtime_data/07d090896bde77c6d2de664ba9d9e32dd23c4133\", \"basename\": \"07d090896bde77c6d2de664ba9d9e32dd23c4133\", \"nameroot\": \"07d090896bde77c6d2de664ba9d9e32dd23c4133\", \"nameext\": \"\", \"class\": \"File\", \"checksum\": \"sha1$47a013e660d408619d894b20806b1d5086aab03b\", \"size\": 13, \"http://commonwl.org/cwltool#generation\": 0, \"id\": \"example_output\", \"result_counter\": 0}}" + ] + }, + "metadata": { + "application/json": { + "expanded": false, + "root": "root" + } + }, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Provenance Stores in directory /Users/dks/Workspaces/CWLKernel/examples/provenance\n", + ": " + ] + }, + { + "data": { + "text/html": [ + "provenance/bagit.txt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/bag-info.txt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/manifest-sha1.txt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/tagmanifest-sha256.txt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/tagmanifest-sha1.txt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/tagmanifest-sha512.txt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/snapshot/echo.cwl" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/workflow/primary-job.json" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/workflow/primary-output.json" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/data/d3/d3486ae9136e7856bc42212385ea797094475802" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/data/47/47a013e660d408619d894b20806b1d5086aab03b" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/manifest.json" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/logs/engine.6b9569de-e707-489e-afa7-518292f93e87.txt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/provenance/primary.cwlprov.jsonld" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/provenance/primary.cwlprov.json" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/provenance/primary.cwlprov.xml" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/provenance/primary.cwlprov.nt" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/provenance/primary.cwlprov.ttl" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "provenance/metadata/provenance/primary.cwlprov.provn" + ], + "text/plain": [ + "full path...." + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "% executeWithProvenance echo\n", + "message: Hello world!" + ] + }, { "cell_type": "code", "execution_count": 3, diff --git a/tests/test_CWLKernel.py b/tests/test_CWLKernel.py index 7aea44a..3dda24a 100644 --- a/tests/test_CWLKernel.py +++ b/tests/test_CWLKernel.py @@ -1,6 +1,7 @@ import json import logging import os +import shutil import tarfile import tempfile import unittest @@ -957,6 +958,44 @@ def test_system_magic_command(self): responses[-1][0][2]['name'], ) + def test_execute_with_provenance(self): + kernel = CWLKernel() + # cancel send_response + responses = [] + kernel.send_response = lambda *args, **kwargs: responses.append((args, kwargs)) + + yaml = YAML(typ='safe') + + with open(os.sep.join([self.cwl_directory, 'echo.cwl'])) as f: + workflow_str = f.read() + self.assertDictEqual( + {'status': 'ok', 'execution_count': 0, 'payload': [], 'user_expressions': {}}, + kernel.do_execute(workflow_str, False) + ) + self.assertIsNotNone(kernel._workflow_repository.get_by_id(yaml.load(workflow_str)['id'])) + + with open(os.sep.join([self.data_directory, 'echo-job.yml'])) as f: + data = '\n'.join(["% executeWithProvenance echo", f.read()]) + + self.assertDictEqual( + {'status': 'ok', 'execution_count': 0, 'payload': [], 'user_expressions': {}}, + kernel.do_execute(data) + ) + + provenance_directory = list(filter( + lambda r: 'text' in r[0][2] and "Provenance stored in directory" in r[0][2]['text'], + responses + ))[0][0][2]['text'].split()[-1] + print(provenance_directory) + self.assertTrue(os.path.isdir(provenance_directory)) + self.assertTrue(os.path.isdir(os.path.join(provenance_directory, 'data'))) + self.assertTrue(os.path.isdir(os.path.join(provenance_directory, 'metadata'))) + self.assertTrue(os.path.isdir(os.path.join(provenance_directory, 'snapshot'))) + self.assertTrue(os.path.isfile(os.path.join(provenance_directory, 'snapshot', 'echo.cwl'))) + self.assertTrue(os.path.isdir(os.path.join(provenance_directory, 'workflow'))) + shutil.rmtree(provenance_directory) + + if __name__ == '__main__': unittest.main() diff --git a/tests/test_CoreExecutor.py b/tests/test_CoreExecutor.py index 97ee450..d77c2a5 100644 --- a/tests/test_CoreExecutor.py +++ b/tests/test_CoreExecutor.py @@ -22,17 +22,18 @@ def setUpClass(cls) -> None: def test_executor_execute(self): # That tests fails only when is called through PyCharm file_manager = IOFileManager(self.kernel_root_directory) - executor = CoreExecutor(file_manager) + executor = CoreExecutor(file_manager, None) workflow_path = os.sep.join([self.cwl_directory, 'essential_input.cwl']) executor.set_workflow_path(workflow_path) with open(os.sep.join([self.data_directory, 'essential_input_data1.yml'])) as f: data_str = f.read() executor.set_data([data_str]) try: - execution_id, new_files, exception = executor.execute() + execution_id, new_files, exception, research_object = executor.execute() self.assertIsNotNone(execution_id) self.assertDictEqual(new_files, {}) self.assertIsNone(exception, 'An exception occurred while executing workflow') + self.assertIsNone(research_object, 'Research object must be none when provenance is disabled') except Exception: self.fail("execution failed")