-
Notifications
You must be signed in to change notification settings - Fork 51
/
supervisorproxy.go
184 lines (151 loc) · 5.15 KB
/
supervisorproxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Package supervisorproxy package implements the supervisor interface and forwards the requests on this interface
// to a remote supervisor over an rpc call.
package supervisorproxy
import (
"fmt"
"sync"
"github.com/aporeto-inc/trireme/cache"
"github.com/aporeto-inc/trireme/collector"
"github.com/aporeto-inc/trireme/enforcer"
"github.com/aporeto-inc/trireme/enforcer/utils/fqconfig"
"github.com/aporeto-inc/trireme/enforcer/utils/rpcwrapper"
"github.com/aporeto-inc/trireme/policy"
"github.com/aporeto-inc/trireme/processmon"
)
//ProxyInfo is a struct used to store state for the remote launcher.
//it mirrors what is stored by the supervisor and also information to talk with the
// remote enforcer
type ProxyInfo struct {
versionTracker cache.DataStore
collector collector.EventCollector
filterQueue *fqconfig.FilterQueue
ExcludedIPs []string
prochdl processmon.ProcessManager
rpchdl rpcwrapper.RPCClient
initDone map[string]bool
sync.Mutex
}
//Supervise Calls Supervise on the remote supervisor
func (s *ProxyInfo) Supervise(contextID string, puInfo *policy.PUInfo) error {
s.Lock()
_, ok := s.initDone[contextID]
s.Unlock()
if !ok {
err := s.InitRemoteSupervisor(contextID, puInfo)
if err != nil {
return err
}
}
req := &rpcwrapper.Request{
Payload: &rpcwrapper.SuperviseRequestPayload{
ContextID: contextID,
ManagementID: puInfo.Policy.ManagementID(),
TriremeAction: puInfo.Policy.TriremeAction(),
ApplicationACLs: puInfo.Policy.ApplicationACLs(),
NetworkACLs: puInfo.Policy.NetworkACLs(),
PolicyIPs: puInfo.Policy.IPAddresses(),
Annotations: puInfo.Policy.Annotations(),
Identity: puInfo.Policy.Identity(),
ReceiverRules: puInfo.Policy.ReceiverRules(),
TransmitterRules: puInfo.Policy.TransmitterRules(),
ExcludedNetworks: puInfo.Policy.ExcludedNetworks(),
TriremeNetworks: puInfo.Policy.TriremeNetworks(),
},
}
if err := s.rpchdl.RemoteCall(contextID, "Server.Supervise", req, &rpcwrapper.Response{}); err != nil {
s.Lock()
delete(s.initDone, contextID)
s.Unlock()
return fmt.Errorf("Failed to send supervise command: context=%s error=%s", contextID, err)
}
return nil
}
// Unsupervise exported stops enforcing policy for the given IP.
func (s *ProxyInfo) Unsupervise(contextID string) error {
s.Lock()
delete(s.initDone, contextID)
s.Unlock()
s.prochdl.KillProcess(contextID)
return nil
}
// SetTargetNetworks sets the target networks in case of an update
func (s *ProxyInfo) SetTargetNetworks(networks []string) error {
s.Lock()
defer s.Unlock()
for contextID, done := range s.initDone {
if done {
request := &rpcwrapper.Request{
Payload: &rpcwrapper.InitSupervisorPayload{
TriremeNetworks: networks,
CaptureMethod: rpcwrapper.IPTables,
},
}
if err := s.rpchdl.RemoteCall(contextID, "Server.InitSupervisor", request, &rpcwrapper.Response{}); err != nil {
return fmt.Errorf("Failed to initialize remote supervisor: context=%s error=%s", contextID, err)
}
}
}
return nil
}
// Start This method does nothing and is implemented for completeness
// THe work done is done in the InitRemoteSupervisor method in the remote enforcer
func (s *ProxyInfo) Start() error {
return nil
}
//Stop This method does nothing
func (s *ProxyInfo) Stop() error {
for c := range s.initDone {
s.Unsupervise(c) // nolint
}
return nil
}
// NewProxySupervisor creates a new IptablesSupervisor launcher
func NewProxySupervisor(collector collector.EventCollector, enforcer enforcer.PolicyEnforcer, rpchdl rpcwrapper.RPCClient) (*ProxyInfo, error) {
if collector == nil {
return nil, fmt.Errorf("Collector cannot be nil")
}
if enforcer == nil {
return nil, fmt.Errorf("Enforcer cannot be nil")
}
s := &ProxyInfo{
versionTracker: cache.NewCache(),
collector: collector,
filterQueue: enforcer.GetFilterQueue(),
prochdl: processmon.GetProcessManagerHdl(),
rpchdl: rpchdl,
initDone: make(map[string]bool),
ExcludedIPs: []string{},
}
return s, nil
}
//InitRemoteSupervisor calls initsupervisor method on the remote
func (s *ProxyInfo) InitRemoteSupervisor(contextID string, puInfo *policy.PUInfo) error {
request := &rpcwrapper.Request{
Payload: &rpcwrapper.InitSupervisorPayload{
TriremeNetworks: puInfo.Policy.TriremeNetworks(),
CaptureMethod: rpcwrapper.IPTables,
},
}
if err := s.rpchdl.RemoteCall(contextID, "Server.InitSupervisor", request, &rpcwrapper.Response{}); err != nil {
return fmt.Errorf("Failed to initialize remote supervisor: context=%s error=%s", contextID, err)
}
s.Lock()
s.initDone[contextID] = true
s.Unlock()
return nil
}
//AddExcludedIPs call addexcluded ip on the remote supervisor
func (s *ProxyInfo) AddExcludedIPs(ips []string) error {
s.ExcludedIPs = ips
request := &rpcwrapper.Request{
Payload: &rpcwrapper.ExcludeIPRequestPayload{
IPs: ips,
},
}
for _, contextID := range s.rpchdl.ContextList() {
if err := s.rpchdl.RemoteCall(contextID, "Server.AddExcludedIP", request, &rpcwrapper.Response{}); err != nil {
return fmt.Errorf("Failed to add excluded IP list: context=%s error=%s", contextID, err)
}
}
return nil
}