/
subnet.go
254 lines (233 loc) · 9.29 KB
/
subnet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package subnet
import (
"context"
"fmt"
"net/http"
"sort"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/mitchellh/hashstructure/v2"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
"knative.dev/pkg/logging"
"github.com/aws/karpenter/pkg/apis/v1alpha1"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/utils/functional"
"github.com/aws/karpenter-core/pkg/utils/pretty"
)
type Provider struct {
sync.RWMutex
ec2api ec2iface.EC2API
cache *cache.Cache
cm *pretty.ChangeMonitor
inflightIPs map[string]int64
}
func NewProvider(ec2api ec2iface.EC2API, cache *cache.Cache) *Provider {
return &Provider{
ec2api: ec2api,
cm: pretty.NewChangeMonitor(),
// TODO: Remove cache for v1beta1, utilize resolved subnet from the AWSNodeTemplate.status
// Subnets are sorted on AvailableIpAddressCount, descending order
cache: cache,
// inflightIPs is used to track IPs from known launched instances
inflightIPs: map[string]int64{},
}
}
func (p *Provider) List(ctx context.Context, nodeTemplate *v1alpha1.AWSNodeTemplate) ([]*ec2.Subnet, error) {
p.Lock()
defer p.Unlock()
filters := getFilters(nodeTemplate)
if len(filters) == 0 {
return []*ec2.Subnet{}, nil
}
hash, err := hashstructure.Hash(filters, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
if err != nil {
return nil, err
}
if subnets, ok := p.cache.Get(fmt.Sprint(hash)); ok {
return subnets.([]*ec2.Subnet), nil
}
output, err := p.ec2api.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{Filters: filters})
if err != nil {
return nil, fmt.Errorf("describing subnets %s, %w", pretty.Concise(filters), err)
}
p.cache.SetDefault(fmt.Sprint(hash), output.Subnets)
// remove any previously tracked IP addresses since we just refreshed from EC2
for _, subnet := range output.Subnets {
delete(p.inflightIPs, *subnet.SubnetId)
}
if p.cm.HasChanged(fmt.Sprintf("subnets/%s", nodeTemplate.Name), output.Subnets) {
logging.FromContext(ctx).
With("subnets", lo.Map(output.Subnets, func(s *ec2.Subnet, _ int) string {
return fmt.Sprintf("%s (%s)", aws.StringValue(s.SubnetId), aws.StringValue(s.AvailabilityZone))
})).
Debugf("discovered subnets")
}
return output.Subnets, nil
}
// CheckAnyPublicIPAssociations returns a bool indicating whether all referenced subnets assign public IPv4 addresses to EC2 instances created therein
func (p *Provider) CheckAnyPublicIPAssociations(ctx context.Context, nodeTemplate *v1alpha1.AWSNodeTemplate) (bool, error) {
subnets, err := p.List(ctx, nodeTemplate)
if err != nil {
return false, err
}
_, ok := lo.Find(subnets, func(s *ec2.Subnet) bool {
return aws.BoolValue(s.MapPublicIpOnLaunch)
})
return ok, nil
}
// ZonalSubnetsForLaunch returns a mapping of zone to the subnet with the most available IP addresses and deducts the passed ips from the available count
func (p *Provider) ZonalSubnetsForLaunch(ctx context.Context, nodeTemplate *v1alpha1.AWSNodeTemplate, instanceTypes []*cloudprovider.InstanceType, capacityType string) (map[string]*ec2.Subnet, error) {
subnets, err := p.List(ctx, nodeTemplate)
if err != nil {
return nil, err
}
if len(subnets) == 0 {
return nil, fmt.Errorf("no subnets matched selector %v", nodeTemplate.Spec.SubnetSelector)
}
p.Lock()
defer p.Unlock()
// sort subnets in ascending order of available IP addresses and populate map with most available subnet per AZ
zonalSubnets := map[string]*ec2.Subnet{}
sort.Slice(subnets, func(i, j int) bool {
iIPs := aws.Int64Value(subnets[i].AvailableIpAddressCount)
jIPs := aws.Int64Value(subnets[j].AvailableIpAddressCount)
// override ip count from ec2.Subnet if we've tracked launches
if ips, ok := p.inflightIPs[*subnets[i].SubnetId]; ok {
iIPs = ips
}
if ips, ok := p.inflightIPs[*subnets[j].SubnetId]; ok {
jIPs = ips
}
return iIPs < jIPs
})
for _, subnet := range subnets {
zonalSubnets[*subnet.AvailabilityZone] = subnet
}
for _, subnet := range zonalSubnets {
predictedIPsUsed := p.minPods(instanceTypes, *subnet.AvailabilityZone, capacityType)
prevIPs := *subnet.AvailableIpAddressCount
if trackedIPs, ok := p.inflightIPs[*subnet.SubnetId]; ok {
prevIPs = trackedIPs
}
p.inflightIPs[*subnet.SubnetId] = prevIPs - predictedIPsUsed
}
return zonalSubnets, nil
}
// UpdateInflightIPs is used to refresh the in-memory IP usage by adding back unused IPs after a CreateFleet response is returned
func (p *Provider) UpdateInflightIPs(createFleetInput *ec2.CreateFleetInput, createFleetOutput *ec2.CreateFleetOutput, instanceTypes []*cloudprovider.InstanceType,
subnets []*ec2.Subnet, capacityType string) {
p.Lock()
defer p.Unlock()
// Process the CreateFleetInput to pull out all the requested subnetIDs
fleetInputSubnets := lo.Compact(lo.Uniq(lo.FlatMap(createFleetInput.LaunchTemplateConfigs, func(req *ec2.FleetLaunchTemplateConfigRequest, _ int) []string {
return lo.Map(req.Overrides, func(override *ec2.FleetLaunchTemplateOverridesRequest, _ int) string {
if override == nil {
return ""
}
return lo.FromPtr(override.SubnetId)
})
})))
// Process the CreateFleetOutput to pull out all the fulfilled subnetIDs
var fleetOutputSubnets []string
if createFleetOutput != nil {
fleetOutputSubnets = lo.Compact(lo.Uniq(lo.Map(createFleetOutput.Instances, func(fleetInstance *ec2.CreateFleetInstance, _ int) string {
if fleetInstance == nil || fleetInstance.LaunchTemplateAndOverrides == nil || fleetInstance.LaunchTemplateAndOverrides.Overrides == nil {
return ""
}
return lo.FromPtr(fleetInstance.LaunchTemplateAndOverrides.Overrides.SubnetId)
})))
}
// Find the subnets that were included in the input but not chosen by Fleet, so we need to add the inflight IPs back to them
subnetIDsToAddBackIPs, _ := lo.Difference(fleetInputSubnets, fleetOutputSubnets)
// Aggregate all the cached subnets
cachedSubnets := lo.UniqBy(lo.Flatten(lo.MapToSlice(p.cache.Items(), func(_ string, item cache.Item) []*ec2.Subnet {
return item.Object.([]*ec2.Subnet)
})), func(subnet *ec2.Subnet) string { return *subnet.SubnetId })
// Update the inflight IP tracking of subnets stored in the cache that have not be synchronized since the initial
// deduction of IP addresses before the instance launch
for _, cachedSubnet := range cachedSubnets {
if !lo.Contains(subnetIDsToAddBackIPs, *cachedSubnet.SubnetId) {
continue
}
originalSubnet, ok := lo.Find(subnets, func(subnet *ec2.Subnet) bool {
return *subnet.SubnetId == *cachedSubnet.SubnetId
})
if !ok {
continue
}
// If the cached subnet IP address count hasn't changed from the original subnet used to
// launch the instance, then we need to update the tracked IPs
if *originalSubnet.AvailableIpAddressCount == *cachedSubnet.AvailableIpAddressCount {
// other IPs deducted were opportunistic and need to be readded since Fleet didn't pick those subnets to launch into
if ips, ok := p.inflightIPs[*originalSubnet.SubnetId]; ok {
minPods := p.minPods(instanceTypes, *originalSubnet.AvailabilityZone, capacityType)
p.inflightIPs[*originalSubnet.SubnetId] = ips + minPods
}
}
}
}
func (p *Provider) LivenessProbe(_ *http.Request) error {
p.Lock()
//nolint: staticcheck
p.Unlock()
return nil
}
func (p *Provider) minPods(instanceTypes []*cloudprovider.InstanceType, zone string, capacityType string) int64 {
// filter for instance types available in the zone and capacity type being requested
filteredInstanceTypes := lo.Filter(instanceTypes, func(it *cloudprovider.InstanceType, _ int) bool {
offering, ok := it.Offerings.Get(capacityType, zone)
if !ok {
return false
}
return offering.Available
})
if len(filteredInstanceTypes) == 0 {
return 0
}
// Get minimum pods to use when selecting a subnet and deducting what will be launched
pods, _ := lo.MinBy(filteredInstanceTypes, func(i *cloudprovider.InstanceType, j *cloudprovider.InstanceType) bool {
return i.Capacity.Pods().Cmp(*j.Capacity.Pods()) < 0
}).Capacity.Pods().AsInt64()
return pods
}
func getFilters(nodeTemplate *v1alpha1.AWSNodeTemplate) []*ec2.Filter {
var filters []*ec2.Filter
// Filter by subnet
for key, value := range nodeTemplate.Spec.SubnetSelector {
switch key {
case "aws-ids", "aws::ids":
filters = append(filters, &ec2.Filter{
Name: aws.String("subnet-id"),
Values: aws.StringSlice(functional.SplitCommaSeparatedString(value)),
})
default:
switch value {
case "*":
filters = append(filters, &ec2.Filter{
Name: aws.String("tag-key"),
Values: []*string{aws.String(key)},
})
default:
filters = append(filters, &ec2.Filter{
Name: aws.String(fmt.Sprintf("tag:%s", key)),
Values: aws.StringSlice(functional.SplitCommaSeparatedString(value)),
})
}
}
}
return filters
}