Skip to content

Commit

Permalink
cri: handle sandbox/container exit event separately
Browse files Browse the repository at this point in the history
The event monitor handles exit events one by one. If there is something
wrong about deleting task, it will slow down the terminating Pods. In
order to reduce the impact, the exit event watcher should handle exit
event separately. If it failed, the watcher should put it into backoff
queue and retry it.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Jan 24, 2021
1 parent 643bb9b commit e56de63
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 36 deletions.
6 changes: 2 additions & 4 deletions pkg/cri/server/container_start.go
Expand Up @@ -148,10 +148,8 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
return nil, errors.Wrapf(err, "failed to update container %q state", id)
}

// start the monitor after updating container state, this ensures that
// event monitor receives the TaskExit event and update container state
// after this.
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
// It handles the TaskExit event and update container state after this.
c.eventMonitor.startContainerExitMonitor(context.Background(), id, task.Pid(), exitCh)

return &runtime.StartContainerResponse{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/server/container_stop.go
Expand Up @@ -88,7 +88,7 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
}

exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
stopCh := c.eventMonitor.startContainerExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
Expand Down
137 changes: 110 additions & 27 deletions pkg/cri/server/events.go
Expand Up @@ -50,24 +50,22 @@ const (
// Add a timeout for each event handling, events that timeout will be requeued and
// handled again in the future.
handleEventTimeout = 10 * time.Second

exitChannelSize = 1024
)

// eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): Handle event for each container in a separate goroutine.
type eventMonitor struct {
c *criService
ch <-chan *events.Envelope
// exitCh receives container/sandbox exit events from exit monitors.
exitCh chan *eventtypes.TaskExit
c *criService
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
}

type backOff struct {
// queuePoolMu is mutex used to protect the queuePool map
queuePoolMu sync.Mutex

queuePool map[string]*backOffQueue
// tickerMu is mutex used to protect the ticker.
tickerMu sync.Mutex
Expand All @@ -93,7 +91,6 @@ func newEventMonitor(c *criService) *eventMonitor {
c: c,
ctx: ctx,
cancel: cancel,
exitCh: make(chan *eventtypes.TaskExit, exitChannelSize),
backOff: newBackOff(),
}
}
Expand All @@ -109,26 +106,102 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}

// startExitMonitor starts an exit monitor for a given container/sandbox.
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
// startSandboxExitMonitor starts an exit monitor for a given sandbox.
func (em *eventMonitor) startSandboxExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
select {
case exitRes := <-exitCh:
exitStatus, exitedAt, err := exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}
em.exitCh <- &eventtypes.TaskExit{

e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: pid,
ExitStatus: exitStatus,
ExitedAt: exitedAt,
}

logrus.Debugf("received exit event %+v", e)

err = func() error {
dctx := ctrdutil.NamespacedContext()
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()

sb, err := em.c.sandboxStore.Get(e.ID)
if err == nil {
if err := handleSandboxExit(dctx, e, sb); err != nil {
return err
}
return nil
} else if err != store.ErrNotExist {
return errors.Wrapf(err, "failed to get sandbox %s", e.ID)
}
return nil
}()
if err != nil {
logrus.WithError(err).Errorf("failed to handle sandbox TaskExit event %+v", e)
em.backOff.enBackOff(id, e)
}
return
case <-ctx.Done():
}
}()
return stopCh
}

// startContainerExitMonitor starts an exit monitor for a given container.
func (em *eventMonitor) startContainerExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
select {
case exitRes := <-exitCh:
exitStatus, exitedAt, err := exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}

e := &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: pid,
ExitStatus: exitStatus,
ExitedAt: exitedAt,
}

logrus.Debugf("received exit event %+v", e)

err = func() error {
dctx := ctrdutil.NamespacedContext()
dctx, dcancel := context.WithTimeout(dctx, handleEventTimeout)
defer dcancel()

cntr, err := em.c.containerStore.Get(e.ID)
if err == nil {
if err := handleContainerExit(dctx, e, cntr); err != nil {
return err
}
return nil
} else if err != store.ErrNotExist {
return errors.Wrapf(err, "failed to get container %s", e.ID)
}
return nil
}()
if err != nil {
logrus.WithError(err).Errorf("failed to handle container TaskExit event %+v", e)
em.backOff.enBackOff(id, e)
}
return
case <-ctx.Done():
}
}()
Expand Down Expand Up @@ -157,9 +230,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
return id, evt, nil
}

// start starts the event monitor which monitors and handles all subscribed events. It returns
// an error channel for the caller to wait for stop errors from the event monitor.
// start must be called after subscribe.
// start starts the event monitor which monitors and handles all subscribed events.
// It returns an error channel for the caller to wait for stop errors from the
// event monitor.
//
// NOTE:
// 1. start must be called after subscribe.
// 2. The task exit event has been handled in individual startSandboxExitMonitor
// or startContainerExitMonitor goroutine at the first. If the goroutine fails,
// it puts the event into backoff retry queue and event monitor will handle
// it later.
func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil {
Expand All @@ -170,18 +250,6 @@ func (em *eventMonitor) start() <-chan error {
defer close(errCh)
for {
select {
case e := <-em.exitCh:
logrus.Debugf("Received exit event %+v", e)
id := e.ID
if em.backOff.isInBackOff(id) {
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, e)
em.backOff.enBackOff(id, e)
break
}
if err := em.handleEvent(e); err != nil {
logrus.WithError(err).Errorf("Failed to handle exit event %+v for %s", e, id)
em.backOff.enBackOff(id, e)
}
case e := <-em.ch:
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
if e.Namespace != constants.K8sContainerdNamespace {
Expand Down Expand Up @@ -388,6 +456,9 @@ func newBackOff() *backOff {
}

func (b *backOff) getExpiredIDs() []string {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()

var ids []string
for id, q := range b.queuePool {
if q.isExpire() {
Expand All @@ -398,6 +469,9 @@ func (b *backOff) getExpiredIDs() []string {
}

func (b *backOff) isInBackOff(key string) bool {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()

if _, ok := b.queuePool[key]; ok {
return true
}
Expand All @@ -406,6 +480,9 @@ func (b *backOff) isInBackOff(key string) bool {

// enBackOff start to backOff and put event to the tail of queue
func (b *backOff) enBackOff(key string, evt interface{}) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()

if queue, ok := b.queuePool[key]; ok {
queue.events = append(queue.events, evt)
return
Expand All @@ -415,13 +492,19 @@ func (b *backOff) enBackOff(key string, evt interface{}) {

// enBackOff get out the whole queue
func (b *backOff) deBackOff(key string) *backOffQueue {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()

queue := b.queuePool[key]
delete(b.queuePool, key)
return queue
}

// enBackOff start to backOff again and put events to the queue
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
b.queuePoolMu.Lock()
defer b.queuePoolMu.Unlock()

duration := 2 * oldDuration
if duration > b.maxDuration {
duration = b.maxDuration
Expand Down
4 changes: 2 additions & 2 deletions pkg/cri/server/restart.go
Expand Up @@ -290,7 +290,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
status.Reason = unknownExitReason
} else {
// Start exit monitor.
c.eventMonitor.startExitMonitor(context.Background(), id, status.Pid, exitCh)
c.eventMonitor.startContainerExitMonitor(context.Background(), id, status.Pid, exitCh)
}
case containerd.Stopped:
// Task is stopped. Updata status and delete the task.
Expand Down Expand Up @@ -389,7 +389,7 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container)
// Task is running, set sandbox state as READY.
status.State = sandboxstore.StateReady
status.Pid = t.Pid()
c.eventMonitor.startExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
c.eventMonitor.startSandboxExitMonitor(context.Background(), meta.ID, status.Pid, exitCh)
}
} else {
// Task is not running. Delete the task and set sandbox state as NOTREADY.
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/server/sandbox_run.go
Expand Up @@ -331,7 +331,7 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
//
// TaskOOM from containerd may come before sandbox is added to store,
// but we don't care about sandbox TaskOOM right now, so it is fine.
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)
c.eventMonitor.startSandboxExitMonitor(context.Background(), id, task.Pid(), exitCh)

return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cri/server/sandbox_stop.go
Expand Up @@ -134,7 +134,7 @@ func (c *criService) stopSandboxContainer(ctx context.Context, sandbox sandboxst
}

exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
stopCh := c.eventMonitor.startSandboxExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
Expand Down

0 comments on commit e56de63

Please sign in to comment.