Skip to content

Commit

Permalink
feat(executor): Move exit code capture to controller. See #5251
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Mar 4, 2021
1 parent 0a70ab6 commit cc51ed1
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 24 deletions.
131 changes: 107 additions & 24 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (
executil "github.com/argoproj/pkg/exec"
gops "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/util/archive"
"github.com/argoproj/argo-workflows/v3/workflow/common"
execcommon "github.com/argoproj/argo-workflows/v3/workflow/executor/common"
"github.com/argoproj/argo-workflows/v3/workflow/executor/k8sapi"
osspecific "github.com/argoproj/argo-workflows/v3/workflow/executor/os-specific"
)
Expand All @@ -30,10 +32,12 @@ type PNSExecutor struct {
podName string
namespace string

// mu for `containers`, and `pidFileHandles`
mu sync.RWMutex

containers map[string]int // container name -> pid
containers map[string]string // container name -> container ID

// ctrIDToPid maps a containerID to a process ID
ctrIDToPid map[string]int

// pidFileHandles holds file handles to all root containers
pidFileHandles map[int]*os.File
Expand All @@ -56,7 +60,8 @@ func NewPNSExecutor(clientset *kubernetes.Clientset, podName, namespace string)
podName: podName,
namespace: namespace,
mu: sync.RWMutex{},
containers: make(map[string]int),
containers: make(map[string]string),
ctrIDToPid: make(map[string]int),
pidFileHandles: make(map[int]*os.File),
thisPID: thisPID,
}, nil
Expand All @@ -77,9 +82,9 @@ func (p *PNSExecutor) GetFileContents(containerName string, sourcePath string) (

// enterChroot enters chroot of the main container
func (p *PNSExecutor) enterChroot(containerName string) error {
pid := p.getContainer(containerName)
if pid == 0 {
return fmt.Errorf("cannot enter chroot for container named %q: no PID known - maybe short running container", containerName)
pid, err := p.getContainerPID(containerName)
if err != nil {
return fmt.Errorf("failed to get container PID: %w", err)
}
if err := p.pidFileHandles[pid].Chdir(); err != nil {
return errors.InternalWrapErrorf(err, "failed to chdir to main filesystem: %v", err)
Expand Down Expand Up @@ -141,18 +146,31 @@ func (p *PNSExecutor) Wait(ctx context.Context, containerNames, sidecarNames []s
}
p.rootFS = rootFS

for i := 0; !p.haveContainers(allContainerNames) && i < 5; i++ { // allow some additional time for polling to get this data
time.Sleep(1 * time.Second)
if !p.haveContainers(allContainerNames) { // allow some additional time for polling to get this data
time.Sleep(3 * time.Second)
}

// we may have never gotten the PIDs at this point because the containers were short running
// so we now assume that any containers we do not have PIDs for have already completed
if !p.haveContainers(containerNames) {
log.Info("container PID still unknown (maybe due to short running main container)")
err := p.K8sAPIExecutor.Until(ctx, func(pod *corev1.Pod) bool {
for _, c := range pod.Status.ContainerStatuses {
p.mu.Lock()
containerID := execcommon.GetContainerID(c.ContainerID)
p.containers[c.Name] = containerID
p.mu.Unlock()
log.Infof("mapped container name %q to container ID %q", c.Name, containerID)
}
return p.haveContainers(allContainerNames)
})
if err != nil {
return err
}
}

for _, containerName := range containerNames {
pid := p.getContainer(containerName)
if pid == 0 {
log.Infof("no container PID found for %q - assuming it was short running", containerName)
continue
pid, err := p.getContainerPID(containerName)
if err != nil {
return err
}
log.Infof("Waiting for %q pid %d to complete", containerName, pid)
for {
Expand Down Expand Up @@ -203,7 +221,7 @@ func (p *PNSExecutor) haveContainers(containerNames []string) bool {
p.mu.RLock()
defer p.mu.RUnlock()
for _, n := range containerNames {
if p.containers[n] == 0 {
if p.ctrIDToPid[p.containers[n]] == 0 {
return false
}
}
Expand All @@ -229,16 +247,16 @@ func (p *PNSExecutor) Kill(ctx context.Context, containerNames []string, termina
}

func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriodDuration time.Duration) error {
pid := p.getContainer(containerName)
if pid == 0 {
log.Warnf("No PID for container named %q. Process assumed to have completed", containerName)
pid, err := p.getContainerPID(containerName)
if err != nil {
log.Warnf("Ignoring kill container failure of %q: %v. Process assumed to have completed", containerName, err)
return nil
}
// On Unix systems, FindProcess always succeeds and returns a Process
// for the given pid, regardless of whether the process exists.
proc, _ := os.FindProcess(pid)
log.Infof("Sending SIGTERM to pid %d", pid)
err := proc.Signal(syscall.SIGTERM)
err = proc.Signal(syscall.SIGTERM)
if err != nil {
log.Warnf("Failed to SIGTERM pid %d: %v", pid, err)
}
Expand All @@ -259,12 +277,20 @@ func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriod
return err
}

// returns the entries associated with the container id. Returns error if it was unable
// getContainerPID returns the pid associated with the container id. Returns error if it was unable
// to be determined because no running root processes exist with that container ID
func (p *PNSExecutor) getContainer(containerName string) int {
func (p *PNSExecutor) getContainerPID(containerName string) (int, error) {
p.mu.RLock()
defer p.mu.RUnlock()
return p.containers[containerName]
containerID, ok := p.containers[containerName]
if !ok {
return 0, fmt.Errorf("container ID not found for container name %q", containerName)
}
pid := p.ctrIDToPid[containerID]
if pid == 0 {
return 0, fmt.Errorf("pid not found for container ID %q", containerID)
}
return pid, nil
}

func containerNameForPID(pid int) (string, error) {
Expand Down Expand Up @@ -305,14 +331,21 @@ func (p *PNSExecutor) secureRootFiles() error {
}
p.pidFileHandles[pid] = fs
log.Infof("secured root for pid %d root: %s", pid, proc.Executable())

containerID, err := parseContainerID(pid)
if err != nil {
return err
}
p.mu.Lock()
defer p.mu.Unlock()
p.ctrIDToPid[containerID] = pid
log.Infof("mapped pid %d to container ID %q", pid, containerID)
containerName, err := containerNameForPID(pid)
if err != nil {
return err
}
p.containers[containerName] = pid
log.Infof("mapped container name %q to pid %d", containerName, pid)
p.containers[containerName] = containerID
log.Infof("mapped container name %q to container ID %q and pid %d", containerName, containerID, pid)
return nil
}()
if err != nil {
Expand All @@ -321,3 +354,53 @@ func (p *PNSExecutor) secureRootFiles() error {
}
return nil
}

// parseContainerID parses the containerID of a pid
func parseContainerID(pid int) (string, error) {
cgroupPath := fmt.Sprintf("/proc/%d/cgroup", pid)
cgroupFile, err := os.OpenFile(cgroupPath, os.O_RDONLY, os.ModePerm)
if err != nil {
return "", errors.InternalWrapError(err)
}
defer func() { _ = cgroupFile.Close() }()
sc := bufio.NewScanner(cgroupFile)
for sc.Scan() {
line := sc.Text()
log.Debugf("pid %d: %s", pid, line)
containerID := parseContainerIDFromCgroupLine(line)
if containerID != "" {
return containerID, nil
}
}
return "", errors.InternalErrorf("Failed to parse container ID from %s", cgroupPath)
}

func parseContainerIDFromCgroupLine(line string) string {
// See https://www.systutorials.com/docs/linux/man/5-proc/ for /proc/XX/cgroup format. e.g.:
// 5:cpuacct,cpu,cpuset:/daemons
parts := strings.Split(line, "/")
if len(parts) > 1 {
if containerID := parts[len(parts)-1]; containerID != "" {
// need to check for empty string because the line may look like: 5:rdma:/

// remove possible ".scope" suffix
containerID := strings.TrimSuffix(containerID, ".scope")

// for compatibility with cri-containerd record format when using systemd cgroup path
// example record in /proc/{pid}/cgroup:
// 9:cpuset:/kubepods-besteffort-pod30556cce_0f92_11eb_b36d_02623cf324c8.slice:cri-containerd:c688c856b21cfb29c1dbf6c14793435e44a1299dfc12add33283239bffed2620
if strings.Contains(containerID, "cri-containerd") {
strList := strings.Split(containerID, ":")
containerID = strList[len(strList)-1]
}

// remove possible "*-" prefix
// e.g. crio-7a92a067289f6197148912be1c15f20f0330c7f3c541473d3b9c4043ca137b42.scope
parts := strings.Split(containerID, "-")
containerID = parts[len(parts)-1]

return containerID
}
}
return ""
}
44 changes: 44 additions & 0 deletions workflow/executor/pns/pns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package pns

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPNSExecutor_parseContainerIDFromCgroupLine(t *testing.T) {
testCases := []struct {
line string
expected string
}{
{
line: "",
expected: "",
},
{
line: "5:rdma:/",
expected: "",
},
{
line: "8:cpu,cpuacct:/kubepods/besteffort/pod2fad8aad-dcd0-4fef-b45a-151630b9a4b5/b844ef90162af07ad3fb2961ffb2f528f8bf7c9edb8c45465fd8651fab8a78c1",
expected: "b844ef90162af07ad3fb2961ffb2f528f8bf7c9edb8c45465fd8651fab8a78c1",
},
{
line: "2:cpu,cpuacct:/system.slice/containerd.service/kubepods-burstable-podf61fae9b_d7a7_11ea_bb0c_12a065621d2b.slice:cri-containerd:b6b894a028b7ec8e0f98c57a5f7b9735ad792276c3ce52bc795fcd367d829de9",
expected: "b6b894a028b7ec8e0f98c57a5f7b9735ad792276c3ce52bc795fcd367d829de9",
},
{
line: "8:cpu,cpuacct:/kubepods/besteffort/pod2fad8aad-dcd0-4fef-b45a-151630b9a4b5/crio-7a92a067289f6197148912be1c15f20f0330c7f3c541473d3b9c4043ca137b42.scope",
expected: "7a92a067289f6197148912be1c15f20f0330c7f3c541473d3b9c4043ca137b42",
},
{
line: "2:cpuacct,cpu:/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod1cd87fe8_8ea0_11ea_8d51_566f300c000a.slice/docker-6b40fc7f75fe3210621a287412ac056e43554b1026a01625b48ba7d136d8a125.scope",
expected: "6b40fc7f75fe3210621a287412ac056e43554b1026a01625b48ba7d136d8a125",
},
}

for _, testCase := range testCases {
containerID := parseContainerIDFromCgroupLine(testCase.line)
assert.Equal(t, testCase.expected, containerID)
}
}

0 comments on commit cc51ed1

Please sign in to comment.