/
instances.go
194 lines (167 loc) · 6.37 KB
/
instances.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package eni
import (
"context"
"time"
"github.com/sirupsen/logrus"
eniTypes "github.com/cilium/cilium/pkg/alibabacloud/eni/types"
"github.com/cilium/cilium/pkg/alibabacloud/types"
"github.com/cilium/cilium/pkg/ipam"
ipamTypes "github.com/cilium/cilium/pkg/ipam/types"
v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/lock"
)
// AlibabaCloudAPI is the API surface used of the ECS API
type AlibabaCloudAPI interface {
GetInstances(ctx context.Context, vpcs ipamTypes.VirtualNetworkMap, subnets ipamTypes.SubnetMap) (*ipamTypes.InstanceMap, error)
GetVSwitches(ctx context.Context) (ipamTypes.SubnetMap, error)
GetVPC(ctx context.Context, vpcID string) (*ipamTypes.VirtualNetwork, error)
GetVPCs(ctx context.Context) (ipamTypes.VirtualNetworkMap, error)
GetSecurityGroups(ctx context.Context) (types.SecurityGroupMap, error)
CreateNetworkInterface(ctx context.Context, secondaryPrivateIPCount int, vSwitchID string, groups []string, tags map[string]string) (string, *eniTypes.ENI, error)
AttachNetworkInterface(ctx context.Context, instanceID, eniID string) error
WaitENIAttached(ctx context.Context, eniID string) (string, error)
DeleteNetworkInterface(ctx context.Context, eniID string) error
AssignPrivateIPAddresses(ctx context.Context, eniID string, toAllocate int) ([]string, error)
UnassignPrivateIPAddresses(ctx context.Context, eniID string, addresses []string) error
}
// InstancesManager maintains the list of instances. It must be kept up to date
// by calling resync() regularly.
type InstancesManager struct {
mutex lock.RWMutex
instances *ipamTypes.InstanceMap
vSwitches ipamTypes.SubnetMap
vpcs ipamTypes.VirtualNetworkMap
securityGroups types.SecurityGroupMap
api AlibabaCloudAPI
}
// NewInstancesManager returns a new instances manager
func NewInstancesManager(api AlibabaCloudAPI) *InstancesManager {
return &InstancesManager{
instances: ipamTypes.NewInstanceMap(),
api: api,
}
}
// CreateNode
func (m *InstancesManager) CreateNode(obj *v2.CiliumNode, node *ipam.Node) ipam.NodeOperations {
return &Node{k8sObj: obj, manager: m, node: node, instanceID: node.InstanceID()}
}
// GetPoolQuota returns the number of available IPs in all IP pools
func (m *InstancesManager) GetPoolQuota() ipamTypes.PoolQuotaMap {
pool := ipamTypes.PoolQuotaMap{}
for subnetID, subnet := range m.GetVSwitches() {
pool[ipamTypes.PoolID(subnetID)] = ipamTypes.PoolQuota{
AvailabilityZone: subnet.AvailabilityZone,
AvailableIPs: subnet.AvailableAddresses,
}
}
return pool
}
// Resync fetches the list of ECS instances and vSwitches and updates the local
// cache in the instanceManager. It returns the time when the resync has
// started or time.Time{} if it did not complete.
func (m *InstancesManager) Resync(ctx context.Context) time.Time {
resyncStart := time.Now()
vpcs, err := m.api.GetVPCs(ctx)
if err != nil {
log.WithError(err).Warning("Unable to synchronize VPC list")
return time.Time{}
}
vSwitches, err := m.api.GetVSwitches(ctx)
if err != nil {
log.WithError(err).Warning("Unable to retrieve VPC vSwitches list")
return time.Time{}
}
securityGroups, err := m.api.GetSecurityGroups(ctx)
if err != nil {
log.WithError(err).Warning("Unable to retrieve ECS security group list")
return time.Time{}
}
instances, err := m.api.GetInstances(ctx, vpcs, vSwitches)
if err != nil {
log.WithError(err).Warning("Unable to synchronize ECS interface list")
return time.Time{}
}
log.WithFields(logrus.Fields{
"numInstances": instances.NumInstances(),
"numVPCs": len(vpcs),
"numVSwitches": len(vSwitches),
"numSecurityGroups": len(securityGroups),
}).Info("Synchronized ENI information")
m.mutex.Lock()
m.instances = instances
m.vSwitches = vSwitches
m.vpcs = vpcs
m.securityGroups = securityGroups
m.mutex.Unlock()
return resyncStart
}
// GetVSwitches returns all the tracked vSwitches
// The returned subnetMap is immutable so it can be safely accessed
func (m *InstancesManager) GetVSwitches() ipamTypes.SubnetMap {
m.mutex.RLock()
defer m.mutex.RUnlock()
subnetsCopy := make(ipamTypes.SubnetMap)
for k, v := range m.vSwitches {
subnetsCopy[k] = v
}
return subnetsCopy
}
// GetVSwitch return vSwitch by id
func (m *InstancesManager) GetVSwitch(id string) *ipamTypes.Subnet {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.vSwitches[id]
}
// ForeachInstance will iterate over each instance inside `instances`, and call
// `fn`. This function is read-locked for the entire execution.
func (m *InstancesManager) ForeachInstance(instanceID string, fn ipamTypes.InterfaceIterator) {
m.mutex.RLock()
defer m.mutex.RUnlock()
m.instances.ForeachInterface(instanceID, fn)
}
// UpdateENI updates the ENI definition of an ENI for a particular instance. If
// the ENI is already known, the definition is updated, otherwise the ENI is
// added to the instance.
func (m *InstancesManager) UpdateENI(instanceID string, eni *eniTypes.ENI) {
m.mutex.Lock()
defer m.mutex.Unlock()
eniRevision := ipamTypes.InterfaceRevision{Resource: eni}
m.instances.Update(instanceID, eniRevision)
}
// FindOneVSwitch returns the vSwitch with the fewest available addresses, matching vpc, az and tags
func (m *InstancesManager) FindOneVSwitch(vpc, az string, toAllocate int, required ipamTypes.Tags) *ipamTypes.Subnet {
var bestSubnet *ipamTypes.Subnet
for _, vSwitch := range m.GetVSwitches() {
if vSwitch.VirtualNetworkID != vpc {
continue
}
if vSwitch.AvailabilityZone != az {
continue
}
if vSwitch.AvailableAddresses < toAllocate {
continue
}
if !vSwitch.Tags.Match(required) {
continue
}
if bestSubnet == nil || bestSubnet.AvailableAddresses > vSwitch.AvailableAddresses {
bestSubnet = vSwitch
}
}
return bestSubnet
}
// FindSecurityGroupByTags returns the security groups matching VPC ID and all required tags
// The returned security groups slice is immutable so it can be safely accessed
func (m *InstancesManager) FindSecurityGroupByTags(vpcID string, required ipamTypes.Tags) []*types.SecurityGroup {
m.mutex.RLock()
defer m.mutex.RUnlock()
securityGroups := []*types.SecurityGroup{}
for _, securityGroup := range m.securityGroups {
if securityGroup.VPCID == vpcID && securityGroup.Tags.Match(required) {
securityGroups = append(securityGroups, securityGroup)
}
}
return securityGroups
}