forked from kubernetes-sigs/aws-iam-authenticator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ec2provider.go
285 lines (258 loc) · 9.82 KB
/
ec2provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package ec2provider
import (
"errors"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/sirupsen/logrus"
"sigs.k8s.io/aws-iam-authenticator/pkg"
"sigs.k8s.io/aws-iam-authenticator/pkg/httputil"
"sigs.k8s.io/aws-iam-authenticator/pkg/metrics"
)
const (
// max limit of k8s nodes support
maxChannelSize = 8000
// max number of in flight non batched ec2:DescribeInstances request to flow
maxAllowedInflightRequest = 5
// default wait interval for the ec2 instance id request which is already in flight
defaultWaitInterval = 50 * time.Millisecond
// Making sure the single instance calls waits max till 5 seconds 100* (50 * time.Millisecond)
totalIterationForWaitInterval = 100
// Maximum number of instances with which ec2:DescribeInstances call will be made
maxInstancesBatchSize = 100
// Maximum time in Milliseconds to wait for a new batch call this also depends on if the instance size has
// already become 100 then it will not respect this limit
maxWaitIntervalForBatch = 200
)
// Get a node name from instance ID
type EC2Provider interface {
GetPrivateDNSName(string) (string, error)
StartEc2DescribeBatchProcessing()
}
type ec2PrivateDNSCache struct {
cache map[string]string
lock sync.RWMutex
}
type ec2Requests struct {
set map[string]bool
lock sync.RWMutex
}
type ec2ProviderImpl struct {
ec2 ec2iface.EC2API
privateDNSCache ec2PrivateDNSCache
ec2Requests ec2Requests
instanceIdsChannel chan string
}
func New(roleARN string, qps int, burst int) EC2Provider {
dnsCache := ec2PrivateDNSCache{
cache: make(map[string]string),
lock: sync.RWMutex{},
}
ec2Requests := ec2Requests{
set: make(map[string]bool),
lock: sync.RWMutex{},
}
return &ec2ProviderImpl{
ec2: ec2.New(newSession(roleARN, qps, burst)),
privateDNSCache: dnsCache,
ec2Requests: ec2Requests,
instanceIdsChannel: make(chan string, maxChannelSize),
}
}
// Initial credentials loaded from SDK's default credential chain, such as
// the environment, shared credentials (~/.aws/credentials), or EC2 Instance
// Role.
func newSession(roleARN string, qps int, burst int) *session.Session {
sess := session.Must(session.NewSession())
sess.Handlers.Build.PushFrontNamed(request.NamedHandler{
Name: "authenticatorUserAgent",
Fn: request.MakeAddToUserAgentHandler(
"aws-iam-authenticator", pkg.Version),
})
if aws.StringValue(sess.Config.Region) == "" {
ec2metadata := ec2metadata.New(sess)
regionFound, err := ec2metadata.Region()
if err != nil {
logrus.WithError(err).Fatal("Region not found in shared credentials, environment variable, or instance metadata.")
}
sess.Config.Region = aws.String(regionFound)
}
if roleARN != "" {
logrus.WithFields(logrus.Fields{
"roleARN": roleARN,
}).Infof("Using assumed role for EC2 API")
rateLimitedClient, err := httputil.NewRateLimitedClient(qps, burst)
if err != nil {
logrus.Errorf("Getting error = %s while creating rate limited client ", err)
}
ap := &stscreds.AssumeRoleProvider{
Client: sts.New(sess, aws.NewConfig().WithHTTPClient(rateLimitedClient).WithSTSRegionalEndpoint(endpoints.RegionalSTSEndpoint)),
RoleARN: roleARN,
Duration: time.Duration(60) * time.Minute,
}
sess.Config.Credentials = credentials.NewCredentials(ap)
}
return sess
}
func (p *ec2ProviderImpl) setPrivateDNSNameCache(id string, privateDNSName string) {
p.privateDNSCache.lock.Lock()
defer p.privateDNSCache.lock.Unlock()
p.privateDNSCache.cache[id] = privateDNSName
}
func (p *ec2ProviderImpl) setRequestInFlightForInstanceId(id string) {
p.ec2Requests.lock.Lock()
defer p.ec2Requests.lock.Unlock()
p.ec2Requests.set[id] = true
}
func (p *ec2ProviderImpl) unsetRequestInFlightForInstanceId(id string) {
p.ec2Requests.lock.Lock()
defer p.ec2Requests.lock.Unlock()
delete(p.ec2Requests.set, id)
}
func (p *ec2ProviderImpl) getRequestInFlightForInstanceId(id string) bool {
p.ec2Requests.lock.RLock()
defer p.ec2Requests.lock.RUnlock()
_, ok := p.ec2Requests.set[id]
return ok
}
func (p *ec2ProviderImpl) getRequestInFlightSize() int {
p.ec2Requests.lock.RLock()
defer p.ec2Requests.lock.RUnlock()
length := len(p.ec2Requests.set)
return length
}
// GetPrivateDNS looks up the private DNS from the EC2 API
func (p *ec2ProviderImpl) getPrivateDNSNameCache(id string) (string, error) {
p.privateDNSCache.lock.RLock()
defer p.privateDNSCache.lock.RUnlock()
name, ok := p.privateDNSCache.cache[id]
if ok {
return name, nil
}
return "", errors.New("instance id not found")
}
// Only calls API if its not in the cache
func (p *ec2ProviderImpl) GetPrivateDNSName(id string) (string, error) {
privateDNSName, err := p.getPrivateDNSNameCache(id)
if err == nil {
return privateDNSName, nil
}
logrus.Debugf("Missed the cache for the InstanceId = %s Verifying if its already in requestQueue ", id)
// check if the request for instanceId already in queue.
if p.getRequestInFlightForInstanceId(id) {
logrus.Debugf("Found the InstanceId:= %s request In Queue waiting in 5 seconds loop ", id)
for i := 0; i < totalIterationForWaitInterval; i++ {
time.Sleep(defaultWaitInterval)
privateDNSName, err := p.getPrivateDNSNameCache(id)
if err == nil {
return privateDNSName, nil
}
}
return "", fmt.Errorf("failed to find node %s in PrivateDNSNameCache returning from loop", id)
}
logrus.Debugf("Missed the requestQueue cache for the InstanceId = %s", id)
p.setRequestInFlightForInstanceId(id)
requestQueueLength := p.getRequestInFlightSize()
//The code verifies if the requestQuqueMap size is greater than max request in flight with rate
//limiting then writes to the channel where we are making batch ec2:DescribeInstances API call.
if requestQueueLength > maxAllowedInflightRequest {
logrus.Debugf("Writing to buffered channel for instance Id %s ", id)
p.instanceIdsChannel <- id
return p.GetPrivateDNSName(id)
}
logrus.Infof("Calling ec2:DescribeInstances for the InstanceId = %s ", id)
metrics.Get().EC2DescribeInstanceCallCount.Inc()
// Look up instance from EC2 API
output, err := p.ec2.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: aws.StringSlice([]string{id}),
})
if err != nil {
p.unsetRequestInFlightForInstanceId(id)
return "", fmt.Errorf("failed querying private DNS from EC2 API for node %s: %s ", id, err.Error())
}
for _, reservation := range output.Reservations {
for _, instance := range reservation.Instances {
if aws.StringValue(instance.InstanceId) == id {
privateDNSName = aws.StringValue(instance.PrivateDnsName)
p.setPrivateDNSNameCache(id, privateDNSName)
p.unsetRequestInFlightForInstanceId(id)
}
}
}
if privateDNSName == "" {
return "", fmt.Errorf("failed to find node %s ", id)
}
return privateDNSName, nil
}
func (p *ec2ProviderImpl) StartEc2DescribeBatchProcessing() {
startTime := time.Now()
var instanceIdList []string
for {
var instanceId string
select {
case instanceId = <-p.instanceIdsChannel:
logrus.Debugf("Received the Instance Id := %s from buffered Channel for batch processing ", instanceId)
instanceIdList = append(instanceIdList, instanceId)
default:
// Waiting for more elements to get added to the buffered Channel
// And to support the for select loop.
time.Sleep(20 * time.Millisecond)
}
endTime := time.Now()
/*
The if statement checks for empty list and ignores to make any ec2:Describe API call
If elements are less than 100 and time of 200 millisecond has elapsed it will make the
ec2:DescribeInstances call with as many elements in the list.
It is also possible that if the system gets more than 99 elements in the list in less than
200 milliseconds time it will the ec2:DescribeInstances call and that's our whole point of
optimization here. Also for FYI we have client level rate limiting which is what this
ec2:DescribeInstances call will make so this call is also rate limited.
*/
if (len(instanceIdList) > 0 && (endTime.Sub(startTime).Milliseconds()) > maxWaitIntervalForBatch) || len(instanceIdList) > maxInstancesBatchSize {
startTime = time.Now()
dupInstanceList := make([]string, len(instanceIdList))
copy(dupInstanceList, instanceIdList)
go p.getPrivateDnsAndPublishToCache(dupInstanceList)
instanceIdList = nil
}
}
}
func (p *ec2ProviderImpl) getPrivateDnsAndPublishToCache(instanceIdList []string) {
// Look up instance from EC2 API
logrus.Infof("Making Batch Query to DescribeInstances for %v instances ", len(instanceIdList))
metrics.Get().EC2DescribeInstanceCallCount.Inc()
output, err := p.ec2.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: aws.StringSlice(instanceIdList),
})
if err != nil {
logrus.Errorf("Batch call failed querying private DNS from EC2 API for nodes [%s] : with error = []%s ", instanceIdList, err.Error())
} else {
if output.NextToken != nil {
logrus.Debugf("Successfully got the batch result , output.NextToken = %s ", *output.NextToken)
} else {
logrus.Debugf("Successfully got the batch result , output.NextToken is nil ")
}
// Adding the result to privateDNSChache as well as removing from the requestQueueMap.
for _, reservation := range output.Reservations {
for _, instance := range reservation.Instances {
id := aws.StringValue(instance.InstanceId)
privateDNSName := aws.StringValue(instance.PrivateDnsName)
p.setPrivateDNSNameCache(id, privateDNSName)
}
}
}
logrus.Debugf("Removing instances from request Queue after getting response from Ec2")
for _, id := range instanceIdList {
p.unsetRequestInFlightForInstanceId(id)
}
}