Skip to content

Commit

Permalink
experimental test workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Dec 15, 2021
1 parent 310b1fd commit 0a0e3b7
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"processDescription": {
"process": {
"id": "DockerNetCDF2Text",
"description": "Mock WPS-1 operation followed by copy of images."
}
},
"executionUnit": [
{
"test": "DockerNetCDF2Text.cwl"
}
],
"deploymentProfileName": "http://www.opengis.net/profiles/eoc/dockerizedApplication"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
processDescription:
process:
id: WPS1JsonArray2NetCDF
version: 0.0.1
description: Process that calls Weaver's own builtin process using its WPS-1 interface.
inputs:
- id: input_json
formats:
- mediaType: application/json
outputs:
- id: output_files
formats:
- mediaType: application/x-netcdf
executionUnit:
- unit:
cwlVersion: v1.0
class: CommandLineTool
hints: # cannot be in requirements since not official CWL
WPS1Requirement:
# note: this must match the WebTest TestApp endpoint and settings
provider: "https://localhost/ows/wps"
process: jsonarray2netcdf
inputs:
input_json:
type: File
outputs:
output_files:
type: File[]
outputBinding:
glob: "*.nc"
deploymentProfileName: http://www.opengis.net/profiles/eoc/wpsApplication
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"processDescription": {
"process": {
"id": "WorkflowWPS1CopyNetCDF",
"description": "Mock WPS-1 operation followed by copy of images."
}
},
"executionUnit": [
{
"test": "WorkflowWPS1CopyNetCDF.cwl"
}
],
"deploymentProfileName": "http://www.opengis.net/profiles/eoc/workflow"
}
28 changes: 28 additions & 0 deletions tests/functional/application-packages/DockerNetCDF2Text.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
cwlVersion: v1.0
class: CommandLineTool
baseCommand:
- bash
- script.sh
requirements:
DockerRequirement:
dockerPull: debian:stretch-slim
InitialWorkDirRequirement:
listing:
- entryname: script.sh
entry: |
set -x
in="\${1}"
out="\${2%.*}.txt"
echo "Input: \${in}"
echo "Output: \${out}"
mv "\${in}" "\${out}"
inputs:
input_nc:
type: File
inputBinding:
position: 1
outputs:
output_txt:
type: File
outputBinding:
glob: "*.txt"
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"inputs": [
{
"id": "input_files",
"href": "https://mocked-file-server/test-file.txt"
"href": "https://mocked-file-server.com/test-file.txt"
}
],
"outputs": [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mode: async
response: document
inputs:
- id: input_nc
href: https://mocked-file-server.com/file.nc
outputs:
- id: output_txt
transmissionMode: reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mode: async
response: document
inputs:
- id: input_json
href: https://mocked-file-server.com/netcdf-array.json
outputs:
- id: output_files
transmissionMode: reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mode: async
response: document
inputs:
- id: input_json
href: https://mocked-file-server.com/netcdf-array.json
outputs:
- id: output_files
transmissionMode: reference
27 changes: 27 additions & 0 deletions tests/functional/application-packages/WorkflowWPS1CopyNetCDF.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
cwlVersion: v1.0
class: Workflow
requirements:
ScatterFeatureRequirement: {}
inputs:
input_json: File
outputs:
output:
type:
type: array
items: File
outputSource: convert/output_txt
steps:
parse:
# note: This cannot exist as CWL by itself. It uses Weaver WSP1Requirement.
run: WPS1JsonArray2NetCDF.cwl
in:
input_json: input_json
out:
- output_files
convert:
run: DockerNetCDF2Text.cwl
scatter: input_nc
in:
input_nc: parse/output_files
out:
- output_txt
49 changes: 42 additions & 7 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class WorkflowProcesses(enum.Enum):
APP_DOCKER_STAGE_IMAGES = "DockerStageImages"
APP_DOCKER_COPY_IMAGES = "DockerCopyImages"
APP_DOCKER_COPY_NESTED_OUTDIR = "DockerCopyNestedOutDir"
APP_DOCKER_NETCDF_2_TEXT = "DockerNetCDF2Text"
APP_WPS1_JSON_ARRAY_2_NETCDF = "WPS1JsonArray2NetCDF"
WORKFLOW_STACKER_SFS = "Workflow"
WORKFLOW_SC = "WorkflowSimpleChain"
WORKFLOW_S2P = "WorkflowS2ProbaV"
Expand All @@ -83,6 +85,7 @@ class WorkflowProcesses(enum.Enum):
WORKFLOW_SUBSET_NASA_ESGF_SUBSET_CRIM = "WorkflowSubsetNASAESGF_SubsetCRIM"
WORKFLOW_FILE_TO_SUBSET_CRIM = "WorkflowFile_To_SubsetCRIM"
WORKFLOW_STAGE_COPY_IMAGES = "WorkflowStageCopyImages"
WORKFLOW_WPS1_COPY_NETCDF = "WorkflowWPS1CopyNetCDF"


class ProcessInfo(object):
Expand Down Expand Up @@ -621,7 +624,8 @@ def request(cls, method, url, ignore_errors=False, force_requests=False, log_ena
if with_requests:
kw.update({"verify": False, "timeout": cls.WEAVER_TEST_REQUEST_TIMEOUT})
# retry request if the error was caused by some connection error
resp = request_extra(method, url, json=json_body, data=data_body, retries=3, settings=cls.settings, **kw)
settings = cls.settings.fget(cls)
resp = request_extra(method, url, json=json_body, data=data_body, retries=3, settings=settings, **kw)

# add some properties similar to `webtest.TestApp`
resp_body = getattr(resp, "body", None) # if error is pyramid HTTPException, body is byte only
Expand Down Expand Up @@ -704,8 +708,8 @@ def workflow_runner(self,
Identifier of the :term:`Workflow` to test.
Must be a member amongst preloaded :attr:`WEAVER_TEST_WORKFLOW_SET` definitions.
:param test_application_ids:
Identifiers of all intermediate :term:`Process` steps expected by the :term:`Workflow` to test.
Must be members amongst preloaded :attr:`WEAVER_TEST_APPLICATION_SET` definitions.
Identifiers of all intermediate :term:`Process` steps to be deployed prior to the tested :term:`Workflow`
expecting them to exist. Must be members amongst preloaded :attr:`WEAVER_TEST_APPLICATION_SET` definitions.
:param log_full_trace:
Flag to provide extensive trace logs of all request and response details for each operation.
:param requests_mock_callback:
Expand Down Expand Up @@ -851,18 +855,49 @@ class WorkflowTestCase(WorkflowTestRunnerBase):
WEAVER_TEST_SERVER_BASE_PATH = ""

WEAVER_TEST_APPLICATION_SET = {
WorkflowProcesses.APP_DOCKER_STAGE_IMAGES,
WorkflowProcesses.APP_DOCKER_COPY_IMAGES,
WorkflowProcesses.APP_DOCKER_COPY_NESTED_OUTDIR,
WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT,
WorkflowProcesses.APP_DOCKER_STAGE_IMAGES,
WorkflowProcesses.APP_WPS1_JSON_ARRAY_2_NETCDF,
}
WEAVER_TEST_WORKFLOW_SET = {
WorkflowProcesses.WORKFLOW_CHAIN_COPY,
WorkflowProcesses.WORKFLOW_STAGE_COPY_IMAGES,
WorkflowProcesses.WORKFLOW_WPS1_COPY_NETCDF,
}

@pytest.mark.xfail(reason="Workflow not working anymore. IO to be repaired.")
def test_workflow_wps1_requirements(self):
raise NotImplementedError()
# FIXME: implement + re-enable 'CWL_REQUIREMENT_SCATTER'
@pytest.mark.xfail(reason="WIP")
def test_workflow_mixed_wps1_docker_scatter_requirements(self):
"""
Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`.
Steps:
1. Convert JSON array of NetCDF references to corresponding NetCDF files
(using WPS-1 interface rather than WPS-REST).
2. Convert NetCDF file to raw text data dumps (using scattered applications per-file).
"""

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowCopyNestedOutDir.json'
tmp_dir = stack.enter_context(tempfile.TemporaryDirectory())
nc_refs = []
for i in range(3):
nc_name = f"test-file-{i}.nc"
nc_refs.append(os.path.join(tmp_host, nc_name))
with open(os.path.join(tmp_dir, nc_name), "w") as tmp_file:
tmp_file.write(f"DUMMY NETCDF DATA #{i}")
with open(os.path.join(tmp_dir, "netcdf-array.json"), "w") as tmp_file: # must match execution body
json.dump(nc_refs, tmp_file)

def mock_tmp_input(requests_mock):
mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock)

self.workflow_runner(WorkflowProcesses.WORKFLOW_WPS1_COPY_NETCDF,
[WorkflowProcesses.APP_WPS1_JSON_ARRAY_2_NETCDF,
WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT],
log_full_trace=True, requests_mock_callback=mock_tmp_input)

def test_workflow_docker_applications(self):
self.workflow_runner(WorkflowProcesses.WORKFLOW_STAGE_COPY_IMAGES,
Expand Down
19 changes: 19 additions & 0 deletions weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@

CWL_REQUIREMENT_ENV_VAR = "EnvVarRequirement"
CWL_REQUIREMENT_INIT_WORKDIR = "InitialWorkDirRequirement"
CWL_REQUIREMENT_SCATTER = "ScatterFeatureRequirement"

CWL_REQUIREMENT_FEATURES = frozenset([
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_INIT_WORKDIR,
# CWL_REQUIREMENT_SCATTER, # FIXME: see workflow test + fix https://github.com/crim-ca/weaver/issues/105
])
"""
Set of :term:`CWL` requirements that corresponds to extra functionalities not completely defining
an :term:`Application Package` by themselves.
"""

CWL_REQUIREMENTS_SUPPORTED = frozenset(
list(CWL_REQUIREMENT_APP_TYPES) +
list(CWL_REQUIREMENT_FEATURES)
)
"""
Set of all :term:`CWL` requirements or hints that are supported for deployment of valid :term:`Application Package`.
"""

# CWL package types and extensions
PACKAGE_SIMPLE_TYPES = frozenset(["string", "boolean", "float", "int", "integer", "long", "double"])
Expand Down
1 change: 1 addition & 0 deletions weaver/processes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def _check_deploy(payload):
})
return results
except colander.Invalid as exc:
LOGGER.debug("Failed deploy body schema validation:\n%s", exc)
raise HTTPBadRequest(json={
"description": message,
"cause": "Invalid schema: [{!s}]".format(exc.msg),
Expand Down
14 changes: 9 additions & 5 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
CWL_REQUIREMENT_APP_TYPES,
CWL_REQUIREMENT_APP_WPS1,
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENTS_SUPPORTED,
WPS_INPUT,
WPS_OUTPUT
)
Expand Down Expand Up @@ -601,7 +601,7 @@ def get_application_requirement(package):
"only one permitted amongst {}.".format(list(app_hints), list(CWL_REQUIREMENT_APP_TYPES)))
requirement = app_hints[0] if app_hints else {"class": ""}

cwl_supported_reqs = [item for item in CWL_REQUIREMENT_APP_TYPES] + [CWL_REQUIREMENT_INIT_WORKDIR]
cwl_supported_reqs = list(CWL_REQUIREMENTS_SUPPORTED)
if not all(item.get("class") in cwl_supported_reqs for item in all_hints):
raise PackageTypeError("Invalid requirement, the requirements supported are {0}".format(cwl_supported_reqs))

Expand Down Expand Up @@ -1654,9 +1654,13 @@ def _get_wps1_params(_requirement):

requirement = get_application_requirement(self.package)
req_class = requirement["class"]
req_source = "requirement/hint"
if self.package_type == PROCESS_WORKFLOW:
req_class = PROCESS_WORKFLOW
req_source = "tool class"

if req_class.endswith(CWL_REQUIREMENT_APP_WPS1):
self.logger.info("WPS-1 Package resolved from requirement/hint: %s", req_class)
self.logger.info("WPS-1 Package resolved from %s: %s", req_source, req_class)
from weaver.processes.wps1_process import Wps1Process
params = _get_wps1_params(requirement)
return Wps1Process(
Expand All @@ -1666,7 +1670,7 @@ def _get_wps1_params(_requirement):
update_status=_update_status_dispatch,
)
elif req_class.endswith(CWL_REQUIREMENT_APP_ESGF_CWT):
self.logger.info("ESGF-CWT Package resolved from requirement/hint: %s", req_class)
self.logger.info("ESGF-CWT Package resolved from %s: %s", req_source, req_class)
from weaver.processes.esgf_process import ESGFProcess
params = _get_wps1_params(requirement)
return ESGFProcess(
Expand All @@ -1677,7 +1681,7 @@ def _get_wps1_params(_requirement):
)
else:
# implements both `PROCESS_APPLICATION` with `CWL_REQUIREMENT_APP_DOCKER` and `PROCESS_WORKFLOW`
self.logger.info("WPS-3 Package resolved from requirement/hint: %s", req_class)
self.logger.info("WPS-3 Package resolved from %s: %s", req_source, req_class)
from weaver.processes.wps3_process import Wps3Process
return Wps3Process(step_payload=step_payload,
joborder=joborder,
Expand Down

0 comments on commit 0a0e3b7

Please sign in to comment.