Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow files to be named pipes #1469

Merged
merged 11 commits into from Jul 26, 2021
1 change: 1 addition & 0 deletions cwltool/context.py
Expand Up @@ -107,6 +107,7 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
self.pull_image = True # type: bool
self.rm_container = True # type: bool
self.move_outputs = "move" # type: str
self.streaming_allowed: bool = False

self.singularity = False # type: bool
self.debug = False # type: bool
Expand Down
20 changes: 16 additions & 4 deletions cwltool/job.py
Expand Up @@ -3,6 +3,7 @@
import itertools
import logging
import os
import stat
import re
import shutil
import subprocess # nosec
Expand Down Expand Up @@ -175,13 +176,24 @@ def _setup(self, runtimeContext: RuntimeContext) -> None:
if not os.path.exists(self.outdir):
os.makedirs(self.outdir)

def is_streamable(file: str) -> bool:
mr-c marked this conversation as resolved.
Show resolved Hide resolved
if not runtimeContext.streaming_allowed:
return False
for inp in self.joborder.values():
if isinstance(inp, dict) and inp.get("location", None) == file:
return inp.get("streamable", False)
return False
mr-c marked this conversation as resolved.
Show resolved Hide resolved

for knownfile in self.pathmapper.files():
p = self.pathmapper.mapper(knownfile)
if p.type == "File" and not os.path.isfile(p[0]) and p.staged:
raise WorkflowException(
"Input file %s (at %s) not found or is not a regular "
"file." % (knownfile, self.pathmapper.mapper(knownfile)[0])
)
if not (
is_streamable(knownfile) and stat.S_ISFIFO(os.stat(p[0]).st_mode)
):
raise WorkflowException(
"Input file %s (at %s) not found or is not a regular "
"file." % (knownfile, self.pathmapper.mapper(knownfile)[0])
)

if "listing" in self.generatefiles:
runtimeContext = runtimeContext.copy()
Expand Down
108 changes: 108 additions & 0 deletions tests/test_streaming.py
@@ -0,0 +1,108 @@
"""Test that files marked as 'streamable' when 'streaming_allowed' can be named pipes."""
import os

import pytest
from pathlib import Path
from typing import cast

from ruamel.yaml.comments import CommentedMap
from schema_salad.sourceline import cmap

from cwltool.command_line_tool import CommandLineTool
from cwltool.context import LoadingContext, RuntimeContext
from cwltool.errors import WorkflowException
from cwltool.job import JobBase
from cwltool.update import INTERNAL_VERSION
from cwltool.utils import CWLObjectType
from .util import get_data

toolpath_object = cast(
CommentedMap,
cmap(
{
"cwlVersion": INTERNAL_VERSION,
"class": "CommandLineTool",
"inputs": [
{
"type": "File",
"id": "inp",
"streamable": True,
}
],
"outputs": [],
"requirements": [],
}
),
)

loading_context = LoadingContext(
{
"metadata": {
"cwlVersion": INTERNAL_VERSION,
"http://commonwl.org/cwltool#original_cwlVersion": INTERNAL_VERSION,
}
}
)


def test_regular_file() -> None:
"""Test that regular files do not raise any exception when they are checked in job._setup."""
clt = CommandLineTool(
toolpath_object,
loading_context,
)
runtime_context = RuntimeContext()

joborder: CWLObjectType = {
"inp": {
"class": "File",
"location": get_data("tests/wf/whale.txt"),
}
}

job = next(clt.job(joborder, None, runtime_context))
assert isinstance(job, JobBase)

job._setup(runtime_context)


streaming = [
(True, True, False),
(True, False, True),
(False, True, True),
(False, False, True),
]


@pytest.mark.parametrize("streamable,streaming_allowed,raise_exception", streaming)
def test_input_can_be_named_pipe(
tmp_path: Path, streamable: bool, streaming_allowed: bool, raise_exception: bool
) -> None:
"""Test that input can be a named pipe."""
clt = CommandLineTool(
toolpath_object,
loading_context,
)

runtime_context = RuntimeContext()
runtime_context.streaming_allowed = streaming_allowed

path = tmp_path / "tmp"
os.mkfifo(path)

joborder: CWLObjectType = {
"inp": {
"class": "File",
"location": str(path),
"streamable": streamable,
}
}

job = next(clt.job(joborder, None, runtime_context))
assert isinstance(job, JobBase)

if raise_exception:
with pytest.raises(WorkflowException):
job._setup(runtime_context)
else:
job._setup(runtime_context)