Skip to content

Commit

Permalink
fix(controller): Report reconciliation errors better (#4877)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
Co-authored-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
alexec and simster7 committed Jan 21, 2021
1 parent c8215f9 commit f872366
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 20 deletions.
18 changes: 18 additions & 0 deletions util/diff/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package diff

import (
"encoding/json"

jsonpatch "github.com/evanphx/json-patch"
log "github.com/sirupsen/logrus"
)

func LogChanges(old, new interface{}) {
if !log.IsLevelEnabled(log.DebugLevel) {
return
}
a, _ := json.Marshal(old)
b, _ := json.Marshal(new)
patch, _ := jsonpatch.CreateMergePatch(a, b)
log.Debugf("Log changes patch: %s", string(patch))
}
3 changes: 2 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1"
authutil "github.com/argoproj/argo/util/auth"
"github.com/argoproj/argo/util/diff"
errorsutil "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/workflow/artifactrepositories"
"github.com/argoproj/argo/workflow/common"
Expand Down Expand Up @@ -894,7 +895,7 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) cache.SharedI
}
if !pod.SignificantPodChange(oldPod, newPod) {
log.WithField("key", key).Info("insignificant pod change")
pod.LogChanges(oldPod, newPod)
diff.LogChanges(oldPod, newPod)
return
}
wfc.podQueue.Add(key)
Expand Down
7 changes: 4 additions & 3 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -461,7 +462,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
}

// Finally execute the template
node, err = woc.executeTemplate(ctx, taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate})
_, err = woc.executeTemplate(ctx, taskNodeName, &t, dagCtx.tmplCtx, t.Arguments, &executeTemplateOpts{boundaryID: dagCtx.boundaryID, onExitTemplate: dagCtx.onExitTemplate})
if err != nil {
switch err {
case ErrDeadlineExceeded:
Expand All @@ -471,8 +472,7 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
_ = woc.markNodePhase(taskNodeName, wfv1.NodeFailed, err.Error())
return
default:
woc.log.Infof("DAG %s deemed errored due to task %s error: %s", node.ID, taskNodeName, err.Error())
_ = woc.markNodePhase(taskNodeName, wfv1.NodeError, fmt.Sprintf("task '%s' errored", taskNodeName))
_ = woc.markNodeError(taskNodeName, fmt.Errorf("task '%s' errored: %v", taskNodeName, err))
return
}
}
Expand Down Expand Up @@ -627,6 +627,7 @@ func (d *dagContext) findLeafTaskNames(tasks []wfv1.DAGTask) []string {
leafTaskNames = append(leafTaskNames, taskName)
}
}
sort.Strings(leafTaskNames) // execute tasks in a predictable order
return leafTaskNames
}

Expand Down
27 changes: 16 additions & 11 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/util"
"github.com/argoproj/argo/util/diff"
envutil "github.com/argoproj/argo/util/env"
errorsutil "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/util/intstr"
Expand Down Expand Up @@ -344,13 +345,17 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {

node, err := woc.executeTemplate(ctx, woc.wf.ObjectMeta.Name, &wfv1.WorkflowStep{Template: woc.execWf.Spec.Entrypoint}, tmplCtx, woc.execWf.Spec.Arguments, &executeTemplateOpts{})
if err != nil {
// the error are handled in the callee so just log it.
msg := "error in entry template execution"
woc.log.WithError(err).Error(msg)
msg = fmt.Sprintf("%s %s: %+v", woc.wf.Name, msg, err)
woc.log.WithError(err).Error("error in entry template execution")
// we wrap this error up to report a clear message
x := fmt.Errorf("error in entry template execution: %w", err)
switch err {
case ErrDeadlineExceeded:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowTimedOut", msg)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowTimedOut", x.Error())
case ErrParallelismReached:
default:
if !errorsutil.IsTransientErr(err) && !woc.wf.Status.Phase.Completed() && os.Getenv("BUBBLE_ENTRY_TEMPLATE_ERR") != "false" {
woc.markWorkflowError(ctx, x)
}
}
return
}
Expand Down Expand Up @@ -504,6 +509,9 @@ func (woc *wfOperationCtx) persistUpdates(ctx context.Context) {
if !woc.updated {
return
}

diff.LogChanges(woc.orig, woc.wf)

resource.UpdateResourceDurations(woc.wf)
progress.UpdateProgress(woc.wf)
// You MUST not call `persistUpdates` twice.
Expand Down Expand Up @@ -881,10 +889,8 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
}
node := woc.wf.Status.Nodes[pod.ObjectMeta.Name]
if node.Fulfilled() && !node.IsDaemoned() {
if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk {
if tmpVal == "true" {
return
}
if pod.GetLabels()[common.LabelKeyCompleted] == "true" {
return
}
woc.completedPods[pod.ObjectMeta.Name] = true
if woc.shouldPrintPodSpec(node) {
Expand Down Expand Up @@ -2125,8 +2131,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS
// markNodePending is a convenience method to mark a node and set the message from the error
func (woc *wfOperationCtx) markNodePending(nodeName string, err error) *wfv1.NodeStatus {
woc.log.Infof("Mark node %s as Pending, due to: %+v", nodeName, err)
node := woc.wf.GetNodeByName(nodeName)
return woc.markNodePhase(nodeName, wfv1.NodePending, fmt.Sprintf("Pending %s", time.Since(node.StartedAt.Time)))
return woc.markNodePhase(nodeName, wfv1.NodePending, err.Error()) // this error message will not change often
}

// markNodeWaitingForLock is a convenience method to mark that a node is waiting for a lock
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4164,7 +4164,7 @@ func TestValidReferenceMode(t *testing.T) {
woc.wf.Status.StoredWorkflowSpec.Entrypoint = "different"
woc = newWorkflowOperationCtx(woc.wf, controller)
woc.operate(ctx)
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
assert.Equal(t, wfv1.NodeError, woc.wf.Status.Phase)
}

var workflowStatusMetric = `
Expand Down
4 changes: 1 addition & 3 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,8 @@ func (woc *wfOperationCtx) executeStepGroup(ctx context.Context, stepGroup []wfv
case ErrTimeout:
return woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("child '%s' timedout", childNodeName))
default:
errMsg := fmt.Sprintf("child '%s' errored", childNodeName)
woc.log.Infof("Step group node %s deemed errored due to child %s error: %s", node.ID, childNodeName, err.Error())
woc.addChildNode(sgNodeName, childNodeName)
return woc.markNodePhase(node.Name, wfv1.NodeError, errMsg)
return woc.markNodeError(node.Name, fmt.Errorf("step group deemed errored due to child %s error: %w", childNodeName, err))
}
}
if childNode != nil {
Expand Down
3 changes: 2 additions & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ type createWorkflowPodOpts struct {

func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)

// we must check to see if the pod exists rather than just optimistically creating the pod and see if we get
// an `AlreadyExists` error because we won't get that error if there is not enough resources.
Expand Down Expand Up @@ -372,6 +371,8 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
pod.Spec.ActiveDeadlineSeconds = &newActiveDeadlineSeconds
}

woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)

created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
if apierr.IsAlreadyExists(err) {
Expand Down

0 comments on commit f872366

Please sign in to comment.