forked from hashicorp/packer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.go
241 lines (210 loc) · 6.67 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package common
import (
"errors"
"fmt"
"log"
"net"
"os"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/mitchellh/multistep"
)
// StateRefreshFunc is a function type used for StateChangeConf that is
// responsible for refreshing the item being watched for a state change.
//
// It returns three results. `result` is any object that will be returned
// as the final object after waiting for state change. This allows you to
// return the final updated object, for example an EC2 instance after refreshing
// it.
//
// `state` is the latest state of that object. And `err` is any error that
// may have happened while refreshing the state.
type StateRefreshFunc func() (result interface{}, state string, err error)
// StateChangeConf is the configuration struct used for `WaitForState`.
type StateChangeConf struct {
Pending []string
Refresh StateRefreshFunc
StepState multistep.StateBag
Target string
}
// AMIStateRefreshFunc returns a StateRefreshFunc that is used to watch
// an AMI for state changes.
func AMIStateRefreshFunc(conn *ec2.EC2, imageId string) StateRefreshFunc {
return func() (interface{}, string, error) {
resp, err := conn.DescribeImages(&ec2.DescribeImagesInput{
ImageIds: []*string{&imageId},
})
if err != nil {
if ec2err, ok := err.(awserr.Error); ok && ec2err.Code() == "InvalidAMIID.NotFound" {
// Set this to nil as if we didn't find anything.
resp = nil
} else if isTransientNetworkError(err) {
// Transient network error, treat it as if we didn't find anything
resp = nil
} else {
log.Printf("Error on AMIStateRefresh: %s", err)
return nil, "", err
}
}
if resp == nil || len(resp.Images) == 0 {
// Sometimes AWS has consistency issues and doesn't see the
// AMI. Return an empty state.
return nil, "", nil
}
i := resp.Images[0]
return i, *i.State, nil
}
}
// InstanceStateRefreshFunc returns a StateRefreshFunc that is used to watch
// an EC2 instance.
func InstanceStateRefreshFunc(conn *ec2.EC2, instanceId string) StateRefreshFunc {
return func() (interface{}, string, error) {
resp, err := conn.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIds: []*string{&instanceId},
})
if err != nil {
if ec2err, ok := err.(awserr.Error); ok && ec2err.Code() == "InvalidInstanceID.NotFound" {
// Set this to nil as if we didn't find anything.
resp = nil
} else if isTransientNetworkError(err) {
// Transient network error, treat it as if we didn't find anything
resp = nil
} else {
log.Printf("Error on InstanceStateRefresh: %s", err)
return nil, "", err
}
}
if resp == nil || len(resp.Reservations) == 0 || len(resp.Reservations[0].Instances) == 0 {
// Sometimes AWS just has consistency issues and doesn't see
// our instance yet. Return an empty state.
return nil, "", nil
}
i := resp.Reservations[0].Instances[0]
return i, *i.State.Name, nil
}
}
// SpotRequestStateRefreshFunc returns a StateRefreshFunc that is used to watch
// a spot request for state changes.
func SpotRequestStateRefreshFunc(conn *ec2.EC2, spotRequestId string) StateRefreshFunc {
return func() (interface{}, string, error) {
resp, err := conn.DescribeSpotInstanceRequests(&ec2.DescribeSpotInstanceRequestsInput{
SpotInstanceRequestIds: []*string{&spotRequestId},
})
if err != nil {
if ec2err, ok := err.(awserr.Error); ok && ec2err.Code() == "InvalidSpotInstanceRequestID.NotFound" {
// Set this to nil as if we didn't find anything.
resp = nil
} else if isTransientNetworkError(err) {
// Transient network error, treat it as if we didn't find anything
resp = nil
} else {
log.Printf("Error on SpotRequestStateRefresh: %s", err)
return nil, "", err
}
}
if resp == nil || len(resp.SpotInstanceRequests) == 0 {
// Sometimes AWS has consistency issues and doesn't see the
// SpotRequest. Return an empty state.
return nil, "", nil
}
i := resp.SpotInstanceRequests[0]
return i, *i.State, nil
}
}
func ImportImageRefreshFunc(conn *ec2.EC2, importTaskId string) StateRefreshFunc {
return func() (interface{}, string, error) {
resp, err := conn.DescribeImportImageTasks(&ec2.DescribeImportImageTasksInput{
ImportTaskIds: []*string{
&importTaskId,
},
},
)
if err != nil {
if ec2err, ok := err.(awserr.Error); ok && strings.HasPrefix(ec2err.Code(), "InvalidConversionTaskId") {
resp = nil
} else if isTransientNetworkError(err) {
resp = nil
} else {
log.Printf("Error on ImportImageRefresh: %s", err)
return nil, "", err
}
}
if resp == nil || len(resp.ImportImageTasks) == 0 {
return nil, "", nil
}
i := resp.ImportImageTasks[0]
return i, *i.Status, nil
}
}
// WaitForState watches an object and waits for it to achieve a certain
// state.
func WaitForState(conf *StateChangeConf) (i interface{}, err error) {
log.Printf("Waiting for state to become: %s", conf.Target)
sleepSeconds := 2
maxTicks := int(TimeoutSeconds()/sleepSeconds) + 1
notfoundTick := 0
for {
var currentState string
i, currentState, err = conf.Refresh()
if err != nil {
return
}
if i == nil {
// If we didn't find the resource, check if we have been
// not finding it for awhile, and if so, report an error.
notfoundTick += 1
if notfoundTick > maxTicks {
return nil, errors.New("couldn't find resource")
}
} else {
// Reset the counter for when a resource isn't found
notfoundTick = 0
if currentState == conf.Target {
return
}
if conf.StepState != nil {
if _, ok := conf.StepState.GetOk(multistep.StateCancelled); ok {
return nil, errors.New("interrupted")
}
}
found := false
for _, allowed := range conf.Pending {
if currentState == allowed {
found = true
break
}
}
if !found {
err := fmt.Errorf("unexpected state '%s', wanted target '%s'", currentState, conf.Target)
return nil, err
}
}
time.Sleep(time.Duration(sleepSeconds) * time.Second)
}
}
func isTransientNetworkError(err error) bool {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
return true
}
return false
}
// Returns 300 seconds (5 minutes) by default
// Some AWS operations, like copying an AMI to a distant region, take a very long time
// Allow user to override with AWS_TIMEOUT_SECONDS environment variable
func TimeoutSeconds() (seconds int) {
seconds = 300
override := os.Getenv("AWS_TIMEOUT_SECONDS")
if override != "" {
n, err := strconv.Atoi(override)
if err != nil {
log.Printf("Invalid timeout seconds '%s', using default", override)
} else {
seconds = n
}
}
log.Printf("Allowing %ds to complete (change with AWS_TIMEOUT_SECONDS)", seconds)
return seconds
}