diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index 6fc6ddd6b660..63e3f3450b0e 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -14,13 +14,11 @@ import ( executil "github.com/argoproj/pkg/exec" gops "github.com/mitchellh/go-ps" log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "github.com/argoproj/argo-workflows/v3/errors" "github.com/argoproj/argo-workflows/v3/util/archive" "github.com/argoproj/argo-workflows/v3/workflow/common" - execcommon "github.com/argoproj/argo-workflows/v3/workflow/executor/common" "github.com/argoproj/argo-workflows/v3/workflow/executor/k8sapi" osspecific "github.com/argoproj/argo-workflows/v3/workflow/executor/os-specific" ) @@ -32,12 +30,10 @@ type PNSExecutor struct { podName string namespace string + // mu for `containerNameToPID`, and `pidFileHandles` mu sync.RWMutex - containers map[string]string // container name -> container ID - - // ctrIDToPid maps a containerID to a process ID - ctrIDToPid map[string]int + containerNameToPID map[string]int // pidFileHandles holds file handles to all root containers pidFileHandles map[int]*os.File @@ -56,14 +52,13 @@ func NewPNSExecutor(clientset *kubernetes.Clientset, podName, namespace string) } delegate := k8sapi.NewK8sAPIExecutor(clientset, nil, podName, namespace) return &PNSExecutor{ - K8sAPIExecutor: delegate, - podName: podName, - namespace: namespace, - mu: sync.RWMutex{}, - containers: make(map[string]string), - ctrIDToPid: make(map[string]int), - pidFileHandles: make(map[int]*os.File), - thisPID: thisPID, + K8sAPIExecutor: delegate, + podName: podName, + namespace: namespace, + mu: sync.RWMutex{}, + containerNameToPID: make(map[string]int), + pidFileHandles: make(map[int]*os.File), + thisPID: thisPID, }, nil } @@ -82,9 +77,9 @@ func (p *PNSExecutor) GetFileContents(containerName string, sourcePath string) ( // enterChroot enters chroot of the main container func (p *PNSExecutor) enterChroot(containerName string) error { - pid, err := p.getContainerPID(containerName) - if err != nil { - return fmt.Errorf("failed to get container PID: %w", err) + pid := p.getContainerPID(containerName) + if pid == 0 { + return fmt.Errorf("cannot enter chroot for container named %q: no PID known - maybe short running container", containerName) } if err := p.pidFileHandles[pid].Chdir(); err != nil { return errors.InternalWrapErrorf(err, "failed to chdir to main filesystem: %v", err) @@ -146,31 +141,27 @@ func (p *PNSExecutor) Wait(ctx context.Context, containerNames, sidecarNames []s } p.rootFS = rootFS - if !p.haveContainers(allContainerNames) { // allow some additional time for polling to get this data - time.Sleep(3 * time.Second) - } + /* + What is a "short running container"?: - if !p.haveContainers(containerNames) { - log.Info("container PID still unknown (maybe due to short running main container)") - err := p.K8sAPIExecutor.Until(ctx, func(pod *corev1.Pod) bool { - for _, c := range pod.Status.ContainerStatuses { - p.mu.Lock() - containerID := execcommon.GetContainerID(c.ContainerID) - p.containers[c.Name] = containerID - p.mu.Unlock() - log.Infof("mapped container name %q to container ID %q", c.Name, containerID) - } - return p.haveContainers(allContainerNames) - }) - if err != nil { - return err - } + Short answer: any container that exits in <5s + + Long answer: + + Some containers are short running and we cannot determine their PIDs because they exit too quickly. + This loop allows 5s for `pollRootProcesses` find PIDs, so we define any container that exits <5s as short running + + We assume any container that does not appeared within 5s has completed. + */ + for i := 0; !p.haveContainerPIDs(allContainerNames) && i < 5; i++ { + time.Sleep(1 * time.Second) } for _, containerName := range containerNames { - pid, err := p.getContainerPID(containerName) - if err != nil { - return err + pid := p.getContainerPID(containerName) + if pid == 0 { + log.Infof("no container PID found for %q - assuming it was short running", containerName) + continue } log.Infof("Waiting for %q pid %d to complete", containerName, pid) for { @@ -209,7 +200,7 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st if err := p.secureRootFiles(); err != nil { log.WithError(err).Warn("failed to secure root files") } - if p.haveContainers(containerNames) { + if p.haveContainerPIDs(containerNames) { return } time.Sleep(50 * time.Millisecond) @@ -217,11 +208,11 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st } } -func (p *PNSExecutor) haveContainers(containerNames []string) bool { +func (p *PNSExecutor) haveContainerPIDs(containerNames []string) bool { p.mu.RLock() defer p.mu.RUnlock() for _, n := range containerNames { - if p.ctrIDToPid[p.containers[n]] == 0 { + if p.containerNameToPID[n] == 0 { return false } } @@ -247,16 +238,16 @@ func (p *PNSExecutor) Kill(ctx context.Context, containerNames []string, termina } func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriodDuration time.Duration) error { - pid, err := p.getContainerPID(containerName) - if err != nil { - log.Warnf("Ignoring kill container failure of %q: %v. Process assumed to have completed", containerName, err) + pid := p.getContainerPID(containerName) + if pid == 0 { + log.Warnf("No PID for container named %q. Process assumed to have completed", containerName) return nil } // On Unix systems, FindProcess always succeeds and returns a Process // for the given pid, regardless of whether the process exists. proc, _ := os.FindProcess(pid) log.Infof("Sending SIGTERM to pid %d", pid) - err = proc.Signal(syscall.SIGTERM) + err := proc.Signal(syscall.SIGTERM) if err != nil { log.Warnf("Failed to SIGTERM pid %d: %v", pid, err) } @@ -277,20 +268,12 @@ func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriod return err } -// getContainerPID returns the pid associated with the container id. Returns error if it was unable +// returns the entries associated with the container id. Returns zero if it was unable // to be determined because no running root processes exist with that container ID -func (p *PNSExecutor) getContainerPID(containerName string) (int, error) { +func (p *PNSExecutor) getContainerPID(containerName string) int { p.mu.RLock() defer p.mu.RUnlock() - containerID, ok := p.containers[containerName] - if !ok { - return 0, fmt.Errorf("container ID not found for container name %q", containerName) - } - pid := p.ctrIDToPid[containerID] - if pid == 0 { - return 0, fmt.Errorf("pid not found for container ID %q", containerID) - } - return pid, nil + return p.containerNameToPID[containerName] } func containerNameForPID(pid int) (string, error) { @@ -331,21 +314,14 @@ func (p *PNSExecutor) secureRootFiles() error { } p.pidFileHandles[pid] = fs log.Infof("secured root for pid %d root: %s", pid, proc.Executable()) - - containerID, err := parseContainerID(pid) - if err != nil { - return err - } p.mu.Lock() defer p.mu.Unlock() - p.ctrIDToPid[containerID] = pid - log.Infof("mapped pid %d to container ID %q", pid, containerID) containerName, err := containerNameForPID(pid) if err != nil { return err } - p.containers[containerName] = containerID - log.Infof("mapped container name %q to container ID %q and pid %d", containerName, containerID, pid) + p.containerNameToPID[containerName] = pid + log.Infof("mapped container name %q to pid %d", containerName, pid) return nil }() if err != nil { @@ -354,53 +330,3 @@ func (p *PNSExecutor) secureRootFiles() error { } return nil } - -// parseContainerID parses the containerID of a pid -func parseContainerID(pid int) (string, error) { - cgroupPath := fmt.Sprintf("/proc/%d/cgroup", pid) - cgroupFile, err := os.OpenFile(cgroupPath, os.O_RDONLY, os.ModePerm) - if err != nil { - return "", errors.InternalWrapError(err) - } - defer func() { _ = cgroupFile.Close() }() - sc := bufio.NewScanner(cgroupFile) - for sc.Scan() { - line := sc.Text() - log.Debugf("pid %d: %s", pid, line) - containerID := parseContainerIDFromCgroupLine(line) - if containerID != "" { - return containerID, nil - } - } - return "", errors.InternalErrorf("Failed to parse container ID from %s", cgroupPath) -} - -func parseContainerIDFromCgroupLine(line string) string { - // See https://www.systutorials.com/docs/linux/man/5-proc/ for /proc/XX/cgroup format. e.g.: - // 5:cpuacct,cpu,cpuset:/daemons - parts := strings.Split(line, "/") - if len(parts) > 1 { - if containerID := parts[len(parts)-1]; containerID != "" { - // need to check for empty string because the line may look like: 5:rdma:/ - - // remove possible ".scope" suffix - containerID := strings.TrimSuffix(containerID, ".scope") - - // for compatibility with cri-containerd record format when using systemd cgroup path - // example record in /proc/{pid}/cgroup: - // 9:cpuset:/kubepods-besteffort-pod30556cce_0f92_11eb_b36d_02623cf324c8.slice:cri-containerd:c688c856b21cfb29c1dbf6c14793435e44a1299dfc12add33283239bffed2620 - if strings.Contains(containerID, "cri-containerd") { - strList := strings.Split(containerID, ":") - containerID = strList[len(strList)-1] - } - - // remove possible "*-" prefix - // e.g. crio-7a92a067289f6197148912be1c15f20f0330c7f3c541473d3b9c4043ca137b42.scope - parts := strings.Split(containerID, "-") - containerID = parts[len(parts)-1] - - return containerID - } - } - return "" -} diff --git a/workflow/executor/pns/pns_test.go b/workflow/executor/pns/pns_test.go deleted file mode 100644 index b04e1540c2b1..000000000000 --- a/workflow/executor/pns/pns_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package pns - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestPNSExecutor_parseContainerIDFromCgroupLine(t *testing.T) { - testCases := []struct { - line string - expected string - }{ - { - line: "", - expected: "", - }, - { - line: "5:rdma:/", - expected: "", - }, - { - line: "8:cpu,cpuacct:/kubepods/besteffort/pod2fad8aad-dcd0-4fef-b45a-151630b9a4b5/b844ef90162af07ad3fb2961ffb2f528f8bf7c9edb8c45465fd8651fab8a78c1", - expected: "b844ef90162af07ad3fb2961ffb2f528f8bf7c9edb8c45465fd8651fab8a78c1", - }, - { - line: "2:cpu,cpuacct:/system.slice/containerd.service/kubepods-burstable-podf61fae9b_d7a7_11ea_bb0c_12a065621d2b.slice:cri-containerd:b6b894a028b7ec8e0f98c57a5f7b9735ad792276c3ce52bc795fcd367d829de9", - expected: "b6b894a028b7ec8e0f98c57a5f7b9735ad792276c3ce52bc795fcd367d829de9", - }, - { - line: "8:cpu,cpuacct:/kubepods/besteffort/pod2fad8aad-dcd0-4fef-b45a-151630b9a4b5/crio-7a92a067289f6197148912be1c15f20f0330c7f3c541473d3b9c4043ca137b42.scope", - expected: "7a92a067289f6197148912be1c15f20f0330c7f3c541473d3b9c4043ca137b42", - }, - { - line: "2:cpuacct,cpu:/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod1cd87fe8_8ea0_11ea_8d51_566f300c000a.slice/docker-6b40fc7f75fe3210621a287412ac056e43554b1026a01625b48ba7d136d8a125.scope", - expected: "6b40fc7f75fe3210621a287412ac056e43554b1026a01625b48ba7d136d8a125", - }, - } - - for _, testCase := range testCases { - containerID := parseContainerIDFromCgroupLine(testCase.line) - assert.Equal(t, testCase.expected, containerID) - } -}