-
Notifications
You must be signed in to change notification settings - Fork 262
/
sqs-monitor.go
420 lines (361 loc) · 14.9 KB
/
sqs-monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package sqsevent
import (
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/monitor"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/rs/zerolog/log"
"go.uber.org/multierr"
)
const (
// SQSMonitorKind is a const to define this monitor kind
SQSMonitorKind = "SQS_MONITOR"
// ASGTagName is the name of the instance tag whose value is the AutoScaling group name
ASGTagName = "aws:autoscaling:groupName"
ASGTerminatingLifecycleTransition = "autoscaling:EC2_INSTANCE_TERMINATING"
ASGLaunchingLifecycleTransition = "autoscaling:EC2_INSTANCE_LAUNCHING"
)
// SQSMonitor is a struct definition that knows how to process events from Amazon EventBridge
type SQSMonitor struct {
InterruptionChan chan<- monitor.InterruptionEvent
CancelChan chan<- monitor.InterruptionEvent
QueueURL string
SQS sqsiface.SQSAPI
ASG autoscalingiface.AutoScalingAPI
EC2 ec2iface.EC2API
CheckIfManaged bool
ManagedTag string
BeforeCompleteLifecycleAction func()
}
// InterruptionEventWrapper is a convenience wrapper for associating an interruption event with its error, if any
type InterruptionEventWrapper struct {
InterruptionEvent *monitor.InterruptionEvent
Err error
}
// Used to skip processing an error, but acknowledge an error occured during a termination event
type skip struct {
err error
}
func (s skip) Error() string {
return s.err.Error()
}
func (s skip) Unwrap() error {
return s.err
}
// Kind denotes the kind of monitor
func (m SQSMonitor) Kind() string {
return SQSMonitorKind
}
// Monitor continuously monitors SQS for events and coordinates processing of the events
func (m SQSMonitor) Monitor() error {
log.Debug().Msg("Checking for queue messages")
messages, err := m.receiveQueueMessages(m.QueueURL)
if err != nil {
return err
}
failedEventBridgeEvents := 0
for _, message := range messages {
eventBridgeEvent, err := m.processSQSMessage(message)
if err != nil {
var s skip
if errors.As(err, &s) {
log.Warn().Err(s).Msg("skip processing SQS message")
} else {
log.Err(err).Msg("error processing SQS message")
failedEventBridgeEvents++
}
continue
}
interruptionEventWrappers := m.processEventBridgeEvent(eventBridgeEvent, message)
if err = m.processInterruptionEvents(interruptionEventWrappers, message); err != nil {
log.Err(err).Msg("error processing interruption events")
failedEventBridgeEvents++
}
}
if len(messages) > 0 && failedEventBridgeEvents == len(messages) {
return fmt.Errorf("none of the waiting queue events could be processed")
}
return nil
}
// processSQSMessage interprets an SQS message and returns an EventBridge event
func (m SQSMonitor) processSQSMessage(message *sqs.Message) (*EventBridgeEvent, error) {
event := EventBridgeEvent{}
err := json.Unmarshal([]byte(*message.Body), &event)
if err != nil {
return &event, err
}
if len(event.DetailType) == 0 {
event, err = m.processLifecycleEventFromASG(message)
}
return &event, err
}
func parseLifecycleEvent(message string) (LifecycleDetail, error) {
lifecycleEventMessage := LifecycleDetailMessage{}
lifecycleEvent := LifecycleDetail{}
err := json.Unmarshal([]byte(message), &lifecycleEventMessage)
if err != nil {
return lifecycleEvent, fmt.Errorf("unmarshalling SQS message: %w", err)
}
// Converts escaped JSON object to string, to lifecycle event
if lifecycleEventMessage.Message != nil {
err = json.Unmarshal([]byte(fmt.Sprintf("%v", lifecycleEventMessage.Message)), &lifecycleEvent)
if err != nil {
err = fmt.Errorf("unmarshalling message body from '.Message': %w", err)
}
} else {
err = json.Unmarshal([]byte(fmt.Sprintf("%v", message)), &lifecycleEvent)
if err != nil {
err = fmt.Errorf("unmarshalling message body: %w", err)
}
}
return lifecycleEvent, err
}
// processLifecycleEventFromASG checks for a Lifecycle event from ASG to SQS, and wraps it in an EventBridgeEvent
func (m SQSMonitor) processLifecycleEventFromASG(message *sqs.Message) (EventBridgeEvent, error) {
log.Debug().Interface("message", message).Msg("processing lifecycle event from ASG")
eventBridgeEvent := EventBridgeEvent{}
if message == nil {
return eventBridgeEvent, fmt.Errorf("ASG event message is nil")
}
lifecycleEvent, err := parseLifecycleEvent(*message.Body)
switch {
case err != nil:
return eventBridgeEvent, fmt.Errorf("parsing lifecycle event messsage from ASG: %w", err)
case lifecycleEvent.Event == TEST_NOTIFICATION || lifecycleEvent.LifecycleTransition == TEST_NOTIFICATION:
err := fmt.Errorf("message is a test notification")
if errs := m.deleteMessages([]*sqs.Message{message}); errs != nil {
err = multierr.Append(err, errs[0])
}
return eventBridgeEvent, skip{err}
case lifecycleEvent.LifecycleTransition != ASGTerminatingLifecycleTransition &&
lifecycleEvent.LifecycleTransition != ASGLaunchingLifecycleTransition:
return eventBridgeEvent, fmt.Errorf("lifecycle transition must be %s or %s. Got %s", ASGTerminatingLifecycleTransition, ASGLaunchingLifecycleTransition, lifecycleEvent.LifecycleTransition)
}
eventBridgeEvent.Source = "aws.autoscaling"
eventBridgeEvent.Time = lifecycleEvent.Time
eventBridgeEvent.ID = lifecycleEvent.RequestID
eventBridgeEvent.Detail, err = json.Marshal(lifecycleEvent)
return eventBridgeEvent, err
}
// processEventBridgeEvent processes an EventBridge event and returns interruption event wrappers
func (m SQSMonitor) processEventBridgeEvent(eventBridgeEvent *EventBridgeEvent, message *sqs.Message) []InterruptionEventWrapper {
interruptionEventWrappers := []InterruptionEventWrapper{}
interruptionEvent := &monitor.InterruptionEvent{}
var err error
if eventBridgeEvent == nil {
return append(interruptionEventWrappers, InterruptionEventWrapper{nil, fmt.Errorf("eventBridgeEvent is nil")})
}
if message == nil {
return append(interruptionEventWrappers, InterruptionEventWrapper{nil, fmt.Errorf("message is nil")})
}
switch eventBridgeEvent.Source {
case "aws.autoscaling":
lifecycleEvent := LifecycleDetail{}
err = json.Unmarshal([]byte(eventBridgeEvent.Detail), &lifecycleEvent)
if err != nil {
interruptionEvent, err = nil, fmt.Errorf("unmarshaling message, %s, from ASG lifecycle event: %w", *message.MessageId, err)
interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err})
}
if lifecycleEvent.LifecycleTransition == ASGLaunchingLifecycleTransition {
interruptionEvent, err = m.createAsgInstanceLaunchEvent(eventBridgeEvent, message)
interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err})
} else if lifecycleEvent.LifecycleTransition == ASGTerminatingLifecycleTransition {
interruptionEvent, err = m.asgTerminationToInterruptionEvent(eventBridgeEvent, message)
interruptionEventWrappers = append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err})
}
return interruptionEventWrappers
case "aws.ec2":
if eventBridgeEvent.DetailType == "EC2 Instance State-change Notification" {
interruptionEvent, err = m.ec2StateChangeToInterruptionEvent(eventBridgeEvent, message)
} else if eventBridgeEvent.DetailType == "EC2 Spot Instance Interruption Warning" {
interruptionEvent, err = m.spotITNTerminationToInterruptionEvent(eventBridgeEvent, message)
} else if eventBridgeEvent.DetailType == "EC2 Instance Rebalance Recommendation" {
interruptionEvent, err = m.rebalanceRecommendationToInterruptionEvent(eventBridgeEvent, message)
}
return append(interruptionEventWrappers, InterruptionEventWrapper{interruptionEvent, err})
case "aws.health":
if eventBridgeEvent.DetailType == "AWS Health Event" {
return m.scheduledEventToInterruptionEvents(eventBridgeEvent, message)
}
}
err = fmt.Errorf("event source (%s) is not supported", eventBridgeEvent.Source)
return append(interruptionEventWrappers, InterruptionEventWrapper{nil, err})
}
// processInterruptionEvents takes interruption event wrappers and sends events to the interruption channel
func (m SQSMonitor) processInterruptionEvents(interruptionEventWrappers []InterruptionEventWrapper, message *sqs.Message) error {
dropMessageSuggestionCount := 0
failedInterruptionEventsCount := 0
var skipErr skip
for _, eventWrapper := range interruptionEventWrappers {
switch {
case errors.As(eventWrapper.Err, &skipErr):
log.Warn().Err(skipErr).Msg("dropping event")
dropMessageSuggestionCount++
case eventWrapper.Err != nil:
// Log errors and record as failed events. Don't delete the message in order to allow retries
log.Err(eventWrapper.Err).Msg("ignoring interruption event due to error")
failedInterruptionEventsCount++
case eventWrapper.InterruptionEvent == nil:
log.Debug().Msg("dropping non-actionable interruption event")
dropMessageSuggestionCount++
case m.CheckIfManaged && !eventWrapper.InterruptionEvent.IsManaged:
// This event is for an instance that is not managed by this process
log.Debug().Str("instance-id", eventWrapper.InterruptionEvent.InstanceID).Msg("dropping interruption event for unmanaged node")
dropMessageSuggestionCount++
case eventWrapper.InterruptionEvent.Monitor == SQSMonitorKind:
// Successfully processed SQS message into a eventWrapper.InterruptionEvent.Kind interruption event
logging.VersionedMsgs.SendingInterruptionEventToChannel(eventWrapper.InterruptionEvent.Kind)
m.InterruptionChan <- *eventWrapper.InterruptionEvent
default:
eventJSON, _ := json.MarshalIndent(eventWrapper.InterruptionEvent, " ", " ")
log.Warn().Msgf("dropping interruption event of an unrecognized kind: %s", eventJSON)
dropMessageSuggestionCount++
}
}
if dropMessageSuggestionCount == len(interruptionEventWrappers) {
// All interruption events weren't actionable, just delete the message. If message deletion fails, count it as an error
errs := m.deleteMessages([]*sqs.Message{message})
if len(errs) > 0 {
log.Err(errs[0]).Msg("Error deleting message from SQS")
failedInterruptionEventsCount++
}
}
if failedInterruptionEventsCount != 0 {
return fmt.Errorf("some interruption events for message Id %s could not be processed", *message.MessageId)
}
return nil
}
// receiveQueueMessages checks the configured SQS queue for new messages
func (m SQSMonitor) receiveQueueMessages(qURL string) ([]*sqs.Message, error) {
result, err := m.SQS.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: &qURL,
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(20), // 20 seconds
WaitTimeSeconds: aws.Int64(20), // Max long polling
})
if err != nil {
return nil, err
}
return result.Messages, nil
}
// deleteMessages deletes messages from the configured SQS queue
func (m SQSMonitor) deleteMessages(messages []*sqs.Message) []error {
var errs []error
for _, message := range messages {
_, err := m.SQS.DeleteMessage(&sqs.DeleteMessageInput{
ReceiptHandle: message.ReceiptHandle,
QueueUrl: &m.QueueURL,
})
if err != nil {
errs = append(errs, err)
}
log.Debug().Msgf("SQS Deleted Message: %s", message)
}
return errs
}
// completeLifecycleAction completes the lifecycle action after calling the "before" hook.
func (m SQSMonitor) completeLifecycleAction(input *autoscaling.CompleteLifecycleActionInput) (*autoscaling.CompleteLifecycleActionOutput, error) {
if m.BeforeCompleteLifecycleAction != nil {
m.BeforeCompleteLifecycleAction()
}
return m.ASG.CompleteLifecycleAction(input)
}
// NodeInfo is relevant information about a single node
type NodeInfo struct {
AsgName string
InstanceID string
ProviderID string
IsManaged bool
Name string
Tags map[string]string
}
// getNodeInfo returns the NodeInfo record for the given instanceID.
//
// The data is retrieved from the EC2 API.
func (m SQSMonitor) getNodeInfo(instanceID string) (*NodeInfo, error) {
result, err := m.EC2.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: []*string{
aws.String(instanceID),
},
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == "InvalidInstanceID.NotFound" {
msg := fmt.Sprintf("No instance found with instance-id %s", instanceID)
log.Warn().Msg(msg)
return nil, skip{fmt.Errorf(msg)}
}
return nil, err
}
if len(result.Reservations) == 0 || len(result.Reservations[0].Instances) == 0 {
msg := fmt.Sprintf("No reservation with instance-id %s", instanceID)
log.Warn().Msg(msg)
return nil, skip{fmt.Errorf(msg)}
}
instance := result.Reservations[0].Instances[0]
instanceJSON, _ := json.MarshalIndent(*instance, " ", " ")
log.Debug().Msgf("Got instance data from ec2 describe call: %s", instanceJSON)
if *instance.PrivateDnsName == "" {
state := "unknown"
// safe access instance.State potentially being nil
if instance.State != nil {
state = *instance.State.Name
}
// anything except running might not contain PrivateDnsName
if state != ec2.InstanceStateNameRunning {
return nil, skip{fmt.Errorf("node: '%s' in state '%s'", instanceID, state)}
}
return nil, fmt.Errorf("unable to retrieve PrivateDnsName name for '%s' in state '%s'", instanceID, state)
}
providerID := ""
if *instance.Placement.AvailabilityZone != "" {
providerID = fmt.Sprintf("aws:///%s/%s", *instance.Placement.AvailabilityZone, instanceID)
}
nodeInfo := &NodeInfo{
Name: *instance.PrivateDnsName,
InstanceID: instanceID,
ProviderID: providerID,
Tags: make(map[string]string),
IsManaged: true,
}
for _, t := range (*instance).Tags {
nodeInfo.Tags[*t.Key] = *t.Value
if *t.Key == ASGTagName {
nodeInfo.AsgName = *t.Value
}
}
if m.CheckIfManaged {
if _, ok := nodeInfo.Tags[m.ManagedTag]; !ok {
nodeInfo.IsManaged = false
}
}
infoJSON, _ := json.MarshalIndent(nodeInfo, " ", " ")
log.Debug().Msgf("Got node info from AWS: %s", infoJSON)
return nodeInfo, nil
}