-
Notifications
You must be signed in to change notification settings - Fork 51
/
processor.go
201 lines (159 loc) · 5.48 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package linuxmonitor
import (
"errors"
"fmt"
"strconv"
"strings"
"go.uber.org/zap"
"github.com/aporeto-inc/trireme/collector"
"github.com/aporeto-inc/trireme/monitor"
"github.com/aporeto-inc/trireme/monitor/contextstore"
"github.com/aporeto-inc/trireme/monitor/linuxmonitor/cgnetcls"
"github.com/aporeto-inc/trireme/monitor/rpcmonitor"
)
// LinuxProcessor captures all the monitor processor information
// It implements the MonitorProcessor interface of the rpc monitor
type LinuxProcessor struct {
collector collector.EventCollector
puHandler monitor.ProcessingUnitsHandler
metadataExtractor rpcmonitor.RPCMetadataExtractor
netcls cgnetcls.Cgroupnetcls
contextStore contextstore.ContextStore
storePath string
}
// NewLinuxProcessor initializes a processor
func NewLinuxProcessor(collector collector.EventCollector, puHandler monitor.ProcessingUnitsHandler, metadataExtractor rpcmonitor.RPCMetadataExtractor, releasePath string) *LinuxProcessor {
storePath := "/var/run/trireme"
return &LinuxProcessor{
collector: collector,
puHandler: puHandler,
metadataExtractor: metadataExtractor,
netcls: cgnetcls.NewCgroupNetController(releasePath),
contextStore: contextstore.NewContextStore(storePath),
storePath: storePath,
}
}
// Create handles create events
func (s *LinuxProcessor) Create(eventInfo *rpcmonitor.EventInfo) error {
contextID, err := generateContextID(eventInfo)
if err != nil {
return fmt.Errorf("Couldn't generate a contextID: %s", err)
}
return s.puHandler.HandlePUEvent(contextID, monitor.EventCreate)
}
// Start handles start events
func (s *LinuxProcessor) Start(eventInfo *rpcmonitor.EventInfo) error {
contextID, err := generateContextID(eventInfo)
if err != nil {
return err
}
runtimeInfo, err := s.metadataExtractor(eventInfo)
if err != nil {
return err
}
if err = s.puHandler.SetPURuntime(contextID, runtimeInfo); err != nil {
return err
}
defaultIP, _ := runtimeInfo.DefaultIPAddress()
if perr := s.puHandler.HandlePUEvent(contextID, monitor.EventStart); perr != nil {
zap.L().Error("Failed to activate process", zap.Error(perr))
return perr
}
//It is okay to launch this so let us create a cgroup for it
err = s.netcls.Creategroup(eventInfo.PUID)
if err != nil {
return err
}
markval, ok := runtimeInfo.Options().Get(cgnetcls.CgroupMarkTag)
if !ok {
if derr := s.netcls.DeleteCgroup(eventInfo.PUID); derr != nil {
zap.L().Warn("Failed to clean cgroup", zap.Error(derr))
}
return errors.New("Mark value not found")
}
mark, _ := strconv.ParseUint(markval, 10, 32)
err = s.netcls.AssignMark(eventInfo.PUID, mark)
if err != nil {
if derr := s.netcls.DeleteCgroup(eventInfo.PUID); derr != nil {
zap.L().Warn("Failed to clean cgroup", zap.Error(derr))
}
return err
}
pid, _ := strconv.Atoi(eventInfo.PID)
err = s.netcls.AddProcess(eventInfo.PUID, pid)
if err != nil {
if derr := s.netcls.DeleteCgroup(eventInfo.PUID); derr != nil {
zap.L().Warn("Failed to clean cgroup", zap.Error(derr))
}
return err
}
s.collector.CollectContainerEvent(&collector.ContainerRecord{
ContextID: contextID,
IPAddress: defaultIP,
Tags: runtimeInfo.Tags(),
Event: collector.ContainerStart,
})
// Store the state in the context store for future access
return s.contextStore.StoreContext(contextID, eventInfo)
}
// Stop handles a stop event
func (s *LinuxProcessor) Stop(eventInfo *rpcmonitor.EventInfo) error {
contextID, err := generateContextID(eventInfo)
if err != nil {
return fmt.Errorf("Couldn't generate a contextID: %s", err)
}
if !strings.HasPrefix(contextID, cgnetcls.TriremeBasePath) || contextID == cgnetcls.TriremeBasePath {
return nil
}
contextID = contextID[strings.LastIndex(contextID, "/"):]
return s.puHandler.HandlePUEvent(contextID, monitor.EventStop)
}
// Destroy handles a destroy event
func (s *LinuxProcessor) Destroy(eventInfo *rpcmonitor.EventInfo) error {
contextID, err := generateContextID(eventInfo)
if err != nil {
return fmt.Errorf("Couldn't generate a contextID: %s", err)
}
if !strings.HasPrefix(contextID, cgnetcls.TriremeBasePath) || contextID == cgnetcls.TriremeBasePath {
return nil
}
contextID = contextID[strings.LastIndex(contextID, "/"):]
contextStoreHdl := s.contextStore
s.netcls.Deletebasepath(contextID)
// Send the event upstream
if err := s.puHandler.HandlePUEvent(contextID, monitor.EventDestroy); err != nil {
zap.L().Warn("Failed to clean trireme ",
zap.String("contextID", contextID),
zap.Error(err),
)
}
//let us remove the cgroup files now
if err := s.netcls.DeleteCgroup(contextID); err != nil {
zap.L().Warn("Failed to clean netcls group",
zap.String("contextID", contextID),
zap.Error(err),
)
}
if err := contextStoreHdl.RemoveContext(contextID); err != nil {
zap.L().Warn("Failed to clean cache while destroying process",
zap.String("contextID", contextID),
zap.Error(err),
)
}
return nil
}
// Pause handles a pause event
func (s *LinuxProcessor) Pause(eventInfo *rpcmonitor.EventInfo) error {
contextID, err := generateContextID(eventInfo)
if err != nil {
return fmt.Errorf("Couldn't generate a contextID: %s", err)
}
return s.puHandler.HandlePUEvent(contextID, monitor.EventPause)
}
// generateContextID creates the contextID from the event information
func generateContextID(eventInfo *rpcmonitor.EventInfo) (string, error) {
if eventInfo.PUID == "" {
return "", fmt.Errorf("PUID is empty from eventInfo")
}
return eventInfo.PUID, nil
}