Skip to content

Commit

Permalink
fix(executor): Delegate PNS wait to K8SAPI executor. (#5307)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Mar 6, 2021
1 parent a5d1acc commit b048875
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
18 changes: 13 additions & 5 deletions workflow/executor/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,21 @@ func isTerminated(ctx context.Context, c KubernetesClientInterface, containerNam
if err != nil {
return false, err
}
for _, s := range containerStatus {
log.Debugf("%q %v", s.Name, s.State.Terminated)
if s.State.Terminated == nil && slice.ContainsString(containerNames, s.Name) {
return false, nil
return AllTerminated(containerStatus, containerNames), nil
}

func AllTerminated(containerStatuses []v1.ContainerStatus, containerNames []string) bool {
terminated := make(map[string]bool)
// I've seen a few cases where containers are missing from container status just after a pod started.
for _, c := range containerStatuses {
terminated[c.Name] = c.State.Terminated != nil
}
for _, n := range containerNames {
if !terminated[n] {
return false
}
}
return true, nil
return true
}

// TerminatePodWithContainerID invoke the given SIG against the PID1 of the container.
Expand Down
9 changes: 2 additions & 7 deletions workflow/executor/k8sapi/k8sapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
restclient "k8s.io/client-go/rest"

"github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/util/slice"
"github.com/argoproj/argo-workflows/v3/workflow/executor/common"
)

type K8sAPIExecutor struct {
Expand Down Expand Up @@ -58,12 +58,7 @@ func (k *K8sAPIExecutor) GetExitCode(ctx context.Context, containerName string)
// Wait for the container to complete
func (k *K8sAPIExecutor) Wait(ctx context.Context, containerNames, sidecarNames []string) error {
return k.Until(ctx, func(pod *corev1.Pod) bool {
for _, s := range pod.Status.ContainerStatuses {
if s.State.Terminated == nil && slice.ContainsString(containerNames, s.Name) {
return false
}
}
return true
return common.AllTerminated(pod.Status.ContainerStatuses, containerNames)
})
}

Expand Down
12 changes: 6 additions & 6 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,13 @@ func (p *PNSExecutor) Wait(ctx context.Context, containerNames, sidecarNames []s
time.Sleep(1 * time.Second)
}

if !p.haveContainerPIDs(containerNames) {
log.Info("container PIDs still unknown (maybe short running container, or late starting)")
return p.K8sAPIExecutor.Wait(ctx, containerNames, sidecarNames)
}

OUTER:
for _, containerName := range containerNames {
pid := p.getContainerPID(containerName)
if pid == 0 {
log.Infof("container %q pid unknown - maybe short running, or late starting container", containerName)
continue
}
log.Infof("Waiting for %q pid %d to complete", containerName, pid)
for {
select {
Expand All @@ -184,7 +183,8 @@ OUTER:
}
}
}
return nil

return p.K8sAPIExecutor.Wait(ctx, containerNames, sidecarNames)
}

// pollRootProcesses will poll /proc for root pids (pids without parents) in a tight loop, for the
Expand Down

0 comments on commit b048875

Please sign in to comment.