From fdc47cd1de128b4435a149f48a3a701eb71c126c Mon Sep 17 00:00:00 2001 From: Antonio Murdaca Date: Mon, 5 Jun 2017 17:57:55 +0200 Subject: [PATCH 1/3] supervisor: cancel SendTask when channel is full This patch handles case c) in https://github.com/moby/moby/issues/31487 (c) A request cannot be added to 's.tasks' because the queue is full. case b) isn't fixable at this point case there's no way to cancel stuff from a channel since channels aren't context aware (yet?). Parallelizing tasks handling isn't easily doable either. Signed-off-by: Antonio Murdaca (cherry picked from commit b812857db221660921e124086bcf3c8687b18dcb) Signed-off-by: Kenfe-Mickael Laventure --- api/grpc/server/server.go | 10 +++++++++- api/grpc/server/server_linux.go | 2 +- api/grpc/server/server_solaris.go | 1 + supervisor/add_process.go | 4 +--- supervisor/create.go | 4 +--- supervisor/exit.go | 2 ++ supervisor/supervisor.go | 13 +++++++++++-- supervisor/task.go | 16 ++++++++++++++++ supervisor/worker.go | 3 +++ 9 files changed, 45 insertions(+), 10 deletions(-) diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 89ba87c781cc9..ceed0b6ab5b2b 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -46,6 +46,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine return nil, errors.New("empty bundle path") } e := &supervisor.StartTask{} + e.WithContext(ctx) e.ID = c.Id e.BundlePath = c.BundlePath e.Stdin = c.Stdin @@ -56,7 +57,6 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine e.Runtime = c.Runtime e.RuntimeArgs = c.RuntimeArgs e.StartResponse = make(chan supervisor.StartResponse, 1) - e.Ctx = ctx if c.Checkpoint != "" { e.CheckpointDir = c.CheckpointDir e.Checkpoint = &runtime.Checkpoint{ @@ -79,6 +79,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) { e := &supervisor.CreateCheckpointTask{} + e.WithContext(ctx) e.ID = r.Id e.CheckpointDir = r.CheckpointDir e.Checkpoint = &runtime.Checkpoint{ @@ -102,6 +103,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo return nil, errors.New("checkpoint name cannot be empty") } e := &supervisor.DeleteCheckpointTask{} + e.WithContext(ctx) e.ID = r.Id e.CheckpointDir = r.CheckpointDir e.Checkpoint = &runtime.Checkpoint{ @@ -116,6 +118,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) { e := &supervisor.GetContainersTask{} + e.WithContext(ctx) s.sv.SendTask(e) if err := <-e.ErrorCh(); err != nil { return nil, err @@ -150,6 +153,7 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) { e := &supervisor.SignalTask{} + e.WithContext(ctx) e.ID = r.Id e.PID = r.Pid e.Signal = syscall.Signal(int(r.Signal)) @@ -167,6 +171,7 @@ func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.St } e := &supervisor.GetContainersTask{} + e.WithContext(ctx) e.ID = r.Id e.GetState = getState s.sv.SendTask(e) @@ -253,6 +258,7 @@ func toUint32(its []int) []uint32 { func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) { e := &supervisor.UpdateTask{} + e.WithContext(ctx) e.ID = r.Id e.State = runtime.State(r.Status) if r.Resources != nil { @@ -304,6 +310,7 @@ func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContaine func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) { e := &supervisor.UpdateProcessTask{} + e.WithContext(ctx) e.ID = r.Id e.PID = r.Pid e.Height = int(r.Height) @@ -482,6 +489,7 @@ func getSystemCPUUsage() (uint64, error) { func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) { e := &supervisor.StatsTask{} + e.WithContext(ctx) e.ID = r.Id e.Stat = make(chan *runtime.Stat, 1) s.sv.SendTask(e) diff --git a/api/grpc/server/server_linux.go b/api/grpc/server/server_linux.go index 92baf3963c861..08c559a89a816 100644 --- a/api/grpc/server/server_linux.go +++ b/api/grpc/server/server_linux.go @@ -49,6 +49,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) return nil, fmt.Errorf("process id cannot be empty") } e := &supervisor.AddProcessTask{} + e.WithContext(ctx) e.ID = r.Id e.PID = r.Pid e.ProcessSpec = process @@ -56,7 +57,6 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) e.Stdout = r.Stdout e.Stderr = r.Stderr e.StartResponse = make(chan supervisor.StartResponse, 1) - e.Ctx = ctx s.sv.SendTask(e) if err := <-e.ErrorCh(); err != nil { return nil, err diff --git a/api/grpc/server/server_solaris.go b/api/grpc/server/server_solaris.go index 5ad273438e577..1219191a766db 100644 --- a/api/grpc/server/server_solaris.go +++ b/api/grpc/server/server_solaris.go @@ -25,6 +25,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest) return nil, fmt.Errorf("process id cannot be empty") } e := &supervisor.AddProcessTask{} + e.WithContext(ctx) e.ID = r.Id e.PID = r.Pid e.ProcessSpec = process diff --git a/supervisor/add_process.go b/supervisor/add_process.go index 6c56b82e9eefe..b5fc40cb87958 100644 --- a/supervisor/add_process.go +++ b/supervisor/add_process.go @@ -6,7 +6,6 @@ import ( "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/specs" - "golang.org/x/net/context" ) // AddProcessTask holds everything necessary to add a process to a @@ -20,7 +19,6 @@ type AddProcessTask struct { Stdin string ProcessSpec *specs.ProcessSpec StartResponse chan StartResponse - Ctx context.Context } func (s *Supervisor) addProcess(t *AddProcessTask) error { @@ -29,7 +27,7 @@ func (s *Supervisor) addProcess(t *AddProcessTask) error { if !ok { return ErrContainerNotFound } - process, err := ci.container.Exec(t.Ctx, t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr)) + process, err := ci.container.Exec(t.Ctx(), t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr)) if err != nil { return err } diff --git a/supervisor/create.go b/supervisor/create.go index 3e782c625f7ef..e6ff89c95d573 100644 --- a/supervisor/create.go +++ b/supervisor/create.go @@ -5,7 +5,6 @@ import ( "time" "github.com/containerd/containerd/runtime" - "golang.org/x/net/context" ) // StartTask holds needed parameters to create a new container @@ -23,7 +22,6 @@ type StartTask struct { CheckpointDir string Runtime string RuntimeArgs []string - Ctx context.Context } func (s *Supervisor) start(t *StartTask) error { @@ -59,7 +57,7 @@ func (s *Supervisor) start(t *StartTask) error { Stdin: t.Stdin, Stdout: t.Stdout, Stderr: t.Stderr, - Ctx: t.Ctx, + Ctx: t.Ctx(), } if t.Checkpoint != nil { task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name) diff --git a/supervisor/exit.go b/supervisor/exit.go index 9977c1868ea3e..83674bf0b0914 100644 --- a/supervisor/exit.go +++ b/supervisor/exit.go @@ -41,6 +41,7 @@ func (s *Supervisor) exit(t *ExitTask) error { Status: status, Process: proc, } + ne.WithContext(t.Ctx()) s.execExit(ne) return nil } @@ -51,6 +52,7 @@ func (s *Supervisor) exit(t *ExitTask) error { PID: proc.ID(), Process: proc, } + ne.WithContext(t.Ctx()) s.delete(ne) ExitProcessTimer.UpdateSince(start) diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index 41539b1c8fa4b..eed68dffc019f 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -1,6 +1,7 @@ package supervisor import ( + "context" "encoding/json" "io" "io/ioutil" @@ -290,8 +291,13 @@ func (s *Supervisor) Machine() Machine { // SendTask sends the provided event the the supervisors main event loop func (s *Supervisor) SendTask(evt Task) { - TasksCounter.Inc(1) - s.tasks <- evt + select { + case <-evt.Ctx().Done(): + evt.ErrorCh() <- evt.Ctx().Err() + close(evt.ErrorCh()) + case s.tasks <- evt: + TasksCounter.Inc(1) + } } func (s *Supervisor) exitHandler() { @@ -299,6 +305,7 @@ func (s *Supervisor) exitHandler() { e := &ExitTask{ Process: p, } + e.WithContext(context.Background()) s.SendTask(e) } } @@ -308,6 +315,7 @@ func (s *Supervisor) oomHandler() { e := &OOMTask{ ID: id, } + e.WithContext(context.Background()) s.SendTask(e) } } @@ -371,6 +379,7 @@ func (s *Supervisor) restore() error { e := &ExitTask{ Process: p, } + e.WithContext(context.Background()) s.SendTask(e) } } diff --git a/supervisor/task.go b/supervisor/task.go index 4980a3570b080..287379db28182 100644 --- a/supervisor/task.go +++ b/supervisor/task.go @@ -1,6 +1,7 @@ package supervisor import ( + "context" "sync" "github.com/containerd/containerd/runtime" @@ -17,13 +18,28 @@ type StartResponse struct { type Task interface { // ErrorCh returns a channel used to report and error from an async task ErrorCh() chan error + // Ctx carries the context of a task + Ctx() context.Context } type baseTask struct { errCh chan error + ctx context.Context mu sync.Mutex } +func (t *baseTask) WithContext(ctx context.Context) { + t.mu.Lock() + defer t.mu.Unlock() + t.ctx = ctx +} + +func (t *baseTask) Ctx() context.Context { + t.mu.Lock() + defer t.mu.Unlock() + return t.ctx +} + func (t *baseTask) ErrorCh() chan error { t.mu.Lock() defer t.mu.Unlock() diff --git a/supervisor/worker.go b/supervisor/worker.go index 652cd62ce97e9..da33c3b857e44 100644 --- a/supervisor/worker.go +++ b/supervisor/worker.go @@ -55,6 +55,7 @@ func (w *worker) Start() { NoEvent: true, Process: process, } + evt.WithContext(t.Ctx) w.s.SendTask(evt) continue } @@ -71,6 +72,7 @@ func (w *worker) Start() { NoEvent: true, Process: process, } + evt.WithContext(t.Ctx) w.s.SendTask(evt) continue } @@ -85,6 +87,7 @@ func (w *worker) Start() { NoEvent: true, Process: process, } + evt.WithContext(t.Ctx) w.s.SendTask(evt) continue } From 7e349f8b036eb95693a794d0a9214de73ab92a50 Mon Sep 17 00:00:00 2001 From: Antonio Murdaca Date: Tue, 27 Jun 2017 12:43:21 +0200 Subject: [PATCH 2/3] containerd: main: notify on SIGPIPE This PR https://github.com/containerd/containerd/pull/930 ignored SIGPIPE signals but we aren't actually subscribed to receive them (and ignore them). This patch adds the missing piece by subscribing to receive SIGPIPE(s). Signed-off-by: Antonio Murdaca (cherry picked from commit 6a7b7d2712ba3f1ab0e29ef4f9ed9bd74b8d43ef) Signed-off-by: Kenfe-Mickael Laventure --- containerd/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/containerd/main.go b/containerd/main.go index 6b33074893777..5ca129ab5ba6c 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -17,15 +17,15 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/Sirupsen/logrus" - "github.com/urfave/cli" - "github.com/cyberdelia/go-metrics-graphite" "github.com/containerd/containerd" grpcserver "github.com/containerd/containerd/api/grpc/server" "github.com/containerd/containerd/api/grpc/types" "github.com/containerd/containerd/api/http/pprof" "github.com/containerd/containerd/supervisor" + "github.com/cyberdelia/go-metrics-graphite" "github.com/docker/docker/pkg/listeners" "github.com/rcrowley/go-metrics" + "github.com/urfave/cli" ) const ( @@ -174,7 +174,7 @@ func daemon(context *cli.Context) error { return err } s := make(chan os.Signal, 2048) - signal.Notify(s, syscall.SIGTERM, syscall.SIGINT) + signal.Notify(s, syscall.SIGTERM, syscall.SIGINT, syscall.SIGPIPE) // Split the listen string of the form proto://addr listenSpec := context.String("listen") listenParts := strings.SplitN(listenSpec, "://", 2) From 21afab02a0f6518a03aae3c6f4f2fa7a5729d219 Mon Sep 17 00:00:00 2001 From: Lei Jitang Date: Wed, 28 Jun 2017 05:08:55 -0400 Subject: [PATCH 3/3] Handle kill shim while container is paused kill shim will send SIGKILL to its children, but if the container is paused, the container doesn't response to the signal, this cause the containerd block and fail to start next time. Signed-off-by: Lei Jitang (cherry picked from commit b2a80cf7d04b27ffa4482b6fa40614e9246bab49) Signed-off-by: Kenfe-Mickael Laventure --- integration-test/start_linux_test.go | 53 ++++++++++++++++++++++++++++ runtime/process.go | 28 +++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/integration-test/start_linux_test.go b/integration-test/start_linux_test.go index 61950141bd225..277907f795810 100644 --- a/integration-test/start_linux_test.go +++ b/integration-test/start_linux_test.go @@ -564,3 +564,56 @@ func (cs *ContainerdSuite) TestSigkillShimReuseName(t *check.C) { t.Fatal(err) } } + +func (cs *ContainerdSuite) TestSigkillShimWhileContainerIsPaused(t *check.C) { + bundleName := "busybox-top" + if err := CreateBusyboxBundle(bundleName, []string{"top"}); err != nil { + t.Fatal(err) + } + containerID := "top" + c, err := cs.StartContainer(containerID, bundleName) + if err != nil { + t.Fatal(err) + } + + // pause the container + err = cs.PauseContainer(containerID) + if err != nil { + t.Fatal(err) + } + + // Sigkill the shim + exec.Command("pkill", "-9", "containerd-shim").Run() + + for _, evt := range []types.Event{ + { + Type: "start-container", + Id: containerID, + Status: 0, + Pid: "", + }, + { + Type: "pause", + Id: containerID, + Status: 0, + Pid: "", + }, + { + Type: "exit", + Id: containerID, + Status: 128 + 9, + Pid: "init", + }, + } { + ch := c.GetEventsChannel() + select { + case e := <-ch: + evt.Timestamp = e.Timestamp + + t.Assert(*e, checker.Equals, evt) + case <-time.After(2 * time.Second): + t.Fatal("Container took more than 2 seconds to terminate") + } + } + +} diff --git a/runtime/process.go b/runtime/process.go index 2df67d95a9b26..f5dd3ee155e97 100644 --- a/runtime/process.go +++ b/runtime/process.go @@ -262,10 +262,27 @@ func (p *process) handleSigkilledShim(rst uint32, rerr error) (uint32, error) { } if ppid == "1" { logrus.Warnf("containerd: %s:%s shim died, killing associated process", p.container.id, p.id) + // Before sending SIGKILL to container, we need to make sure + // the container is not in Paused state. If the container is + // Paused, the container will not response to any signal + // we should Resume it after sending SIGKILL + var ( + s State + err1 error + ) + if p.container != nil { + s, err1 = p.container.Status() + } + unix.Kill(p.pid, syscall.SIGKILL) if err != nil && err != syscall.ESRCH { return UnknownStatus, fmt.Errorf("containerd: unable to SIGKILL %s:%s (pid %v): %v", p.container.id, p.id, p.pid, err) } + if p.container != nil { + if err1 == nil && s == Paused { + p.container.Resume() + } + } // wait for the process to die for { @@ -289,6 +306,17 @@ func (p *process) handleSigkilledShim(rst uint32, rerr error) (uint32, error) { return rst, rerr } + // The shim was SIGKILLED + // We should get the container state first + // to make sure the container is not in + // Pause state, if it's Paused, we should resume it + // and it will exit immediately because shim will send sigkill to + // container when died. + s, err1 := p.container.Status() + if err1 == nil && s == Paused { + p.container.Resume() + } + // Ensure we got the shim ProcessState <-p.cmdDoneCh