Skip to content

Commit

Permalink
fix(executor): Enhance PNS executor. Resolves #5251
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Mar 4, 2021
1 parent 288ab92 commit b0b888f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 159 deletions.
156 changes: 41 additions & 115 deletions workflow/executor/pns/pns.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import (
executil "github.com/argoproj/pkg/exec"
gops "github.com/mitchellh/go-ps"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/util/archive"
"github.com/argoproj/argo-workflows/v3/workflow/common"
execcommon "github.com/argoproj/argo-workflows/v3/workflow/executor/common"
"github.com/argoproj/argo-workflows/v3/workflow/executor/k8sapi"
osspecific "github.com/argoproj/argo-workflows/v3/workflow/executor/os-specific"
)
Expand All @@ -32,12 +30,10 @@ type PNSExecutor struct {
podName string
namespace string

// mu for `containerNameToPID`, and `pidFileHandles`
mu sync.RWMutex

containers map[string]string // container name -> container ID

// ctrIDToPid maps a containerID to a process ID
ctrIDToPid map[string]int
containerNameToPID map[string]int

// pidFileHandles holds file handles to all root containers
pidFileHandles map[int]*os.File
Expand All @@ -56,14 +52,13 @@ func NewPNSExecutor(clientset *kubernetes.Clientset, podName, namespace string)
}
delegate := k8sapi.NewK8sAPIExecutor(clientset, nil, podName, namespace)
return &PNSExecutor{
K8sAPIExecutor: delegate,
podName: podName,
namespace: namespace,
mu: sync.RWMutex{},
containers: make(map[string]string),
ctrIDToPid: make(map[string]int),
pidFileHandles: make(map[int]*os.File),
thisPID: thisPID,
K8sAPIExecutor: delegate,
podName: podName,
namespace: namespace,
mu: sync.RWMutex{},
containerNameToPID: make(map[string]int),
pidFileHandles: make(map[int]*os.File),
thisPID: thisPID,
}, nil
}

Expand All @@ -82,9 +77,9 @@ func (p *PNSExecutor) GetFileContents(containerName string, sourcePath string) (

// enterChroot enters chroot of the main container
func (p *PNSExecutor) enterChroot(containerName string) error {
pid, err := p.getContainerPID(containerName)
if err != nil {
return fmt.Errorf("failed to get container PID: %w", err)
pid := p.getContainerPID(containerName)
if pid == 0 {
return fmt.Errorf("cannot enter chroot for container named %q: no PID known - maybe short running container", containerName)
}
if err := p.pidFileHandles[pid].Chdir(); err != nil {
return errors.InternalWrapErrorf(err, "failed to chdir to main filesystem: %v", err)
Expand Down Expand Up @@ -146,31 +141,27 @@ func (p *PNSExecutor) Wait(ctx context.Context, containerNames, sidecarNames []s
}
p.rootFS = rootFS

if !p.haveContainers(allContainerNames) { // allow some additional time for polling to get this data
time.Sleep(3 * time.Second)
}
/*
What is a "short running container"?:
if !p.haveContainers(containerNames) {
log.Info("container PID still unknown (maybe due to short running main container)")
err := p.K8sAPIExecutor.Until(ctx, func(pod *corev1.Pod) bool {
for _, c := range pod.Status.ContainerStatuses {
p.mu.Lock()
containerID := execcommon.GetContainerID(c.ContainerID)
p.containers[c.Name] = containerID
p.mu.Unlock()
log.Infof("mapped container name %q to container ID %q", c.Name, containerID)
}
return p.haveContainers(allContainerNames)
})
if err != nil {
return err
}
Short answer: any container that exits in <5s
Long answer:
Some containers are short running and we cannot determine their PIDs because they exit too quickly.
This loop allows 5s for `pollRootProcesses` find PIDs, so we define any container that exits <5s as short running
We assume any container that does not appeared within 5s has completed.
*/
for i := 0; !p.haveContainerPIDs(allContainerNames) && i < 5; i++ {
time.Sleep(1 * time.Second)
}

for _, containerName := range containerNames {
pid, err := p.getContainerPID(containerName)
if err != nil {
return err
pid := p.getContainerPID(containerName)
if pid == 0 {
log.Infof("no container PID found for %q - assuming it was short running", containerName)
continue
}
log.Infof("Waiting for %q pid %d to complete", containerName, pid)
for {
Expand Down Expand Up @@ -209,19 +200,19 @@ func (p *PNSExecutor) pollRootProcesses(ctx context.Context, containerNames []st
if err := p.secureRootFiles(); err != nil {
log.WithError(err).Warn("failed to secure root files")
}
if p.haveContainers(containerNames) {
if p.haveContainerPIDs(containerNames) {
return
}
time.Sleep(50 * time.Millisecond)
}
}
}

func (p *PNSExecutor) haveContainers(containerNames []string) bool {
func (p *PNSExecutor) haveContainerPIDs(containerNames []string) bool {
p.mu.RLock()
defer p.mu.RUnlock()
for _, n := range containerNames {
if p.ctrIDToPid[p.containers[n]] == 0 {
if p.containerNameToPID[n] == 0 {
return false
}
}
Expand All @@ -247,16 +238,16 @@ func (p *PNSExecutor) Kill(ctx context.Context, containerNames []string, termina
}

func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriodDuration time.Duration) error {
pid, err := p.getContainerPID(containerName)
if err != nil {
log.Warnf("Ignoring kill container failure of %q: %v. Process assumed to have completed", containerName, err)
pid := p.getContainerPID(containerName)
if pid == 0 {
log.Warnf("No PID for container named %q. Process assumed to have completed", containerName)
return nil
}
// On Unix systems, FindProcess always succeeds and returns a Process
// for the given pid, regardless of whether the process exists.
proc, _ := os.FindProcess(pid)
log.Infof("Sending SIGTERM to pid %d", pid)
err = proc.Signal(syscall.SIGTERM)
err := proc.Signal(syscall.SIGTERM)
if err != nil {
log.Warnf("Failed to SIGTERM pid %d: %v", pid, err)
}
Expand All @@ -277,20 +268,12 @@ func (p *PNSExecutor) killContainer(containerName string, terminationGracePeriod
return err
}

// getContainerPID returns the pid associated with the container id. Returns error if it was unable
// returns the entries associated with the container id. Returns zero if it was unable
// to be determined because no running root processes exist with that container ID
func (p *PNSExecutor) getContainerPID(containerName string) (int, error) {
func (p *PNSExecutor) getContainerPID(containerName string) int {
p.mu.RLock()
defer p.mu.RUnlock()
containerID, ok := p.containers[containerName]
if !ok {
return 0, fmt.Errorf("container ID not found for container name %q", containerName)
}
pid := p.ctrIDToPid[containerID]
if pid == 0 {
return 0, fmt.Errorf("pid not found for container ID %q", containerID)
}
return pid, nil
return p.containerNameToPID[containerName]
}

func containerNameForPID(pid int) (string, error) {
Expand Down Expand Up @@ -331,21 +314,14 @@ func (p *PNSExecutor) secureRootFiles() error {
}
p.pidFileHandles[pid] = fs
log.Infof("secured root for pid %d root: %s", pid, proc.Executable())

containerID, err := parseContainerID(pid)
if err != nil {
return err
}
p.mu.Lock()
defer p.mu.Unlock()
p.ctrIDToPid[containerID] = pid
log.Infof("mapped pid %d to container ID %q", pid, containerID)
containerName, err := containerNameForPID(pid)
if err != nil {
return err
}
p.containers[containerName] = containerID
log.Infof("mapped container name %q to container ID %q and pid %d", containerName, containerID, pid)
p.containerNameToPID[containerName] = pid
log.Infof("mapped container name %q to pid %d", containerName, pid)
return nil
}()
if err != nil {
Expand All @@ -354,53 +330,3 @@ func (p *PNSExecutor) secureRootFiles() error {
}
return nil
}

// parseContainerID parses the containerID of a pid
func parseContainerID(pid int) (string, error) {
cgroupPath := fmt.Sprintf("/proc/%d/cgroup", pid)
cgroupFile, err := os.OpenFile(cgroupPath, os.O_RDONLY, os.ModePerm)
if err != nil {
return "", errors.InternalWrapError(err)
}
defer func() { _ = cgroupFile.Close() }()
sc := bufio.NewScanner(cgroupFile)
for sc.Scan() {
line := sc.Text()
log.Debugf("pid %d: %s", pid, line)
containerID := parseContainerIDFromCgroupLine(line)
if containerID != "" {
return containerID, nil
}
}
return "", errors.InternalErrorf("Failed to parse container ID from %s", cgroupPath)
}

func parseContainerIDFromCgroupLine(line string) string {
// See https://www.systutorials.com/docs/linux/man/5-proc/ for /proc/XX/cgroup format. e.g.:
// 5:cpuacct,cpu,cpuset:/daemons
parts := strings.Split(line, "/")
if len(parts) > 1 {
if containerID := parts[len(parts)-1]; containerID != "" {
// need to check for empty string because the line may look like: 5:rdma:/

// remove possible ".scope" suffix
containerID := strings.TrimSuffix(containerID, ".scope")

// for compatibility with cri-containerd record format when using systemd cgroup path
// example record in /proc/{pid}/cgroup:
// 9:cpuset:/kubepods-besteffort-pod30556cce_0f92_11eb_b36d_02623cf324c8.slice:cri-containerd:c688c856b21cfb29c1dbf6c14793435e44a1299dfc12add33283239bffed2620
if strings.Contains(containerID, "cri-containerd") {
strList := strings.Split(containerID, ":")
containerID = strList[len(strList)-1]
}

// remove possible "*-" prefix
// e.g. crio-7a92a067289f6197148912be1c15f20f0330c7f3c541473d3b9c4043ca137b42.scope
parts := strings.Split(containerID, "-")
containerID = parts[len(parts)-1]

return containerID
}
}
return ""
}
44 changes: 0 additions & 44 deletions workflow/executor/pns/pns_test.go

This file was deleted.

0 comments on commit b0b888f

Please sign in to comment.