diff --git a/cwlkernel/CoreExecutor.py b/cwlkernel/CoreExecutor.py index ac4b27f..63b1f9e 100644 --- a/cwlkernel/CoreExecutor.py +++ b/cwlkernel/CoreExecutor.py @@ -1,58 +1,19 @@ -from io import StringIO -import argparse -import functools -import io -import logging import os import sys -from codecs import StreamWriter, getwriter +import traceback from io import StringIO from pathlib import Path from typing import ( - IO, - Any, - Callable, Dict, List, - MutableMapping, - MutableSequence, Optional, - TextIO, Tuple, - Union, - cast, NoReturn) -from urllib.parse import ParseResult from uuid import uuid4, UUID -import coloredlogs -from cwltool import command_line_tool -from cwltool.argparser import arg_parser, get_default_args -from cwltool.context import LoadingContext, RuntimeContext, getdefault -from cwltool.cwlrdf import printdot, printrdf -from cwltool.errors import WorkflowException, UnsupportedRequirement -from cwltool.executors import JobExecutor, MultithreadedJobExecutor, SingleJobExecutor -from cwltool.load_tool import resolve_tool_uri, load_overrides, fetch_document, resolve_and_validate_document, make_tool -from cwltool.loghandler import _logger, defaultStreamHandler -from cwltool.main import configure_logging, supported_cwl_versions, setup_schema, \ - setup_provenance, setup_loadingContext, load_job_order, printdeps, print_pack, make_template, choose_target, \ - init_job_order, prov_deps, check_working_directories, find_default_container -from cwltool.mutation import MutationManager -from cwltool.process import ( - add_sizes, - shortname, -) -from cwltool.procgenerator import ProcessGenerator -from cwltool.resolver import ga4gh_tool_registries -from cwltool.secrets import SecretStore -from cwltool.software_requirements import DependenciesConfiguration -from cwltool.stdfsaccess import StdFsAccess -from cwltool.utils import versionstring, onWindows, windows_default_container_id, DEFAULT_TMP_PREFIX, visit_class -from ruamel.yaml.comments import CommentedMap -from schema_salad import validate -from schema_salad.ref_resolver import file_uri, uri_file_path -from schema_salad.sourceline import strip_dup_lineno -from schema_salad.utils import json_dumps +from cwltool.context import RuntimeContext +from cwltool.factory import Factory +from ruamel import yaml from .IOManager import IOFileManager @@ -81,436 +42,36 @@ def execute(self) -> Tuple[UUID, List[str], StringIO, StringIO, Optional[Excepti :return: Run ID, List of new files, stdout, stderr, exception if there is any """ run_id = uuid4() - args = [self._workflow_path, *self._data_paths] + # args = [self._workflow_path, *self._data_paths] stdout = StringIO() stderr = StringIO() + runtime_context = RuntimeContext() + runtime_context.outdir = self.file_manager.ROOT_DIRECTORY + runtime_context.basedir = self.file_manager.ROOT_DIRECTORY + os.chdir(self.file_manager.ROOT_DIRECTORY) + factory = Factory(runtime_context=runtime_context) + executable = factory.make(self._workflow_path) + data = {} + for data_file in self._data_paths: + with open(data_file) as f: + new_data = yaml.load(f, Loader=yaml.Loader) + data = {**new_data, **data} try: - created_files = self._cwltool_main(argsl=args, stderr=stderr, stdout=stdout) + result = executable(**data) + created_files = [f['location'] for f in result.values()] return run_id, created_files, stdout, stderr, None except Exception as e: + traceback.print_exc(file=sys.stderr) return run_id, [], stdout, stderr, e - @classmethod - def _check_workflow_file(cls, args): - if not args.workflow: - if os.path.isfile("CWLFile"): - setattr(args, "workflow", "CWLFile") - else: - _logger.error("CWL document required, no input file was provided") - arg_parser().print_help() - raise RuntimeError("CWL document required, no input file was provided") - @classmethod def validate_input_files(cls, yaml_input: Dict, cwd: Path) -> NoReturn: for arg in yaml_input: if isinstance(yaml_input[arg], dict) and 'class' in yaml_input[arg] and yaml_input[arg]['class'] == 'File': - file_path = Path(yaml_input[arg]['path']) + # TODO: check about path vs location + file_path = Path(yaml_input[arg]['location']) if not file_path.is_absolute(): file_path = cwd / file_path if not file_path.exists(): raise FileNotFoundError(file_path) - - def _cwltool_main(self, - argsl: Optional[List[str]] = None, - stdin: IO[Any] = sys.stdin, - stdout: Optional[Union[TextIO, StreamWriter]] = None, - stderr: IO[Any] = sys.stderr, - logger_handler: Optional[logging.Handler] = None, - loading_context: Optional[LoadingContext] = None, - runtime_context: Optional[RuntimeContext] = None, - input_required: bool = True, - ) -> List[str]: - """ - - :param argsl: - :param stdin: - :param stdout: - :param stderr: - :param logger_handler: - :param loading_context: - :param runtime_context: - :param input_required: - :return: A list of the paths of created files by the workflow - """ - stdout = self._force_utf8_to_stream(stdout) - - stderr_handler = self._init_cwl_logger(logger_handler, stderr) - - workflowobj = None - prov_log_handler = None # type: Optional[logging.StreamHandler] - try: - args, argsl = self._parse_cwl_options(argsl) - runtime_context = self._init_runtime_context(argsl, args, runtime_context) - configure_logging(args, stderr_handler, runtime_context) - _logger.info(versionstring()) - self._check_workflow_file(args) - - setup_schema(args, None) - - loading_context = setup_loadingContext(loading_context, runtime_context, args) - - uri, tool_file_uri = resolve_tool_uri( - args.workflow, - resolver=loading_context.resolver, - fetcher_constructor=loading_context.fetcher_constructor, - ) - - try_again_msg = ( - "" if args.debug else ", try again with --debug for more information" - ) - - try: - job_order_object, input_basedir, jobloader = load_job_order( - args, - stdin, - loading_context.fetcher_constructor, - loading_context.overrides_list, - tool_file_uri, - ) - - if args.overrides: - loading_context.overrides_list.extend( - load_overrides( - file_uri(os.path.abspath(args.overrides)), tool_file_uri - ) - ) - - loading_context, workflowobj, uri = fetch_document(uri, loading_context) - - loading_context, uri = resolve_and_validate_document( - loading_context, - workflowobj, - uri, - preprocess_only=(args.print_pre or args.pack), - skip_schemas=args.skip_schemas, - ) - - if loading_context.loader is None: - raise Exception("Impossible code path.") - processobj, metadata = loading_context.loader.resolve_ref(uri) - processobj = cast(CommentedMap, processobj) - - if args.provenance and runtime_context.research_obj: - # Can't really be combined with args.pack at same time - runtime_context.research_obj.packed_workflow( - print_pack(loading_context.loader, processobj, uri, metadata) - ) - - tool = make_tool(uri, loading_context) - - except (validate.ValidationException) as exc: - _logger.error( - "Tool definition failed validation:\n%s", str(exc), exc_info=args.debug - ) - raise exc - except (RuntimeError, WorkflowException) as exc: - _logger.error( - "Tool definition failed initialization:\n%s", - str(exc), - exc_info=args.debug, - ) - raise exc - except Exception as exc: - _logger.error( - "I'm sorry, I couldn't load this CWL file%s.\nThe error was: %s", - try_again_msg, - str(exc) if not args.debug else "", - exc_info=args.debug, - ) - raise exc - - if isinstance(tool, int): - return tool - - self._set_runtime_tmp_directories(runtime_context) - - if args.cachedir: - if args.move_outputs == "move": - runtime_context.move_outputs = "copy" - runtime_context.tmp_outdir_prefix = args.cachedir - - runtime_context.secret_store = getdefault( - runtime_context.secret_store, SecretStore() - ) - runtime_context.make_fs_access = getdefault( - runtime_context.make_fs_access, StdFsAccess - ) - - real_executor = self._init_job_executor(args, runtime_context) - - try: - runtime_context.basedir = input_basedir - - if isinstance(tool, ProcessGenerator): - tfjob_order = {} # type: MutableMapping[str, Any] - if loading_context.jobdefaults: - tfjob_order.update(loading_context.jobdefaults) - if job_order_object: - tfjob_order.update(job_order_object) - tfout, tfstatus = real_executor( - tool.embedded_tool, tfjob_order, runtime_context - ) - if tfstatus != "success": - raise WorkflowException( - "ProcessGenerator failed to generate workflow" - ) - tool, job_order_object = tool.result(tfjob_order, tfout, runtime_context) - if not job_order_object: - job_order_object = None - - initialized_job_order_object = self._init_job_order(args, input_basedir, input_required, - job_order_object, jobloader, runtime_context, - stdout, - tool) - - conf_file = getattr( - args, "beta_dependency_resolvers_configuration", None - ) # str - use_conda_dependencies = getattr( - args, "beta_conda_dependencies", None - ) # str - - if conf_file or use_conda_dependencies: - runtime_context.job_script_provider = DependenciesConfiguration(args) - else: - runtime_context.find_default_container = functools.partial( - find_default_container, - default_container=runtime_context.default_container, - use_biocontainers=args.beta_use_biocontainers, - ) - - (out, status) = real_executor( - tool, initialized_job_order_object, runtime_context, logger=_logger - ) - _logger.info(f'real executor out:\t{out}') - if out is not None: - if runtime_context.research_obj is not None: - runtime_context.research_obj.create_job(out, None, True) - - def remove_at_id(doc: MutableMapping[str, Any]) -> None: - for key in list(doc.keys()): - if key == "@id": - del doc[key] - else: - value = doc[key] - if isinstance(value, MutableMapping): - remove_at_id(value) - elif isinstance(value, MutableSequence): - for entry in value: - if isinstance(entry, MutableMapping): - remove_at_id(entry) - - remove_at_id(out) - visit_class( - out, - ("File",), - functools.partial(add_sizes, runtime_context.make_fs_access("")), - ) - - def loc_to_path(obj): # type: (Dict[str, Any]) -> None - for field in ("path", "nameext", "nameroot", "dirname"): - if field in obj: - del obj[field] - if obj["location"].startswith("file://"): - obj["path"] = uri_file_path(obj["location"]) - - visit_class(out, ("File", "Directory"), loc_to_path) - - # Unsetting the Generation from final output object - visit_class(out, ("File",), MutationManager().unset_generation) - - if isinstance(out, str): - stdout.write(out) - else: - stdout.write(json_dumps(out, indent=4, ensure_ascii=False)) - stdout.write("\n") - if hasattr(stdout, "flush"): - stdout.flush() - - if status != "success": - _logger.warning("Final process status is %s", status) - raise RuntimeError("Final process status is %s", status) - - _logger.info("Final process status is %s", status) - return [output_binding['location'] for output_binding in out.values()] - - except (validate.ValidationException) as exc: - _logger.error( - "Input object failed validation:\n%s", str(exc), exc_info=args.debug - ) - raise exc - except UnsupportedRequirement as exc: - _logger.error( - "Workflow or tool uses unsupported feature:\n%s", - str(exc), - exc_info=args.debug, - ) - raise exc - except WorkflowException as exc: - _logger.error( - "Workflow error%s:\n%s", - try_again_msg, - strip_dup_lineno(str(exc)), - exc_info=args.debug, - ) - raise exc - except Exception as exc: # pylint: disable=broad-except - _logger.error( - "Unhandled error%s:\n %s", - try_again_msg, - str(exc), - exc_info=args.debug, - ) - raise exc - - finally: - if ( - args - and runtime_context - and runtime_context.research_obj - and workflowobj - and loading_context - ): - research_obj = runtime_context.research_obj - if loading_context.loader is not None: - research_obj.generate_snapshot( - prov_deps(workflowobj, loading_context.loader, uri) - ) - else: - _logger.warning( - "Unable to generate provenance snapshot " - " due to missing loadingContext.loader." - ) - if prov_log_handler is not None: - # Stop logging so we won't half-log adding ourself to RO - _logger.debug( - "[provenance] Closing provenance log file %s", prov_log_handler - ) - _logger.removeHandler(prov_log_handler) - # Ensure last log lines are written out - prov_log_handler.flush() - # Underlying WritableBagFile will add the tagfile to the manifest - prov_log_handler.stream.close() - prov_log_handler.close() - research_obj.close(args.provenance) - - _logger.removeHandler(stderr_handler) - _logger.addHandler(defaultStreamHandler) - - @classmethod - def _init_job_order(cls, args, input_basedir, input_required, job_order_object, jobloader, runtime_context, stdout, - tool): - try: - initialized_job_order_object = init_job_order( - job_order_object, - args, - tool, - jobloader, - stdout, - print_input_deps=args.print_input_deps, - relative_deps=args.relative_deps, - make_fs_access=runtime_context.make_fs_access, - input_basedir=input_basedir, - secret_store=runtime_context.secret_store, - input_required=input_required, - ) - except SystemExit as err: - raise RuntimeError("cannot init job order: " + str(err)) - - del args.workflow - del args.job_order - return initialized_job_order_object - - @classmethod - def _set_runtime_tmp_directories(cls, runtimeContext): - # If on MacOS platform, TMPDIR must be set to be under one of the - # shared volumes in Docker for Mac - # More info: https://dockstore.org/docs/faq - if sys.platform == "darwin": - default_mac_path = "/private/tmp/docker_tmp" - if runtimeContext.tmp_outdir_prefix == DEFAULT_TMP_PREFIX: - runtimeContext.tmp_outdir_prefix = default_mac_path - if runtimeContext.tmpdir_prefix == DEFAULT_TMP_PREFIX: - runtimeContext.tmpdir_prefix = default_mac_path - if check_working_directories(runtimeContext) is not None: - raise RuntimeError("Failed to check working directories for runtime context") - - @classmethod - def _init_job_executor(cls, args, runtimeContext): - if args.parallel: - temp_executor = MultithreadedJobExecutor() - runtimeContext.select_resources = temp_executor.select_resources - real_executor = temp_executor # type: JobExecutor - else: - real_executor = SingleJobExecutor() - return real_executor - - @classmethod - def _init_runtime_context(cls, argsl, args, runtime_context): - runtime_context = RuntimeContext(vars(args)) if runtime_context is None else runtime_context.copy() - # If on Windows platform, a default Docker Container is used if not - # explicitely provided by user - if onWindows() and not runtime_context.default_container: - # This docker image is a minimal alpine image with bash installed - # (size 6 mb). source: https://github.com/frol/docker-alpine-bash - runtime_context.default_container = windows_default_container_id - - if args.provenance: - if argsl is None: - raise Exception("argsl cannot be None") - if setup_provenance(args, argsl, runtime_context) is not None: - return 1 - - return runtime_context - - def _parse_cwl_options(self, argsl): - if argsl is None: - argsl = sys.argv[1:] - addl = [] # type: List[str] - if "CWLTOOL_OPTIONS" in os.environ: - addl = os.environ["CWLTOOL_OPTIONS"].split(" ") - args = arg_parser().parse_args(addl + ["--outdir", self.file_manager.ROOT_DIRECTORY] + argsl) - if args.record_container_id: - if not args.cidfile_dir: - args.cidfile_dir = os.getcwd() - del args.record_container_id - - # If caller parsed its own arguments, it may not include every - # cwltool option, so fill in defaults to avoid crashing when - # dereferencing them in args. - for key, val in get_default_args().items(): - if not hasattr(args, key): - setattr(args, key, val) - - if args.relax_path_checks: - command_line_tool.ACCEPTLIST_RE = command_line_tool.ACCEPTLIST_EN_RELAXED_RE - - if args.ga4gh_tool_registries: - ga4gh_tool_registries[:] = args.ga4gh_tool_registries - if not args.enable_ga4gh_tool_registry: - del ga4gh_tool_registries[:] - - return args, argsl - - @classmethod - def _init_cwl_logger(cls, logger_handler, stderr): - stderr_handler = logger_handler - if stderr_handler is not None: - _logger.addHandler(stderr_handler) - else: - coloredlogs.install(logger=_logger, stream=stderr) - stderr_handler = _logger.handlers[-1] - return stderr_handler - - @classmethod - def _force_utf8_to_stream(cls, stdout): - if not stdout: # force UTF-8 even if the console is configured differently - if hasattr(sys.stdout, "encoding") and sys.stdout.encoding != "UTF-8": - if hasattr(sys.stdout, "detach"): - stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") - else: - stdout = getwriter("utf-8")(sys.stdout) # type: ignore - else: - stdout = sys.stdout - return stdout diff --git a/tests/cwl/extract_tar.cwl b/tests/cwl/extract_tar.cwl index 01c8e26..405e514 100644 --- a/tests/cwl/extract_tar.cwl +++ b/tests/cwl/extract_tar.cwl @@ -2,7 +2,7 @@ cwlVersion: v1.0 class: CommandLineTool -baseCommand: [tar, --extract] +baseCommand: [tar, --extract, --verbose] inputs: tarfile: type: File diff --git a/tests/input_data/input_with_file.yml b/tests/input_data/input_with_file.yml index 8e353a6..07daf88 100644 --- a/tests/input_data/input_with_file.yml +++ b/tests/input_data/input_with_file.yml @@ -3,4 +3,4 @@ example_string: hello example_int: 42 example_file: class: File - path: foo.txt \ No newline at end of file + location: foo.txt \ No newline at end of file diff --git a/tests/input_data/tar_job.yml b/tests/input_data/tar_job.yml index addb10a..751c58e 100644 --- a/tests/input_data/tar_job.yml +++ b/tests/input_data/tar_job.yml @@ -1,4 +1,4 @@ tarfile: class: File # set the filename in the script - path: tarfile.tar \ No newline at end of file + location: tarfile.tar \ No newline at end of file diff --git a/tests/test_CWLKernel.py b/tests/test_CWLKernel.py index 9c3ceb9..42d136a 100644 --- a/tests/test_CWLKernel.py +++ b/tests/test_CWLKernel.py @@ -168,8 +168,8 @@ def test_handle_input_data_files(self): data = yaml.load(f, Loader=yaml.Loader) tmp_dir = tempfile.mkdtemp() - data['example_file']['path'] = os.path.join(tmp_dir, 'file.txt') - with open(data['example_file']['path'], 'w') as f: + data['example_file']['location'] = os.path.join(tmp_dir, 'file.txt') + with open(data['example_file']['location'], 'w') as f: f.write('') data_stream = StringIO() yaml.dump(data, data_stream) @@ -185,7 +185,7 @@ def test_handle_input_data_files(self): ) import uuid input_with_missing_file = StringIO() - yaml.dump({"missing_file": {"class": "File", "path": f"/{uuid.uuid4()}"}}, input_with_missing_file) + yaml.dump({"missing_file": {"class": "File", "location": f"/{uuid.uuid4()}"}}, input_with_missing_file) response = kernel.do_execute(input_with_missing_file.getvalue()) self.assertDictEqual( {'status': 'error', 'execution_count': 0, 'payload': [], 'user_expressions': {}}, diff --git a/tests/test_CoreExecutor.py b/tests/test_CoreExecutor.py index eee7ccd..3cf90cc 100644 --- a/tests/test_CoreExecutor.py +++ b/tests/test_CoreExecutor.py @@ -36,7 +36,7 @@ def test_executor_execute(self): self.assertListEqual(new_files, []) self.assertIsInstance(stdout, io.IOBase) self.assertIsInstance(stderr, io.IOBase) - self.assertIsNone(exception, 'An exception occurred while executing workflow', str(exception)) + self.assertIsNone(exception, 'An exception occurred while executing workflow') except Exception: self.fail("execution failed") @@ -48,7 +48,7 @@ def test_validate_input_files(self): 'example_int': 42, 'example_file': { 'class': 'File', - 'path': f'/NOT_EXISTING_FILENAME-{uuid.uuid4()}.txt' + 'location': f'/NOT_EXISTING_FILENAME-{uuid.uuid4()}.txt' } } self.assertRaises( @@ -60,7 +60,7 @@ def test_validate_input_files(self): 'example_int': 42, 'example_file': { 'class': 'File', - 'path': f'NOT_EXISTING_FILENAME-{uuid.uuid4()}.txt' + 'location': f'NOT_EXISTING_FILENAME-{uuid.uuid4()}.txt' } } self.assertRaises(