In [1]:
from hera.shared import global_config
from hera.auth import ArgoCLITokenGenerator

global_config.host = "https://localhost:2746"
global_config.token = ArgoCLITokenGenerator
global_config.verify_ssl = False

## WorkflowTemplate <-> Pipeline & Workflow <-> Flow

WorkFlowTemplates crucially allow the definition of parameters as input variables to the entire workflow *without specifying a value*. You can also specify the default value, but it is possible to omit in cases where a sensible value just doesnt apply.

Submitting these WorkFlow templates doesnt execute anything on the ArgoWorkflow engine, but rather adds a template that future `Workflow`s can be derived from, together with configurable workflow input parameters.

This allows for the following entity mapping between bettmensch.ai pipelines and ArgoWorkflow resources:

| bettmensch.ai | ArgoWorkflow      |
| --------------|-------------------|
| `Flow`        | `WorkflowTemplate`|
| `Run`         | `Workflow`        |

## Define script templates

In [2]:
from hera.workflows import script, Parameter, models as m

@script(outputs=[Parameter(name="sum",value_from=m.ValueFrom(path="./sum.txt"))])
def add(a: float, b: float):
    with open('./sum.txt','w') as output:
        output.write(str(a + b))
        
@script(outputs=[Parameter(name="difference",value_from=m.ValueFrom(path="./difference.txt"))])
def subtract(a: float, b: float):
    with open('./difference.txt','w') as output:
        output.write(str(a - b))
        
@script(outputs=[Parameter(name="product",value_from=m.ValueFrom(path="./product.txt"))])
def multiply(a: float, b: float):
    with open('./product.txt','w') as output:
        output.write(str(a * b))

type(add), type(subtract), type(multiply)

About to instantiate Script instance
Successfully instantiated Script instance
About to instantiate Script instance
Successfully instantiated Script instance
About to instantiate Script instance
Successfully instantiated Script instance


(function, function, function)

## Define Workflow and DAG

In [9]:
from hera.workflows import WorkflowTemplate, DAG, Parameter, S3Artifact

dag_name = "2ab-minus-bcd"

with WorkflowTemplate(
    generate_name="2ab-minus-bcd-", 
    entrypoint=dag_name,
    namespace="argo",
    arguments=[Parameter(name="a"),Parameter(name="b"),Parameter(name="c"),Parameter(name="d")],
    # the Workflow referencing this template inherits the `spec` level `workflow_metadata` field from this WorkflowTemplate, so
    # when submitting via .create(), the workflow metadata does get merged into the Workflow CRD manifest before being applied to the K8s cluster
    workflow_metadata={
        'annotations':{'annotation_key_a':'annotation_value_a'},
        'labels':{'label_key_a':'label_value_a'}
    },
    # the Workflow referencing this template inherits the `spec` level `pod_metadata` field from this WorkflowTemplate, so
    # when submitting via .create(), the workflow metadata does get merged into the Pod manifest(s) before being applied to the K8s cluster.
    # for whatever reason, the (apparently?) pod level manifests shown on the ArgoWorkflow server dashboard do not show this
    pod_metadata={ 
        'annotations':{'annotation_key_b':'annotation_value_b'},
        'labels':{'label_key_b':'label_value_b'}
    }) as wt:
    
    wt_a, wt_b, wt_c, wt_d = wt.get_parameter('a').value,wt.get_parameter('b').value,wt.get_parameter('c').value,wt.get_parameter('d').value 
    
    with DAG(name=dag_name) as dag:
        double_a = multiply(
            name="multiply-2-and-a",arguments=[
                Parameter(name="a",value=2), 
                Parameter(name="b",value=wt_a)
            ]
        )
        bc = multiply(
            name="multiply-b-and-c",arguments=[
                Parameter(name="a",value=wt_b),
                Parameter(name="b",value=wt_c)
            ]
        )
        double_ab = multiply(
            name="multiply-2a-and-b",arguments=[
                Parameter(name="a",value=double_a.get_parameter('product').value),
                Parameter(name="b",value=wt_b)
            ]
        )
        bcd = multiply(
            name="multiply-bc-and-d",arguments=[
                Parameter(name="a",value=bc.get_parameter('product').value),
                Parameter(name="b",value=wt_d)
            ]
        )
        double_ab_minus_bcd = subtract(
            name="subtract-2ab-and-bcd",arguments=[
                Parameter(name="a",value=double_ab.get_parameter('product').value),
                Parameter(name="b",value=bcd.get_parameter('product').value)
            ],    
        )
        
        print(wt_b)
        print(bc.get_parameter('product').value)
        
        double_a >> double_ab
        bc >> bcd
        [double_ab, bcd] >> double_ab_minus_bcd       

Added node (<class 'hera.workflows.dag.DAG'>, '2ab-minus-bcd') to inner most context element (<class 'hera.workflows.workflow_template.WorkflowTemplate'>, None)!
Added task node (<class 'hera.workflows.task.Task'>, 'multiply-2-and-a') to DAG (<class 'hera.workflows.dag.DAG'>, '2ab-minus-bcd')!
Added node (<class 'hera.workflows.task.Task'>, 'multiply-2-and-a') to inner most context element (<class 'hera.workflows.dag.DAG'>, '2ab-minus-bcd')!
Added template from (<class 'hera.workflows.task.Task'>, 'multiply-2-and-a') to outer most context element (<class 'hera.workflows.workflow_template.WorkflowTemplate'>, None)!
Added task node (<class 'hera.workflows.task.Task'>, 'multiply-b-and-c') to DAG (<class 'hera.workflows.dag.DAG'>, '2ab-minus-bcd')!
Added node (<class 'hera.workflows.task.Task'>, 'multiply-b-and-c') to inner most context element (<class 'hera.workflows.dag.DAG'>, '2ab-minus-bcd')!
Added task node (<class 'hera.workflows.task.Task'>, 'multiply-2a-and-b') to DAG (<class 'hera

In [5]:
dag.tasks

[Task(with_items=None, with_param=None, arguments=[Parameter(default=None, description=None, enum=None, global_name=None, name='a', value='2', value_from=None, output=False), Parameter(default=None, description=None, enum=None, global_name=None, name='b', value='{{workflow.parameters.a}}', value_from=None, output=False)], name='multiply-2-and-a', continue_on=None, hooks=None, on_exit=None, template=Script(volumes=None, volume_devices=None, volume_mounts=None, resources=None, metrics=None, active_deadline_seconds=None, affinity=None, archive_location=None, automount_service_account_token=None, daemon=None, executor=None, fail_fast=None, host_aliases=None, init_containers=None, memoize=None, annotations=None, labels=None, name='multiply', node_selector=None, parallelism=None, http=None, plugin=None, pod_spec_patch=None, priority=None, priority_class_name=None, retry_strategy=None, scheduler_name=None, pod_security_context=None, service_account_name=None, sidecars=None, synchronization=No

In [6]:
wt.templates

[DAG(metrics=None, active_deadline_seconds=None, affinity=None, archive_location=None, automount_service_account_token=None, daemon=None, executor=None, fail_fast=None, host_aliases=None, init_containers=None, memoize=None, annotations=None, labels=None, name='2ab-minus-bcd', node_selector=None, parallelism=None, http=None, plugin=None, pod_spec_patch=None, priority=None, priority_class_name=None, retry_strategy=None, scheduler_name=None, pod_security_context=None, service_account_name=None, sidecars=None, synchronization=None, timeout=None, tolerations=None, inputs=None, outputs=None, arguments=None, target=None, tasks=[Task(with_items=None, with_param=None, arguments=[Parameter(default=None, description=None, enum=None, global_name=None, name='a', value='2', value_from=None, output=False), Parameter(default=None, description=None, enum=None, global_name=None, name='b', value='{{workflow.parameters.a}}', value_from=None, output=False)], name='multiply-2-and-a', continue_on=None, hooks

In [7]:
multiply.__dict__

{'__wrapped__': <function __main__.multiply(a: float, b: float)>,
 'wrapped_function': <function __main__.multiply(a: float, b: float)>}

In [8]:
from typing import Tuple, Literal, NamedTuple, List, Generic, Any
from pydantic.generics import Generic, GenericModel



class OutputParam(GenericModel, Generic[str,type]):
    name: str
    type: type
    
class Output(GenericModel, Generic[List[OutputParam]]):
    

def CustomOutput(*args: List[Tuple[str,type]]):
    
    return NamedTuple('CustomOutput', args)

#test_func_output = NamedTuple('TestFuncOutput',[('a', int),('b', str)])
def test_func(a: int, b: str = '') -> CustomOutput(('a', int),('b', str)):
    return a, b

IndentationError: expected an indented block after class definition on line 10 (4291690277.py, line 13)

In [67]:
dir(test_func)

['__annotations__',
 '__builtins__',
 '__call__',
 '__class__',
 '__closure__',
 '__code__',
 '__defaults__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__get__',
 '__getattribute__',
 '__getstate__',
 '__globals__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__kwdefaults__',
 '__le__',
 '__lt__',
 '__module__',
 '__name__',
 '__ne__',
 '__new__',
 '__qualname__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__']

In [68]:
test_func.__annotations__, test_func.__class__, test_func.__closure__, test_func.__code__, test_func.__defaults__

({'a': int, 'b': str, 'return': __main__.CustomOutput},
 function,
 None,
 <code object test_func at 0x00000085F297ECE0, file "C:\Users\bettmensch\AppData\Local\Temp\ipykernel_5904\2398073047.py", line 8>,
 ('',))

In [69]:
test_func.__delattr__, test_func.__dict__, test_func.__dir__, test_func.__doc__, test_func.__eq__, test_func.__format__

(<method-wrapper '__delattr__' of function object at 0x00000085F25D1A80>,
 {},
 <function function.__dir__()>,
 None,
 <method-wrapper '__eq__' of function object at 0x00000085F25D1A80>,
 <function function.__format__(format_spec, /)>)

In [70]:
test_func.__ge__, test_func.__get__, test_func.__getattribute__, test_func.__getstate__, #test_func.__globals__

(<method-wrapper '__ge__' of function object at 0x00000085F25D1A80>,
 <method-wrapper '__get__' of function object at 0x00000085F25D1A80>,
 <method-wrapper '__getattribute__' of function object at 0x00000085F25D1A80>,
 <function function.__getstate__()>)

In [71]:
test_func.__gt__, test_func.__hash__, test_func.__hash__, test_func.__init__, test_func.__init_subclass__, test_func.__kwdefaults__, test_func.__le__, test_func.__lt__, test_func.__module__, test_func.__name__

(<method-wrapper '__gt__' of function object at 0x00000085F25D1A80>,
 <method-wrapper '__hash__' of function object at 0x00000085F25D1A80>,
 <method-wrapper '__hash__' of function object at 0x00000085F25D1A80>,
 <method-wrapper '__init__' of function object at 0x00000085F25D1A80>,
 <function function.__init_subclass__>,
 None,
 <method-wrapper '__le__' of function object at 0x00000085F25D1A80>,
 <method-wrapper '__lt__' of function object at 0x00000085F25D1A80>,
 '__main__',
 'test_func')

In [72]:
test_func.__ne__, test_func.__new__,test_func.__qualname__,test_func.__reduce__,test_func.__reduce_ex__,test_func.__repr__,test_func.__setattr__,test_func.__sizeof__,test_func.__str__,test_func.__subclasshook__

(<method-wrapper '__ne__' of function object at 0x00000085F25D1A80>,
 <function function.__new__(*args, **kwargs)>,
 'test_func',
 <function function.__reduce__()>,
 <function function.__reduce_ex__(protocol, /)>,
 <method-wrapper '__repr__' of function object at 0x00000085F25D1A80>,
 <method-wrapper '__setattr__' of function object at 0x00000085F25D1A80>,
 <function function.__sizeof__()>,
 <method-wrapper '__str__' of function object at 0x00000085F25D1A80>,
 <function function.__subclasshook__>)

In [73]:
import inspect

test_sig = inspect.signature(test_func)

In [74]:
dir(test_sig)

['__class__',
 '__delattr__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__setstate__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '_bind',
 '_bound_arguments_cls',
 '_hash_basis',
 '_parameter_cls',
 '_parameters',
 '_return_annotation',
 'bind',
 'bind_partial',
 'empty',
 'from_callable',
 'parameters',
 'replace',
 'return_annotation']

In [75]:
return_annot = test_sig.return_annotation

In [76]:
return_annot.__dict__

mappingproxy({'__doc__': 'CustomOutput(a, b)',
              '__slots__': (),
              '_fields': ('a', 'b'),
              '_field_defaults': {},
              '__new__': <staticmethod(<function CustomOutput.__new__ at 0x00000085F25D13A0>)>,
              '_make': <classmethod(<function CustomOutput._make at 0x00000085F25D0F40>)>,
              '_replace': <function collections.CustomOutput._replace(self, /, **kwds)>,
              '__repr__': <function collections.CustomOutput.__repr__(self)>,
              '_asdict': <function collections.CustomOutput._asdict(self)>,
              '__getnewargs__': <function collections.CustomOutput.__getnewargs__(self)>,
              '__match_args__': ('a', 'b'),
              'a': _tuplegetter(0, 'Alias for field number 0'),
              'b': _tuplegetter(1, 'Alias for field number 1'),
              '__module__': '__main__',
              '__annotations__': {'a': int, 'b': str}})

In [77]:
[item for item in test_sig.parameters.items()]

[('a', <Parameter "a: int">), ('b', <Parameter "b: str = ''">)]

In [57]:
param_a = test_sig.parameters['a']

In [59]:
dir(param_a)

['KEYWORD_ONLY',
 'POSITIONAL_ONLY',
 'POSITIONAL_OR_KEYWORD',
 'VAR_KEYWORD',
 'VAR_POSITIONAL',
 '__class__',
 '__delattr__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getstate__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__setstate__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '_annotation',
 '_default',
 '_kind',
 '_name',
 'annotation',
 'default',
 'empty',
 'kind',
 'name',
 'replace']

In [62]:
param_a.KEYWORD_ONLY, param_a.POSITIONAL_ONLY, param_a.POSITIONAL_OR_KEYWORD, param_a.VAR_KEYWORD, param_a.VAR_POSITIONAL

(<_ParameterKind.KEYWORD_ONLY: 3>,
 <_ParameterKind.POSITIONAL_ONLY: 0>,
 <_ParameterKind.POSITIONAL_OR_KEYWORD: 1>,
 <_ParameterKind.VAR_KEYWORD: 4>,
 <_ParameterKind.VAR_POSITIONAL: 2>)

In [63]:
param_a.VAR_POSITIONAL.__dict__

{'_value_': 2,
 'description': 'variadic positional',
 '_name_': 'VAR_POSITIONAL',
 '__objclass__': <enum '_ParameterKind'>,
 '_sort_order_': 2}

In [64]:
param_b = test_sig.parameters['b']

In [67]:
param_b.KEYWORD_ONLY.__dict__

{'_value_': 3,
 'description': 'keyword-only',
 '_name_': 'KEYWORD_ONLY',
 '__objclass__': <enum '_ParameterKind'>,
 '_sort_order_': 3}

In [68]:
param_b.annotation,param_b.default,param_b.empty,param_b.kind,param_b.name,param_b.replace

(str,
 '',
 inspect._empty,
 <_ParameterKind.POSITIONAL_OR_KEYWORD: 1>,
 'b',
 <bound method Parameter.replace of <Parameter "b: str = ''">>)

In [72]:
param_a.annotation,param_a.default,param_a.empty,param_a.kind,param_a.name,param_a.replace

(int,
 inspect._empty,
 inspect._empty,
 <_ParameterKind.POSITIONAL_OR_KEYWORD: 1>,
 'a',
 <bound method Parameter.replace of <Parameter "a: int">>)

In [75]:
param_a.VAR_POSITIONAL

<_ParameterKind.VAR_POSITIONAL: 2>

In [41]:
from typing import Tuple

def test_func(a: int, b: int = 1) -> Tuple[int,int]:
    return a,b

In [43]:
from hera.workflows._unparse import roundtrip
import textwrap
test_content = roundtrip(textwrap.dedent(inspect.getsource(test_func))).splitlines()

print(f"Function content: {test_content}")

for i, line in enumerate(test_content):
    print(f"Line {i}: {line}")

Function content: ['def test_func(a: int, b: int=1) -> Tuple[int, int]:', '    return (a, b)']
Line 0: def test_func(a: int, b: int=1) -> Tuple[int, int]:
Line 1:     return (a, b)


In [44]:
test_params = inspect.signature(test_func).parameters
test_params

mappingproxy({'a': <Parameter "a: int">, 'b': <Parameter "b: int = 1">})

In [45]:
test_params['a'].default, test_params['b'].default

(inspect._empty, 1)