Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter info to argo workflow template annotations #1549

Merged
merged 6 commits into from
Oct 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 19 additions & 5 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from collections import defaultdict
from hashlib import sha1

from metaflow import current
from metaflow import current, JSONType
from metaflow.includefile import FilePathClass
from metaflow.decorators import flow_decorators
from metaflow.exception import MetaflowException
from metaflow.metaflow_config import (
Expand Down Expand Up @@ -401,6 +402,14 @@ def _process_parameters(self):
)
seen.add(norm)

if param.kwargs.get("type") == JSONType or isinstance(
param.kwargs.get("type"), FilePathClass
Comment on lines +405 to +406
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@savingoyal should I use isinstance() for both comparisons? For some reason I didn't fully understand, only the FilePathClass required the use of it and it failed with the == comparison.

):
# Special-case this to avoid touching core
param_type = str(param.kwargs.get("type").name)
else:
param_type = str(param.kwargs.get("type").__name__)

is_required = param.kwargs.get("required", False)
# Throw an exception if a schedule is set for a flow with required
# parameters with no defaults. We currently don't have any notion
Expand All @@ -412,16 +421,17 @@ def _process_parameters(self):
"Scheduling such parameters via Argo CronWorkflows is not "
"currently supported." % param.name
)
value = deploy_time_eval(param.kwargs.get("default"))
default_value = deploy_time_eval(param.kwargs.get("default"))
# If the value is not required and the value is None, we set the value to
# the JSON equivalent of None to please argo-workflows. Unfortunately it
# has the side effect of casting the parameter value to string null during
# execution - which needs to be fixed imminently.
if not is_required or value is not None:
value = json.dumps(value)
if not is_required or default_value is not None:
default_value = json.dumps(default_value)
parameters[param.name] = dict(
name=param.name,
value=value,
value=default_value,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to rename this in the parameter map too but I didn't want to get too ambitious, especially since it's serialized in the argo workflow template as value again. #ifItAintBroke

type=param_type,
description=param.kwargs.get("help"),
is_required=is_required,
)
Expand Down Expand Up @@ -574,6 +584,10 @@ def _compile_workflow_template(self):
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
}

if self.parameters:
annotations.update({"metaflow/parameters": json.dumps(self.parameters)})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serialize the parameters dictionary into the metaflow/parameters annotation for consumers to deserialize.


if current.get("project_name"):
annotations.update(
{
Expand Down