Skip to content

Commit

Permalink
Merge pull request #1392 from cpuguy83/wait_async
Browse files Browse the repository at this point in the history
Make Wait() async
  • Loading branch information
crosbymichael committed Aug 22, 2017
2 parents 0648ec3 + 6ab99ed commit 1046064
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 358 deletions.
35 changes: 14 additions & 21 deletions cmd/containerd-stress/main.go
Expand Up @@ -208,30 +208,23 @@ func (w *worker) runContainer(ctx context.Context, id string) error {
return err
}
defer task.Delete(ctx, containerd.WithProcessKill)
var (
start sync.WaitGroup
status = make(chan uint32, 1)
)
start.Add(1)
go func() {
start.Done()
s, err := task.Wait(w.waitContext)
if err != nil {
if err == context.DeadlineExceeded ||
err == context.Canceled {
close(status)
return
}
w.failures++
logrus.WithError(err).Errorf("wait task %s", id)
}
status <- s
}()
start.Wait()

statusC, err := task.Wait(ctx)
if err != nil {
return err
}

if err := task.Start(ctx); err != nil {
return err
}
<-status
status := <-statusC
_, _, err = status.Result()
if err != nil {
if err == context.DeadlineExceeded || err == context.Canceled {
return nil
}
w.failures++
}
return nil
}

Expand Down
14 changes: 11 additions & 3 deletions cmd/ctr/attach.go
Expand Up @@ -44,6 +44,12 @@ var taskAttachCommand = cli.Command{
return err
}
defer task.Delete(ctx)

statusC, err := task.Wait(ctx)
if err != nil {
return err
}

if tty {
if err := handleConsoleResize(ctx, task, con); err != nil {
logrus.WithError(err).Error("console resize")
Expand All @@ -52,12 +58,14 @@ var taskAttachCommand = cli.Command{
sigc := forwardAllSignals(ctx, task)
defer stopCatch(sigc)
}
status, err := task.Wait(ctx)

ec := <-statusC
code, _, err := ec.Result()
if err != nil {
return err
}
if status != 0 {
return cli.NewExitError("", int(status))
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},
Expand Down
21 changes: 11 additions & 10 deletions cmd/ctr/exec.go
Expand Up @@ -70,14 +70,11 @@ var taskExecCommand = cli.Command{
}
defer process.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := process.Wait(ctx)
if err != nil {
logrus.WithError(err).Error("wait process")
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
return err
}

var con console.Console
if tty {
con = console.Current()
Expand All @@ -98,8 +95,12 @@ var taskExecCommand = cli.Command{
defer stopCatch(sigc)
}
status := <-statusC
if status != 0 {
return cli.NewExitError("", int(status))
code, _, err := status.Result()
if err != nil {
return err
}
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},
Expand Down
22 changes: 12 additions & 10 deletions cmd/ctr/run.go
Expand Up @@ -129,14 +129,11 @@ var runCommand = cli.Command{
}
defer task.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
logrus.WithError(err).Error("wait process")
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
return err
}

var con console.Console
if tty {
con = console.Current()
Expand All @@ -158,11 +155,16 @@ var runCommand = cli.Command{
}

status := <-statusC
code, _, err := status.Result()
if err != nil {
return err
}

if _, err := task.Delete(ctx); err != nil {
return err
}
if status != 0 {
return cli.NewExitError("", int(status))
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},
Expand Down
21 changes: 11 additions & 10 deletions cmd/ctr/start.go
Expand Up @@ -47,14 +47,11 @@ var taskStartCommand = cli.Command{
}
defer task.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
logrus.WithError(err).Error("wait process")
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
return err
}

var con console.Console
if tty {
con = console.Current()
Expand All @@ -76,11 +73,15 @@ var taskStartCommand = cli.Command{
}

status := <-statusC
code, _, err := status.Result()
if err != nil {
return err
}
if _, err := task.Delete(ctx); err != nil {
return err
}
if status != 0 {
return cli.NewExitError("", int(status))
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},
Expand Down
63 changes: 25 additions & 38 deletions container_checkpoint_test.go
Expand Up @@ -47,14 +47,11 @@ func TestCheckpointRestore(t *testing.T) {
}
defer task.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

if err := task.Start(ctx); err != nil {
t.Error(err)
Expand All @@ -79,13 +76,11 @@ func TestCheckpointRestore(t *testing.T) {
}
defer task.Delete(ctx)

go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err = task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

if err := task.Start(ctx); err != nil {
t.Error(err)
Expand Down Expand Up @@ -137,14 +132,11 @@ func TestCheckpointRestoreNewContainer(t *testing.T) {
}
defer task.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

if err := task.Start(ctx); err != nil {
t.Error(err)
Expand Down Expand Up @@ -177,13 +169,11 @@ func TestCheckpointRestoreNewContainer(t *testing.T) {
}
defer task.Delete(ctx)

go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err = task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

if err := task.Start(ctx); err != nil {
t.Error(err)
Expand Down Expand Up @@ -240,14 +230,11 @@ func TestCheckpointLeaveRunning(t *testing.T) {
}
defer task.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

if err := task.Start(ctx); err != nil {
t.Error(err)
Expand Down
63 changes: 26 additions & 37 deletions container_linux_test.go
Expand Up @@ -57,14 +57,11 @@ func TestContainerUpdate(t *testing.T) {
}
defer task.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

// check that the task has a limit of 32mb
cgroup, err := cgroups.Load(cgroups.V1, cgroups.PidPath(int(task.Pid())))
Expand Down Expand Up @@ -157,14 +154,12 @@ func TestShimInCgroup(t *testing.T) {
}
defer task.Delete(ctx)

statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

// check to see if the shim is inside the cgroup
processes, err := cg.Processes(cgroups.Devices, false)
if err != nil {
Expand Down Expand Up @@ -221,17 +216,11 @@ func TestDaemonRestart(t *testing.T) {
}
defer task.Delete(ctx)

synC := make(chan struct{})
statusC := make(chan uint32, 1)
go func() {
synC <- struct{}{}
status, err := task.Wait(ctx)
if err == nil {
t.Errorf(`first task.Wait() should have failed with "transport is closing"`)
}
statusC <- status
}()
<-synC
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

if err := task.Start(ctx); err != nil {
t.Error(err)
Expand All @@ -242,7 +231,11 @@ func TestDaemonRestart(t *testing.T) {
t.Fatal(err)
}

<-statusC
status := <-statusC
_, _, err = status.Result()
if err == nil {
t.Errorf(`first task.Wait() should have failed with "transport is closing"`)
}

waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second)
serving, err := client.IsServing(waitCtx)
Expand All @@ -251,15 +244,11 @@ func TestDaemonRestart(t *testing.T) {
t.Fatalf("containerd did not start within 2s: %v", err)
}

go func() {
synC <- struct{}{}
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
<-synC
statusC, err = task.Wait(ctx)
if err != nil {
t.Error(err)
return
}

if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 1046064

Please sign in to comment.