Skip to content

Commit

Permalink
update emr's run_jobflow() so that it can take a list of InstanceGroups
Browse files Browse the repository at this point in the history
(as an alternative to num_instances + master/slave instance type)
  • Loading branch information
Hunter Blanks committed Sep 5, 2011
1 parent ed7fc93 commit 210b7ce
Showing 1 changed file with 67 additions and 19 deletions.
86 changes: 67 additions & 19 deletions boto/emr/connection.py
Expand Up @@ -159,9 +159,7 @@ def add_instance_groups(self, jobflow_id, instance_groups):
instance_groups = [instance_groups]
params = {}
params['JobFlowId'] = jobflow_id

instance_group_args = [self._build_instance_group_args(ig) for ig in instance_groups]
params.update(self._build_instance_group_list(instance_group_args))
params.update(self._build_instance_group_list_args(instance_groups))

return self.get_object('AddInstanceGroups', params, AddInstanceGroupsResponse, verb='POST')

Expand Down Expand Up @@ -197,7 +195,8 @@ def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=None,
enable_debugging=False,
hadoop_version='0.20',
steps=[],
bootstrap_actions=[]):
bootstrap_actions=[],
instance_groups=None):
"""
Runs a job flow
Expand All @@ -223,7 +222,10 @@ def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=None,
:param enable_debugging: Denotes whether AWS console debugging should be enabled.
:type steps: list(boto.emr.Step)
:param steps: List of steps to add with the job
:type steps: list(boto.emr.InstanceGroup)
:param steps: Optional list of instance groups to use when creating
this job. NB: When provided, this argument supersedes
num_instances and master/slave_instance_type.
:rtype: str
:return: The jobflow id
"""
Expand All @@ -233,11 +235,31 @@ def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=None,
params['Name'] = name
params['LogUri'] = log_uri

# Instance args
instance_params = self._build_instance_args(ec2_keyname, availability_zone,
master_instance_type, slave_instance_type,
num_instances, keep_alive, hadoop_version)
params.update(instance_params)
# Common instance args
common_params = self._build_instance_common_args(ec2_keyname,
availability_zone,
keep_alive, hadoop_version)
params.update(common_params)

# NB: according to the AWS API's error message, we must
# "configure instances either using instance count, master and
# slave instance type or instance groups but not both."
#
# Thus we switch here on the truthiness of instance_groups.
if not instance_groups:
# Instance args (the common case)
instance_params = self._build_instance_count_and_type_args(
master_instance_type,
slave_instance_type,
num_instances)
params.update(instance_params)
else:
# Instance group args (for spot instances or a heterogenous cluster)
list_args = self._build_instance_group_list_args(instance_groups)
instance_params = dict(
('Instances.%s' % k, v) for k, v in list_args.iteritems()
)
params.update(instance_params)

# Debugging step from EMR API docs
if enable_debugging:
Expand All @@ -261,7 +283,6 @@ def run_jobflow(self, name, log_uri, ec2_keyname=None, availability_zone=None,
'RunJobFlow', params, RunJobFlowResponse, verb='POST')
return response.jobflowid


def set_termination_protection(self, jobflow_id, termination_protection_status):
"""
Set termination protection on specified Elastic MapReduce job flows
Expand Down Expand Up @@ -331,12 +352,14 @@ def _build_step_list(self, steps):
params['Steps.member.%s.%s' % (i+1, key)] = value
return params

def _build_instance_args(self, ec2_keyname, availability_zone, master_instance_type,
slave_instance_type, num_instances, keep_alive, hadoop_version):
def _build_instance_common_args(self, ec2_keyname, availability_zone,
keep_alive, hadoop_version):
"""
Takes a number of parameters used when starting a jobflow (as
specified in run_jobflow() above). Returns a comparable dict for
use in making a RunJobFlow request.
"""
params = {
'Instances.MasterInstanceType' : master_instance_type,
'Instances.SlaveInstanceType' : slave_instance_type,
'Instances.InstanceCount' : num_instances,
'Instances.KeepJobFlowAliveWhenNoSteps' : str(keep_alive).lower(),
'Instances.HadoopVersion' : hadoop_version
}
Expand All @@ -348,7 +371,26 @@ def _build_instance_args(self, ec2_keyname, availability_zone, master_instance_t

return params

def _build_instance_count_and_type_args(self, master_instance_type,
slave_instance_type, num_instances):
"""
Takes a master instance type (string), a slave instance type
(string), and a number of instances. Returns a comparable dict
for use in making a RunJobFlow request.
"""
params = {
'Instances.MasterInstanceType' : master_instance_type,
'Instances.SlaveInstanceType' : slave_instance_type,
'Instances.InstanceCount' : num_instances,
}
return params

def _build_instance_group_args(self, instance_group):
"""
Takes an InstanceGroup; returns a dict that, when its keys are
properly prefixed, can be used for describing InstanceGroups in
RunJobFlow or AddInstanceGroups requests.
"""
params = {
'InstanceCount' : instance_group.num_instances,
'InstanceRole' : instance_group.role,
Expand All @@ -360,12 +402,18 @@ def _build_instance_group_args(self, instance_group):
params['BidPrice'] = instance_group.bidprice
return params

def _build_instance_group_list(self, instance_groups):
def _build_instance_group_list_args(self, instance_groups):
"""
Takes a list of InstanceGroups, or a single InstanceGroup. Returns
a comparable dict for use in making a RunJobFlow or AddInstanceGroups
request.
"""
if type(instance_groups) != types.ListType:
instance_groups = [instance_groups]

params = {}
for i, instance_group in enumerate(instance_groups):
for key, value in instance_group.iteritems():
params['InstanceGroups.member.%s.%s' % (i+1, key)] = value
ig_dict = self._build_instance_group_args(instance_group)
for key, value in ig_dict.iteritems():
params['InstanceGroups.member.%d.%s' % (i+1, key)] = value
return params

0 comments on commit 210b7ce

Please sign in to comment.