From 1b8b7543d2883d0f820901bbff8ebd5705d80fb0 Mon Sep 17 00:00:00 2001 From: Danny Randall <10566468+dannyrandall@users.noreply.github.com> Date: Wed, 8 Nov 2023 12:36:43 -0800 Subject: [PATCH 1/6] remove iptables-save --- .../pkg/docker/orchestrator/orchestrator.go | 5 ---- .../docker/orchestrator/orchestrator_test.go | 29 ------------------- 2 files changed, 34 deletions(-) diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index 155bc57cce6..de88f211cc8 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -283,11 +283,6 @@ func (o *Orchestrator) setupProxyConnections(ctx context.Context, pauseContainer return fmt.Errorf("modify iptables: %w", err) } - err = o.docker.Exec(ctx, pauseContainer, io.Discard, "iptables-save") - if err != nil { - return fmt.Errorf("save iptables: %w", err) - } - err = o.docker.Exec(ctx, pauseContainer, io.Discard, "/bin/bash", "-c", fmt.Sprintf(`echo %s %s >> /etc/hosts`, ip.String(), host.Name)) if err != nil { diff --git a/internal/pkg/docker/orchestrator/orchestrator_test.go b/internal/pkg/docker/orchestrator/orchestrator_test.go index 6f3a82f717e..4e117a8801a 100644 --- a/internal/pkg/docker/orchestrator/orchestrator_test.go +++ b/internal/pkg/docker/orchestrator/orchestrator_test.go @@ -264,35 +264,6 @@ func TestOrchestrator(t *testing.T) { stopAfterNErrs: 1, errs: []string{`setup proxy connections: modify iptables: some error`}, }, - "proxy setup, ip tables save error": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { - de := &dockerenginetest.Double{ - IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { - return true, nil - }, - ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error { - if cmd == "aws" { - fmt.Fprintf(w, "Port 61972 opened for sessionId mySessionId\n") - } else if cmd == "iptables-save" { - return errors.New("some error") - } - return nil - }, - } - return func(t *testing.T, o *Orchestrator) { - _, ipNet, err := net.ParseCIDR("172.20.0.0/16") - require.NoError(t, err) - - o.RunTask(Task{}, RunTaskWithProxy("ecs:cluster_task_ctr", *ipNet, Host{ - Name: "remote-foo", - Port: "80", - })) - }, de - }, - stopAfterNErrs: 1, - errs: []string{`setup proxy connections: save iptables: some error`}, - }, "proxy setup, /etc/hosts error": { logOptions: noLogs, test: func(t *testing.T) (test, DockerEngine) { From f1614f7b37b528363a1b7502daad7728d67b5b1d Mon Sep 17 00:00:00 2001 From: Danny Randall <10566468+dannyrandall@users.noreply.github.com> Date: Wed, 8 Nov 2023 12:54:51 -0800 Subject: [PATCH 2/6] support arm arch session manager plugin --- internal/pkg/docker/orchestrator/Pause-Dockerfile | 5 ++++- internal/pkg/docker/orchestrator/orchestrator.go | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/internal/pkg/docker/orchestrator/Pause-Dockerfile b/internal/pkg/docker/orchestrator/Pause-Dockerfile index 0d64cd66ae2..83c31c9ed32 100644 --- a/internal/pkg/docker/orchestrator/Pause-Dockerfile +++ b/internal/pkg/docker/orchestrator/Pause-Dockerfile @@ -1,4 +1,7 @@ FROM public.ecr.aws/amazonlinux/amazonlinux:2023 RUN dnf install -y aws-cli iptables -RUN dnf install -y https://s3.amazonaws.com/session-manager-downloads/plugin/latest/linux_64bit/session-manager-plugin.rpm \ No newline at end of file + +ARG ARCH=64bit +# url from https://docs.aws.amazon.com/systems-manager/latest/userguide/install-plugin-linux.html +RUN dnf install -y https://s3.amazonaws.com/session-manager-downloads/plugin/latest/linux_${ARCH}/session-manager-plugin.rpm \ No newline at end of file diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index de88f211cc8..3bc708583cb 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -12,7 +12,9 @@ import ( "maps" "net" "os" + "runtime" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -333,10 +335,18 @@ func ipv4Increment(ip net.IP, network *net.IPNet) (net.IP, error) { } func (o *Orchestrator) buildPauseContainer(ctx context.Context) error { + arch := "64bit" + if strings.Contains(runtime.GOARCH, "arm") { + arch = "arm64" + } + return o.docker.Build(ctx, &dockerengine.BuildArguments{ URI: pauseCtrURI, Tags: []string{pauseCtrTag}, DockerfileContent: pauseDockerfile, + Args: map[string]string{ + "ARCH": arch, + }, }, os.Stderr) } From be24765e6eb31e56758968629c8624c2ef5be5a6 Mon Sep 17 00:00:00 2001 From: Danny Randall <10566468+dannyrandall@users.noreply.github.com> Date: Wed, 8 Nov 2023 13:29:38 -0800 Subject: [PATCH 3/6] print stopping messages --- internal/pkg/docker/orchestrator/orchestrator.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index 3bc708583cb..389ded5deb9 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -356,6 +356,7 @@ func (o *Orchestrator) buildPauseContainer(ctx context.Context) error { func (o *Orchestrator) Stop() { o.stopOnce.Do(func() { close(o.stopped) + fmt.Printf("\nStopping task...\n") o.actions <- &stopAction{} }) } @@ -373,9 +374,11 @@ func (a *stopAction) Do(o *Orchestrator) error { } // stop pause container + fmt.Printf("Stopping %q\n", "pause") if err := o.docker.Stop(context.Background(), o.containerID("pause")); err != nil { errs = append(errs, fmt.Errorf("stop %q: %w", "pause", err)) } + fmt.Printf("Stopped %q\n", "pause") return errors.Join(errs...) } @@ -391,10 +394,12 @@ func (o *Orchestrator) stopTask(ctx context.Context, task Task) error { for name := range task.Containers { name := name go func() { + fmt.Printf("Stopping %q\n", name) if err := o.docker.Stop(ctx, o.containerID(name)); err != nil { errCh <- fmt.Errorf("stop %q: %w", name, err) return } + fmt.Printf("Stopped %q\n", name) errCh <- nil }() } From 5226a809fcdfe62865668e41c03e083832e07fc3 Mon Sep 17 00:00:00 2001 From: Danny Randall <10566468+dannyrandall@users.noreply.github.com> Date: Wed, 8 Nov 2023 15:30:34 -0800 Subject: [PATCH 4/6] report error if a container stops unexpectedly --- .../pkg/docker/orchestrator/orchestrator.go | 22 ++- .../docker/orchestrator/orchestrator_test.go | 183 ++++++++++++++---- 2 files changed, 157 insertions(+), 48 deletions(-) diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index 389ded5deb9..ba9acfb9a52 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -496,18 +496,22 @@ func (o *Orchestrator) run(taskID int32, opts dockerengine.RunOptions) { o.wg.Add(1) go func() { defer o.wg.Done() + err := o.docker.Run(context.Background(), &opts) - if err := o.docker.Run(context.Background(), &opts); err != nil { - curTaskID := o.curTaskID.Load() - if curTaskID == orchestratorStoppedTaskID { - return - } + // if the orchestrator has already stopped, + // we don't want to report the error + curTaskID := o.curTaskID.Load() + if curTaskID == orchestratorStoppedTaskID { + return + } - // the error is from the pause container - // or from the currently running task - if taskID == pauseCtrTaskID || taskID == curTaskID { - o.runErrs <- fmt.Errorf("run %q: %w", opts.ContainerName, err) + // the error is from the pause container + // or from the currently running task + if taskID == pauseCtrTaskID || taskID == curTaskID { + if err == nil { + err = errors.New("container stopped unexpectedly") } + o.runErrs <- fmt.Errorf("run %q: %w", opts.ContainerName, err) } }() } diff --git a/internal/pkg/docker/orchestrator/orchestrator_test.go b/internal/pkg/docker/orchestrator/orchestrator_test.go index 4e117a8801a..521518a09da 100644 --- a/internal/pkg/docker/orchestrator/orchestrator_test.go +++ b/internal/pkg/docker/orchestrator/orchestrator_test.go @@ -37,19 +37,20 @@ func TestOrchestrator(t *testing.T) { type test func(*testing.T, *Orchestrator) tests := map[string]struct { - logOptions logOptionsFunc - test func(t *testing.T) (test, DockerEngine) - stopAfterNErrs int + logOptions logOptionsFunc + test func(t *testing.T) (test, *dockerenginetest.Double) + runUntilStopped bool - errs []string + stopAfterNErrs int + errs []string }{ "stop and start": { - test: func(t *testing.T) (test, DockerEngine) { + test: func(t *testing.T) (test, *dockerenginetest.Double) { return func(t *testing.T, o *Orchestrator) {}, &dockerenginetest.Double{} }, }, "error if fail to build pause container": { - test: func(t *testing.T) (test, DockerEngine) { + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ BuildFn: func(ctx context.Context, ba *dockerengine.BuildArguments, w io.Writer) error { return errors.New("some error") @@ -64,7 +65,8 @@ func TestOrchestrator(t *testing.T) { }, }, "error if unable to check if pause container is running": { - test: func(t *testing.T) (test, DockerEngine) { + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return false, errors.New("some error") @@ -79,8 +81,9 @@ func TestOrchestrator(t *testing.T) { }, }, "error stopping task": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -109,8 +112,9 @@ func TestOrchestrator(t *testing.T) { }, }, "error restarting new task due to pause changes": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -142,8 +146,9 @@ func TestOrchestrator(t *testing.T) { }, }, "success with a task": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -161,6 +166,9 @@ func TestOrchestrator(t *testing.T) { } return nil }, + StopFn: func(ctx context.Context, name string) error { + return nil + }, } return func(t *testing.T, o *Orchestrator) { o.RunTask(Task{ @@ -182,11 +190,81 @@ func TestOrchestrator(t *testing.T) { }) }, de }, - errs: []string{}, }, - "proxy setup, connection returns error": { + "container run stops early with error": { + logOptions: noLogs, + test: func(t *testing.T) (test, *dockerenginetest.Double) { + stopPause := make(chan struct{}) + de := &dockerenginetest.Double{ + IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { + return true, nil + }, + RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error { + if opts.ContainerName == "prefix-foo" { + return errors.New("some error") + } else { + // block pause container until Stop(pause) + <-stopPause + } + return nil + }, + StopFn: func(ctx context.Context, s string) error { + if s == "prefix-pause" { + stopPause <- struct{}{} + } + return nil + }, + } + return func(t *testing.T, o *Orchestrator) { + o.RunTask(Task{ + Containers: map[string]ContainerDefinition{ + "foo": {}, + }, + }) + }, de + }, + stopAfterNErrs: 1, + errs: []string{`run "prefix-foo": some error`}, + }, + "container run stops early with nil error": { logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + test: func(t *testing.T) (test, *dockerenginetest.Double) { + stopPause := make(chan struct{}) + de := &dockerenginetest.Double{ + IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { + return true, nil + }, + RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error { + if opts.ContainerName == "prefix-foo" { + return nil + } else { + // block pause container until Stop(pause) + <-stopPause + } + return nil + }, + StopFn: func(ctx context.Context, s string) error { + if s == "prefix-pause" { + stopPause <- struct{}{} + } + return nil + }, + } + return func(t *testing.T, o *Orchestrator) { + o.RunTask(Task{ + Containers: map[string]ContainerDefinition{ + "foo": {}, + }, + }) + }, de + }, + stopAfterNErrs: 1, + errs: []string{`run "prefix-foo": container stopped unexpectedly`}, + }, + "proxy setup, connection returns error": { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -212,8 +290,9 @@ func TestOrchestrator(t *testing.T) { errs: []string{`proxy to remote-foo:80: some error`}, }, "proxy setup, ip increment error": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -232,12 +311,12 @@ func TestOrchestrator(t *testing.T) { o.RunTask(Task{}, RunTaskWithProxy("ecs:cluster_task_ctr", *ipNet, generateHosts(3)...)) }, de }, - stopAfterNErrs: 1, - errs: []string{`setup proxy connections: increment ip: max ipv4 address`}, + errs: []string{`setup proxy connections: increment ip: max ipv4 address`}, }, "proxy setup, ip tables error": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -261,12 +340,12 @@ func TestOrchestrator(t *testing.T) { })) }, de }, - stopAfterNErrs: 1, - errs: []string{`setup proxy connections: modify iptables: some error`}, + errs: []string{`setup proxy connections: modify iptables: some error`}, }, "proxy setup, /etc/hosts error": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -290,13 +369,12 @@ func TestOrchestrator(t *testing.T) { })) }, de }, - stopAfterNErrs: 1, - errs: []string{`setup proxy connections: update /etc/hosts: some error`}, + errs: []string{`setup proxy connections: update /etc/hosts: some error`}, }, "proxy success": { - logOptions: noLogs, - test: func(t *testing.T) (test, DockerEngine) { - waitUntilRun := make(chan struct{}) + logOptions: noLogs, + runUntilStopped: true, + test: func(t *testing.T) (test, *dockerenginetest.Double) { de := &dockerenginetest.Double{ IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) { return true, nil @@ -307,12 +385,6 @@ func TestOrchestrator(t *testing.T) { } return nil }, - RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error { - if opts.ContainerName == "prefix-foo" { - close(waitUntilRun) - } - return nil - }, } return func(t *testing.T, o *Orchestrator) { _, ipNet, err := net.ParseCIDR("172.20.0.0/16") @@ -326,8 +398,6 @@ func TestOrchestrator(t *testing.T) { Name: "remote-foo", Port: "80", })) - - <-waitUntilRun }, de }, }, @@ -336,6 +406,41 @@ func TestOrchestrator(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { test, dockerEngine := tc.test(t) + + if tc.runUntilStopped { + // make Run(x) not return until until Stop(x) is called + stopChs := make(map[string]chan struct{}) + mu := &sync.Mutex{} + getCh := func(name string) chan struct{} { + mu.Lock() + defer mu.Unlock() + + _, ok := stopChs[name] + if !ok { + stopChs[name] = make(chan struct{}) + } + return stopChs[name] + } + + ogRun := dockerEngine.RunFn + dockerEngine.RunFn = func(ctx context.Context, ro *dockerengine.RunOptions) error { + <-getCh(ro.ContainerName) + if ogRun != nil { + return ogRun(ctx, ro) + } + return nil + } + + ogStop := dockerEngine.StopFn + dockerEngine.StopFn = func(ctx context.Context, name string) error { + getCh(name) <- struct{}{} + if ogStop != nil { + return ogStop(ctx, name) + } + return nil + } + } + o := New(dockerEngine, "prefix-", tc.logOptions) wg := &sync.WaitGroup{} From 085b461cdd9858f7b047c310817f6e6fa35643ad Mon Sep 17 00:00:00 2001 From: Danny Randall <10566468+dannyrandall@users.noreply.github.com> Date: Wed, 8 Nov 2023 16:03:39 -0800 Subject: [PATCH 5/6] make pause container get sigint signal --- internal/pkg/docker/dockerengine/dockerengine.go | 5 +++++ internal/pkg/docker/orchestrator/orchestrator.go | 1 + 2 files changed, 6 insertions(+) diff --git a/internal/pkg/docker/dockerengine/dockerengine.go b/internal/pkg/docker/dockerengine/dockerengine.go index 62d4805316c..537679b2bc7 100644 --- a/internal/pkg/docker/dockerengine/dockerengine.go +++ b/internal/pkg/docker/dockerengine/dockerengine.go @@ -87,6 +87,7 @@ type RunOptions struct { ContainerNetwork string // Optional. Network mode for the container. LogOptions RunLogOptions // Optional. Configure logging for output from the container AddLinuxCapabilities []string // Optional. Adds linux capabilities to the container. + Init bool } // RunLogOptions holds the logging configuration for Run(). @@ -274,6 +275,10 @@ func (in *RunOptions) generateRunArguments() []string { args = append(args, "--cap-add", cap) } + if in.Init { + args = append(args, "--init") + } + args = append(args, in.ImageURI) if in.Command != nil && len(in.Command) > 0 { diff --git a/internal/pkg/docker/orchestrator/orchestrator.go b/internal/pkg/docker/orchestrator/orchestrator.go index ba9acfb9a52..d1a948a17ca 100644 --- a/internal/pkg/docker/orchestrator/orchestrator.go +++ b/internal/pkg/docker/orchestrator/orchestrator.go @@ -466,6 +466,7 @@ func (o *Orchestrator) pauseRunOptions(t Task) dockerengine.RunOptions { ContainerPorts: make(map[string]string), Secrets: t.PauseSecrets, AddLinuxCapabilities: []string{"NET_ADMIN"}, + Init: true, } for _, ctr := range t.Containers { From 36e72cfdf6ec9c90963df2deba400adbfdb0a229 Mon Sep 17 00:00:00 2001 From: Danny Randall <10566468+dannyrandall@users.noreply.github.com> Date: Wed, 8 Nov 2023 16:21:54 -0800 Subject: [PATCH 6/6] comment --- internal/pkg/docker/dockerengine/dockerengine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/pkg/docker/dockerengine/dockerengine.go b/internal/pkg/docker/dockerengine/dockerengine.go index 537679b2bc7..4e5f0268e91 100644 --- a/internal/pkg/docker/dockerengine/dockerengine.go +++ b/internal/pkg/docker/dockerengine/dockerengine.go @@ -87,7 +87,7 @@ type RunOptions struct { ContainerNetwork string // Optional. Network mode for the container. LogOptions RunLogOptions // Optional. Configure logging for output from the container AddLinuxCapabilities []string // Optional. Adds linux capabilities to the container. - Init bool + Init bool // Optional. Adds an init process as an entrypoint. } // RunLogOptions holds the logging configuration for Run().