-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
cidr.go
238 lines (214 loc) · 8.24 KB
/
cidr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package ipcache
import (
"context"
"net"
"net/netip"
"strings"
"github.com/sirupsen/logrus"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/ip"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/labels/cidr"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/metrics"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/source"
)
// AllocateCIDRs attempts to allocate identities for a list of CIDRs. If any
// allocation fails, all allocations are rolled back and the error is returned.
// When an identity is freshly allocated for a CIDR, it is added to the
// ipcache if 'newlyAllocatedIdentities' is 'nil', otherwise the newly allocated
// identities are placed in 'newlyAllocatedIdentities' and it is the caller's
// responsibility to upsert them into ipcache by calling UpsertGeneratedIdentities().
//
// Previously used numeric identities for the given prefixes may be passed in as the
// 'oldNIDs' parameter; nil slice must be passed if no previous numeric identities exist.
// Previously used NID is allocated if still available. Non-availability is not an error.
//
// Upon success, the caller must also arrange for the resulting identities to
// be released via a subsequent call to ReleaseCIDRIdentitiesByCIDR().
func (ipc *IPCache) AllocateCIDRs(
prefixes []netip.Prefix, oldNIDs []identity.NumericIdentity, newlyAllocatedIdentities map[netip.Prefix]*identity.Identity,
) ([]*identity.Identity, error) {
// maintain list of used identities to undo on error
usedIdentities := make([]*identity.Identity, 0, len(prefixes))
// Maintain list of newly allocated identities to update ipcache,
// but upsert them to ipcache only if no map was given by the caller.
upsert := false
if newlyAllocatedIdentities == nil {
upsert = true
newlyAllocatedIdentities = map[netip.Prefix]*identity.Identity{}
}
allocateCtx, cancel := context.WithTimeout(context.Background(), option.Config.IPAllocationTimeout)
defer cancel()
ipc.metadata.RLock()
ipc.Lock()
allocatedIdentities := make(map[netip.Prefix]*identity.Identity, len(prefixes))
for i, prefix := range prefixes {
info := ipc.metadata.getLocked(prefix)
oldNID := identity.InvalidIdentity
if oldNIDs != nil && len(oldNIDs) > i {
oldNID = oldNIDs[i]
}
id, isNew, err := ipc.resolveIdentity(allocateCtx, prefix, info, oldNID)
if err != nil {
ipc.IdentityAllocator.ReleaseSlice(context.Background(), usedIdentities)
ipc.Unlock()
ipc.metadata.RUnlock()
return nil, err
}
usedIdentities = append(usedIdentities, id)
allocatedIdentities[prefix] = id
if isNew {
newlyAllocatedIdentities[prefix] = id
}
}
ipc.Unlock()
ipc.metadata.RUnlock()
// Only upsert into ipcache if identity wasn't allocated
// before and the caller does not care doing this
if upsert {
ipc.UpsertGeneratedIdentities(newlyAllocatedIdentities, nil)
}
identities := make([]*identity.Identity, 0, len(allocatedIdentities))
for _, id := range allocatedIdentities {
identities = append(identities, id)
}
return identities, nil
}
// AllocateCIDRsForIPs performs the same action as AllocateCIDRs but for IP
// addresses instead of CIDRs.
//
// Upon success, the caller must also arrange for the resulting identities to
// be released via a subsequent call to ReleaseCIDRIdentitiesByID().
func (ipc *IPCache) AllocateCIDRsForIPs(
prefixes []net.IP, newlyAllocatedIdentities map[netip.Prefix]*identity.Identity,
) ([]*identity.Identity, error) {
return ipc.AllocateCIDRs(ip.IPsToNetPrefixes(prefixes), nil, newlyAllocatedIdentities)
}
func cidrLabelToPrefix(id *identity.Identity) (prefix netip.Prefix, ok bool) {
var err error
label := id.CIDRLabel.String()
if !strings.HasPrefix(label, labels.LabelSourceCIDR) {
log.WithFields(logrus.Fields{
logfields.Identity: id.ID,
}).Warning("BUG: Attempting to upsert non-CIDR identity")
return
}
if prefix, err = netip.ParsePrefix(strings.TrimPrefix(label, labels.LabelSourceCIDR+":")); err != nil {
log.WithFields(logrus.Fields{
logfields.Identity: id.ID,
logfields.Labels: label,
}).Warning("BUG: Attempting to upsert identity with bad CIDR label")
return
}
return prefix, true
}
// UpsertGeneratedIdentities unconditionally upserts 'newlyAllocatedIdentities'
// into the ipcache, then also upserts any CIDR identities in 'usedIdentities'
// that were not already upserted. If any 'usedIdentities' are upserted, these
// are counted separately as they may provide an indication of another logic
// error elsewhere in the codebase that is causing premature ipcache deletions.
func (ipc *IPCache) UpsertGeneratedIdentities(newlyAllocatedIdentities map[netip.Prefix]*identity.Identity, usedIdentities []*identity.Identity) {
for prefix, id := range newlyAllocatedIdentities {
ipc.Upsert(prefix.String(), nil, 0, nil, Identity{
ID: id.ID,
Source: source.Generated,
})
}
if len(usedIdentities) == 0 {
return
}
toUpsert := make(map[netip.Prefix]*identity.Identity)
ipc.mutex.RLock()
for _, id := range usedIdentities {
prefix, ok := cidrLabelToPrefix(id)
if !ok {
continue
}
if _, ok := ipc.LookupByIPRLocked(prefix.String()); ok {
// Already there; continue
continue
}
toUpsert[prefix] = id
}
ipc.mutex.RUnlock()
for prefix, id := range toUpsert {
metrics.IPCacheErrorsTotal.WithLabelValues(
metricTypeRecover, metricErrorUnexpected,
).Inc()
ipc.Upsert(prefix.String(), nil, 0, nil, Identity{
ID: id.ID,
Source: source.Generated,
})
}
}
func (ipc *IPCache) releaseCIDRIdentities(ctx context.Context, prefixes []netip.Prefix) {
// Create a critical section for identity release + removal from ipcache.
// Otherwise, it's possible to trigger the following race condition:
//
// Goroutine 1 | Goroutine 2
// releaseCIDRIdentities() | AllocateCIDRs()
// -> Release(..., id, ...) |
// | -> allocate(...)
// | -> ipc.UpsertGeneratedIdentities(...)
// -> ipc.deleteLocked(...) |
//
// In this case, the expectation from Goroutine 2 is that an identity
// is allocated and that identity is in the ipcache, but the result
// is that the identity is allocated but the ipcache entry is missing.
ipc.Lock()
defer ipc.Unlock()
toDelete := make([]netip.Prefix, 0, len(prefixes))
for _, prefix := range prefixes {
lbls := cidr.GetCIDRLabels(prefix)
id := ipc.IdentityAllocator.LookupIdentity(ctx, lbls)
if id == nil {
log.Errorf("Unable to find identity of previously used CIDR %s", prefix.String())
continue
}
released, err := ipc.IdentityAllocator.Release(ctx, id, false)
if err != nil {
log.WithFields(logrus.Fields{
logfields.Identity: id,
logfields.CIDR: prefix,
}).WithError(err).Warning("Unable to release CIDR identity. Ignoring error. Identity may be leaked")
}
if released {
toDelete = append(toDelete, prefix)
}
}
for _, prefix := range toDelete {
ipc.deleteLocked(prefix.String(), source.Generated)
}
}
// ReleaseCIDRIdentitiesByCIDR releases the identities of a list of CIDRs.
// When the last use of the identity is released, the ipcache entry is deleted.
func (ipc *IPCache) ReleaseCIDRIdentitiesByCIDR(prefixes []netip.Prefix) {
ipc.deferredPrefixRelease.enqueue(prefixes, "cidr-prefix-release")
}
// ReleaseCIDRIdentitiesByID releases the specified identities.
// When the last use of the identity is released, the ipcache entry is deleted.
func (ipc *IPCache) ReleaseCIDRIdentitiesByID(ctx context.Context, identities []identity.NumericIdentity) {
prefixes := make([]netip.Prefix, 0, len(identities))
for _, nid := range identities {
if id := ipc.IdentityAllocator.LookupIdentityByID(ctx, nid); id != nil {
prefix, ok := cidrLabelToPrefix(id)
if !ok {
log.WithFields(logrus.Fields{
logfields.Identity: nid,
logfields.Labels: id.Labels,
}).Warn("Unexpected release of non-CIDR identity, will leak this identity. Please report this issue to the developers.")
continue
}
prefixes = append(prefixes, prefix)
} else {
log.WithFields(logrus.Fields{
logfields.Identity: nid,
}).Warn("Unexpected release of numeric identity that is no longer allocated")
}
}
ipc.deferredPrefixRelease.enqueue(prefixes, "selector-prefix-release")
}