Skip to content

Commit

Permalink
fix(controller): Various v2.12 fixes. Fixes argoproj#4798, argoproj#4801
Browse files Browse the repository at this point in the history
, argoproj#4806, argoproj#3551

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Dec 29, 2020
1 parent 78b0bff commit 996afc8
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 23 deletions.
9 changes: 2 additions & 7 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,7 @@ func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap)
log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName)
continue
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Errorf("failed to convert to workflow from unstructured: %v", err)
continue
}
wfc.wfQueue.Add(fmt.Sprintf("%s/%s", wf.Namespace, wf.Name))
wfc.wfQueue.Add(fmt.Sprintf("%s/%s", un.GetNamespace(), un.GetName()))
}
}

Expand Down Expand Up @@ -992,7 +987,7 @@ func (wfc *WorkflowController) syncWorkflowPhaseMetrics() {
if err != nil {
log.WithError(err).Errorf("failed to list workflows by '%s'", phase)
continue
}
}
wfc.metrics.SetWorkflowPhaseGauge(phase, len(objs))
}
}
18 changes: 9 additions & 9 deletions workflow/controller/estimation/estimator_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
if err != nil {
return defaultEstimator, fmt.Errorf("failed to list workflows by index: %v", err)
}
var newestWf *wfv1.Workflow
var newestUn *unstructured.Unstructured
for _, obj := range objs {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
Expand All @@ -55,16 +55,16 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
if un.GetLabels()[common.LabelKeyPhase] != string(wfv1.NodeSucceeded) {
continue
}
candidateWf, err := util.FromUnstructured(un)
// we use `creationTimestamp` because it's fast
if newestUn == nil || un.GetCreationTimestamp().After(newestUn.GetCreationTimestamp().Time) {
newestUn = un
}
}
if newestUn != nil {
newestWf, err := util.FromUnstructured(newestUn)
if err != nil {
return defaultEstimator, fmt.Errorf("failed convert unstructured to workflow: %w", err)
}
// we use `startedAt` because that's same as how the archive sorts
if newestWf == nil || candidateWf.Status.StartedAt.Time.After(newestWf.Status.StartedAt.Time) {
newestWf = candidateWf
}
}
if newestWf != nil {
err = f.hydrator.Hydrate(newestWf)
if err != nil {
return defaultEstimator, fmt.Errorf("failed hydrate last workflow: %w", err)
Expand All @@ -78,7 +78,7 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
}
workflows, err := f.wfArchive.ListWorkflows(wf.Namespace, time.Time{}, time.Time{}, requirements, 1, 0)
if err != nil {
return nil, fmt.Errorf("failed to list archived workflows: %v", err)
return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err)
}
if len(workflows) > 0 {
return &estimator{wf, &workflows[0]}, nil
Expand Down
7 changes: 2 additions & 5 deletions workflow/controller/indexes/workflow_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexes
import (
"fmt"

"github.com/prometheus/common/log"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -32,13 +31,11 @@ func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("cannot convert obj into unstructured.Unstructured in Indexer %s", SemaphoreConfigIndexName)
return []string{}, nil
return nil, nil
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("failed to convert to workflow from unstructured: %v", err)
return []string{}, nil
return nil, nil
}
return wf.GetSemaphoreKeys(), nil
}
Expand Down
9 changes: 9 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,10 +646,19 @@ func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface, no
if err != nil {
return nil, err
}
if currWf.Status.Fulfilled() {
return nil, fmt.Errorf("must never update completed workflows")
}
err = woc.controller.hydrator.Hydrate(currWf)
if err != nil {
return nil, err
}
for _, x := range woc.wf.Status.Nodes {
y, exists := currWf.Status.Nodes[x.ID]
if exists && y.Fulfilled() && x.Phase != y.Phase {
return nil, fmt.Errorf("must never update completed node %s", x.ID)
}
}
currWfBytes, err := json.Marshal(currWf)
if err != nil {
return nil, err
Expand Down
11 changes: 9 additions & 2 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/argoproj/argo/errors"
"github.com/argoproj/argo/util/archive"
errorsutil "github.com/argoproj/argo/util/errors"
"github.com/argoproj/argo/workflow/common"
execcommon "github.com/argoproj/argo/workflow/executor/common"
argowait "github.com/argoproj/argo/workflow/executor/common/wait"
Expand Down Expand Up @@ -369,13 +370,19 @@ func (p *PNSExecutor) updateCtrIDMap() {
}
}

var backoffOver30s = wait.Backoff{
Duration: 1 * time.Second,
Steps: 7,
Factor: 2,
}

func (p *PNSExecutor) GetTerminatedContainerStatus(containerID string) (*corev1.Pod, *corev1.ContainerStatus, error) {
var pod *corev1.Pod
var containerStatus *corev1.ContainerStatus
err := wait.Poll(1*time.Second, 3*time.Second, func() (bool, error) {
err := wait.ExponentialBackoff(backoffOver30s, func() (bool, error) {
podRes, err := p.clientset.CoreV1().Pods(p.namespace).Get(p.podName, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("could not get pod: %s", err)
return !errorsutil.IsTransientErr(err), fmt.Errorf("could not get pod: %w", err)
}
for _, containerStatusRes := range podRes.Status.ContainerStatuses {
if execcommon.GetContainerID(&containerStatusRes) != containerID {
Expand Down
12 changes: 12 additions & 0 deletions workflow/executor/pns/pns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@ package pns

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_backoffOver30s(t *testing.T) {
x := backoffOver30s
assert.Equal(t, 1*time.Second, x.Step())
assert.Equal(t, 2*time.Second, x.Step())
assert.Equal(t, 4*time.Second, x.Step())
assert.Equal(t, 8*time.Second, x.Step())
assert.Equal(t, 16*time.Second, x.Step())
assert.Equal(t, 32*time.Second, x.Step())
assert.Equal(t, 64*time.Second, x.Step())
}

func TestPNSExecutor_parseContainerIDFromCgroupLine(t *testing.T) {
testCases := []struct {
line string
Expand Down

0 comments on commit 996afc8

Please sign in to comment.