Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions tests/test_workflow_validator.py

This file was deleted.

104 changes: 104 additions & 0 deletions tests/test_workflow_validator_for_create_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
from typing import Any

import pytest
import yaml

pytestmark = pytest.mark.unit

from tests.test_decoder import _MINIMAL_WORKFLOW
from workflow.workflow_validator import ValidationLevel, WorkflowValidator


def test_validate_minimal():
# Arrange

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.CREATE,
workflow_definition=_MINIMAL_WORKFLOW,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None


def test_validate_example_nop_file():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "example-nop-fail.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.CREATE,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None


def test_validate_example_smiles_to_file():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "example-smiles-to-file.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.CREATE,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None


def test_validate_example_tow_step_nop():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "example-two-step-nop.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.CREATE,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None


def test_validate_shortcut_example_1():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "shortcut-example-1.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.CREATE,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None
89 changes: 89 additions & 0 deletions tests/test_workflow_validator_for_run_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import os
from typing import Any

import pytest
import yaml

pytestmark = pytest.mark.unit

from workflow.workflow_validator import ValidationLevel, WorkflowValidator


def test_validate_example_nop_file():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "example-nop-fail.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.RUN,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None


def test_validate_example_smiles_to_file():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "example-smiles-to-file.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.RUN,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None


def test_validate_example_tow_step_nop():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "example-two-step-nop.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.RUN,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None


def test_validate_shortcut_example_1():
# Arrange
workflow_file: str = os.path.join(
os.path.dirname(__file__), "workflow-definitions", "shortcut-example-1.yaml"
)
with open(workflow_file, "r", encoding="utf8") as workflow_file:
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
assert workflow

# Act
error = WorkflowValidator.validate(
level=ValidationLevel.RUN,
workflow_definition=workflow,
)

# Assert
assert error.error_num == 0
assert error.error_msg is None
16 changes: 8 additions & 8 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,22 +304,22 @@ def _validate_step_command(
# and can be launched. If it is False then the returned str contains an
# error message.
#
# Remember that variables can exist in (ascending order of priority): -
# Remember that variables can exist in the specification too.
# So, the full set of step variables (in ascending order of priority)
# is...
#
# 1. The specification
# 2. The workflow
# 2. The RunningWorkflow
# 3. The RunningWorkflow

all_variables: dict[str, Any] = {}
if "variables" in step_spec:
all_variables = step_spec.pop("variables")
all_variables = step_spec.pop("variables") if "variables" in step_spec else {}
if workflow_variables:
all_variables = all_variables | workflow_variables
all_variables |= workflow_variables
if running_workflow_variables:
all_variables = all_variables | running_workflow_variables
all_variables |= running_workflow_variables
message, success = decode(
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
)

return all_variables if success else message

def _launch(
Expand Down
57 changes: 29 additions & 28 deletions workflow/workflow_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,35 +72,36 @@ def _validate_run_level(
assert workflow_definition
del workflow_inputs

# RUN level requires that the specification is a valid JSON string.
# RUN level requires that each step specification is a valid JSON string.
# and contains properties for 'collection', 'job', and 'version'.
try:
specification = json.loads(workflow_definition["specification"])
except json.decoder.JSONDecodeError as e:
return ValidationResult(
error_num=1,
error_msg=[
f"Error decoding specification, which is not valid JSON: {e}"
],
)
except TypeError as e:
return ValidationResult(
error_num=2,
error_msg=[
f"Error decoding specification, which is not valid JSON: {e}"
],
)
expected_keys: set[str] = {"collection", "job", "version"}
missing_keys: list[str] = []
missing_keys.extend(
expected_key
for expected_key in expected_keys
if expected_key not in specification
)
if missing_keys:
return ValidationResult(
error_num=2,
error_msg=[f"Specification is missing: {', '.join(missing_keys)}"],
for step in workflow_definition["steps"]:
try:
specification = json.loads(step["specification"])
except json.decoder.JSONDecodeError as e:
return ValidationResult(
error_num=2,
error_msg=[
f"Error decoding specification, which is not valid JSON: {e}"
],
)
except TypeError as e:
return ValidationResult(
error_num=3,
error_msg=[
f"Error decoding specification, which is not valid JSON: {e}"
],
)
expected_keys: set[str] = {"collection", "job", "version"}
missing_keys: list[str] = []
missing_keys.extend(
expected_key
for expected_key in expected_keys
if expected_key not in specification
)
if missing_keys:
return ValidationResult(
error_num=2,
error_msg=[f"Specification is missing: {', '.join(missing_keys)}"],
)

return _VALIDATION_SUCCESS