diff --git a/Makefile b/Makefile index ee66b19e9d62..256bfb18ca21 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,9 @@ GREP_LOGS := "" IMAGE_NAMESPACE ?= quay.io/argoproj DEV_IMAGE ?= $(shell [ `uname -s` = Darwin ] && echo true || echo false) +# declares which cluster to import to in case it's not the default name +K3D_CLUSTER_NAME ?= k3s-default + # The name of the namespace where Kubernetes resources/RBAC will be installed KUBE_NAMESPACE ?= argo MANAGED_NAMESPACE ?= $(KUBE_NAMESPACE) @@ -222,7 +225,7 @@ argoexec-image: --output=type=docker . [ ! -e $* ] || mv $* dist/ docker run --rm -t $(IMAGE_NAMESPACE)/$*:$(VERSION) version - if [ $(K3D) = true ]; then k3d image import $(IMAGE_NAMESPACE)/$*:$(VERSION); fi + if [ $(K3D) = true ]; then k3d image import -c $(K3D_CLUSTER_NAME) $(IMAGE_NAMESPACE)/$*:$(VERSION); fi if [ $(DOCKER_PUSH) = true ] && [ $(IMAGE_NAMESPACE) != argoproj ] ; then docker push $(IMAGE_NAMESPACE)/$*:$(VERSION) ; fi scan-images: scan-workflow-controller scan-argoexec scan-argocli @@ -399,7 +402,7 @@ endif argosay: cd test/e2e/images/argosay/v2 && docker build . -t argoproj/argosay:v2 ifeq ($(K3D),true) - k3d image import argoproj/argosay:v2 + k3d image import -c $(K3D_CLUSTER_NAME) argoproj/argosay:v2 endif ifeq ($(DOCKER_PUSH),true) docker push argoproj/argosay:v2 diff --git a/cmd/argoexec/commands/root.go b/cmd/argoexec/commands/root.go index b6a553a322c3..f33867afab14 100644 --- a/cmd/argoexec/commands/root.go +++ b/cmd/argoexec/commands/root.go @@ -105,6 +105,10 @@ func initExecutor() *executor.WorkflowExecutor { deadline, err := time.Parse(time.RFC3339, os.Getenv(common.EnvVarDeadline)) checkErr(err) + // errors ignored because values are set by the controller and checked there. + annotationPatchTickDuration, _ := time.ParseDuration(os.Getenv(common.EnvVarProgressPatchTickDuration)) + progressFileTickDuration, _ := time.ParseDuration(os.Getenv(common.EnvVarProgressFileTickDuration)) + var cre executor.ContainerRuntimeExecutor log.Infof("Creating a %s executor", executorType) switch executorType { @@ -121,7 +125,7 @@ func initExecutor() *executor.WorkflowExecutor { } checkErr(err) - wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl, includeScriptOutput, deadline) + wfExecutor := executor.NewExecutor(clientset, restClient, podName, namespace, cre, *tmpl, includeScriptOutput, deadline, annotationPatchTickDuration, progressFileTickDuration) log. WithField("version", version.String()). diff --git a/docs/environment-variables.md b/docs/environment-variables.md index d2fa61f42848..bcbbcfc93b39 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -38,6 +38,8 @@ Note that these environment variables may be removed at any time. | `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. | | `INFORMER_WRITE_BACK` | `bool` | `true` | Whether to write back to informer instead of catching up. | | `GRPC_MESSAGE_SIZE` | `string` | Use different GRPC Max message size for Argo server deployment (supporting huge workflows). | +| `ARGO_PROGRESS_PATCH_TICK_DURATION` | `time.Duration` | `1m` | How often self reported progress is patched into the pod annotations which means how long it takes until the controller picks up the progress change. Set to 0 to disable self reporting progress. | +| `ARGO_PROGRESS_FILE_TICK_DURATION` | `time.Duration` | `3s` | How often the progress file is read by the executor. Set to 0 to disable self reporting progress. | CLI parameters of the `argo-server` and `workflow-controller` can be specified as environment variables with the `ARGO_` prefix. For example: diff --git a/docs/progress.md b/docs/progress.md index 58873aadc233..653fb6f1dab3 100644 --- a/docs/progress.md +++ b/docs/progress.md @@ -24,3 +24,46 @@ For a whole workflow's, progress is the sum of all its leaf nodes. !!! Warning `M` will increase during workflow run each time a node is added to the graph. + +## Self reporting progress + +> v3.3 and after + +Pods in a workflow can report their own progress during their runtime. This self reported progress overrides the +auto-generated progress. + +Reporting progress works as follows: +- create and write the progress to a file indicated by the env variable `ARGO_PROGRESS_FILE` +- format of the progress must be `N/M` + +The executor will read this file every 3s and if there was an update, +patch the pod annotations with `workflows.argoproj.io/progress: N/M`. +The controller picks this up and writes the progress to the appropriate Status properties. + +Initially the progress of a workflows' pod is always `0/1`. If you want to influence this, make sure to set an initial +progress annotation on the pod: + +```yaml +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: progress- +spec: + entrypoint: main + templates: + - name: main + dag: + tasks: + - name: progress + template: progress + - name: progress + metadata: + annotations: + workflows.argoproj.io/progress: 0/100 + container: + image: alpine:3.14 + command: [ "/bin/sh", "-c" ] + args: + - | + for i in `seq 1 10`; do sleep 10; echo "$(($i*10))"'/100' > $ARGO_PROGRESS_FILE; done +``` diff --git a/pkg/apis/workflow/v1alpha1/progress.go b/pkg/apis/workflow/v1alpha1/progress.go index 82ea054078da..f80f209d2a02 100644 --- a/pkg/apis/workflow/v1alpha1/progress.go +++ b/pkg/apis/workflow/v1alpha1/progress.go @@ -34,6 +34,10 @@ func (in Progress) Add(x Progress) Progress { return Progress(fmt.Sprintf("%v/%v", in.N()+x.N(), in.M()+x.M())) } +func (in Progress) Complete() Progress { + return Progress(fmt.Sprintf("%v/%v", in.M(), in.M())) +} + func (in Progress) IsValid() bool { return in != "" && in.N() >= 0 && in.N() <= in.M() && in.M() > 0 } diff --git a/pkg/apis/workflow/v1alpha1/progress_test.go b/pkg/apis/workflow/v1alpha1/progress_test.go index fe55b4fa765a..75c791be50f8 100644 --- a/pkg/apis/workflow/v1alpha1/progress_test.go +++ b/pkg/apis/workflow/v1alpha1/progress_test.go @@ -25,4 +25,7 @@ func TestProgress(t *testing.T) { t.Run("Add", func(t *testing.T) { assert.Equal(t, Progress("1/2"), Progress("0/0").Add("1/2")) }) + t.Run("Complete", func(t *testing.T) { + assert.Equal(t, Progress("100/100"), Progress("0/100").Complete()) + }) } diff --git a/test/e2e/progress_test.go b/test/e2e/progress_test.go index 55002f69c463..a567da4ad1d2 100644 --- a/test/e2e/progress_test.go +++ b/test/e2e/progress_test.go @@ -5,6 +5,7 @@ package e2e import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" @@ -31,6 +32,41 @@ func (s *ProgressSuite) TestDefaultProgress() { }) } +func (s *ProgressSuite) TestLoggedProgress() { + assertProgress := func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus, expectedPhase wfv1.WorkflowPhase, expectedProgress wfv1.Progress) { + assert.Equal(t, expectedPhase, status.Phase) + assert.Equal(t, expectedProgress, status.Progress) + // DAG + assert.Equal(t, expectedProgress, status.Nodes[metadata.Name].Progress) + // Pod + podNode := status.Nodes.FindByDisplayName("progress") + assert.Equal(t, expectedProgress, podNode.Progress) + } + + s.Given(). + Workflow("@testdata/progress-workflow.yaml"). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeRunning). + Wait(5 * time.Second). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assertProgress(t, metadata, status, wfv1.WorkflowRunning, "0/100") + }). + When(). + Wait(65 * time.Second). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assertProgress(t, metadata, status, wfv1.WorkflowRunning, "50/100") + }). + When(). + WaitForWorkflow(10 * time.Second). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + assertProgress(t, metadata, status, wfv1.WorkflowSucceeded, "100/100") + }) +} + func TestProgressSuite(t *testing.T) { suite.Run(t, new(ProgressSuite)) } diff --git a/test/e2e/testdata/progress-workflow.yaml b/test/e2e/testdata/progress-workflow.yaml new file mode 100644 index 000000000000..3aedfc0f0847 --- /dev/null +++ b/test/e2e/testdata/progress-workflow.yaml @@ -0,0 +1,23 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: progress- + labels: + argo-e2e: "true" +spec: + entrypoint: main + templates: + - name: main + dag: + tasks: + - name: progress + template: progress + - name: progress + metadata: + annotations: + workflows.argoproj.io/progress: 0/100 + container: + image: argoproj/argosay:v2 + command: ["/bin/sh", "-c"] + args: + - /argosay echo 50/100 $ARGO_PROGRESS_FILE && /argosay sleep 70s diff --git a/workflow/common/common.go b/workflow/common/common.go index f95a05913509..94fb73f2fd9f 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -42,6 +42,9 @@ const ( // AnnotationKeyPodNameVersion stores the pod naming convention version AnnotationKeyPodNameVersion = workflow.WorkflowFullName + "/pod-name-format" + // AnnotationKeyProgress is N/M progress for the node + AnnotationKeyProgress = workflow.WorkflowFullName + "/progress" + // LabelKeyControllerInstanceID is the label the controller will carry forward to workflows/pod labels // for the purposes of workflow segregation LabelKeyControllerInstanceID = workflow.WorkflowFullName + "/controller-instanceid" @@ -117,6 +120,15 @@ const ( // EnvVarArgoTrace is used enable tracing statements in Argo components EnvVarArgoTrace = "ARGO_TRACE" + // EnvVarProgressPatchTickDuration sets the tick duration for patching pod annotations upon progress changes. + // Setting this or EnvVarProgressFileTickDuration to 0 will disable monitoring progress. + EnvVarProgressPatchTickDuration = "ARGO_PROGRESS_PATCH_TICK_DURATION" + // EnvVarProgressFileTickDuration sets the tick duration for reading & parsing the progress file. + // Setting this or EnvVarProgressPatchTickDuration to 0 will disable monitoring progress. + EnvVarProgressFileTickDuration = "ARGO_PROGRESS_FILE_TICK_DURATION" + // EnvVarProgressFile is the file watched for reporting progress + EnvVarProgressFile = "ARGO_PROGRESS_FILE" + // ContainerRuntimeExecutorDocker to use docker as container runtime executor ContainerRuntimeExecutorDocker = "docker" @@ -182,6 +194,9 @@ const ( ServiceAccountTokenMountPath = "/var/run/secrets/kubernetes.io/serviceaccount" //nolint:gosec ServiceAccountTokenVolumeName = "exec-sa-token" //nolint:gosec SecretVolMountPath = "/argo/secret" + + // ArgoProgressPath defines the path to a file used for self reporting progress + ArgoProgressPath = "/var/run/argo/progress" ) // AnnotationKeyKillCmd specifies the command to use to kill to container, useful for injected sidecars diff --git a/workflow/controller/container_set_template_test.go b/workflow/controller/container_set_template_test.go index 46042a1bc4df..0015bbc431b3 100644 --- a/workflow/controller/container_set_template_test.go +++ b/workflow/controller/container_set_template_test.go @@ -53,10 +53,13 @@ spec: for _, c := range pod.Spec.Containers { switch c.Name { case common.WaitContainerName: - assert.ElementsMatch(t, []corev1.VolumeMount{}, c.VolumeMounts) + assert.ElementsMatch(t, []corev1.VolumeMount{ + {Name: "var-run-argo", MountPath: "/var/run/argo"}, + }, c.VolumeMounts) case "ctr-0": assert.ElementsMatch(t, []corev1.VolumeMount{ {Name: "workspace", MountPath: "/workspace"}, + {Name: "var-run-argo", MountPath: "/var/run/argo"}, }, c.VolumeMounts) default: t.Fatalf(c.Name) @@ -116,6 +119,7 @@ spec: assert.ElementsMatch(t, []corev1.VolumeMount{ {Name: "input-artifacts", MountPath: "/argo/inputs/artifacts"}, {Name: "workspace", MountPath: "/mainctrfs/workspace"}, + {Name: "var-run-argo", MountPath: "/var/run/argo"}, }, c.VolumeMounts) } @@ -126,11 +130,13 @@ spec: assert.ElementsMatch(t, []corev1.VolumeMount{ {Name: "workspace", MountPath: "/mainctrfs/workspace"}, {Name: "input-artifacts", MountPath: "/mainctrfs/in/in-0", SubPath: "in-0"}, + {Name: "var-run-argo", MountPath: "/var/run/argo"}, }, c.VolumeMounts) case "main": assert.ElementsMatch(t, []corev1.VolumeMount{ {Name: "workspace", MountPath: "/workspace"}, {Name: "input-artifacts", MountPath: "/in/in-0", SubPath: "in-0"}, + {Name: "var-run-argo", MountPath: "/var/run/argo"}, }, c.VolumeMounts) default: t.Fatalf(c.Name) @@ -192,10 +198,12 @@ spec: case common.WaitContainerName: assert.ElementsMatch(t, []corev1.VolumeMount{ {Name: "workspace", MountPath: "/mainctrfs/workspace"}, + {Name: "var-run-argo", MountPath: "/var/run/argo"}, }, c.VolumeMounts) case "main": assert.ElementsMatch(t, []corev1.VolumeMount{ {Name: "workspace", MountPath: "/workspace"}, + {Name: "var-run-argo", MountPath: "/var/run/argo"}, }, c.VolumeMounts) default: t.Fatalf(c.Name) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 971e41d8b016..34148841556c 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -112,6 +112,13 @@ type WorkflowController struct { archiveLabelSelector labels.Selector cacheFactory controllercache.Factory wfTaskSetInformer wfextvv1alpha1.WorkflowTaskSetInformer + + // progressPatchTickDuration defines how often the executor will patch pod annotations if an updated progress is found. + // Default is 1m and can be configured using the env var ARGO_PROGRESS_PATCH_TICK_DURATION. + progressPatchTickDuration time.Duration + // progressFileTickDuration defines how often the progress file is read. + // Default is 3s and can be configured using the env var ARGO_PROGRESS_FILE_TICK_DURATION + progressFileTickDuration time.Duration } const ( @@ -144,6 +151,8 @@ func NewWorkflowController(ctx context.Context, restConfig *rest.Config, kubecli workflowKeyLock: syncpkg.NewKeyLock(), cacheFactory: controllercache.NewCacheFactory(kubeclientset, namespace), eventRecorderManager: events.NewEventRecorderManager(kubeclientset), + progressPatchTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute), + progressFileTickDuration: env.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second), } wfc.UpdateConfig(ctx) diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 1af79abee115..78bc95dfe059 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -27,6 +27,7 @@ import ( fakewfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/scheme" wfextv "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions" + envutil "github.com/argoproj/argo-workflows/v3/util/env" armocks "github.com/argoproj/argo-workflows/v3/workflow/artifactrepositories/mocks" "github.com/argoproj/argo-workflows/v3/workflow/common" controllercache "github.com/argoproj/argo-workflows/v3/workflow/controller/cache" @@ -151,16 +152,18 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl S3Bucket: wfv1.S3Bucket{Endpoint: "my-endpoint", Bucket: "my-bucket"}, }, }), - kubeclientset: kube, - dynamicInterface: dynamicClient, - wfclientset: wfclientset, - workflowKeyLock: sync.NewKeyLock(), - wfArchive: sqldb.NullWorkflowArchive, - hydrator: hydratorfake.Noop, - estimatorFactory: estimation.DummyEstimatorFactory, - eventRecorderManager: &testEventRecorderManager{eventRecorder: record.NewFakeRecorder(64)}, - archiveLabelSelector: labels.Everything(), - cacheFactory: controllercache.NewCacheFactory(kube, "default"), + kubeclientset: kube, + dynamicInterface: dynamicClient, + wfclientset: wfclientset, + workflowKeyLock: sync.NewKeyLock(), + wfArchive: sqldb.NullWorkflowArchive, + hydrator: hydratorfake.Noop, + estimatorFactory: estimation.DummyEstimatorFactory, + eventRecorderManager: &testEventRecorderManager{eventRecorder: record.NewFakeRecorder(64)}, + archiveLabelSelector: labels.Everything(), + cacheFactory: controllercache.NewCacheFactory(kube, "default"), + progressPatchTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressPatchTickDuration, 1*time.Minute), + progressFileTickDuration: envutil.LookupEnvDurationOr(common.EnvVarProgressFileTickDuration, 3*time.Second), } for _, opt := range options { @@ -261,7 +264,8 @@ func listPods(woc *wfOperationCtx) (*apiv1.PodList, error) { type with func(pod *apiv1.Pod) -func withOutputs(v string) with { return withAnnotation(common.AnnotationKeyOutputs, v) } +func withOutputs(v string) with { return withAnnotation(common.AnnotationKeyOutputs, v) } +func withProgress(v string) with { return withAnnotation(common.AnnotationKeyProgress, v) } func withExitCode(v int32) with { return func(pod *apiv1.Pod) { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index bb1892d80aa3..d1c76449d64c 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -69,7 +69,7 @@ type wfOperationCtx struct { // updated indicates whether or not the workflow object itself was updated // and needs to be persisted back to kubernetes updated bool - // log is an logrus logging context to corralate logs with a workflow + // log is an logrus logging context to correlate logs with a workflow log *log.Entry // controller reference to workflow controller controller *WorkflowController @@ -996,6 +996,13 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { woc.wf.Status.Nodes[nodeID] = node woc.updated = true } + podProgress := progress.PodProgress(pod, &node) + if podProgress.IsValid() && node.Progress != podProgress { + woc.log.WithField("progress", podProgress).Info("pod progress") + node.Progress = podProgress + woc.wf.Status.Nodes[nodeID] = node + woc.updated = true + } } if node.Fulfilled() && !node.IsDaemoned() { if pod.GetLabels()[common.LabelKeyCompleted] == "true" { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index fb5799112e7a..d41320ba4df0 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -261,6 +261,51 @@ spec: assert.Equal(t, wfv1.Progress("1/1"), woc.wf.Status.Nodes.FindByDisplayName("pod").Progress) } +func TestLoggedProgress(t *testing.T) { + wf := wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-wf + namespace: my-ns +spec: + entrypoint: main + templates: + - name: main + dag: + tasks: + - name: pod + template: pod + - name: pod + container: + image: my-image +`) + cancel, controller := newController(wf) + defer cancel() + + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + woc.operate(ctx) + + makePodsPhase(ctx, woc, apiv1.PodRunning, withProgress("50/100")) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + + assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase) + assert.Equal(t, wfv1.Progress("50/100"), woc.wf.Status.Progress) + assert.Equal(t, wfv1.Progress("50/100"), woc.wf.Status.Nodes[woc.wf.Name].Progress) + pod := woc.wf.Status.Nodes.FindByDisplayName("pod") + assert.Equal(t, wfv1.Progress("50/100"), pod.Progress) + + makePodsPhase(ctx, woc, apiv1.PodSucceeded, withProgress("100/100")) + woc = newWorkflowOperationCtx(woc.wf, controller) + woc.operate(ctx) + + assert.Equal(t, wfv1.WorkflowSucceeded, woc.wf.Status.Phase) + assert.Equal(t, wfv1.Progress("100/100"), woc.wf.Status.Progress) + assert.Equal(t, wfv1.Progress("100/100"), woc.wf.Status.Nodes[woc.wf.Name].Progress) + pod = woc.wf.Status.Nodes.FindByDisplayName("pod") + assert.Equal(t, wfv1.Progress("100/100"), pod.Progress) +} + var sidecarWithVol = ` # Verifies sidecars can reference volumeClaimTemplates apiVersion: argoproj.io/v1alpha1 @@ -6004,7 +6049,7 @@ func TestWFWithRetryAndWithParam(t *testing.T) { ctrs := pods.Items[0].Spec.Containers assert.Len(t, ctrs, 2) envs := ctrs[1].Env - assert.Len(t, envs, 4) + assert.Len(t, envs, 7) assert.Equal(t, apiv1.EnvVar{Name: "ARGO_INCLUDE_SCRIPT_OUTPUT", Value: "true"}, envs[2]) } }) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 08b6eedeff11..0c43b2a93ed4 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -289,29 +289,27 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin addSidecars(pod, tmpl) addOutputArtifactsVolumes(pod, tmpl) - if woc.getContainerRuntimeExecutor() == common.ContainerRuntimeExecutorEmissary { - for i, c := range pod.Spec.InitContainers { - c.VolumeMounts = append(c.VolumeMounts, volumeMountVarArgo) - pod.Spec.InitContainers[i] = c - } - for i, c := range pod.Spec.Containers { - if c.Name != common.WaitContainerName { - // https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes - if len(c.Command) == 0 { - x := woc.getImage(c.Image) - c.Command = x.Command - if c.Args == nil { // check nil rather than length, as zero-length is valid args - c.Args = x.Args - } - } - if len(c.Command) == 0 { - return nil, fmt.Errorf("when using the emissary executor you must either explicitly specify the command, or list the image's command in the index: https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary") + for i, c := range pod.Spec.InitContainers { + c.VolumeMounts = append(c.VolumeMounts, volumeMountVarArgo) + pod.Spec.InitContainers[i] = c + } + for i, c := range pod.Spec.Containers { + if woc.getContainerRuntimeExecutor() == common.ContainerRuntimeExecutorEmissary && c.Name != common.WaitContainerName { + // https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes + if len(c.Command) == 0 { + x := woc.getImage(c.Image) + c.Command = x.Command + if c.Args == nil { // check nil rather than length, as zero-length is valid args + c.Args = x.Args } - c.Command = append([]string{"/var/run/argo/argoexec", "emissary", "--"}, c.Command...) } - c.VolumeMounts = append(c.VolumeMounts, volumeMountVarArgo) - pod.Spec.Containers[i] = c + if len(c.Command) == 0 { + return nil, fmt.Errorf("when using the emissary executor you must either explicitly specify the command, or list the image's command in the index: https://argoproj.github.io/argo-workflows/workflow-executors/#emissary-emissary") + } + c.Command = append([]string{"/var/run/argo/argoexec", "emissary", "--"}, c.Command...) } + c.VolumeMounts = append(c.VolumeMounts, volumeMountVarArgo) + pod.Spec.Containers[i] = c } // Add standard environment variables, making pod spec larger @@ -319,6 +317,22 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin {Name: common.EnvVarTemplate, Value: wfv1.MustMarshallJSON(tmpl)}, {Name: common.EnvVarIncludeScriptOutput, Value: strconv.FormatBool(opts.includeScriptOutput)}, {Name: common.EnvVarDeadline, Value: woc.getDeadline(opts).Format(time.RFC3339)}, + {Name: common.EnvVarProgressFile, Value: common.ArgoProgressPath}, + } + + // only set tick durations if progress is enabled. The EnvVarProgressFile is always set (user convenience) but the + // progress is only monitored if the tick durations are >0. + if woc.controller.progressPatchTickDuration != 0 && woc.controller.progressFileTickDuration != 0 { + envVars = append(envVars, + apiv1.EnvVar{ + Name: common.EnvVarProgressPatchTickDuration, + Value: woc.controller.progressPatchTickDuration.String(), + }, + apiv1.EnvVar{ + Name: common.EnvVarProgressFileTickDuration, + Value: woc.controller.progressFileTickDuration.String(), + }, + ) } for i, c := range pod.Spec.InitContainers { @@ -636,12 +650,12 @@ func (woc *wfOperationCtx) createVolumes(tmpl *wfv1.Template) []apiv1.Volume { }, }) } + + volumes = append(volumes, volumeVarArgo) + switch woc.getContainerRuntimeExecutor() { - case common.ContainerRuntimeExecutorKubelet, common.ContainerRuntimeExecutorK8sAPI, common.ContainerRuntimeExecutorPNS: case common.ContainerRuntimeExecutorDocker: volumes = append(volumes, woc.getVolumeDockerSock(tmpl)) - default: - volumes = append(volumes, volumeVarArgo) } volumes = append(volumes, tmpl.Volumes...) return volumes diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index c6a0c05b1e8a..8629fa046277 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -715,11 +716,13 @@ func TestVolumeAndVolumeMounts(t *testing.T) { assert.NoError(t, err) assert.Len(t, pods.Items, 1) pod := pods.Items[0] - assert.Equal(t, 2, len(pod.Spec.Volumes)) - assert.Equal(t, "docker-sock", pod.Spec.Volumes[0].Name) - assert.Equal(t, "volume-name", pod.Spec.Volumes[1].Name) - assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) + assert.Equal(t, 3, len(pod.Spec.Volumes)) + assert.Equal(t, "var-run-argo", pod.Spec.Volumes[0].Name) + assert.Equal(t, "docker-sock", pod.Spec.Volumes[1].Name) + assert.Equal(t, "volume-name", pod.Spec.Volumes[2].Name) + assert.Equal(t, 2, len(pod.Spec.Containers[1].VolumeMounts)) assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) + assert.Equal(t, "var-run-argo", pod.Spec.Containers[1].VolumeMounts[1].Name) }) // For Kubelet executor @@ -738,10 +741,12 @@ func TestVolumeAndVolumeMounts(t *testing.T) { assert.NoError(t, err) assert.Len(t, pods.Items, 1) pod := pods.Items[0] - assert.Equal(t, 1, len(pod.Spec.Volumes)) - assert.Equal(t, "volume-name", pod.Spec.Volumes[0].Name) - assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) + assert.Equal(t, 2, len(pod.Spec.Volumes)) + assert.Equal(t, "var-run-argo", pod.Spec.Volumes[0].Name) + assert.Equal(t, "volume-name", pod.Spec.Volumes[1].Name) + assert.Equal(t, 2, len(pod.Spec.Containers[1].VolumeMounts)) assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) + assert.Equal(t, "var-run-argo", pod.Spec.Containers[1].VolumeMounts[1].Name) }) // For K8sAPI executor @@ -760,10 +765,12 @@ func TestVolumeAndVolumeMounts(t *testing.T) { assert.NoError(t, err) assert.Len(t, pods.Items, 1) pod := pods.Items[0] - assert.Equal(t, 1, len(pod.Spec.Volumes)) - assert.Equal(t, "volume-name", pod.Spec.Volumes[0].Name) - assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) + assert.Equal(t, 2, len(pod.Spec.Volumes)) + assert.Equal(t, "var-run-argo", pod.Spec.Volumes[0].Name) + assert.Equal(t, "volume-name", pod.Spec.Volumes[1].Name) + assert.Equal(t, 2, len(pod.Spec.Containers[1].VolumeMounts)) assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) + assert.Equal(t, "var-run-argo", pod.Spec.Containers[1].VolumeMounts[1].Name) }) // For emissary executor @@ -848,10 +855,10 @@ func TestVolumesPodSubstitution(t *testing.T) { assert.NoError(t, err) assert.Len(t, pods.Items, 1) pod := pods.Items[0] - assert.Equal(t, 2, len(pod.Spec.Volumes)) - assert.Equal(t, "volume-name", pod.Spec.Volumes[1].Name) - assert.Equal(t, "test-name", pod.Spec.Volumes[1].PersistentVolumeClaim.ClaimName) - assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts)) + assert.Equal(t, 3, len(pod.Spec.Volumes)) + assert.Equal(t, "volume-name", pod.Spec.Volumes[2].Name) + assert.Equal(t, "test-name", pod.Spec.Volumes[2].PersistentVolumeClaim.ClaimName) + assert.Equal(t, 2, len(pod.Spec.Containers[1].VolumeMounts)) assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) } @@ -1013,9 +1020,10 @@ func TestInitContainers(t *testing.T) { for _, v := range volumes { assert.Contains(t, pod.Spec.Volumes, v) } - assert.Equal(t, 2, len(pod.Spec.InitContainers[0].VolumeMounts)) + assert.Equal(t, 3, len(pod.Spec.InitContainers[0].VolumeMounts)) assert.Equal(t, "init-volume-name", pod.Spec.InitContainers[0].VolumeMounts[0].Name) assert.Equal(t, "volume-name", pod.Spec.InitContainers[0].VolumeMounts[1].Name) + assert.Equal(t, "var-run-argo", pod.Spec.InitContainers[0].VolumeMounts[2].Name) } // TestSidecars verifies the ability to set up sidecars @@ -1077,7 +1085,7 @@ func TestSidecars(t *testing.T) { for _, v := range volumes { assert.Contains(t, pod.Spec.Volumes, v) } - assert.Equal(t, 2, len(pod.Spec.Containers[2].VolumeMounts)) + assert.Equal(t, 3, len(pod.Spec.Containers[2].VolumeMounts)) assert.Equal(t, "sidecar-volume-name", pod.Spec.Containers[2].VolumeMounts[0].Name) assert.Equal(t, "volume-name", pod.Spec.Containers[2].VolumeMounts[1].Name) } @@ -1533,7 +1541,7 @@ func TestHybridWfVolumesWindows(t *testing.T) { pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}) assert.Equal(t, "\\\\.\\pipe\\docker_engine", pod.Spec.Containers[0].VolumeMounts[0].MountPath) assert.Equal(t, false, pod.Spec.Containers[0].VolumeMounts[0].ReadOnly) - assert.Equal(t, (*apiv1.HostPathType)(nil), pod.Spec.Volumes[0].HostPath.Type) + assert.Equal(t, (*apiv1.HostPathType)(nil), pod.Spec.Volumes[1].HostPath.Type) } func TestWindowsUNCPathsAreRemoved(t *testing.T) { @@ -1594,7 +1602,7 @@ func TestHybridWfVolumesLinux(t *testing.T) { pod, _ := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}) assert.Equal(t, "/var/run/docker.sock", pod.Spec.Containers[0].VolumeMounts[0].MountPath) assert.Equal(t, true, pod.Spec.Containers[0].VolumeMounts[0].ReadOnly) - assert.Equal(t, &hostPathSocket, pod.Spec.Volumes[0].HostPath.Type) + assert.Equal(t, &hostPathSocket, pod.Spec.Volumes[1].HostPath.Type) } var propagateMaxDuration = ` @@ -1792,3 +1800,99 @@ func TestPodExists(t *testing.T) { assert.True(t, doesExist) assert.EqualValues(t, pod, existingPod) } + +func TestProgressEnvVars(t *testing.T) { + setup := func(t *testing.T, options ...interface{}) (context.CancelFunc, *apiv1.Pod) { + cancel, controller := newController(options...) + + wf := wfv1.MustUnmarshalWorkflow(helloWorldWf) + ctx := context.Background() + woc := newWorkflowOperationCtx(wf, controller) + err := woc.setExecWorkflow(ctx) + require.NoError(t, err) + mainCtr := woc.execWf.Spec.Templates[0].Container + pod, err := woc.createWorkflowPod(ctx, wf.Name, []apiv1.Container{*mainCtr}, &wf.Spec.Templates[0], &createWorkflowPodOpts{}) + require.NoError(t, err) + assert.NotNil(t, pod) + return cancel, pod + } + + t.Run("default settings use self reporting progress with defaults", func(t *testing.T) { + cancel, pod := setup(t) + defer cancel() + + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFile, + Value: common.ArgoProgressPath, + }) + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressPatchTickDuration, + Value: "1m0s", + }) + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFileTickDuration, + Value: "3s", + }) + }) + + t.Run("setting patch tick duration to 0 disables self reporting progress but still exposes the ARGO_PROGRESS_FILE env var as a convenience.", func(t *testing.T) { + cancel, pod := setup(t, func(workflowController *WorkflowController) { + workflowController.progressPatchTickDuration = 0 + }) + defer cancel() + + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFile, + Value: common.ArgoProgressPath, + }) + assert.NotContains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressPatchTickDuration, + Value: "1m0s", + }) + assert.NotContains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFileTickDuration, + Value: "3s", + }) + }) + + t.Run("setting read file tick duration to 0 disables self reporting progress but still exposes the ARGO_PROGRESS_FILE env var as a convenience.", func(t *testing.T) { + cancel, pod := setup(t, func(workflowController *WorkflowController) { + workflowController.progressFileTickDuration = 0 + }) + defer cancel() + + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFile, + Value: common.ArgoProgressPath, + }) + assert.NotContains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressPatchTickDuration, + Value: "1m0s", + }) + assert.NotContains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFileTickDuration, + Value: "3s", + }) + }) + + t.Run("tick durations are configurable", func(t *testing.T) { + cancel, pod := setup(t, func(workflowController *WorkflowController) { + workflowController.progressPatchTickDuration = 30 * time.Second + workflowController.progressFileTickDuration = 1 * time.Second + }) + defer cancel() + + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFile, + Value: common.ArgoProgressPath, + }) + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressPatchTickDuration, + Value: "30s", + }) + assert.Contains(t, pod.Spec.Containers[0].Env, apiv1.EnvVar{ + Name: common.EnvVarProgressFileTickDuration, + Value: "1s", + }) + }) +} diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index fda941bb7d0a..a0fe077e8f97 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -7,8 +7,10 @@ import ( "compress/gzip" "context" "encoding/json" + "errors" "fmt" "io" + "io/fs" "io/ioutil" "os" "os/signal" @@ -27,7 +29,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "github.com/argoproj/argo-workflows/v3/errors" + argoerrs "github.com/argoproj/argo-workflows/v3/errors" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util" "github.com/argoproj/argo-workflows/v3/util/archive" @@ -77,6 +79,12 @@ type WorkflowExecutor struct { // list of errors that occurred during execution. // the first of these is used as the overall message of the node errors []error + + // current progress which is synced every `annotationPatchTickDuration` to the pods annotations. + progress string + + annotationPatchTickDuration time.Duration + readProgressFileTickDuration time.Duration } type Initializer interface { @@ -108,19 +116,21 @@ type ContainerRuntimeExecutor interface { } // NewExecutor instantiates a new workflow executor -func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace string, cre ContainerRuntimeExecutor, template wfv1.Template, includeScriptOutput bool, deadline time.Time) WorkflowExecutor { +func NewExecutor(clientset kubernetes.Interface, restClient rest.Interface, podName, namespace string, cre ContainerRuntimeExecutor, template wfv1.Template, includeScriptOutput bool, deadline time.Time, annotationPatchTickDuration, readProgressFileTickDuration time.Duration) WorkflowExecutor { return WorkflowExecutor{ - PodName: podName, - ClientSet: clientset, - RESTClient: restClient, - Namespace: namespace, - RuntimeExecutor: cre, - Template: template, - IncludeScriptOutput: includeScriptOutput, - Deadline: deadline, - memoizedConfigMaps: map[string]string{}, - memoizedSecrets: map[string][]byte{}, - errors: []error{}, + PodName: podName, + ClientSet: clientset, + RESTClient: restClient, + Namespace: namespace, + RuntimeExecutor: cre, + Template: template, + IncludeScriptOutput: includeScriptOutput, + Deadline: deadline, + memoizedConfigMaps: map[string]string{}, + memoizedSecrets: map[string][]byte{}, + errors: []error{}, + annotationPatchTickDuration: annotationPatchTickDuration, + readProgressFileTickDuration: readProgressFileTickDuration, } } @@ -148,7 +158,7 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error { log.Warnf("Ignoring optional artifact '%s' which was not supplied", art.Name) continue } else { - return errors.Errorf(errors.CodeNotFound, "required artifact '%s' not supplied", art.Name) + return argoerrs.Errorf(argoerrs.CodeNotFound, "required artifact '%s' not supplied", art.Name) } } driverArt, err := we.newDriverArt(&art) @@ -161,7 +171,7 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error { } // Determine the file path of where to load the artifact if art.Path == "" { - return errors.InternalErrorf("Artifact %s did not specify a path", art.Name) + return argoerrs.InternalErrorf("Artifact %s did not specify a path", art.Name) } var artPath string mnt := common.FindOverlappingVolume(&we.Template, art.Path) @@ -183,7 +193,7 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error { tempArtPath := artPath + ".tmp" err = artDriver.Load(driverArt, tempArtPath) if err != nil { - if art.Optional && errors.IsCode(errors.CodeNotFound, err) { + if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) { log.Infof("Skipping optional input artifact that was not found: %s", art.Name) continue } @@ -253,7 +263,7 @@ func (we *WorkflowExecutor) StageFiles() error { } err := ioutil.WriteFile(filePath, body, 0o644) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } return nil } @@ -267,7 +277,7 @@ func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error { log.Infof("Saving output artifacts") err := os.MkdirAll(tempOutArtDir, os.ModePerm) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } for i, art := range we.Template.Outputs.Artifacts { @@ -283,11 +293,11 @@ func (we *WorkflowExecutor) SaveArtifacts(ctx context.Context) error { func (we *WorkflowExecutor) saveArtifact(ctx context.Context, containerName string, art *wfv1.Artifact) error { // Determine the file path of where to find the artifact if art.Path == "" { - return errors.InternalErrorf("Artifact %s did not specify a path", art.Name) + return argoerrs.InternalErrorf("Artifact %s did not specify a path", art.Name) } fileName, localArtPath, err := we.stageArchiveFile(containerName, art) if err != nil { - if art.Optional && errors.IsCode(errors.CodeNotFound, err) { + if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) { log.Warnf("Ignoring optional artifact '%s' which does not exist in path '%s': %v", art.Name, art.Path, err) return nil } @@ -374,7 +384,7 @@ func (we *WorkflowExecutor) stageArchiveFile(containerName string, art *wfv1.Art fileName := filepath.Base(art.Path) log.Infof("No compression strategy needed. Staging skipped") if !argofile.Exists(mountedArtPath) { - return "", "", errors.Errorf(errors.CodeNotFound, "%s no such file or directory", art.Path) + return "", "", argoerrs.Errorf(argoerrs.CodeNotFound, "%s no such file or directory", art.Path) } return fileName, mountedArtPath, nil } @@ -382,7 +392,7 @@ func (we *WorkflowExecutor) stageArchiveFile(containerName string, art *wfv1.Art localArtPath := filepath.Join(tempOutArtDir, fileName) f, err := os.Create(localArtPath) if err != nil { - return "", "", errors.InternalWrapError(err) + return "", "", argoerrs.InternalWrapError(err) } w := bufio.NewWriter(f) err = archive.TarGzToWriter(mountedArtPath, compressionLevel, w) @@ -415,11 +425,11 @@ func (we *WorkflowExecutor) stageArchiveFile(containerName string, art *wfv1.Art // Delete the tarball err = os.Remove(localArtPath) if err != nil { - return "", "", errors.InternalWrapError(err) + return "", "", argoerrs.InternalWrapError(err) } isDir, err := argofile.IsDirectory(unarchivedArtPath) if err != nil { - return "", "", errors.InternalWrapError(err) + return "", "", argoerrs.InternalWrapError(err) } fileName = filepath.Base(art.Path) if isDir { @@ -431,7 +441,7 @@ func (we *WorkflowExecutor) stageArchiveFile(containerName string, art *wfv1.Art localArtPath = path.Join(tempOutArtDir, fileName) err = os.Rename(unarchivedArtPath, localArtPath) if err != nil { - return "", "", errors.InternalWrapError(err) + return "", "", argoerrs.InternalWrapError(err) } } // In the future, if we were to support other compression formats (e.g. bzip2) or options @@ -531,7 +541,7 @@ func (we *WorkflowExecutor) SaveLogs(ctx context.Context) (*wfv1.Artifact, error tempLogsDir := "/tmp/argo/outputs/logs" err := os.MkdirAll(tempLogsDir, os.ModePerm) if err != nil { - return nil, errors.InternalWrapError(err) + return nil, argoerrs.InternalWrapError(err) } fileName := "main.log" mainLog := path.Join(tempLogsDir, fileName) @@ -560,7 +570,7 @@ func (we *WorkflowExecutor) GetSecret(ctx context.Context, accessKeyName string, func (we *WorkflowExecutor) saveLogToFile(ctx context.Context, containerName, path string) error { outFile, err := os.Create(path) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } defer func() { _ = outFile.Close() }() reader, err := we.RuntimeExecutor.GetOutputStream(ctx, containerName, true) @@ -570,7 +580,7 @@ func (we *WorkflowExecutor) saveLogToFile(ctx context.Context, containerName, pa defer func() { _ = reader.Close() }() _, err = io.Copy(outFile, reader) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } return nil } @@ -585,7 +595,7 @@ func (we *WorkflowExecutor) newDriverArt(art *wfv1.Artifact) (*wfv1.Artifact, er func (we *WorkflowExecutor) InitDriver(ctx context.Context, art *wfv1.Artifact) (artifactcommon.ArtifactDriver, error) { driver, err := artifact.NewDriver(ctx, art, we) if err == artifact.ErrUnsupportedDriver { - return nil, errors.Errorf(errors.CodeBadRequest, "Unsupported artifact driver for %s", art.Name) + return nil, argoerrs.Errorf(argoerrs.CodeBadRequest, "Unsupported artifact driver for %s", art.Name) } return driver, err } @@ -600,7 +610,7 @@ func (we *WorkflowExecutor) getPod(ctx context.Context) (*apiv1.Pod, error) { return !errorsutil.IsTransientErr(err), err }) if err != nil { - return nil, errors.InternalWrapError(err) + return nil, argoerrs.InternalWrapError(err) } return pod, nil } @@ -620,7 +630,7 @@ func (we *WorkflowExecutor) GetConfigMapKey(ctx context.Context, name, key strin return !errorsutil.IsTransientErr(err), err }) if err != nil { - return "", errors.InternalWrapError(err) + return "", argoerrs.InternalWrapError(err) } // memoize all keys in the configmap since it's highly likely we will need to get a // subsequent key in the configmap (e.g. username + password) and we can save an API call @@ -629,7 +639,7 @@ func (we *WorkflowExecutor) GetConfigMapKey(ctx context.Context, name, key strin } val, ok := we.memoizedConfigMaps[cachedKey] if !ok { - return "", errors.Errorf(errors.CodeBadRequest, "configmap '%s' does not have the key '%s'", name, key) + return "", argoerrs.Errorf(argoerrs.CodeBadRequest, "configmap '%s' does not have the key '%s'", name, key) } return val, nil } @@ -648,7 +658,7 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key return !errorsutil.IsTransientErr(err), err }) if err != nil { - return []byte{}, errors.InternalWrapError(err) + return []byte{}, argoerrs.InternalWrapError(err) } // memoize all keys in the secret since it's highly likely we will need to get a // subsequent key in the secret (e.g. username + password) and we can save an API call @@ -657,7 +667,7 @@ func (we *WorkflowExecutor) GetSecrets(ctx context.Context, namespace, name, key } val, ok := we.memoizedSecrets[cachedKey] if !ok { - return []byte{}, errors.Errorf(errors.CodeBadRequest, "secret '%s' does not have the key '%s'", name, key) + return []byte{}, argoerrs.Errorf(argoerrs.CodeBadRequest, "secret '%s' does not have the key '%s'", name, key) } return val, nil } @@ -690,7 +700,7 @@ func (we *WorkflowExecutor) CaptureScriptResult(ctx context.Context) error { defer func() { _ = reader.Close() }() bytes, err := ioutil.ReadAll(reader) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } out := string(bytes) // Trims off a single newline for user convenience @@ -723,7 +733,7 @@ func (we *WorkflowExecutor) AnnotateOutputs(ctx context.Context, logArt *wfv1.Ar log.Infof("Annotating pod with output") outputBytes, err := json.Marshal(outputs) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } return we.AddAnnotation(ctx, common.AnnotationKeyOutputs, string(outputBytes)) } @@ -860,7 +870,7 @@ func unpack(srcPath string, destPath string, decompressor func(string, string) e tmpDir := destPath + ".tmpdir" err := os.MkdirAll(tmpDir, os.ModePerm) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } if decompressor != nil { if err = decompressor(srcPath, tmpDir); err != nil { @@ -871,7 +881,7 @@ func unpack(srcPath string, destPath string, decompressor func(string, string) e // to the destination path. files, err := ioutil.ReadDir(tmpDir) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } if len(files) == 1 { // if the tar is comprised of single file or directory, @@ -879,18 +889,18 @@ func unpack(srcPath string, destPath string, decompressor func(string, string) e filePath := path.Join(tmpDir, files[0].Name()) err = os.Rename(filePath, destPath) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } err = os.Remove(tmpDir) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } } else { // the tar extracted into multiple files. In this case, // just rename the temp directory to the dest path err = os.Rename(tmpDir, destPath) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } } return nil @@ -899,7 +909,7 @@ func unpack(srcPath string, destPath string, decompressor func(string, string) e func chmod(artPath string, mode int32, recurse bool) error { err := os.Chmod(artPath, os.FileMode(mode)) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } if recurse { @@ -907,7 +917,7 @@ func chmod(artPath string, mode int32, recurse bool) error { return os.Chmod(path, os.FileMode(mode)) }) if err != nil { - return errors.InternalWrapError(err) + return argoerrs.InternalWrapError(err) } } @@ -919,6 +929,13 @@ func chmod(artPath string, mode int32, recurse bool) error { // Upon completion, kills any sidecars after it finishes. func (we *WorkflowExecutor) Wait(ctx context.Context) error { containerNames := we.Template.GetMainContainerNames() + // only monitor progress if both tick durations are >0 + if we.annotationPatchTickDuration != 0 && we.readProgressFileTickDuration != 0 { + go we.monitorProgress(ctx, os.Getenv(common.EnvVarProgressFile)) + } else { + log.WithField("annotationPatchTickDuration", we.annotationPatchTickDuration).WithField("readProgressFileTickDuration", we.readProgressFileTickDuration).Info("monitoring progress disabled") + } + go we.monitorDeadline(ctx, containerNames) err := waitutil.Backoff(ExecutorRetry, func() (bool, error) { err := we.RuntimeExecutor.Wait(ctx, containerNames) @@ -931,6 +948,57 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error { return nil } +// monitorProgress monitors for self-reported progress in the progressFile and patches the pod annotations with the parsed progress. +// +// The function reads the last line of the `progressFile` every `readFileTickDuration`. +// If the line matches `N/M`, will set the progress annotation to the parsed progress value. +// Every `annotationPatchTickDuration` the pod is patched with the updated annotations. This way the controller +// gets notified of new self reported progress. +func (we *WorkflowExecutor) monitorProgress(ctx context.Context, progressFile string) { + annotationPatchTicker := time.NewTicker(we.annotationPatchTickDuration) + defer annotationPatchTicker.Stop() + fileTicker := time.NewTicker(we.readProgressFileTickDuration) + defer fileTicker.Stop() + + lastLine := "" + progressFile = filepath.Clean(progressFile) + + for { + select { + case <-ctx.Done(): + log.WithError(ctx.Err()).Info("stopping progress monitor (context done)") + return + case <-annotationPatchTicker.C: + log.WithField("progress", we.progress).Infof("patching pod progress annotation") + if err := we.AddAnnotation(ctx, common.AnnotationKeyProgress, we.progress); err != nil { + log.WithField("progress", we.progress).WithError(err).Warn("failed to patch progress annotation") + } + case <-fileTicker.C: + data, err := ioutil.ReadFile(progressFile) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + log.WithError(err).WithField("file", progressFile).Info("unable to watch file") + } + continue + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + mostRecent := strings.TrimSpace(lines[len(lines)-1]) + + if mostRecent == "" || mostRecent == lastLine { + continue + } + lastLine = mostRecent + + if progress, ok := wfv1.ParseProgress(lastLine); ok { + log.WithField("progress", progress).Info() + we.progress = string(progress) + } else { + log.WithField("line", lastLine).Info("unable to parse progress") + } + } + } +} + // monitorDeadline checks to see if we exceeded the deadline for the step and // terminates the main container if we did func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames []string) { diff --git a/workflow/executor/executor_test.go b/workflow/executor/executor_test.go index eed0194d701c..e0211f57af8e 100644 --- a/workflow/executor/executor_test.go +++ b/workflow/executor/executor_test.go @@ -6,12 +6,16 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/executor/mocks" ) @@ -365,3 +369,90 @@ func TestSaveArtifacts(t *testing.T) { err = we.SaveArtifacts(ctx) assert.Error(t, err) } + +func TestMonitorProgress(t *testing.T) { + deadline, ok := t.Deadline() + if !ok { + deadline = time.Now().Add(30 * time.Second) + } + ctx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() + + fakeClientset := fake.NewSimpleClientset(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fakePodName, + Namespace: fakeNamespace, + }, + Spec: corev1.PodSpec{}, + Status: corev1.PodStatus{ + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "main", + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{ + StartedAt: metav1.Now(), + }, + }, + }, + }, + }, + }) + + f, err := os.CreateTemp("", "") + require.NoError(t, err) + defer func() { + name := f.Name() + err := f.Close() + assert.NoError(t, err) + err = os.Remove(name) + assert.NoError(t, err) + }() + annotationPackTickDuration := 5 * time.Second + readProgressFileTickDuration := 1 * time.Second + progressFile := f.Name() + + mockRuntimeExecutor := mocks.ContainerRuntimeExecutor{} + we := NewExecutor(fakeClientset, nil, fakePodName, fakeNamespace, &mockRuntimeExecutor, wfv1.Template{}, false, deadline, annotationPackTickDuration, readProgressFileTickDuration) + + go we.monitorProgress(ctx, progressFile) + + go func(ctx context.Context) { + progress := 0 + maxProgress := 10 + tickDuration := 1 * time.Second + ticker := time.After(tickDuration) + for { + select { + case <-ctx.Done(): + return + case <-ticker: + t.Logf("tick progress=%d", progress) + _, err := fmt.Fprintf(f, "%d/100\n", progress*10) + assert.NoError(t, err) + if progress >= maxProgress { + return + } + progress += 1 + ticker = time.After(tickDuration) + } + } + }(ctx) + + ticker := time.After(annotationPackTickDuration) + for { + select { + case <-ctx.Done(): + t.Error(ctx.Err()) + return + case <-ticker: + pod, err := we.getPod(ctx) + assert.NoError(t, err) + progress, ok := pod.Annotations[common.AnnotationKeyProgress] + if ok && progress == "100/100" { + t.Log("success reaching 100/100 progress") + return + } + ticker = time.After(annotationPackTickDuration) + } + } +} diff --git a/workflow/progress/updater.go b/workflow/progress/updater.go index fda8b4a1f757..a5ed071332f7 100644 --- a/workflow/progress/updater.go +++ b/workflow/progress/updater.go @@ -1,22 +1,42 @@ package progress import ( + apiv1 "k8s.io/api/core/v1" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/common" ) +// PodProgress reads the progress annotation of a pod and ensures it's valid and synced +// with the node status. +func PodProgress(pod *apiv1.Pod, node *wfv1.NodeStatus) wfv1.Progress { + progress := wfv1.Progress("0/1") + if node.Progress.IsValid() { + progress = node.Progress + } + + if annotation, ok := pod.Annotations[common.AnnotationKeyProgress]; ok { + v, ok := wfv1.ParseProgress(annotation) + if ok { + progress = v + } + } + if node.Fulfilled() { + progress = progress.Complete() + } + return progress +} + +// UpdateProgress ensures the workflow's progress is updated with the individual node progress. func UpdateProgress(wf *wfv1.Workflow) { wf.Status.Progress = "0/0" - for nodeID, node := range wf.Status.Nodes { + for _, node := range wf.Status.Nodes { if node.Type != wfv1.NodeTypePod { continue } - progress := wfv1.Progress("0/1") - if node.Fulfilled() { - progress = "1/1" + if node.Progress.IsValid() { + wf.Status.Progress = wf.Status.Progress.Add(node.Progress) } - node.Progress = progress - wf.Status.Nodes[nodeID] = node - wf.Status.Progress = wf.Status.Progress.Add(progress) } for nodeID, node := range wf.Status.Nodes { if node.Type == wfv1.NodeTypePod { diff --git a/workflow/progress/updater_test.go b/workflow/progress/updater_test.go index 9cb2e1ef55b3..8273302c34bb 100644 --- a/workflow/progress/updater_test.go +++ b/workflow/progress/updater_test.go @@ -9,20 +9,25 @@ import ( wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) -func TestUpdator(t *testing.T) { +func TestUpdater(t *testing.T) { + ns := "my-ns" wf := &wfv1.Workflow{ - ObjectMeta: metav1.ObjectMeta{Namespace: "my-ns"}, + ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: "wf"}, Status: wfv1.WorkflowStatus{ Nodes: wfv1.Nodes{ - "pod-1": wfv1.NodeStatus{Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod}, - "pod-2": wfv1.NodeStatus{Type: wfv1.NodeTypePod}, - "wf": wfv1.NodeStatus{Children: []string{"pod-1", "pod-2"}}, + "pod-1": wfv1.NodeStatus{Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, Progress: wfv1.Progress("50/50")}, + "pod-2": wfv1.NodeStatus{Type: wfv1.NodeTypePod, Progress: wfv1.Progress("50/150")}, + "pod-3": wfv1.NodeStatus{Type: wfv1.NodeTypePod, Progress: wfv1.Progress("50/100")}, + "wf": wfv1.NodeStatus{Children: []string{"pod-1", "pod-2", "pod-3"}}, }, }, } + UpdateProgress(wf) - assert.Equal(t, wfv1.Progress("1/1"), wf.Status.Nodes["pod-1"].Progress) - assert.Equal(t, wfv1.Progress("0/1"), wf.Status.Nodes["pod-2"].Progress) - assert.Equal(t, wfv1.Progress("1/2"), wf.Status.Nodes["wf"].Progress) - assert.Equal(t, wfv1.Progress("1/2"), wf.Status.Progress) + + assert.Equal(t, wfv1.Progress("50/50"), wf.Status.Nodes["pod-1"].Progress) + assert.Equal(t, wfv1.Progress("50/150"), wf.Status.Nodes["pod-2"].Progress) + assert.Equal(t, wfv1.Progress("50/100"), wf.Status.Nodes["pod-3"].Progress) + assert.Equal(t, wfv1.Progress("150/300"), wf.Status.Nodes["wf"].Progress) + assert.Equal(t, wfv1.Progress("150/300"), wf.Status.Progress) }