/
instances.go
216 lines (185 loc) · 7.28 KB
/
instances.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Copyright 2019 Authors of Cilium
// Copyright 2017 Lyft, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eni
import (
"context"
"time"
eniTypes "github.com/cilium/cilium/pkg/aws/eni/types"
"github.com/cilium/cilium/pkg/aws/types"
"github.com/cilium/cilium/pkg/ipam"
ipamTypes "github.com/cilium/cilium/pkg/ipam/types"
v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/lock"
"github.com/sirupsen/logrus"
)
// EC2API is the API surface used of the EC2 API
type EC2API interface {
GetInstances(ctx context.Context, vpcs ipamTypes.VirtualNetworkMap, subnets ipamTypes.SubnetMap) (*ipamTypes.InstanceMap, error)
GetSubnets(ctx context.Context) (ipamTypes.SubnetMap, error)
GetVpcs(ctx context.Context) (ipamTypes.VirtualNetworkMap, error)
GetSecurityGroups(ctx context.Context) (types.SecurityGroupMap, error)
CreateNetworkInterface(ctx context.Context, toAllocate int32, subnetID, desc string, groups []string) (string, *eniTypes.ENI, error)
AttachNetworkInterface(ctx context.Context, index int32, instanceID, eniID string) (string, error)
DeleteNetworkInterface(ctx context.Context, eniID string) error
ModifyNetworkInterface(ctx context.Context, eniID, attachmentID string, deleteOnTermination bool) error
AssignPrivateIpAddresses(ctx context.Context, eniID string, addresses int32) error
UnassignPrivateIpAddresses(ctx context.Context, eniID string, addresses []string) error
}
// InstancesManager maintains the list of instances. It must be kept up to date
// by calling resync() regularly.
type InstancesManager struct {
mutex lock.RWMutex
instances *ipamTypes.InstanceMap
subnets ipamTypes.SubnetMap
vpcs ipamTypes.VirtualNetworkMap
securityGroups types.SecurityGroupMap
api EC2API
}
// NewInstancesManager returns a new instances manager
func NewInstancesManager(api EC2API) *InstancesManager {
return &InstancesManager{
instances: ipamTypes.NewInstanceMap(),
api: api,
}
}
// CreateNode is called on discovery of a new node and returns the ENI node
// allocation implementation for the new node
func (m *InstancesManager) CreateNode(obj *v2.CiliumNode, n *ipam.Node) ipam.NodeOperations {
return NewNode(n, obj, m)
}
// GetPoolQuota returns the number of available IPs in all IP pools
func (m *InstancesManager) GetPoolQuota() ipamTypes.PoolQuotaMap {
pool := ipamTypes.PoolQuotaMap{}
for subnetID, subnet := range m.GetSubnets(context.TODO()) {
pool[ipamTypes.PoolID(subnetID)] = ipamTypes.PoolQuota{
AvailabilityZone: subnet.AvailabilityZone,
AvailableIPs: subnet.AvailableAddresses,
}
}
return pool
}
// GetSubnet returns the subnet by subnet ID
//
// The returned subnet is immutable so it can be safely accessed
func (m *InstancesManager) GetSubnet(subnetID string) *ipamTypes.Subnet {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.subnets[subnetID]
}
// GetSubnets returns all the tracked subnets
//
// The returned subnetMap is immutable so it can be safely accessed
func (m *InstancesManager) GetSubnets(ctx context.Context) ipamTypes.SubnetMap {
m.mutex.RLock()
defer m.mutex.RUnlock()
subnetsCopy := make(ipamTypes.SubnetMap)
for k, v := range m.subnets {
subnetsCopy[k] = v
}
return subnetsCopy
}
// FindSubnetByTags returns the subnet with the most addresses matching VPC ID,
// availability zone and all required tags
//
// The returned subnet is immutable so it can be safely accessed
func (m *InstancesManager) FindSubnetByTags(vpcID, availabilityZone string, required ipamTypes.Tags) (bestSubnet *ipamTypes.Subnet) {
m.mutex.RLock()
defer m.mutex.RUnlock()
for _, s := range m.subnets {
if s.VirtualNetworkID == vpcID && s.AvailabilityZone == availabilityZone && s.Tags.Match(required) {
if bestSubnet == nil || bestSubnet.AvailableAddresses < s.AvailableAddresses {
bestSubnet = s
}
}
}
return
}
// FindSecurityGroupByTags returns the security groups matching VPC ID and all required tags
//
// The returned security groups slice is immutable so it can be safely accessed
func (m *InstancesManager) FindSecurityGroupByTags(vpcID string, required ipamTypes.Tags) []*types.SecurityGroup {
m.mutex.RLock()
defer m.mutex.RUnlock()
securityGroups := []*types.SecurityGroup{}
for _, securityGroup := range m.securityGroups {
if securityGroup.VpcID == vpcID && securityGroup.Tags.Match(required) {
securityGroups = append(securityGroups, securityGroup)
}
}
return securityGroups
}
// Resync fetches the list of EC2 instances and subnets and updates the local
// cache in the instanceManager. It returns the time when the resync has
// started or time.Time{} if it did not complete.
func (m *InstancesManager) Resync(ctx context.Context) time.Time {
resyncStart := time.Now()
vpcs, err := m.api.GetVpcs(ctx)
if err != nil {
log.WithError(err).Warning("Unable to synchronize EC2 VPC list")
return time.Time{}
}
subnets, err := m.api.GetSubnets(ctx)
if err != nil {
log.WithError(err).Warning("Unable to retrieve EC2 subnets list")
return time.Time{}
}
securityGroups, err := m.api.GetSecurityGroups(ctx)
if err != nil {
log.WithError(err).Warning("Unable to retrieve EC2 security group list")
return time.Time{}
}
instances, err := m.api.GetInstances(ctx, vpcs, subnets)
if err != nil {
log.WithError(err).Warning("Unable to synchronize EC2 interface list")
return time.Time{}
}
log.WithFields(logrus.Fields{
"numInstances": instances.NumInstances(),
"numVPCs": len(vpcs),
"numSubnets": len(subnets),
"numSecurityGroups": len(securityGroups),
}).Info("Synchronized ENI information")
m.mutex.Lock()
m.instances = instances
m.subnets = subnets
m.vpcs = vpcs
m.securityGroups = securityGroups
m.mutex.Unlock()
return resyncStart
}
// UpdateENI updates the ENI definition of an ENI for a particular instance. If
// the ENI is already known, the definition is updated, otherwise the ENI is
// added to the instance.
func (m *InstancesManager) UpdateENI(instanceID string, eni *eniTypes.ENI) {
m.mutex.Lock()
eniRevision := ipamTypes.InterfaceRevision{Resource: eni}
m.instances.Update(instanceID, eniRevision)
m.mutex.Unlock()
}
// ForeachInstance will iterate over each instance inside `instances`, and call
// `fn`. This function is read-locked for the entire execution.
func (m *InstancesManager) ForeachInstance(instanceID string, fn ipamTypes.InterfaceIterator) {
// This is a safety net in case the InstanceID is not known for some
// reason. If we don't know the instanceID, we also can't derive the
// list of ENIs attached to this instance. Without this,
// ForeachInstance() would return the ENIs of all instances.
if instanceID == "" {
log.Error("BUG: Inconsistent CiliumNode state. The InstanceID is not known")
return
}
m.mutex.RLock()
defer m.mutex.RUnlock()
m.instances.ForeachInterface(instanceID, fn)
}