Skip to content
This repository was archived by the owner on Mar 11, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion api/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
return nil, errors.New("empty bundle path")
}
e := &supervisor.StartTask{}
e.WithContext(ctx)
e.ID = c.Id
e.BundlePath = c.BundlePath
e.Stdin = c.Stdin
Expand All @@ -56,7 +57,6 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine
e.Runtime = c.Runtime
e.RuntimeArgs = c.RuntimeArgs
e.StartResponse = make(chan supervisor.StartResponse, 1)
e.Ctx = ctx
if c.Checkpoint != "" {
e.CheckpointDir = c.CheckpointDir
e.Checkpoint = &runtime.Checkpoint{
Expand All @@ -79,6 +79,7 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine

func (s *apiServer) CreateCheckpoint(ctx context.Context, r *types.CreateCheckpointRequest) (*types.CreateCheckpointResponse, error) {
e := &supervisor.CreateCheckpointTask{}
e.WithContext(ctx)
e.ID = r.Id
e.CheckpointDir = r.CheckpointDir
e.Checkpoint = &runtime.Checkpoint{
Expand All @@ -102,6 +103,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo
return nil, errors.New("checkpoint name cannot be empty")
}
e := &supervisor.DeleteCheckpointTask{}
e.WithContext(ctx)
e.ID = r.Id
e.CheckpointDir = r.CheckpointDir
e.Checkpoint = &runtime.Checkpoint{
Expand All @@ -116,6 +118,7 @@ func (s *apiServer) DeleteCheckpoint(ctx context.Context, r *types.DeleteCheckpo

func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointRequest) (*types.ListCheckpointResponse, error) {
e := &supervisor.GetContainersTask{}
e.WithContext(ctx)
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
Expand Down Expand Up @@ -150,6 +153,7 @@ func (s *apiServer) ListCheckpoint(ctx context.Context, r *types.ListCheckpointR

func (s *apiServer) Signal(ctx context.Context, r *types.SignalRequest) (*types.SignalResponse, error) {
e := &supervisor.SignalTask{}
e.WithContext(ctx)
e.ID = r.Id
e.PID = r.Pid
e.Signal = syscall.Signal(int(r.Signal))
Expand All @@ -167,6 +171,7 @@ func (s *apiServer) State(ctx context.Context, r *types.StateRequest) (*types.St
}

e := &supervisor.GetContainersTask{}
e.WithContext(ctx)
e.ID = r.Id
e.GetState = getState
s.sv.SendTask(e)
Expand Down Expand Up @@ -253,6 +258,7 @@ func toUint32(its []int) []uint32 {

func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContainerRequest) (*types.UpdateContainerResponse, error) {
e := &supervisor.UpdateTask{}
e.WithContext(ctx)
e.ID = r.Id
e.State = runtime.State(r.Status)
if r.Resources != nil {
Expand Down Expand Up @@ -304,6 +310,7 @@ func (s *apiServer) UpdateContainer(ctx context.Context, r *types.UpdateContaine

func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
e := &supervisor.UpdateProcessTask{}
e.WithContext(ctx)
e.ID = r.Id
e.PID = r.Pid
e.Height = int(r.Height)
Expand Down Expand Up @@ -482,6 +489,7 @@ func getSystemCPUUsage() (uint64, error) {

func (s *apiServer) Stats(ctx context.Context, r *types.StatsRequest) (*types.StatsResponse, error) {
e := &supervisor.StatsTask{}
e.WithContext(ctx)
e.ID = r.Id
e.Stat = make(chan *runtime.Stat, 1)
s.sv.SendTask(e)
Expand Down
2 changes: 1 addition & 1 deletion api/grpc/server/server_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
return nil, fmt.Errorf("process id cannot be empty")
}
e := &supervisor.AddProcessTask{}
e.WithContext(ctx)
e.ID = r.Id
e.PID = r.Pid
e.ProcessSpec = process
e.Stdin = r.Stdin
e.Stdout = r.Stdout
e.Stderr = r.Stderr
e.StartResponse = make(chan supervisor.StartResponse, 1)
e.Ctx = ctx
s.sv.SendTask(e)
if err := <-e.ErrorCh(); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions api/grpc/server/server_solaris.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (s *apiServer) AddProcess(ctx context.Context, r *types.AddProcessRequest)
return nil, fmt.Errorf("process id cannot be empty")
}
e := &supervisor.AddProcessTask{}
e.WithContext(ctx)
e.ID = r.Id
e.PID = r.Pid
e.ProcessSpec = process
Expand Down
6 changes: 3 additions & 3 deletions containerd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/Sirupsen/logrus"
"github.com/urfave/cli"
"github.com/cyberdelia/go-metrics-graphite"
"github.com/containerd/containerd"
grpcserver "github.com/containerd/containerd/api/grpc/server"
"github.com/containerd/containerd/api/grpc/types"
"github.com/containerd/containerd/api/http/pprof"
"github.com/containerd/containerd/supervisor"
"github.com/cyberdelia/go-metrics-graphite"
"github.com/docker/docker/pkg/listeners"
"github.com/rcrowley/go-metrics"
"github.com/urfave/cli"
)

const (
Expand Down Expand Up @@ -174,7 +174,7 @@ func daemon(context *cli.Context) error {
return err
}
s := make(chan os.Signal, 2048)
signal.Notify(s, syscall.SIGTERM, syscall.SIGINT)
signal.Notify(s, syscall.SIGTERM, syscall.SIGINT, syscall.SIGPIPE)
// Split the listen string of the form proto://addr
listenSpec := context.String("listen")
listenParts := strings.SplitN(listenSpec, "://", 2)
Expand Down
53 changes: 53 additions & 0 deletions integration-test/start_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,3 +564,56 @@ func (cs *ContainerdSuite) TestSigkillShimReuseName(t *check.C) {
t.Fatal(err)
}
}

func (cs *ContainerdSuite) TestSigkillShimWhileContainerIsPaused(t *check.C) {
bundleName := "busybox-top"
if err := CreateBusyboxBundle(bundleName, []string{"top"}); err != nil {
t.Fatal(err)
}
containerID := "top"
c, err := cs.StartContainer(containerID, bundleName)
if err != nil {
t.Fatal(err)
}

// pause the container
err = cs.PauseContainer(containerID)
if err != nil {
t.Fatal(err)
}

// Sigkill the shim
exec.Command("pkill", "-9", "containerd-shim").Run()

for _, evt := range []types.Event{
{
Type: "start-container",
Id: containerID,
Status: 0,
Pid: "",
},
{
Type: "pause",
Id: containerID,
Status: 0,
Pid: "",
},
{
Type: "exit",
Id: containerID,
Status: 128 + 9,
Pid: "init",
},
} {
ch := c.GetEventsChannel()
select {
case e := <-ch:
evt.Timestamp = e.Timestamp

t.Assert(*e, checker.Equals, evt)
case <-time.After(2 * time.Second):
t.Fatal("Container took more than 2 seconds to terminate")
}
}

}
28 changes: 28 additions & 0 deletions runtime/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,27 @@ func (p *process) handleSigkilledShim(rst uint32, rerr error) (uint32, error) {
}
if ppid == "1" {
logrus.Warnf("containerd: %s:%s shim died, killing associated process", p.container.id, p.id)
// Before sending SIGKILL to container, we need to make sure
// the container is not in Paused state. If the container is
// Paused, the container will not response to any signal
// we should Resume it after sending SIGKILL
var (
s State
err1 error
)
if p.container != nil {
s, err1 = p.container.Status()
}

unix.Kill(p.pid, syscall.SIGKILL)
if err != nil && err != syscall.ESRCH {
return UnknownStatus, fmt.Errorf("containerd: unable to SIGKILL %s:%s (pid %v): %v", p.container.id, p.id, p.pid, err)
}
if p.container != nil {
if err1 == nil && s == Paused {
p.container.Resume()
}
}

// wait for the process to die
for {
Expand All @@ -289,6 +306,17 @@ func (p *process) handleSigkilledShim(rst uint32, rerr error) (uint32, error) {
return rst, rerr
}

// The shim was SIGKILLED
// We should get the container state first
// to make sure the container is not in
// Pause state, if it's Paused, we should resume it
// and it will exit immediately because shim will send sigkill to
// container when died.
s, err1 := p.container.Status()
if err1 == nil && s == Paused {
p.container.Resume()
}

// Ensure we got the shim ProcessState
<-p.cmdDoneCh

Expand Down
4 changes: 1 addition & 3 deletions supervisor/add_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/specs"
"golang.org/x/net/context"
)

// AddProcessTask holds everything necessary to add a process to a
Expand All @@ -20,7 +19,6 @@ type AddProcessTask struct {
Stdin string
ProcessSpec *specs.ProcessSpec
StartResponse chan StartResponse
Ctx context.Context
}

func (s *Supervisor) addProcess(t *AddProcessTask) error {
Expand All @@ -29,7 +27,7 @@ func (s *Supervisor) addProcess(t *AddProcessTask) error {
if !ok {
return ErrContainerNotFound
}
process, err := ci.container.Exec(t.Ctx, t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
process, err := ci.container.Exec(t.Ctx(), t.PID, *t.ProcessSpec, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
if err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions supervisor/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/containerd/containerd/runtime"
"golang.org/x/net/context"
)

// StartTask holds needed parameters to create a new container
Expand All @@ -23,7 +22,6 @@ type StartTask struct {
CheckpointDir string
Runtime string
RuntimeArgs []string
Ctx context.Context
}

func (s *Supervisor) start(t *StartTask) error {
Expand Down Expand Up @@ -59,7 +57,7 @@ func (s *Supervisor) start(t *StartTask) error {
Stdin: t.Stdin,
Stdout: t.Stdout,
Stderr: t.Stderr,
Ctx: t.Ctx,
Ctx: t.Ctx(),
}
if t.Checkpoint != nil {
task.CheckpointPath = filepath.Join(t.CheckpointDir, t.Checkpoint.Name)
Expand Down
2 changes: 2 additions & 0 deletions supervisor/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *Supervisor) exit(t *ExitTask) error {
Status: status,
Process: proc,
}
ne.WithContext(t.Ctx())
s.execExit(ne)
return nil
}
Expand All @@ -51,6 +52,7 @@ func (s *Supervisor) exit(t *ExitTask) error {
PID: proc.ID(),
Process: proc,
}
ne.WithContext(t.Ctx())
s.delete(ne)

ExitProcessTimer.UpdateSince(start)
Expand Down
13 changes: 11 additions & 2 deletions supervisor/supervisor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package supervisor

import (
"context"
"encoding/json"
"io"
"io/ioutil"
Expand Down Expand Up @@ -290,15 +291,21 @@ func (s *Supervisor) Machine() Machine {

// SendTask sends the provided event the the supervisors main event loop
func (s *Supervisor) SendTask(evt Task) {
TasksCounter.Inc(1)
s.tasks <- evt
select {
case <-evt.Ctx().Done():
evt.ErrorCh() <- evt.Ctx().Err()
close(evt.ErrorCh())
case s.tasks <- evt:
TasksCounter.Inc(1)
}
}

func (s *Supervisor) exitHandler() {
for p := range s.monitor.Exits() {
e := &ExitTask{
Process: p,
}
e.WithContext(context.Background())
s.SendTask(e)
}
}
Expand All @@ -308,6 +315,7 @@ func (s *Supervisor) oomHandler() {
e := &OOMTask{
ID: id,
}
e.WithContext(context.Background())
s.SendTask(e)
}
}
Expand Down Expand Up @@ -371,6 +379,7 @@ func (s *Supervisor) restore() error {
e := &ExitTask{
Process: p,
}
e.WithContext(context.Background())
s.SendTask(e)
}
}
Expand Down
16 changes: 16 additions & 0 deletions supervisor/task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package supervisor

import (
"context"
"sync"

"github.com/containerd/containerd/runtime"
Expand All @@ -17,13 +18,28 @@ type StartResponse struct {
type Task interface {
// ErrorCh returns a channel used to report and error from an async task
ErrorCh() chan error
// Ctx carries the context of a task
Ctx() context.Context
}

type baseTask struct {
errCh chan error
ctx context.Context
mu sync.Mutex
}

func (t *baseTask) WithContext(ctx context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
t.ctx = ctx
}

func (t *baseTask) Ctx() context.Context {
t.mu.Lock()
defer t.mu.Unlock()
return t.ctx
}

func (t *baseTask) ErrorCh() chan error {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
Loading