Skip to content

Commit

Permalink
Add pydantic IO docs (#939)
Browse files Browse the repository at this point in the history
Add docs to script user guide and experimental features section

---------

Signed-off-by: Elliot Gunton <egunton@bloomberg.net>
Co-authored-by: Sambhav Kothari <skothari44@bloomberg.net>
  • Loading branch information
elliotgunton and samj1912 committed Feb 12, 2024
1 parent b146956 commit 5c5b665
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 5 deletions.
106 changes: 106 additions & 0 deletions docs/user-guides/scripts.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,109 @@ The parent outputs directory, `/hera/outputs` by default, can be set by the user
```python
global_config.set_class_defaults(RunnerScriptConstructor, outputs_directory="user/chosen/outputs")
```

## Script Pydantic IO

Hera provides the `RunnerInput` and `RunnerOutput` classes which can be used to more succinctly write your script
function inputs and outputs, and requires use of the Hera Runner. Use of these classes also requires the
`"script_pydantic_io"` experimental feature flag to be enabled:

```py
global_config.experimental_features["script_pydantic_io"] = True
```

### Pydantic V1 or V2?

You can import `RunnerInput` and `RunnerOutput` from the `hera.workflows.io` submodule to import the version of Pydantic
that matches your V1 or V2 installation.

If you need to use V1 models when you have V2 installed, you should import
`RunnerInput` and `RunnerOutput` from the `hera.workflows.io.v1` or `hera.workflows.io.v2` module explicitly. The V2
models will not be available if you have installed `pydantic<2`, but the V1 models are usable for either version,
allowing you to migrate at your own pace.

### Script inputs using `RunnerInput`

For your script inputs, you can create a derived class of `RunnerInput`, and declare all your input parameters (and
artifacts) as fields of the class. If you want to use `Annotated` to declare `Artifacts` add metadata to your
`Parameters`, you will also need to enable the `"script_annotations"` experimental feature flag.

```py
from typing import Annotated
from pydantic import BaseModel

from hera.workflows import Artifact, ArtifactLoader, Parameter, script
from hera.workflows.io import RunnerInput


class MyObject(BaseModel):
a_dict: dict = {}
a_str: str = "a default string"


class MyInput(RunnerInput):
param_int: Annotated[int, Parameter(name="param-input")] = 42
an_object: Annotated[MyObject, Parameter(name="obj-input")] = MyObject(
a_dict={"my-key": "a-value"}, a_str="hello world!"
)
artifact_int: Annotated[int, Artifact(name="artifact-input", loader=ArtifactLoader.json)]


@script(constructor="runner")
def pydantic_io(
my_input: MyInput,
) -> ...:
...
```

This will create a script template named `pydantic_io`, with input parameters `"param-input"` and `"obj-input"`, but
_not_ `"my_input"` (hence inline script templates will not work, as references to `my_input` will not resolve); the
template will also have the `"artifact-input"` artifact. The yaml generated from the Python will look something like the following:

```yaml
templates:
- name: pydantic-io
inputs:
parameters:
- name: param-input
default: '42'
- name: obj-input
default: '{"a_dict": {"my-key": "a-value"}, "a_str": "hello world!"}'
artifacts:
- name: artifact-input
path: /tmp/hera-inputs/artifacts/artifact-input
script:
...
```

### Script outputs using `RunnerOutput`

The `RunnerOutput` class comes with two special variables, `exit_code` and `result`. The `exit_code` is used to exit the
container when running on Argo with the specific exit code - it is set to `0` by default. The `result` is used to print
any serializable object to stdout, which means you can now use `.result` on tasks or steps that use a "runner
constructor" script - you should be mindful of printing/logging anything else to stdout, which will stop the `result`
functionality working as intended. If you want an output parameters/artifacts with the name `exit_code` or `result`, you
can declare another field with an annotation of that name, e.g.
`my_exit_code: Annotated[int, Parameter(name="exit_code")]`.

Aside from the `exit_code` and `result`, the `RunnerOutput` behaves exactly like the `RunnerInput`:

```py
from typing import Annotated

from hera.workflows import Artifact, Parameter, script
from hera.workflows.io import RunnerOutput


class MyOutput(RunnerOutput):
param_int: Annotated[int, Parameter(name="param-output")]
artifact_int: Annotated[int, Artifact(name="artifact-output")]


@script(constructor="runner")
def pydantic_io() -> MyOutput:
return MyOutput(exit_code=1, result="Test!", param_int=42, artifact_int=my_input.param_int)

```

See the full Pydantic IO example [here](../examples/workflows/experimental/script_pydantic_io.md)!
17 changes: 17 additions & 0 deletions docs/walk-through/advanced-hera-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,23 @@ global_config.experimental_features["script_annotations"] = True

Read the full guide on script annotations in [the script user guide](../user-guides/scripts.md#script-annotations).

### Script IO Models

Hera provides Pydantic models for you to create subclasses from, which allow you to more easily declare script template
inputs. Any fields that you declare in your subclass of `RunnerInput` will become input parameters or artifacts, while
`RunnerOutput` fields will become output parameters artifacts. The fields that you declare can be `Annotated` as a
`Parameter` or `Artifact`, as any fields with a basic type will become `Parameters` - you will also need the
`script_annotations` experimental feature enabled.

To enable Hera input/output models, you must set the `experimental_feature` flag `script_pydantic_io`

```py
global_config.experimental_features["script_pydantic_io"] = True
```

Read the full guide on script pydantic IO in [the script user guide](../user-guides/scripts.md#script-pydantic-io).


## Graduated features

Once an experimental feature is robust and reliable, we "graduate" them to allow their use without setting the
Expand Down
48 changes: 47 additions & 1 deletion src/hera/workflows/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,48 @@ def _save_annotated_return_outputs(
return None


def _save_dummy_outputs(
output_annotations: List[
Union[Tuple[type, Union[Parameter, Artifact]], Union[Type[RunnerOutputV1], Type[RunnerOutputV2]]]
],
) -> None:
"""Save dummy values into the outputs specified.
This function is used at runtime by the Hera Runner to create files in the container so that Argo
does not log confusing error messages that obfuscate the real error, which look like:
```
msg="cannot save parameter /tmp/hera-outputs/parameters/my-parameter"
argo=true
error="open /tmp/hera-outputs/parameters/my-parameter: no such file or directory"`
```
The output annotations are used to write files using the schema:
<parent_directory>/artifacts/<name>
<parent_directory>/parameters/<name>
If the artifact path or parameter value_from.path is specified, that is used instead.
<parent_directory> can be provided by the user or is set to /tmp/hera-outputs by default
"""
for dest in output_annotations:
if isinstance(dest, (RunnerOutputV1, RunnerOutputV2)):
if os.environ.get("hera__script_pydantic_io", None) is None:
raise ValueError("hera__script_pydantic_io environment variable is not set")

for field, _ in dest.__fields__:
if field in {"exit_code", "result"}:
continue

annotation = dest._get_output(field)
path = _get_outputs_path(annotation)
_write_to_path(path, "")
else:
assert isinstance(dest, tuple)
if not dest[1].name:
raise ValueError("The name was not provided for one of the outputs.")

path = _get_outputs_path(dest[1])
_write_to_path(path, "")


def _get_outputs_path(destination: Union[Parameter, Artifact]) -> Path:
"""Get the path from the destination annotation using the defined outputs directory."""
path = Path(os.environ.get("hera__outputs_directory", "/tmp/hera-outputs"))
Expand Down Expand Up @@ -403,7 +445,11 @@ def _runner(entrypoint: str, kwargs_list: List) -> Any:
if output_annotations:
# This will save outputs returned from the function only. Any function parameters/artifacts marked as
# outputs should be written to within the function itself.
output = _save_annotated_return_outputs(function(**kwargs), output_annotations)
try:
output = _save_annotated_return_outputs(function(**kwargs), output_annotations)
except Exception as e:
_save_dummy_outputs(output_annotations)
raise e
return output or None

return function(**kwargs)
Expand Down
14 changes: 14 additions & 0 deletions tests/script_runner/annotated_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ def script_param_no_name(a_number) -> Annotated[int, Parameter()]:
return a_number + 1


@script()
def script_param_output_raises_index_error() -> Annotated[int, Parameter(name="param-output")]:
"""Raise an IndexError."""
a_list = []
return a_list[0]


@script()
def script_artifact_output_raises_index_error() -> Annotated[int, Artifact(name="artifact-output")]:
"""Raise an IndexError."""
a_list = []
return a_list[0]


@script()
def script_outputs_in_function_signature(
a_number: Annotated[int, Parameter(name="a_number")],
Expand Down
58 changes: 54 additions & 4 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,58 @@ def test_script_annotations_outputs(
assert Path(tmp_path / file["subpath"]).read_text() == file["value"]


@pytest.mark.parametrize(
"function_name,expected_error,expected_files",
[
(
"script_param_output_raises_index_error",
IndexError,
[{"subpath": "tmp/hera-outputs/parameters/param-output", "value": ""}],
),
(
"script_artifact_output_raises_index_error",
IndexError,
[{"subpath": "tmp/hera-outputs/artifacts/artifact-output", "value": ""}],
),
],
)
def test_script_raising_error_still_outputs(
function_name,
expected_error: type,
expected_files: List[Dict[str, str]],
global_config_fixture: GlobalConfig,
environ_annotations_fixture: None,
tmp_path: Path,
monkeypatch,
):
"""Test that the output annotations are parsed correctly and save outputs to correct destinations."""
for file in expected_files:
assert not Path(tmp_path / file["subpath"]).is_file()
# GIVEN
global_config_fixture.experimental_features["script_annotations"] = True

outputs_directory = str(tmp_path / "tmp/hera-outputs")
global_config_fixture.set_class_defaults(RunnerScriptConstructor, outputs_directory=outputs_directory)

monkeypatch.setattr(test_module, "ARTIFACT_PATH", str(tmp_path))
os.environ["hera__outputs_directory"] = outputs_directory

# Force a reload of the test module, as the runner performs "importlib.import_module", which
# may fetch a cached version
import tests.script_runner.annotated_outputs as output_tests_module

importlib.reload(output_tests_module)

# WHEN
with pytest.raises(expected_error):
_runner(f"{output_tests_module.__name__}:{function_name}", [])

# THEN
for file in expected_files:
assert Path(tmp_path / file["subpath"]).is_file()
assert Path(tmp_path / file["subpath"]).read_text() == file["value"]


@pytest.mark.parametrize(
"function_name,kwargs_list,exception",
[
Expand Down Expand Up @@ -671,11 +723,10 @@ def test_runner_pydantic_inputs_params(


@pytest.mark.parametrize(
"entrypoint,kwargs_list,expected_files,pydantic_mode",
"entrypoint,expected_files,pydantic_mode",
[
pytest.param(
"tests.script_runner.pydantic_io_v1:pydantic_output_parameters",
[],
[
{"subpath": "tmp/hera-outputs/parameters/my_output_str", "value": "a string!"},
{"subpath": "tmp/hera-outputs/parameters/second-output", "value": "my-val"},
Expand All @@ -687,7 +738,6 @@ def test_runner_pydantic_inputs_params(
)
def test_runner_pydantic_output_params(
entrypoint,
kwargs_list,
expected_files,
pydantic_mode,
global_config_fixture: GlobalConfig,
Expand All @@ -708,7 +758,7 @@ def test_runner_pydantic_output_params(
os.environ["hera__outputs_directory"] = outputs_directory

# WHEN
output = _runner(entrypoint, kwargs_list)
output = _runner(entrypoint, [])

# THEN
assert isinstance(output, RunnerOutput)
Expand Down

0 comments on commit 5c5b665

Please sign in to comment.