forked from kevpar/cri
-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
403 lines (372 loc) · 12.1 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"sync"
"time"
eventtypes "github.com/containerd/containerd/api/events"
containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/typeurl"
gogotypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/util/clock"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
const (
backOffInitDuration = 1 * time.Second
backOffMaxDuration = 5 * time.Minute
backOffExpireCheckDuration = 1 * time.Second
)
// eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): [P1] Figure out is it possible to drop event during containerd
// is running. If it is, we should do periodically list to sync state with containerd.
type eventMonitor struct {
containerStore *containerstore.Store
sandboxStore *sandboxstore.Store
ch <-chan *events.Envelope
errCh <-chan error
ctx context.Context
cancel context.CancelFunc
backOff *backOff
}
type backOff struct {
queuePool map[string]*backOffQueue
// tickerMu is mutex used to protect the ticker.
tickerMu sync.Mutex
ticker *time.Ticker
minDuration time.Duration
maxDuration time.Duration
checkDuration time.Duration
clock clock.Clock
}
type backOffQueue struct {
events []interface{}
expireTime time.Time
duration time.Duration
clock clock.Clock
}
// Create new event monitor. New event monitor will start subscribing containerd event. All events
// happen after it should be monitored.
func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor {
// event subscribe doesn't need namespace.
ctx, cancel := context.WithCancel(context.Background())
return &eventMonitor{
containerStore: c,
sandboxStore: s,
ctx: ctx,
cancel: cancel,
backOff: newBackOff(),
}
}
// subscribe starts to subscribe containerd events.
func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
filters := []string{
`topic=="/tasks/exit"`,
`topic=="/tasks/oom"`,
}
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}
func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
containerID := ""
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", nil, errors.Wrap(err, "failed to unmarshalany")
}
switch evt.(type) {
case *eventtypes.TaskExit:
containerID = evt.(*eventtypes.TaskExit).ContainerID
case *eventtypes.TaskOOM:
containerID = evt.(*eventtypes.TaskOOM).ContainerID
default:
return "", nil, errors.New("unsupported event")
}
return containerID, evt, nil
}
// start starts the event monitor which monitors and handles all container events. It returns
// an error channel for the caller to wait for stop errors from the event monitor.
// start must be called after subscribe.
func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil {
panic("event channel is nil")
}
backOffCheckCh := em.backOff.start()
go func() {
defer close(errCh)
for {
select {
case e := <-em.ch:
logrus.Debugf("Received containerd event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic)
cID, evt, err := convertEvent(e.Event)
if err != nil {
logrus.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.backOff.isInBackOff(cID) {
logrus.Infof("Events for container %q is in backoff, enqueue event %+v", cID, evt)
em.backOff.enBackOff(cID, evt)
break
}
if err := em.handleEvent(evt); err != nil {
logrus.WithError(err).Errorf("Failed to handle event %+v for container %s", evt, cID)
em.backOff.enBackOff(cID, evt)
}
case err := <-em.errCh:
// Close errCh in defer directly if there is no error.
if err != nil {
logrus.WithError(err).Errorf("Failed to handle event stream")
errCh <- err
}
return
case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers()
for _, cID := range cIDs {
queue := em.backOff.deBackOff(cID)
for i, any := range queue.events {
if err := em.handleEvent(any); err != nil {
logrus.WithError(err).Errorf("Failed to handle backOff event %+v for container %s", any, cID)
em.backOff.reBackOff(cID, queue.events[i:], queue.duration)
break
}
}
}
}
}
}()
return errCh
}
// stop stops the event monitor. It will close the event channel.
// Once event monitor is stopped, it can't be started.
func (em *eventMonitor) stop() {
em.backOff.stop()
em.cancel()
}
// handleEvent handles a containerd event.
func (em *eventMonitor) handleEvent(any interface{}) error {
ctx := ctrdutil.NamespacedContext()
switch any.(type) {
// If containerd-shim exits unexpectedly, there will be no corresponding event.
// However, containerd could not retrieve container state in that case, so it's
// fine to leave out that case for now.
// TODO(random-liu): [P2] Handle containerd-shim exit.
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err == nil {
if err := handleContainerExit(ctx, e, cntr); err != nil {
return errors.Wrap(err, "failed to handle container TaskExit event")
}
return nil
} else if err != store.ErrNotExist {
return errors.Wrap(err, "can't find container for TaskExit event")
}
// Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID)
if err == nil {
if err := handleSandboxExit(ctx, e, sb); err != nil {
return errors.Wrap(err, "failed to handle sandbox TaskExit event")
}
return nil
} else if err != store.ErrNotExist {
return errors.Wrap(err, "can't find sandbox for TaskExit event")
}
return nil
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err != nil {
if err != store.ErrNotExist {
return errors.Wrap(err, "can't find container for TaskOOM event")
}
if _, err = em.sandboxStore.Get(e.ContainerID); err != nil {
if err != store.ErrNotExist {
return errors.Wrap(err, "can't find sandbox for TaskOOM event")
}
return nil
}
return nil
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
status.Reason = oomExitReason
return status, nil
})
if err != nil {
return errors.Wrap(err, "failed to update container status for TaskOOM event")
}
}
return nil
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error {
if e.Pid != cntr.Status.Get().Pid {
// Non-init process died, ignore the event.
return nil
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(ctx,
func(*containerdio.FIFOSet) (containerdio.IO, error) {
return cntr.IO, nil
},
)
if err != nil {
if !errdefs.IsNotFound(err) {
return errors.Wrapf(err, "failed to load task for container")
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx); err != nil {
if !errdefs.IsNotFound(err) {
return errors.Wrap(err, "failed to stop container")
}
// Move on to make sure container status is updated.
}
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
// If FinishedAt has been set (e.g. with start failure), keep as
// it is.
if status.FinishedAt != 0 {
return status, nil
}
status.Pid = 0
status.FinishedAt = e.ExitedAt.UnixNano()
status.ExitCode = int32(e.ExitStatus)
return status, nil
})
if err != nil {
return errors.Wrap(err, "failed to update container state")
}
// Using channel to propagate the information of container stop
cntr.Stop()
return nil
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) error {
if e.Pid != sb.Status.Get().Pid {
// Non-init process died, ignore the event.
return nil
}
// No stream attached to sandbox container.
task, err := sb.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return errors.Wrap(err, "failed to load task for sandbox")
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(ctx); err != nil {
if !errdefs.IsNotFound(err) {
return errors.Wrap(err, "failed to stop sandbox")
}
// Move on to make sure container status is updated.
}
}
err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
// NOTE(random-liu): We SHOULD NOT change UNKNOWN state here.
// If sandbox state is UNKNOWN when event monitor receives an TaskExit event,
// it means that sandbox start has failed. In that case, `RunPodSandbox` will
// cleanup everything immediately.
// Once sandbox state goes out of UNKNOWN, it becomes visable to the user, which
// is not what we want.
if status.State != sandboxstore.StateUnknown {
status.State = sandboxstore.StateNotReady
}
status.Pid = 0
return status, nil
})
if err != nil {
return errors.Wrap(err, "failed to update sandbox state")
}
// Using channel to propagate the information of sandbox stop
sb.Stop()
return nil
}
func newBackOff() *backOff {
return &backOff{
queuePool: map[string]*backOffQueue{},
minDuration: backOffInitDuration,
maxDuration: backOffMaxDuration,
checkDuration: backOffExpireCheckDuration,
clock: clock.RealClock{},
}
}
func (b *backOff) getExpiredContainers() []string {
var containers []string
for c, q := range b.queuePool {
if q.isExpire() {
containers = append(containers, c)
}
}
return containers
}
func (b *backOff) isInBackOff(key string) bool {
if _, ok := b.queuePool[key]; ok {
return true
}
return false
}
// enBackOff start to backOff and put event to the tail of queue
func (b *backOff) enBackOff(key string, evt interface{}) {
if queue, ok := b.queuePool[key]; ok {
queue.events = append(queue.events, evt)
return
}
b.queuePool[key] = newBackOffQueue([]interface{}{evt}, b.minDuration, b.clock)
}
// enBackOff get out the whole queue
func (b *backOff) deBackOff(key string) *backOffQueue {
queue := b.queuePool[key]
delete(b.queuePool, key)
return queue
}
// enBackOff start to backOff again and put events to the queue
func (b *backOff) reBackOff(key string, events []interface{}, oldDuration time.Duration) {
duration := 2 * oldDuration
if duration > b.maxDuration {
duration = b.maxDuration
}
b.queuePool[key] = newBackOffQueue(events, duration, b.clock)
}
func (b *backOff) start() <-chan time.Time {
b.tickerMu.Lock()
defer b.tickerMu.Unlock()
b.ticker = time.NewTicker(b.checkDuration)
return b.ticker.C
}
func (b *backOff) stop() {
b.tickerMu.Lock()
defer b.tickerMu.Unlock()
if b.ticker != nil {
b.ticker.Stop()
}
}
func newBackOffQueue(events []interface{}, init time.Duration, c clock.Clock) *backOffQueue {
return &backOffQueue{
events: events,
duration: init,
expireTime: c.Now().Add(init),
clock: c,
}
}
func (q *backOffQueue) isExpire() bool {
// return time.Now >= expireTime
return !q.clock.Now().Before(q.expireTime)
}