Skip to content

Commit

Permalink
Add parameter info to argo workflow template annotations (#1549)
Browse files Browse the repository at this point in the history
* Add parameter-required info to template annotations

* Use a single annotation instead

* Add type information as well and change serialization method

* Avoid touching core by special-casing JSONType

* Formatting

* Handle IncludeFile parameter types
  • Loading branch information
rohanrebello committed Oct 12, 2023
1 parent efd7a85 commit 0c49524
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from collections import defaultdict
from hashlib import sha1

from metaflow import current
from metaflow import current, JSONType
from metaflow.includefile import FilePathClass
from metaflow.decorators import flow_decorators
from metaflow.exception import MetaflowException
from metaflow.metaflow_config import (
Expand Down Expand Up @@ -424,6 +425,14 @@ def _process_parameters(self):
)
seen.add(norm)

if param.kwargs.get("type") == JSONType or isinstance(
param.kwargs.get("type"), FilePathClass
):
# Special-case this to avoid touching core
param_type = str(param.kwargs.get("type").name)
else:
param_type = str(param.kwargs.get("type").__name__)

is_required = param.kwargs.get("required", False)
# Throw an exception if a schedule is set for a flow with required
# parameters with no defaults. We currently don't have any notion
Expand All @@ -435,16 +444,17 @@ def _process_parameters(self):
"Scheduling such parameters via Argo CronWorkflows is not "
"currently supported." % param.name
)
value = deploy_time_eval(param.kwargs.get("default"))
default_value = deploy_time_eval(param.kwargs.get("default"))
# If the value is not required and the value is None, we set the value to
# the JSON equivalent of None to please argo-workflows. Unfortunately it
# has the side effect of casting the parameter value to string null during
# execution - which needs to be fixed imminently.
if not is_required or value is not None:
value = json.dumps(value)
if not is_required or default_value is not None:
default_value = json.dumps(default_value)
parameters[param.name] = dict(
name=param.name,
value=value,
value=default_value,
type=param_type,
description=param.kwargs.get("help"),
is_required=is_required,
)
Expand Down Expand Up @@ -597,6 +607,10 @@ def _compile_workflow_template(self):
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}

if self.parameters:
annotations.update({"metaflow/parameters": json.dumps(self.parameters)})

if current.get("project_name"):
annotations.update(
{
Expand Down

0 comments on commit 0c49524

Please sign in to comment.