Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Error pending nodes w/o Pods unless resubmitPendingPods is set #2721

Merged
merged 4 commits into from Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 20 additions & 2 deletions workflow/controller/operator.go
Expand Up @@ -223,7 +223,6 @@ func (woc *wfOperationCtx) operate() {
if err != nil {
woc.log.Errorf("%s error: %+v", woc.wf.ObjectMeta.Name, err)
woc.auditLogger.LogWorkflowEvent(woc.wf, argo.EventInfo{Type: apiv1.EventTypeWarning, Reason: argo.EventReasonWorkflowTimedOut}, "Workflow timed out")

// TODO: we need to re-add to the workqueue, but should happen in caller
return
}
Expand Down Expand Up @@ -797,11 +796,30 @@ func (woc *wfOperationCtx) podReconciliation() error {
// It is now impossible to infer pod status. The only thing we can do at this point is to mark
// the node with Error.
for nodeID, node := range woc.wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod || node.Completed() || node.StartedAt.IsZero() || node.Pending() {
if node.Type != wfv1.NodeTypePod || node.Completed() || node.StartedAt.IsZero() {
// node is not a pod, it is already complete, or it can be re-run.
continue
}
if _, ok := seenPods[nodeID]; !ok {

// If the node is pending and the pod does not exist, it could be the case that we want to try to submit it
// again instead of marking it as an error. Check if that's the case.
if node.Pending() {
tmplCtx, err := woc.createTemplateContext(node.GetTemplateScope())
if err != nil {
return err
}
_, tmpl, _, err := tmplCtx.ResolveTemplate(&node)
if err != nil {
return err
}

if isResubmitAllowed(tmpl) {
// We want to resubmit. Continue and do not mark as error.
continue
}
}

node.Message = "pod deleted"
node.Phase = wfv1.NodeError
woc.wf.Status.Nodes[nodeID] = node
Expand Down
14 changes: 7 additions & 7 deletions workflow/controller/operator_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test"
"github.com/argoproj/argo/util/argo"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
)
Expand Down Expand Up @@ -2411,9 +2412,9 @@ func TestEventInvalidSpec(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(events.Items))
runningEvent := events.Items[0]
assert.Equal(t, "WorkflowRunning", runningEvent.Reason)
assert.Equal(t, argo.EventReasonWorkflowRunning, runningEvent.Reason)
invalidSpecEvent := events.Items[1]
assert.Equal(t, "WorkflowFailed", invalidSpecEvent.Reason)
assert.Equal(t, argo.EventReasonWorkflowFailed, invalidSpecEvent.Reason)
assert.Equal(t, "invalid spec: template name '123' undefined", invalidSpecEvent.Message)
}

Expand Down Expand Up @@ -2452,10 +2453,9 @@ func TestEventTimeout(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(events.Items))
runningEvent := events.Items[0]
assert.Equal(t, "WorkflowRunning", runningEvent.Reason)
assert.Equal(t, argo.EventReasonWorkflowRunning, runningEvent.Reason)
timeoutEvent := events.Items[1]
assert.Equal(t, "WorkflowTimedOut", timeoutEvent.Reason)
assert.True(t, strings.HasPrefix(timeoutEvent.Message, "timeout-template error in entry template execution: Deadline exceeded"))
assert.Equal(t, argo.EventReasonWorkflowFailed, timeoutEvent.Reason)
}

var failLoadArtifactRepoCm = `
Expand Down Expand Up @@ -2494,9 +2494,9 @@ func TestEventFailArtifactRepoCm(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 2, len(events.Items))
runningEvent := events.Items[0]
assert.Equal(t, "WorkflowRunning", runningEvent.Reason)
assert.Equal(t, argo.EventReasonWorkflowRunning, runningEvent.Reason)
failEvent := events.Items[1]
assert.Equal(t, "WorkflowFailed", failEvent.Reason)
assert.Equal(t, argo.EventReasonWorkflowFailed, failEvent.Reason)
assert.Equal(t, "Failed to load artifact repository configMap: configmaps \"artifact-repository\" not found", failEvent.Message)
}

Expand Down