From f872366f3b40fc346266e3ae328bdc25eb2082ec Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Wed, 20 Jan 2021 16:37:24 -0800 Subject: [PATCH] fix(controller): Report reconciliation errors better (#4877) Signed-off-by: Alex Collins Co-authored-by: Simon Behar --- util/diff/diff.go | 18 ++++++++++++++++++ workflow/controller/controller.go | 3 ++- workflow/controller/dag.go | 7 ++++--- workflow/controller/operator.go | 27 ++++++++++++++++----------- workflow/controller/operator_test.go | 2 +- workflow/controller/steps.go | 4 +--- workflow/controller/workflowpod.go | 3 ++- 7 files changed, 44 insertions(+), 20 deletions(-) create mode 100644 util/diff/diff.go diff --git a/util/diff/diff.go b/util/diff/diff.go new file mode 100644 index 000000000000..e70bbc3c4810 --- /dev/null +++ b/util/diff/diff.go @@ -0,0 +1,18 @@ +package diff + +import ( + "encoding/json" + + jsonpatch "github.com/evanphx/json-patch" + log "github.com/sirupsen/logrus" +) + +func LogChanges(old, new interface{}) { + if !log.IsLevelEnabled(log.DebugLevel) { + return + } + a, _ := json.Marshal(old) + b, _ := json.Marshal(new) + patch, _ := jsonpatch.CreateMergePatch(a, b) + log.Debugf("Log changes patch: %s", string(patch)) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 67fa259236e7..6c3aff049003 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -40,6 +40,7 @@ import ( wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1" authutil "github.com/argoproj/argo/util/auth" + "github.com/argoproj/argo/util/diff" errorsutil "github.com/argoproj/argo/util/errors" "github.com/argoproj/argo/workflow/artifactrepositories" "github.com/argoproj/argo/workflow/common" @@ -894,7 +895,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) cache.SharedI } if !pod.SignificantPodChange(oldPod, newPod) { log.WithField("key", key).Info("insignificant pod change") - pod.LogChanges(oldPod, newPod) + diff.LogChanges(oldPod, newPod) return } wfc.podQueue.Add(key) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 308ca79c428f..3eed1cc461a2 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sort" "strings" "time" @@ -461,7 +462,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex } // Finally execute the template - node, err = woc.executeTemplate(ctx, taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate}) + _, err = woc.executeTemplate(ctx, taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate}) if err != nil { switch err { case ErrDeadlineExceeded: @@ -471,8 +472,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex _ = woc.markNodePhase(taskNodeName, wfv1.NodeFailed, err.Error()) return default: - woc.log.Infof("DAG %s deemed errored due to task %s error: %s", node.ID, taskNodeName, err.Error()) - _ = woc.markNodePhase(taskNodeName, wfv1.NodeError, fmt.Sprintf("task '%s' errored", taskNodeName)) + _ = woc.markNodeError(taskNodeName, fmt.Errorf("task '%s' errored: %v", taskNodeName, err)) return } } @@ -627,6 +627,7 @@ func (d *dagContext) findLeafTaskNames(tasks []wfv1.DAGTask) []string { leafTaskNames = append(leafTaskNames, taskName) } } + sort.Strings(leafTaskNames) // execute tasks in a predictable order return leafTaskNames } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index e0923c9bde59..b4047276e9f5 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -38,6 +38,7 @@ import ( wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1" "github.com/argoproj/argo/util" + "github.com/argoproj/argo/util/diff" envutil "github.com/argoproj/argo/util/env" errorsutil "github.com/argoproj/argo/util/errors" "github.com/argoproj/argo/util/intstr" @@ -344,13 +345,17 @@ func (woc *wfOperationCtx) operate(ctx context.Context) { node, err := woc.executeTemplate(ctx, woc.wf.ObjectMeta.Name, &wfv1.WorkflowStep{Template: woc.execWf.Spec.Entrypoint}, tmplCtx, woc.execWf.Spec.Arguments, &executeTemplateOpts{}) if err != nil { - // the error are handled in the callee so just log it. - msg := "error in entry template execution" - woc.log.WithError(err).Error(msg) - msg = fmt.Sprintf("%s %s: %+v", woc.wf.Name, msg, err) + woc.log.WithError(err).Error("error in entry template execution") + // we wrap this error up to report a clear message + x := fmt.Errorf("error in entry template execution: %w", err) switch err { case ErrDeadlineExceeded: - woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowTimedOut", msg) + woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowTimedOut", x.Error()) + case ErrParallelismReached: + default: + if !errorsutil.IsTransientErr(err) && !woc.wf.Status.Phase.Completed() && os.Getenv("BUBBLE_ENTRY_TEMPLATE_ERR") != "false" { + woc.markWorkflowError(ctx, x) + } } return } @@ -504,6 +509,9 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) { if !woc.updated { return } + + diff.LogChanges(woc.orig, woc.wf) + resource.UpdateResourceDurations(woc.wf) progress.UpdateProgress(woc.wf) // You MUST not call `persistUpdates` twice. @@ -881,10 +889,8 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error { } node := woc.wf.Status.Nodes[pod.ObjectMeta.Name] if node.Fulfilled() && !node.IsDaemoned() { - if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk { - if tmpVal == "true" { - return - } + if pod.GetLabels()[common.LabelKeyCompleted] == "true" { + return } woc.completedPods[pod.ObjectMeta.Name] = true if woc.shouldPrintPodSpec(node) { @@ -2125,8 +2131,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS // markNodePending is a convenience method to mark a node and set the message from the error func (woc *wfOperationCtx) markNodePending(nodeName string, err error) *wfv1.NodeStatus { woc.log.Infof("Mark node %s as Pending, due to: %+v", nodeName, err) - node := woc.wf.GetNodeByName(nodeName) - return woc.markNodePhase(nodeName, wfv1.NodePending, fmt.Sprintf("Pending %s", time.Since(node.StartedAt.Time))) + return woc.markNodePhase(nodeName, wfv1.NodePending, err.Error()) // this error message will not change often } // markNodeWaitingForLock is a convenience method to mark that a node is waiting for a lock diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 9ad693926c06..f867d6f94f29 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -4164,7 +4164,7 @@ func TestValidReferenceMode(t *testing.T) { woc.wf.Status.StoredWorkflowSpec.Entrypoint = "different" woc = newWorkflowOperationCtx(woc.wf, controller) woc.operate(ctx) - assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase) + assert.Equal(t, wfv1.NodeError, woc.wf.Status.Phase) } var workflowStatusMetric = ` diff --git a/workflow/controller/steps.go b/workflow/controller/steps.go index 69c0d29bca49..632201372f08 100644 --- a/workflow/controller/steps.go +++ b/workflow/controller/steps.go @@ -247,10 +247,8 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv case ErrTimeout: return woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("child '%s' timedout", childNodeName)) default: - errMsg := fmt.Sprintf("child '%s' errored", childNodeName) - woc.log.Infof("Step group node %s deemed errored due to child %s error: %s", node.ID, childNodeName, err.Error()) woc.addChildNode(sgNodeName, childNodeName) - return woc.markNodePhase(node.Name, wfv1.NodeError, errMsg) + return woc.markNodeError(node.Name, fmt.Errorf("step group deemed errored due to child %s error: %w", childNodeName, err)) } } if childNode != nil { diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index bb6a680c332b..9bbef8012e34 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -130,7 +130,6 @@ type createWorkflowPodOpts struct { func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts) (*apiv1.Pod, error) { nodeID := woc.wf.NodeID(nodeName) - woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID) // we must check to see if the pod exists rather than just optimistically creating the pod and see if we get // an `AlreadyExists` error because we won't get that error if there is not enough resources. @@ -372,6 +371,8 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin pod.Spec.ActiveDeadlineSeconds = &newActiveDeadlineSeconds } + woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID) + created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { if apierr.IsAlreadyExists(err) {