-
Notifications
You must be signed in to change notification settings - Fork 228
/
policymanager.go
228 lines (190 loc) · 7.57 KB
/
policymanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package policies
import (
"fmt"
"sync"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/util"
npmerrors "github.com/Azure/azure-container-networking/npm/util/errors"
"k8s.io/klog"
)
// PolicyManagerMode will be used in windows to decide if
// SetPolicies should be used or not
type PolicyManagerMode string
const (
// IPSetPolicyMode will references IPSets in policies
IPSetPolicyMode PolicyManagerMode = "IPSet"
// IPPolicyMode will replace ipset names with their value IPs in policies
// NOTE: this is currently unimplemented
IPPolicyMode PolicyManagerMode = "IP"
// this number is based on the implementation in chain-management_linux.go
// it represents the number of rules unrelated to policies
// it's technically 3 off when there are no policies since we flush the AZURE-NPM chain then
numLinuxBaseACLRules = 11
)
type PolicyManagerCfg struct {
// PolicyMode only affects Windows
PolicyMode PolicyManagerMode
// PlaceAzureChainFirst only affects Linux
PlaceAzureChainFirst bool
}
type PolicyMap struct {
sync.RWMutex
cache map[string]*NPMNetworkPolicy
}
type reconcileManager struct {
sync.Mutex
releaseLockSignal chan struct{}
}
// PolicyManager has two locks.
// The PolicyMap lock is used only in Windows to prevent concurrent write access to the PolicyMap
// from both the NetPol Controller thread and the PodController thread, accessed respectively from
// dataplane.AddPolicy()/dataplane.RemovePolicy(), and dataplane.ApplyDataplane() --> dataplane.updatePod().
// In Linux, the reconcileManager's lock is used to avoid iptables contention for adding/removing policies versus
// background cleanup of stale, ineffective chains.
type PolicyManager struct {
policyMap *PolicyMap
ioShim *common.IOShim
staleChains *staleChains
reconcileManager *reconcileManager
*PolicyManagerCfg
}
func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManager {
return &PolicyManager{
policyMap: &PolicyMap{
cache: make(map[string]*NPMNetworkPolicy),
},
ioShim: ioShim,
staleChains: newStaleChains(),
reconcileManager: &reconcileManager{
releaseLockSignal: make(chan struct{}, 1),
},
PolicyManagerCfg: cfg,
}
}
func (pMgr *PolicyManager) Bootup(epIDs []string) error {
metrics.ResetNumACLRules()
if err := pMgr.bootup(epIDs); err != nil {
// NOTE: in Linux, Prometheus metrics may be off at this point since some ACL rules may have been applied successfully
metrics.SendErrorLogAndMetric(util.IptmID, "error: failed to bootup policy manager: %s", err.Error())
return npmerrors.ErrorWrapper(npmerrors.BootupPolicyMgr, false, "failed to bootup policy manager", err)
}
if !util.IsWindowsDP() {
// update Prometheus metrics on success
metrics.IncNumACLRulesBy(numLinuxBaseACLRules)
}
return nil
}
func (pMgr *PolicyManager) Reconcile() {
pMgr.reconcile()
}
func (pMgr *PolicyManager) PolicyExists(policyKey string) bool {
pMgr.policyMap.RLock()
defer pMgr.policyMap.RUnlock()
_, ok := pMgr.policyMap.cache[policyKey]
return ok
}
func (pMgr *PolicyManager) GetPolicy(policyKey string) (*NPMNetworkPolicy, bool) {
pMgr.policyMap.RLock()
defer pMgr.policyMap.RUnlock()
policy, ok := pMgr.policyMap.cache[policyKey]
return policy, ok
}
func (pMgr *PolicyManager) AddPolicy(policy *NPMNetworkPolicy, endpointList map[string]string) error {
if len(policy.ACLs) == 0 {
klog.Infof("[DataPlane] No ACLs in policy %s to apply", policy.PolicyKey)
return nil
}
NormalizePolicy(policy)
if err := ValidatePolicy(policy); err != nil {
msg := fmt.Sprintf("failed to validate policy: %s", err.Error())
metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg)
return npmerrors.Errorf(npmerrors.AddPolicy, false, msg)
}
pMgr.policyMap.Lock()
defer pMgr.policyMap.Unlock()
// Call actual dataplane function to apply changes
timer := metrics.StartNewTimer()
err := pMgr.addPolicy(policy, endpointList)
metrics.RecordACLRuleExecTime(timer) // record execution time regardless of failure
if err != nil {
// NOTE: in Linux, Prometheus metrics may be off at this point since some ACL rules may have been applied successfully
// In Windows, Prometheus metrics may be off at this point since we don't know how many endpoints had rules applied successfully.
msg := fmt.Sprintf("failed to add policy: %s", err.Error())
metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg)
return npmerrors.Errorf(npmerrors.AddPolicy, false, msg)
}
// update Prometheus metrics on success
numEndpoints := 1
if util.IsWindowsDP() {
numEndpoints = len(endpointList)
}
metrics.IncNumACLRulesBy(policy.numACLRulesProducedInKernel() * numEndpoints)
pMgr.policyMap.cache[policy.PolicyKey] = policy
return nil
}
func (pMgr *PolicyManager) isFirstPolicy() bool {
return len(pMgr.policyMap.cache) == 0
}
func (pMgr *PolicyManager) RemovePolicy(policyKey string) error {
policy, ok := pMgr.GetPolicy(policyKey)
if !ok {
return nil
}
if len(policy.ACLs) == 0 {
klog.Infof("[DataPlane] No ACLs in policy %s to remove", policyKey)
return nil
}
pMgr.policyMap.Lock()
defer pMgr.policyMap.Unlock()
// used for Prometheus metrics later
numEndpointsBefore := len(policy.PodEndpoints)
// Call actual dataplane function to apply changes
err := pMgr.removePolicy(policy, nil)
// currently we only have acl rule exec time for "adding" rules, so we skip recording here
if err != nil {
// NOTE: in Linux, Prometheus metrics may be off at this point since some ACL rules may have been applied successfully.
// In Windows, Prometheus metrics may be off at this point since we don't know how many endpoints had rules applied successfully.
msg := fmt.Sprintf("failed to remove policy: %s", err.Error())
metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg)
return npmerrors.Errorf(npmerrors.RemovePolicy, false, msg)
}
// update Prometheus metrics on success
numEndpointsRemoved := 1
if util.IsWindowsDP() {
numEndpointsRemoved = numEndpointsBefore - len(policy.PodEndpoints)
}
metrics.DecNumACLRulesBy(policy.numACLRulesProducedInKernel() * numEndpointsRemoved)
// remove policy from cache
delete(pMgr.policyMap.cache, policyKey)
return nil
}
// RemovePolicyForEndpoints is identical to RemovePolicy except it will not remove the policy from the cache.
// This function is intended for Windows only.
func (pMgr *PolicyManager) RemovePolicyForEndpoints(policyKey string, endpointList map[string]string) error {
policy, ok := pMgr.GetPolicy(policyKey)
if !ok {
return nil
}
if len(policy.ACLs) == 0 {
klog.Infof("[DataPlane] No ACLs in policy %s to remove for endpoints", policyKey)
return nil
}
// Call actual dataplane function to apply changes
err := pMgr.removePolicy(policy, endpointList)
// currently we only have acl rule exec time for "adding" rules, so we skip recording here
if err != nil {
// NOTE: Prometheus metrics may be off at this point since we don't know how many endpoints had rules applied successfully.
msg := fmt.Sprintf("failed to remove policy. endpoints: [%+v]. err: [%s]", endpointList, err.Error())
metrics.SendErrorLogAndMetric(util.IptmID, "error: %s", msg)
return npmerrors.Errorf(npmerrors.RemovePolicy, false, msg)
}
// update Prometheus metrics on success
metrics.DecNumACLRulesBy(policy.numACLRulesProducedInKernel() * len(endpointList))
return nil
}
func (pMgr *PolicyManager) isLastPolicy() bool {
// if we change our code to delete more than one policy at once, we can specify numPoliciesToDelete as an argument
numPoliciesToDelete := 1
return len(pMgr.policyMap.cache) == numPoliciesToDelete
}