Skip to content

Commit

Permalink
feat: self reporting workflow progress
Browse files Browse the repository at this point in the history
original code from: https://github.com/argoproj/argo-workflows/pull/4015/files
closes argoproj#1658, argoproj#4245

Signed-off-by: Michael Weibel <michael@helio.exchange>
  • Loading branch information
mweibel committed Nov 3, 2021
1 parent 02165aa commit 98b8d56
Show file tree
Hide file tree
Showing 20 changed files with 625 additions and 117 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ GREP_LOGS := ""
IMAGE_NAMESPACE ?= quay.io/argoproj
DEV_IMAGE ?= $(shell [ `uname -s` = Darwin ] && echo true || echo false)

# declares which cluster to import to in case it's not the default name
K3D_CLUSTER_NAME ?= k3s-default

# The name of the namespace where Kubernetes resources/RBAC will be installed
KUBE_NAMESPACE ?= argo
MANAGED_NAMESPACE ?= $(KUBE_NAMESPACE)
Expand Down Expand Up @@ -222,7 +225,7 @@ argoexec-image:
--output=type=docker .
[ ! -e $* ] || mv $* dist/
docker run --rm -t $(IMAGE_NAMESPACE)/$*:$(VERSION) version
if [ $(K3D) = true ]; then k3d image import $(IMAGE_NAMESPACE)/$*:$(VERSION); fi
if [ $(K3D) = true ]; then k3d image import -c $(K3D_CLUSTER_NAME) $(IMAGE_NAMESPACE)/$*:$(VERSION); fi
if [ $(DOCKER_PUSH) = true ] && [ $(IMAGE_NAMESPACE) != argoproj ] ; then docker push $(IMAGE_NAMESPACE)/$*:$(VERSION) ; fi

scan-images: scan-workflow-controller scan-argoexec scan-argocli
Expand Down Expand Up @@ -399,7 +402,7 @@ endif
argosay:
cd test/e2e/images/argosay/v2 && docker build . -t argoproj/argosay:v2
ifeq ($(K3D),true)
k3d image import argoproj/argosay:v2
k3d image import -c $(K3D_CLUSTER_NAME) argoproj/argosay:v2
endif
ifeq ($(DOCKER_PUSH),true)
docker push argoproj/argosay:v2
Expand Down
6 changes: 5 additions & 1 deletion cmd/argoexec/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func initExecutor() *executor.WorkflowExecutor {
deadline, err := time.Parse(time.RFC3339, os.Getenv(common.EnvVarDeadline))
checkErr(err)

// errors ignored because values are set by the controller and checked there.
annotationPatchTickDuration, _ := time.ParseDuration(os.Getenv(common.EnvVarProgressPatchTickDuration))
progressFileTickDuration, _ := time.ParseDuration(os.Getenv(common.EnvVarProgressFileTickDuration))

var cre executor.ContainerRuntimeExecutor
log.Infof("Creating a %s executor", executorType)
switch executorType {
Expand All @@ -121,7 +125,7 @@ func initExecutor() *executor.WorkflowExecutor {
}
checkErr(err)

wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl, includeScriptOutput, deadline)
wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl, includeScriptOutput, deadline, annotationPatchTickDuration, progressFileTickDuration)

log.
WithField("version", version.String()).
Expand Down
2 changes: 2 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Note that these environment variables may be removed at any time.
| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. |
| `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. |
| `GRPC_MESSAGE_SIZE` | `string` | Use different GRPC Max message size for Argo server deployment (supporting huge workflows). |
| `ARGO_PROGRESS_PATCH_TICK_DURATION` | `time.Duration` | `1m` | How often self reported progress is patched into the pod annotations which means how long it takes until the controller picks up the progress change. Set to 0 to disable self reporting progress. |
| `ARGO_PROGRESS_FILE_TICK_DURATION` | `time.Duration` | `3s` | How often the progress file is read by the executor. Set to 0 to disable self reporting progress. |

CLI parameters of the `argo-server` and `workflow-controller` can be specified as environment variables with the `ARGO_` prefix. For example:

Expand Down
43 changes: 43 additions & 0 deletions docs/progress.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,46 @@ For a whole workflow's, progress is the sum of all its leaf nodes.

!!! Warning
`M` will increase during workflow run each time a node is added to the graph.

## Self reporting progress

> v3.3 and after
Pods in a workflow can report their own progress during their runtime. This self reported progress overrides the
auto-generated progress.

Reporting progress works as follows:
- create and write the progress to a file indicated by the env variable `ARGO_PROGRESS_FILE`
- format of the progress must be `N/M`

The executor will read this file every 3s and if there was an update,
patch the pod annotations with `workflows.argoproj.io/progress: N/M`.
The controller picks this up and writes the progress to the appropriate Status properties.

Initially the progress of a workflows' pod is always `0/1`. If you want to influence this, make sure to set an initial
progress annotation on the pod:

```yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: progress-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: progress
template: progress
- name: progress
metadata:
annotations:
workflows.argoproj.io/progress: 0/100
container:
image: alpine:3.14
command: [ "/bin/sh", "-c" ]
args:
- |
for i in `seq 1 10`; do sleep 10; echo "$(($i*10))"'/100' > $ARGO_PROGRESS_FILE; done
```
4 changes: 4 additions & 0 deletions pkg/apis/workflow/v1alpha1/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (in Progress) Add(x Progress) Progress {
return Progress(fmt.Sprintf("%v/%v", in.N()+x.N(), in.M()+x.M()))
}

func (in Progress) Complete() Progress {
return Progress(fmt.Sprintf("%v/%v", in.M(), in.M()))
}

func (in Progress) IsValid() bool {
return in != "" && in.N() >= 0 && in.N() <= in.M() && in.M() > 0
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ func TestProgress(t *testing.T) {
t.Run("Add", func(t *testing.T) {
assert.Equal(t, Progress("1/2"), Progress("0/0").Add("1/2"))
})
t.Run("Complete", func(t *testing.T) {
assert.Equal(t, Progress("100/100"), Progress("0/100").Complete())
})
}
36 changes: 36 additions & 0 deletions test/e2e/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package e2e

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
Expand All @@ -31,6 +32,41 @@ func (s *ProgressSuite) TestDefaultProgress() {
})
}

func (s *ProgressSuite) TestLoggedProgress() {
assertProgress := func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus, expectedPhase wfv1.WorkflowPhase, expectedProgress wfv1.Progress) {
assert.Equal(t, expectedPhase, status.Phase)
assert.Equal(t, expectedProgress, status.Progress)
// DAG
assert.Equal(t, expectedProgress, status.Nodes[metadata.Name].Progress)
// Pod
podNode := status.Nodes.FindByDisplayName("progress")
assert.Equal(t, expectedProgress, podNode.Progress)
}

s.Given().
Workflow("@testdata/progress-workflow.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeRunning).
Wait(5 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assertProgress(t, metadata, status, wfv1.WorkflowRunning, "0/100")
}).
When().
Wait(65 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assertProgress(t, metadata, status, wfv1.WorkflowRunning, "50/100")
}).
When().
WaitForWorkflow(10 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assertProgress(t, metadata, status, wfv1.WorkflowSucceeded, "100/100")
})
}

func TestProgressSuite(t *testing.T) {
suite.Run(t, new(ProgressSuite))
}
23 changes: 23 additions & 0 deletions test/e2e/testdata/progress-workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: progress-
labels:
argo-e2e: "true"
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: progress
template: progress
- name: progress
metadata:
annotations:
workflows.argoproj.io/progress: 0/100
container:
image: argoproj/argosay:v2
command: ["/bin/sh", "-c"]
args:
- /argosay echo 50/100 $ARGO_PROGRESS_FILE && /argosay sleep 70s
15 changes: 15 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
// AnnotationKeyPodNameVersion stores the pod naming convention version
AnnotationKeyPodNameVersion = workflow.WorkflowFullName + "/pod-name-format"

// AnnotationKeyProgress is N/M progress for the node
AnnotationKeyProgress = workflow.WorkflowFullName + "/progress"

// LabelKeyControllerInstanceID is the label the controller will carry forward to workflows/pod labels
// for the purposes of workflow segregation
LabelKeyControllerInstanceID = workflow.WorkflowFullName + "/controller-instanceid"
Expand Down Expand Up @@ -117,6 +120,15 @@ const (
// EnvVarArgoTrace is used enable tracing statements in Argo components
EnvVarArgoTrace = "ARGO_TRACE"

// EnvVarProgressPatchTickDuration sets the tick duration for patching pod annotations upon progress changes.
// Setting this or EnvVarProgressFileTickDuration to 0 will disable monitoring progress.
EnvVarProgressPatchTickDuration = "ARGO_PROGRESS_PATCH_TICK_DURATION"
// EnvVarProgressFileTickDuration sets the tick duration for reading & parsing the progress file.
// Setting this or EnvVarProgressPatchTickDuration to 0 will disable monitoring progress.
EnvVarProgressFileTickDuration = "ARGO_PROGRESS_FILE_TICK_DURATION"
// EnvVarProgressFile is the file watched for reporting progress
EnvVarProgressFile = "ARGO_PROGRESS_FILE"

// ContainerRuntimeExecutorDocker to use docker as container runtime executor
ContainerRuntimeExecutorDocker = "docker"

Expand Down Expand Up @@ -182,6 +194,9 @@ const (
ServiceAccountTokenMountPath = "/var/run/secrets/kubernetes.io/serviceaccount" //nolint:gosec
ServiceAccountTokenVolumeName = "exec-sa-token" //nolint:gosec
SecretVolMountPath = "/argo/secret"

// ArgoProgressPath defines the path to a file used for self reporting progress
ArgoProgressPath = "/var/run/argo/progress"
)

// AnnotationKeyKillCmd specifies the command to use to kill to container, useful for injected sidecars
Expand Down
10 changes: 9 additions & 1 deletion workflow/controller/container_set_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ spec:
for _, c := range pod.Spec.Containers {
switch c.Name {
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{}, c.VolumeMounts)
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "var-run-argo", MountPath: "/var/run/argo"},
}, c.VolumeMounts)
case "ctr-0":
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "workspace", MountPath: "/workspace"},
{Name: "var-run-argo", MountPath: "/var/run/argo"},
}, c.VolumeMounts)
default:
t.Fatalf(c.Name)
Expand Down Expand Up @@ -116,6 +119,7 @@ spec:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "input-artifacts", MountPath: "/argo/inputs/artifacts"},
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
{Name: "var-run-argo", MountPath: "/var/run/argo"},
}, c.VolumeMounts)
}

Expand All @@ -126,11 +130,13 @@ spec:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
{Name: "input-artifacts", MountPath: "/mainctrfs/in/in-0", SubPath: "in-0"},
{Name: "var-run-argo", MountPath: "/var/run/argo"},
}, c.VolumeMounts)
case "main":
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "workspace", MountPath: "/workspace"},
{Name: "input-artifacts", MountPath: "/in/in-0", SubPath: "in-0"},
{Name: "var-run-argo", MountPath: "/var/run/argo"},
}, c.VolumeMounts)
default:
t.Fatalf(c.Name)
Expand Down Expand Up @@ -192,10 +198,12 @@ spec:
case common.WaitContainerName:
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "workspace", MountPath: "/mainctrfs/workspace"},
{Name: "var-run-argo", MountPath: "/var/run/argo"},
}, c.VolumeMounts)
case "main":
assert.ElementsMatch(t, []corev1.VolumeMount{
{Name: "workspace", MountPath: "/workspace"},
{Name: "var-run-argo", MountPath: "/var/run/argo"},
}, c.VolumeMounts)
default:
t.Fatalf(c.Name)
Expand Down
9 changes: 9 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ type WorkflowController struct {
archiveLabelSelector labels.Selector
cacheFactory controllercache.Factory
wfTaskSetInformer wfextvv1alpha1.WorkflowTaskSetInformer

// progressPatchTickDuration defines how often the executor will patch pod annotations if an updated progress is found.
// Default is 1m and can be configured using the env var ARGO_PROGRESS_PATCH_TICK_DURATION.
progressPatchTickDuration time.Duration
// progressFileTickDuration defines how often the progress file is read.
// Default is 3s and can be configured using the env var ARGO_PROGRESS_FILE_TICK_DURATION
progressFileTickDuration time.Duration
}

const (
Expand Down Expand Up @@ -144,6 +151,8 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli
workflowKeyLock: syncpkg.NewKeyLock(),
cacheFactory: controllercache.NewCacheFactory(kubeclientset, namespace),
eventRecorderManager: events.NewEventRecorderManager(kubeclientset),
progressPatchTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second),
}

wfc.UpdateConfig(ctx)
Expand Down
26 changes: 15 additions & 11 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
fakewfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/scheme"
wfextv "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions"
envutil "github.com/argoproj/argo-workflows/v3/util/env"
armocks "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories/mocks"
"github.com/argoproj/argo-workflows/v3/workflow/common"
controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache"
Expand Down Expand Up @@ -151,16 +152,18 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
S3Bucket: wfv1.S3Bucket{Endpoint: "my-endpoint", Bucket: "my-bucket"},
},
}),
kubeclientset: kube,
dynamicInterface: dynamicClient,
wfclientset: wfclientset,
workflowKeyLock: sync.NewKeyLock(),
wfArchive: sqldb.NullWorkflowArchive,
hydrator: hydratorfake.Noop,
estimatorFactory: estimation.DummyEstimatorFactory,
eventRecorderManager: &testEventRecorderManager{eventRecorder: record.NewFakeRecorder(64)},
archiveLabelSelector: labels.Everything(),
cacheFactory: controllercache.NewCacheFactory(kube, "default"),
kubeclientset: kube,
dynamicInterface: dynamicClient,
wfclientset: wfclientset,
workflowKeyLock: sync.NewKeyLock(),
wfArchive: sqldb.NullWorkflowArchive,
hydrator: hydratorfake.Noop,
estimatorFactory: estimation.DummyEstimatorFactory,
eventRecorderManager: &testEventRecorderManager{eventRecorder: record.NewFakeRecorder(64)},
archiveLabelSelector: labels.Everything(),
cacheFactory: controllercache.NewCacheFactory(kube, "default"),
progressPatchTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute),
progressFileTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second),
}

for _, opt := range options {
Expand Down Expand Up @@ -261,7 +264,8 @@ func listPods(woc *wfOperationCtx) (*apiv1.PodList, error) {

type with func(pod *apiv1.Pod)

func withOutputs(v string) with { return withAnnotation(common.AnnotationKeyOutputs, v) }
func withOutputs(v string) with { return withAnnotation(common.AnnotationKeyOutputs, v) }
func withProgress(v string) with { return withAnnotation(common.AnnotationKeyProgress, v) }

func withExitCode(v int32) with {
return func(pod *apiv1.Pod) {
Expand Down
9 changes: 8 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type wfOperationCtx struct {
// updated indicates whether or not the workflow object itself was updated
// and needs to be persisted back to kubernetes
updated bool
// log is an logrus logging context to corralate logs with a workflow
// log is an logrus logging context to correlate logs with a workflow
log *log.Entry
// controller reference to workflow controller
controller *WorkflowController
Expand Down Expand Up @@ -996,6 +996,13 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
woc.wf.Status.Nodes[nodeID] = node
woc.updated = true
}
podProgress := progress.PodProgress(pod, &node)
if podProgress.IsValid() && node.Progress != podProgress {
woc.log.WithField("progress", podProgress).Info("pod progress")
node.Progress = podProgress
woc.wf.Status.Nodes[nodeID] = node
woc.updated = true
}
}
if node.Fulfilled() && !node.IsDaemoned() {
if pod.GetLabels()[common.LabelKeyCompleted] == "true" {
Expand Down

0 comments on commit 98b8d56

Please sign in to comment.