Skip to content

Commit

Permalink
fix(executor): Fix emissary bugs (#5187)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Feb 24, 2021
1 parent f5dcd1b commit 23415b2
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 6 deletions.
10 changes: 6 additions & 4 deletions cmd/argoexec/commands/emissary.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/archive"
"github.com/argoproj/argo-workflows/v3/workflow/common"
osspecific "github.com/argoproj/argo-workflows/v3/workflow/executor/os-specific"
"github.com/argoproj/argo-workflows/v3/workflow/util/path"
)

Expand Down Expand Up @@ -58,8 +59,8 @@ func NewEmissaryCommand() *cobra.Command {
defer signal.Reset()
go func() {
for s := range signals {
if s != syscall.SIGCHLD {
_ = syscall.Kill(-os.Getpid(), s.(syscall.Signal))
if !osspecific.IsSIGCHLD(s) {
_ = osspecific.Kill(-os.Getpid(), s.(syscall.Signal))
}
}
}()
Expand All @@ -80,7 +81,8 @@ func NewEmissaryCommand() *cobra.Command {

command := exec.Command(name, args...)
command.Env = os.Environ()
command.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
command.SysProcAttr = &syscall.SysProcAttr{}
osspecific.Setpgid(command.SysProcAttr)
command.Stdout = os.Stdout
command.Stderr = os.Stderr

Expand Down Expand Up @@ -112,7 +114,7 @@ func NewEmissaryCommand() *cobra.Command {
_ = os.Remove(varRunArgo + "/ctr/" + containerName + "/signal")
s, _ := strconv.Atoi(string(data))
if s > 0 {
_ = syscall.Kill(command.Process.Pid, syscall.Signal(s))
_ = osspecific.Kill(command.Process.Pid, syscall.Signal(s))
}
time.Sleep(2 * time.Second)
}
Expand Down
10 changes: 9 additions & 1 deletion workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,15 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
ctx, cancel := context.WithCancel(context.Background())
kube := fake.NewSimpleClientset()
wfc := &WorkflowController{
Config: config.Config{ExecutorImage: "executor:latest"},
Config: config.Config{
ExecutorImage: "executor:latest",
Images: map[string]config.Image{
"my-image": {
Command: []string{"my-cmd"},
Args: []string{"my-args"},
},
},
},
artifactRepositories: armocks.DummyArtifactRepositories(&config.ArtifactRepository{
S3: &config.S3ArtifactRepository{
S3Bucket: wfv1.S3Bucket{Endpoint: "my-endpoint", Bucket: "my-bucket"},
Expand Down
4 changes: 3 additions & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
if len(c.Command) == 0 {
x := woc.getImage(c.Image)
c.Command = x.Command
c.Args = x.Args
if c.Args == nil { // check nil rather than length, as zero-length is valid args
c.Args = x.Args
}
}
if len(c.Command) == 0 {
return nil, fmt.Errorf("when using the emissary executor you must either explicitly specify the command, or list the image's command in the index: https://argoproj.github.io/argo-workflows/workflow-executors/#emissary")
Expand Down
34 changes: 34 additions & 0 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,40 @@ func TestConditionalArchiveLocation(t *testing.T) {
assert.Nil(t, tmpl.ArchiveLocation)
}

func Test_createWorkflowPod_emissary(t *testing.T) {
t.Run("NoCommand", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
_, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{}, &wfv1.Template{}, &createWorkflowPodOpts{})
assert.Error(t, err)
})
t.Run("CommandNoArgs", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
pod, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{Command: []string{"foo"}}, &wfv1.Template{}, &createWorkflowPodOpts{})
assert.NoError(t, err)
assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", "--", "foo"}, pod.Spec.Containers[1].Command)
})
t.Run("NoCommandWithImageIndex", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
pod, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{Image: "my-image"}, &wfv1.Template{}, &createWorkflowPodOpts{})
if assert.NoError(t, err) {
assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", "--", "my-cmd"}, pod.Spec.Containers[1].Command)
assert.Equal(t, []string{"my-args"}, pod.Spec.Containers[1].Args)
}
})
t.Run("NoCommandWithArgsWithImageIndex", func(t *testing.T) {
woc := newWoc()
woc.controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorEmissary
pod, err := woc.createWorkflowPod(context.Background(), "", apiv1.Container{Image: "my-image", Args: []string{"foo"}}, &wfv1.Template{}, &createWorkflowPodOpts{})
if assert.NoError(t, err) {
assert.Equal(t, []string{"/var/run/argo/argoexec", "emissary", "--", "my-cmd"}, pod.Spec.Containers[1].Command)
assert.Equal(t, []string{"foo"}, pod.Spec.Containers[1].Args)
}
})
}

// TestVolumeAndVolumeMounts verifies the ability to carry forward volumes and volumeMounts from workflow.spec
func TestVolumeAndVolumeMounts(t *testing.T) {
volumes := []apiv1.Volume{
Expand Down
10 changes: 10 additions & 0 deletions workflow/executor/os-specific/signal_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ import (
func GetOsSignal() os.Signal {
return syscall.SIGUSR2
}

func IsSIGCHLD(s os.Signal) bool { return s == syscall.SIGCHLD }

func Kill(pid int, s syscall.Signal) error {
return syscall.Kill(pid, s)
}

func Setpgid(a *syscall.SysProcAttr) {
a.Setpgid = true
}
10 changes: 10 additions & 0 deletions workflow/executor/os-specific/signal_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ import (
func GetOsSignal() os.Signal {
return syscall.SIGUSR2
}

func IsSIGCHLD(s os.Signal) bool { return s == syscall.SIGCHLD }

func Kill(pid int, s syscall.Signal) error {
return syscall.Kill(pid, s)
}

func Setpgid(a *syscall.SysProcAttr) {
a.Setpgid = true
}
19 changes: 19 additions & 0 deletions workflow/executor/os-specific/signal_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,22 @@ import (
func GetOsSignal() os.Signal {
return syscall.SIGINT
}

func IsSIGCHLD(s os.Signal) bool {
return false // this does not exist on windows
}

func Kill(pid int, s syscall.Signal) error {
if pid < 0 {
pid = -pid // // we cannot kill a negative process on windows
}
p, err := os.FindProcess(pid)
if err != nil {
return err
}
return p.Signal(s)
}

func Setpgid(a *syscall.SysProcAttr) {
// this does not exist on windows
}

0 comments on commit 23415b2

Please sign in to comment.