Skip to content

Commit

Permalink
Merge pull request #1700 from davidmarin/robust-pooling
Browse files Browse the repository at this point in the history
make pool matching robust against bad instance_fleets/instance_groups (fixes #1696)
  • Loading branch information
David Marin committed Nov 1, 2017
2 parents 26027d7 + d862765 commit 36a03b7
Showing 1 changed file with 52 additions and 8 deletions.
60 changes: 52 additions & 8 deletions mrjob/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@
"""
from collections import defaultdict
from datetime import timedelta
from logging import getLogger

from mrjob.aws import _boto3_now
from mrjob.aws import EC2_INSTANCE_TYPE_TO_COMPUTE_UNITS
from mrjob.aws import EC2_INSTANCE_TYPE_TO_MEMORY
from mrjob.py2 import integer_types

log = getLogger(__name__)

# we check the type and contents of requested fleets/groups because they
# are user-specified and may not have the correct format. Currently, we
# simply return no match, since either boto3 or the EMR AMI will catch
# the error when EMRJobRunner attempts to create a new cluster. See #1696


### identifying pooled clusters ###

Expand Down Expand Up @@ -73,13 +77,20 @@ def _instance_groups_satisfy(actual_igs, requested_igs):
# and the format of *actual_igs* is here:
# http://docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListInstanceGroups.html # noqa

# verify format of requested_igs
if not (isinstance(requested_igs, (list, tuple)) and
all(isinstance(req_ig, dict) and 'InstanceRole' in req_ig
for req_ig in requested_igs)):
log.debug(' bad instance_groups config')
return None

# a is a map from role to actual instance groups
a = defaultdict(list)
for ig in actual_igs:
a[ig['InstanceGroupType']].append(ig)

# r is a map from role to request (should be only one per role)
r = {req['InstanceRole']: req for req in requested_igs}
r = {req.get('InstanceRole'): req for req in requested_igs}

# updated request to account for extra instance groups
# see #1630 for what we do when roles don't match
Expand Down Expand Up @@ -125,6 +136,8 @@ def _ig_satisfies_bid_price(actual_ig, requested_ig):
"""Does the actual instance group definition satisfy the bid price
(or lack thereof) of the requested instance group?
"""
# _instance_groups_satisfy() already verified *requested_ig* is a dict

# on-demand instances satisfy every bid price
if actual_ig['Market'] == 'ON_DEMAND':
return True
Expand Down Expand Up @@ -153,7 +166,7 @@ def _ig_satisfies_mem(actual_ig, requested_ig):
"""Does the actual instance group satisfy the memory requirements of
the requested instance group?"""
actual_type = actual_ig['InstanceType']
requested_type = requested_ig['InstanceType']
requested_type = requested_ig.get('InstanceType')

# this works even for unknown instance types
if actual_type == requested_type:
Expand All @@ -180,8 +193,11 @@ def _igs_satisfy_cpu(actual_igs, requested_ig):
If the requested instance type is unknown, just return the number
of actual instances of the same type.
"""
requested_type = requested_ig['InstanceType']
num_requested = requested_ig['InstanceCount']
requested_type = requested_ig.get('InstanceType')
num_requested = requested_ig.get('InstanceCount')

if not isinstance(num_requested, integer_types):
return None

# count number of compute units (cu)
if requested_type in EC2_INSTANCE_TYPE_TO_COMPUTE_UNITS:
Expand Down Expand Up @@ -215,6 +231,12 @@ def _instance_fleets_satisfy(actual_fleets, req_fleets):
"""Common code for :py:func:`
:py:func:`_instance_groups_satisfy_fleets` and
:py:func:`_instance_groups_satisfy`."""
# verify format of requested_igs
if not (isinstance(req_fleets, (list, tuple)) and
all(isinstance(req_ft, dict) and 'InstanceFleetType' in req_ft
for req_ft in req_fleets)):
log.debug(' bad instance_fleets config')
return None

# a is a map from role to actual instance fleet
# (unlike with groups, there can never be more than one fleet per role)
Expand Down Expand Up @@ -248,8 +270,11 @@ def _fleet_for_same_role_satisfies(actual_fleet, req_fleet):
# match up instance types
actual_specs = {spec['InstanceType']: spec
for spec in actual_fleet['InstanceTypeSpecifications']}
req_specs = {spec['InstanceType']: spec
for spec in req_fleet['InstanceTypeConfigs']}
try:
req_specs = {spec['InstanceType']: spec
for spec in req_fleet['InstanceTypeConfigs']}
except (TypeError, KeyError):
return

if set(actual_specs) - set(req_specs):
log.debug(' fleet may include wrong instance types')
Expand All @@ -263,13 +288,19 @@ def _fleet_for_same_role_satisfies(actual_fleet, req_fleet):
actual_on_demand = actual_fleet.get('ProvisionedOnDemandCapacity', 0)
req_on_demand = req_fleet.get('TargetOnDemandCapacity', 0)

if not isinstance(req_on_demand, integer_types):
return

if req_on_demand > actual_on_demand:
log.debug(' not enough on-demand capacity')
return

actual_spot = actual_fleet.get('ProvisionedSpotCapacity', 0)
req_spot = req_fleet.get('TargetSpotCapacity', 0)

if not isinstance(req_spot, integer_types):
return

# allow extra on-demand instances to serve as spot instances
if req_spot > actual_spot + (actual_on_demand - req_on_demand):
log.debug(' not enough spot capacity')
Expand Down Expand Up @@ -344,6 +375,9 @@ def _fleet_spec_satsifies(actual_spec, req_spec):

# relative bid price
req_bid_percent = req_spec.get('BidPriceAsPercentageOfOnDemandPrice')
if not isinstance(req_spec, (integer_types, float)):
return

if req_bid_percent:
actual_bid_percent = actual_spec.get(
'BidPriceAsPercentageOfOnDemandPrice')
Expand Down Expand Up @@ -419,6 +453,10 @@ def _ebs_satisfies(actual, request):
if not req_device_configs:
return True

if not (isinstance(req_device_configs, (list, tuple)) and
all(isinstance(rdc, dict) for rdc in req_device_configs)):
return False

req_volumes = []

for req_device_config in req_device_configs:
Expand All @@ -441,6 +479,9 @@ def _ebs_volumes_satisfy(actual_volumes, req_volumes):
bigger/faster; just having the same amount of capacity or iops
isn't enough).
"""
if not isinstance(req_volumes, (list, tuple)):
return False

if len(req_volumes) > len(actual_volumes):
log.debug(' more EBS volumes requested than available')
return False
Expand All @@ -451,6 +492,9 @@ def _ebs_volumes_satisfy(actual_volumes, req_volumes):

def _ebs_volume_satisfies(actual_volume, req_volume):
"""Does the given actual EBS volume satisfy the given request?"""
if not isinstance(req_volume, dict):
return False

if req_volume.get('VolumeType') != actual_volume.get('VolumeType'):
log.debug(' wrong EBS volume type')
return False
Expand Down

0 comments on commit 36a03b7

Please sign in to comment.