Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions internal/pkg/docker/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ func (a *runTaskAction) Do(o *Orchestrator) error {
if err := o.stopTask(ctx, o.curTask); err != nil {
return fmt.Errorf("stop existing task: %w", err)
}

// ensure that containers are fully stopped after o.stopTask finishes blocking
// TODO(Aiden): Implement a container ID system or use `docker ps` to ensure containers are stopped
time.Sleep(1 * time.Second)
}

for name, ctr := range a.task.Containers {
Expand Down Expand Up @@ -399,8 +395,29 @@ func (o *Orchestrator) stopTask(ctx context.Context, task Task) error {
errCh <- fmt.Errorf("stop %q: %w", name, err)
return
}
fmt.Printf("Stopped %q\n", name)
errCh <- nil

// ensure that container is fully stopped before stopTask finishes blocking
for {
running, err := o.docker.IsContainerRunning(ctx, o.containerID(name))
if err != nil {
errCh <- fmt.Errorf("polling container %q for removal: %w", name, err)
return
}

if running {
select {
case <-time.After(1 * time.Second):
continue
case <-ctx.Done():
errCh <- fmt.Errorf("check container %q stopped: %w", name, ctx.Err())
return
}
}

fmt.Printf("Stopped %q\n", name)
errCh <- nil
return
}
}()
}

Expand Down
59 changes: 53 additions & 6 deletions internal/pkg/docker/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func TestOrchestrator(t *testing.T) {
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
StopFn: func(ctx context.Context, name string) error {
if name == "prefix-success" {
Expand All @@ -111,13 +114,45 @@ func TestOrchestrator(t *testing.T) {
`stop "bar": some error`,
},
},
"error polling tasks removed": {
logOptions: noLogs,
runUntilStopped: true,
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
if name == "prefix-pause" {
return true, nil
}
return false, errors.New("some error")
},
StopFn: func(ctx context.Context, name string) error {
return nil
},
}
return func(t *testing.T, o *Orchestrator) {
o.RunTask(Task{
Containers: map[string]ContainerDefinition{
"foo": {},
"bar": {},
},
})
}, de
},
errs: []string{
`polling container "foo" for removal: some error`,
`polling container "bar" for removal: some error`,
},
},
"error restarting new task due to pause changes": {
logOptions: noLogs,
runUntilStopped: true,
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
}
return func(t *testing.T, o *Orchestrator) {
Expand Down Expand Up @@ -151,7 +186,10 @@ func TestOrchestrator(t *testing.T) {
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error {
// validate pause container has correct ports and secrets
Expand Down Expand Up @@ -197,7 +235,10 @@ func TestOrchestrator(t *testing.T) {
stopPause := make(chan struct{})
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error {
if opts.ContainerName == "prefix-foo" {
Expand Down Expand Up @@ -232,7 +273,10 @@ func TestOrchestrator(t *testing.T) {
stopPause := make(chan struct{})
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error {
if opts.ContainerName == "prefix-foo" {
Expand Down Expand Up @@ -377,7 +421,10 @@ func TestOrchestrator(t *testing.T) {
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error {
if cmd == "aws" {
Expand Down