In [1]:
import kubernetes
import argo_workflows
import yaml
import requests
import random
import string
import time
import string

from argo_workflows.apis import WorkflowServiceApi
from argo_workflows.model.io_argoproj_workflow_v1alpha1_workflow_create_request import (
    IoArgoprojWorkflowV1alpha1WorkflowCreateRequest,
)

In [2]:
def random_wf_name(prefix, N):
    random_portion = ''.join(random.choices(string.ascii_lowercase + string.digits, k=N))
    return "-".join([prefix, random_portion])

In [3]:
def get_workflows_by_status(api_instance, namespace, status):
    workflows = api_instance.list_workflows(namespace,
                                            _check_return_type=False)["items"]
    pending_wfs = [workflow for workflow in workflows if workflow["status"]["phase"] == status]
    return pending_wfs

In [4]:
def get_workflows_by_name(api_instance, namespace, name):
    workflows = api_instance.list_workflows(namespace,
                                            _check_return_type=False)["items"]
    pending_wfs = [
        workflow for workflow in workflows if name in workflow["metadata"]["name"]]
    return pending_wfs


In [5]:
def get_workflows_by_priority(api_instance, namespace, operator, priority):
    workflows = api_instance.list_workflows(namespace,
                                            _check_return_type=False)["items"]
    if operator == "<":
        pending_wfs = [
            workflow for workflow in workflows if workflow["spec"]["priority"] < priority]
    if operator == ">":
        pending_wfs = [
            workflow for workflow in workflows if workflow["spec"]["priority"] > priority]
    if operator == "=":
        pending_wfs = [
            workflow for workflow in workflows if workflow["spec"]["priority"] == priority]
    if operator == "<=":
        pending_wfs = [
            workflow for workflow in workflows if workflow["spec"]["priority"] <= priority]
    if operator == ">=":
        pending_wfs = [
            workflow for workflow in workflows if workflow["spec"]["priority"] >= priority]
    return pending_wfs


In [6]:
def change_workflow_priority(api_instance, namespace, workflow_name, new_priority):
    # get a copy of the desired workflow
    new_pr_wf = api_instance.get_workflow(
        namespace, workflow_name, _check_return_type=False)

    # alter priority of the workflow copy
    new_pr_wf["spec"]["priority"] = 11

    # clear the resource version or create request will throw an error
    del new_pr_wf["metadata"]["resourceVersion"]

    # delete old workflow
    api_instance.delete_workflow(
        namespace, workflow_name, _check_return_type=False)

    # create a new instance of the altered workflow copy
    api_instance.create_workflow(namespace = namespace, body = IoArgoprojWorkflowV1alpha1WorkflowCreateRequest(workflow=new_pr_wf,_check_type=False), _check_return_type = False)

In [None]:
namespace = "argo-workflows"
workflow_prefix = "placeholder-wf"

## Getting a workflow
wf_spec = requests.get(
    'https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/hello-world.yaml')
manifest = yaml.safe_load(wf_spec.text)

# Unset generateName, will be providing our own so we can get it easily later
del manifest["metadata"]["generateName"]
manifest["spec"]["priority"] = 10

## Workflow API setup
config = argo_workflows.Configuration(
    host="http://argo-workflows.pd.k8s.altamiracorp.com")
config.verify_ssl = False
config.client_side_validation = False

api_client = argo_workflows.ApiClient(config)
api_instance = WorkflowServiceApi(api_client)


## Submitting a lot of workflows
for _ in range(100):
    new_random_name = random_wf_name(workflow_prefix, 5)
    manifest["metadata"]["name"] = new_random_name
    api_instance.create_workflow(namespace=namespace,
                                 body=IoArgoprojWorkflowV1alpha1WorkflowCreateRequest(workflow=manifest,
                                                                                      _check_type=False),
                                 _check_return_type=False)

# Submitting a low priority workflow
low_priority_name = random_wf_name("low-priority", 5)
manifest["metadata"]["name"] = low_priority_name
manifest["spec"]["priority"] = 1
api_instance.create_workflow(namespace=namespace,
                             body=IoArgoprojWorkflowV1alpha1WorkflowCreateRequest(workflow=manifest,
                                                                                  _check_type=False),
                             _check_return_type=False)


In [17]:
# The workflow name can be aqcuired
change_workflow_priority(api_instance, namespace,
                         low_priority_name, 11)


In [None]:
# display workflows of desired priority if known
get_workflows_by_priority(api_instance, namespace, "=", 11)

In [None]:
# Get a list of workflows whose names contain a substring
get_workflows_by_name(api_instance, namespace, "low")

In [None]:
# Get a Workflow's info by searching for part of it's name
get_workflows_by_status(api_instance, namespace, "Pending")