This repository has been archived by the owner on Sep 16, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
/
manager.py
361 lines (309 loc) · 13.7 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# coding: utf-8
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import datetime
import logging
import ssl
import sys
import time
logger = logging.getLogger('laniakea')
try:
import boto.ec2
import boto.exception
except ImportError as msg:
logger.error(msg)
sys.exit(-1)
class EC2Manager(object):
"""
Amazon Elastic Cloud Computing manager class.
"""
def __init__(self, images):
self.ec2 = None
self.images = images
def retry_on_ec2_error(self, func, *args, **kwargs):
"""
Call the given method with the given arguments, retrying if the call
failed due to an EC2ResponseError. This method will wait at most 30
seconds and perform up to 6 retries. If the method still fails, it will
propagate the error.
:param func: Function to call
:type func: function
"""
exception_retry_count = 6
while True:
try:
return func(*args, **kwargs)
except (boto.exception.EC2ResponseError, ssl.SSLError) as e:
exception_retry_count -= 1
if exception_retry_count <= 0:
raise e
time.sleep(5)
def connect(self, region, **kw_params):
"""Connect to a EC2.
:param region: The name of the region to connect to.
:type region: str
:param kw_params:
:type kw_params: dict
"""
self.ec2 = boto.ec2.connect_to_region(region, **kw_params)
if not self.ec2:
raise Exception('Unable to connect to region "%s"' % region)
if self.images:
# Resolve AMI names in our configuration to their IDs
logger.info('Retrieving available AMIs...')
remote_images = self.ec2.get_all_images(owners=['self', 'amazon', 'aws-marketplace'])
for i in self.images:
if "image_name" in self.images[i] and 'image_id' not in self.images[i]:
image_name = self.images[i]['image_name']
for ri in remote_images:
if ri.name == image_name:
if 'image_id' in self.images[i]:
raise Exception('Ambiguous AMI name "%s" resolves to multiple IDs' % image_name)
self.images[i]['image_id'] = ri.id
del self.images[i]['image_name']
if 'image_id' not in self.images[i]:
raise Exception('Failed to resolve AMI name "%s" to an AMI ID' % image_name)
def create_on_demand(self,
instance_type='default',
tags=None,
root_device_type='ebs',
size='default',
vol_type='gp2',
delete_on_termination=False):
"""Create one or more EC2 on-demand instances.
:param instance_type: A section name in images.json
:type instance_type: str
:param tags:
:type tags: dict
:return: List of instances created
:rtype: list
"""
name, size = self._get_default_name_size(instance_type, size)
if root_device_type == 'ebs':
self.images[instance_type]['block_device_map'] = \
self._configure_ebs_volume(vol_type, name, size, delete_on_termination)
reservation = self.ec2.run_instances(**self.images[instance_type])
logger.info('Creating requested tags...')
for i in reservation.instances:
self.retry_on_ec2_error(self.ec2.create_tags, [i.id], tags or {})
instances = []
logger.info('Waiting for instances to become ready...')
while len(reservation.instances):
for i in reservation.instances:
if i.state == 'running':
instances.append(i)
reservation.instances.pop(reservation.instances.index(i))
logger.info('%s is %s at %s (%s)',
i.id,
i.state,
i.public_dns_name,
i.ip_address)
else:
self.retry_on_ec2_error(i.update)
return instances
def create_spot_requests(self,
price,
instance_type='default',
root_device_type='ebs',
size='default',
vol_type='gp2',
delete_on_termination=False,
timeout=None):
"""Request creation of one or more EC2 spot instances.
:param price: Max price to pay for spot instance per hour.
:type price: float
:param instance_type: A section name in images.json
:type instance_type: str
:param timeout: Seconds to keep the request open (cancelled if not fulfilled).
:type timeout: int
:return: List of requests created
:rtype: list
"""
name, size = self._get_default_name_size(instance_type, size)
if root_device_type == 'ebs':
self.images[instance_type]['block_device_map'] = \
self._configure_ebs_volume(vol_type, name, size, delete_on_termination)
valid_until = None
if timeout is not None:
valid_until = (datetime.datetime.now() + datetime.timedelta(seconds=timeout)).isoformat()
requests = self.ec2.request_spot_instances(price, valid_until=valid_until, **self.images[instance_type])
return [r.id for r in requests]
def check_spot_requests(self, requests, tags=None):
"""Check status of one or more EC2 spot instance requests.
:param requests: List of EC2 spot instance request IDs.
:type requests: list
:param tags:
:type tags: dict
:return: List of boto.ec2.instance.Instance's created, order corresponding to requests param (None if request
still open, boto.ec2.instance.Reservation if request is no longer open)
:rtype: list
"""
instances = [None] * len(requests)
ec2_requests = self.retry_on_ec2_error(self.ec2.get_all_spot_instance_requests, request_ids=requests)
for req in ec2_requests:
if req.instance_id:
instance = None
instance = self.retry_on_ec2_error(self.ec2.get_only_instances, req.instance_id)[0]
if not instance:
raise Exception('Failed to get instance with id %s for %s request %s'
% (req.instance_id, req.status.code, req.id))
instances[requests.index(req.id)] = instance
self.retry_on_ec2_error(self.ec2.create_tags, [instance.id], tags or {})
logger.info('Request %s is %s and %s.',
req.id,
req.status.code,
req.state)
logger.info('%s is %s at %s (%s)',
instance.id,
instance.state,
instance.public_dns_name,
instance.ip_address)
elif req.state != "open":
# return the request so we don't try again
instances[requests.index(req.id)] = req
return instances
def cancel_spot_requests(self, requests):
"""Cancel one or more EC2 spot instance requests.
:param requests: List of EC2 spot instance request IDs.
:type requests: list
"""
ec2_requests = self.retry_on_ec2_error(self.ec2.get_all_spot_instance_requests, request_ids=requests)
for req in ec2_requests:
req.cancel()
def create_spot(self,
price,
instance_type='default',
tags=None,
root_device_type='ebs',
size='default',
vol_type='gp2',
delete_on_termination=False,
timeout=None):
"""Create one or more EC2 spot instances.
:param price: Max price to pay for spot instance per hour.
:type price: float
:param instance_type: A section name in images.json
:type instance_type: str
:param tags:
:type tags: dict
:return: List of instances created
:rtype: list
"""
request_ids = self.create_spot_requests(price,
instance_type=instance_type,
root_device_type=root_device_type,
size=size,
vol_type=vol_type,
delete_on_termination=delete_on_termination)
instances = []
logger.info('Waiting on fulfillment of requested spot instances.')
poll_resolution = 5.0
time_exceeded = False
while request_ids:
time.sleep(poll_resolution)
new_instances = self.check_spot_requests(request_ids, tags=tags)
if timeout is not None:
timeout -= poll_resolution
time_exceeded = timeout <= 0
fulfilled = []
for idx, instance in enumerate(new_instances):
if instance is not None:
fulfilled.append(idx)
if isinstance(instance, boto.ec2.instance.Instance):
instances.append(instance)
for idx in reversed(fulfilled):
request_ids.pop(idx)
if request_ids and time_exceeded:
self.cancel_spot_requests(request_ids)
break
return instances
def _scale_down(self, instances, count):
"""Return a list of |count| last created instances by launch time.
:param instances: A list of instances.
:type instances: list
:param count: Number of instances to scale down.
:type count: integer
:return: List of instances to be scaled down.
:rtype: list
"""
i = sorted(instances, key=lambda i: i.launch_time, reverse=True)
if not i:
return []
running = len(i)
logger.info('%d instance/s are running.', running)
logger.info('Scaling down %d instances of those.', count)
if count > running:
logger.info('Scale-down value is > than running instance/s - using maximum of %d!', running)
count = running
return i[:count]
def _get_default_name_size(self, instance_type, size):
"""Checks if root device name/size were specified in the image definition.
:param instance_type: A section name in images.json.
:type instance_type: str
:param size:
:type size: int
:return: Root device name and size
:rtype: tuple(str, int)
"""
if 'root_size' in self.images[instance_type]:
size = self.images[instance_type].pop('root_size')
if 'root_device' in self.images[instance_type]:
name = self.images[instance_type].pop('root_device')
else:
name = '/dev/sda1'
return (name, size)
def _configure_ebs_volume(self, vol_type, name, size, delete_on_termination):
"""Sets the desired root EBS size, otherwise the default EC2 value is used.
:param vol_type: Type of EBS storage - gp2 (SSD), io1 or standard (magnetic)
:type vol_type: str
:param size: Desired root EBS size.
:type size: int
:param delete_on_termination: Toggle this flag to delete EBS volume on termination.
:type delete_on_termination: bool
:return: A BlockDeviceMapping object.
:rtype: object
"""
# From GitHub boto docs: http://git.io/veyDv
root_dev = boto.ec2.blockdevicemapping.BlockDeviceType()
root_dev.delete_on_termination = delete_on_termination
root_dev.volume_type = vol_type
if size != 'default':
root_dev.size = size # change root volume to desired size
bdm = boto.ec2.blockdevicemapping.BlockDeviceMapping()
bdm[name] = root_dev
return bdm
def stop(self, instances, count=0):
"""Stop each provided running instance.
:param instances: A list of instances.
:type instances: list
"""
if not instances:
return
if count > 0:
instances = self._scale_down(instances, count)
self.ec2.stop_instances([i.id for i in instances])
def terminate(self, instances, count=0):
"""Terminate each provided running or stopped instance.
:param instances: A list of instances.
:type instances: list
"""
if not instances:
return
if count > 0:
instances = self._scale_down(instances, count)
self.ec2.terminate_instances([i.id for i in instances])
def find(self, instance_ids=None, filters=None):
"""Flatten list of reservations to a list of instances.
:param instance_ids: A list of instance ids to filter by
:type instance_ids: list
:param filters: A dict of |Filter.N| values defined in http://goo.gl/jYNej9
:type filters: dict
:return: A flattened list of filtered instances.
:rtype: list
"""
instances = []
reservations = self.retry_on_ec2_error(self.ec2.get_all_instances, instance_ids=instance_ids, filters=filters)
for reservation in reservations:
instances.extend(reservation.instances)
return instances