This repository has been archived by the owner on Jan 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
lifecycle.go
139 lines (125 loc) · 4.35 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package ec2cluster
import (
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/sqs"
)
// LifecycleMessage represents the message we receive from the
// autoscaling lifecycle hook when an instance is created or
// terminated.
type LifecycleMessage struct {
AutoScalingGroupName string `json:",omitempty"`
Service string `json:",omitempty"`
Time time.Time `json:",omitempty"`
AccountID string `json:",omitempty"`
LifecycleTransition string `json:",omitempty"`
RequestID string `json:"RequestId"`
LifecycleActionToken string `json:",omitempty"`
EC2InstanceID string `json:"EC2InstanceID"`
LifecycleHookName string `json:",omitempty"`
}
var ErrLifecycleHookNotFound = errors.New("cannot find a suitable lifecycle hook")
// LifecyleEventCallback is a function that is invoked for each
// ASG lifecycle event. If the function returns a non-nil error
// then the message remains in the queue. If `shouldContinue` is
// true then CompleteLifecycleAction() is invoked with `CONINTUE`
// otherwise it is invoked with `ABANDON`.
type LifecyleEventCallback func(m *LifecycleMessage) (shouldContinue bool, err error)
// LifecycleEventQueueURL inspects the current autoscaling group and returns
// the URL of the first suitable lifecycle hook queue.
func (s *Cluster) LifecycleEventQueueURL() (string, error) {
asg, err := s.AutoscalingGroup()
if err != nil {
return "", err
}
autoscalingSvc := autoscaling.New(s.AwsSession)
resp, err := autoscalingSvc.DescribeLifecycleHooks(&autoscaling.DescribeLifecycleHooksInput{
AutoScalingGroupName: asg.AutoScalingGroupName,
})
if err != nil {
return "", err
}
sqsSvc := sqs.New(s.AwsSession)
for _, hook := range resp.LifecycleHooks {
if !strings.HasPrefix(*hook.NotificationTargetARN, "arn:aws:sqs:") {
continue
}
arnParts := strings.Split(*hook.NotificationTargetARN, ":")
queueName := arnParts[len(arnParts)-1]
queueOwnerAWSAccountID := arnParts[len(arnParts)-2]
resp, err := sqsSvc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: &queueName,
QueueOwnerAWSAccountId: &queueOwnerAWSAccountID,
})
if err != nil {
return "", err
}
return *resp.QueueUrl, nil
}
return "", ErrLifecycleHookNotFound
}
// WatchLifecycleEvents monitors a lifecycle event SQS queue and invokes
// cb for each event. If the callback returns an error, then the
// lifecycle action is completed with ABANDON. On success, the event is
// completed with CONTINUE.
func (s *Cluster) WatchLifecycleEvents(queueURL string, cb LifecyleEventCallback) error {
sqsSvc := sqs.New(s.AwsSession)
autoscalingSvc := autoscaling.New(s.AwsSession)
for {
resp, err := sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: &queueURL,
MaxNumberOfMessages: aws.Int64(1),
WaitTimeSeconds: aws.Int64(20),
})
if err != nil {
return err
}
for _, messageWrapper := range resp.Messages {
m := LifecycleMessage{}
if err := json.Unmarshal([]byte(*messageWrapper.Body), &m); err != nil {
return fmt.Errorf("cannot unmarshal event: %s", err)
}
if m.LifecycleTransition != "autoscaling:EC2_INSTANCE_LAUNCHING" && m.LifecycleTransition != "autoscaling:EC2_INSTANCE_TERMINATING" {
_, err := sqsSvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: messageWrapper.ReceiptHandle,
})
if err != nil {
log.Printf("DeleteMessage: %s", err)
}
continue
}
shouldContinue, err := cb(&m)
if err != nil {
continue
}
lifecycleActionResult := "CONTINUE"
if !shouldContinue {
lifecycleActionResult = "ABANDON"
}
_, err = autoscalingSvc.CompleteLifecycleAction(&autoscaling.CompleteLifecycleActionInput{
AutoScalingGroupName: &m.AutoScalingGroupName,
LifecycleActionResult: aws.String(lifecycleActionResult),
LifecycleHookName: &m.LifecycleHookName,
InstanceId: &m.EC2InstanceID,
LifecycleActionToken: &m.LifecycleActionToken,
})
if err != nil {
log.Printf("ERROR: CompleteLifecycleAction: %s", err)
}
_, err = sqsSvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: &queueURL,
ReceiptHandle: messageWrapper.ReceiptHandle,
})
if err != nil {
return err
}
}
}
}