forked from containerd/containerd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor_linux.go
147 lines (136 loc) · 3.4 KB
/
monitor_linux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package supervisor
import (
"sync"
"syscall"
"github.com/Sirupsen/logrus"
"github.com/docker/containerd/archutils"
"github.com/docker/containerd/runtime"
)
// NewMonitor starts a new process monitor and returns it
func NewMonitor() (*Monitor, error) {
m := &Monitor{
receivers: make(map[int]interface{}),
exits: make(chan runtime.Process, 1024),
ooms: make(chan string, 1024),
}
fd, err := archutils.EpollCreate1(0)
if err != nil {
return nil, err
}
m.epollFd = fd
go m.start()
return m, nil
}
// Monitor represents a runtime.Process monitor
type Monitor struct {
m sync.Mutex
receivers map[int]interface{}
exits chan runtime.Process
ooms chan string
epollFd int
}
// Exits returns the channel used to notify of a process exit
func (m *Monitor) Exits() chan runtime.Process {
return m.exits
}
// OOMs returns the channel used to notify of a container exit due to OOM
func (m *Monitor) OOMs() chan string {
return m.ooms
}
// Monitor adds a process to the list of the one being monitored
func (m *Monitor) Monitor(p runtime.Process) error {
m.m.Lock()
defer m.m.Unlock()
fd := p.ExitFD()
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP,
}
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
EpollFdCounter.Inc(1)
m.receivers[fd] = p
return nil
}
// MonitorOOM adds a container to the list of the ones monitored for OOM
func (m *Monitor) MonitorOOM(c runtime.Container) error {
m.m.Lock()
defer m.m.Unlock()
o, err := c.OOM()
if err != nil {
return err
}
fd := o.FD()
event := syscall.EpollEvent{
Fd: int32(fd),
Events: syscall.EPOLLHUP | syscall.EPOLLIN,
}
if err := archutils.EpollCtl(m.epollFd, syscall.EPOLL_CTL_ADD, fd, &event); err != nil {
return err
}
EpollFdCounter.Inc(1)
m.receivers[fd] = o
return nil
}
// Close cleans up resources allocated by NewMonitor()
func (m *Monitor) Close() error {
return syscall.Close(m.epollFd)
}
func (m *Monitor) processEvent(fd int, event uint32) {
m.m.Lock()
r := m.receivers[fd]
switch t := r.(type) {
case runtime.Process:
if event == syscall.EPOLLHUP {
delete(m.receivers, fd)
if err := syscall.EpollCtl(m.epollFd, syscall.EPOLL_CTL_DEL, fd, &syscall.EpollEvent{
Events: syscall.EPOLLHUP,
Fd: int32(fd),
}); err != nil {
logrus.WithField("error", err).Error("containerd: epoll remove fd")
}
if err := t.Close(); err != nil {
logrus.WithField("error", err).Error("containerd: close process IO")
}
EpollFdCounter.Dec(1)
// defer until lock is released
defer func() {
m.exits <- t
}()
}
case runtime.OOM:
// always flush the event fd
t.Flush()
if t.Removed() {
delete(m.receivers, fd)
// epoll will remove the fd from its set after it has been closed
t.Close()
EpollFdCounter.Dec(1)
} else {
// defer until lock is released
defer func() {
m.ooms <- t.ContainerID()
}()
}
}
// This cannot be a defer to avoid a deadlock in case the channels
// above get full
m.m.Unlock()
}
func (m *Monitor) start() {
var events [128]syscall.EpollEvent
for {
n, err := archutils.EpollWait(m.epollFd, events[:], -1)
if err != nil {
if err == syscall.EINTR {
continue
}
logrus.WithField("error", err).Fatal("containerd: epoll wait")
}
// process events
for i := 0; i < n; i++ {
m.processEvent(int(events[i].Fd), events[i].Events)
}
}
}