Skip to content

Commit

Permalink
Merge pull request #1004 from davidmarin/role-emergency
Browse files Browse the repository at this point in the history
work with new IAM requirements. Fixes #999, fixes #1001, fixes #1005
  • Loading branch information
David Marin committed Apr 21, 2015
2 parents 325d3c6 + 1f83fe6 commit deb1866
Show file tree
Hide file tree
Showing 9 changed files with 913 additions and 35 deletions.
3 changes: 1 addition & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ Setting up EMR on Amazon
------------------------

* create an `Amazon Web Services account <http://aws.amazon.com/>`_
* sign up for `Elastic MapReduce <http://aws.amazon.com/elasticmapreduce/>`__
* Get your access and secret keys (click "Security Credentials" on
`your account page <http://aws.amazon.com/account/>`_)
* Set the environment variables ``$AWS_ACCESS_KEY_ID`` and
Expand Down Expand Up @@ -125,7 +124,7 @@ Reference
---------

* `Hadoop Streaming <http://hadoop.apache.org/docs/stable1/streaming.html>`_
* `Elastic MapReduce <http://aws.amazon.com/documentation/elasticmapreduce/>`__
* `Elastic MapReduce <http://aws.amazon.com/documentation/elasticmapreduce/>`_

More Information
----------------
Expand Down
7 changes: 3 additions & 4 deletions docs/guides/emr-opts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,15 @@ Job flow creation and configuration
necessary to set this by hand.

.. mrjob-opt::
:config: iam_job_flow_role
:switch: --iam-job-flow-role
:config: iam_instance_profile
:switch: --iam-instance-profile
:type: :ref:`string <data-type-string>`
:set: emr
:default: ``None``

IAM job flow role to use on the EMR cluster. See
IAM instance profile to use on the EMR cluster. See
http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-iam-roles.html
for more details on using IAM roles with EMR.
Needs AMI version 2.3.0 or later to work.

.. versionadded:: 0.4.3

Expand Down
6 changes: 6 additions & 0 deletions mrjob/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""General information about Amazon Web Services, such as region-to-endpoint
mappings.
"""
import random

### EC2 Instances ###

Expand Down Expand Up @@ -213,3 +214,8 @@ def s3_location_constraint_for_region(region):
return ''
else:
return region


def random_identifier():
"""Used to randomly name new buckets and roles."""
return '%016x' % random.randint(0, 2 ** 64 - 1)
60 changes: 51 additions & 9 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
from mrjob.aws import MAX_STEPS_PER_JOB_FLOW
from mrjob.aws import emr_endpoint_for_region
from mrjob.aws import emr_ssl_host_for_region
from mrjob.aws import random_identifier
from mrjob.aws import s3_endpoint_for_region
from mrjob.aws import s3_location_constraint_for_region
from mrjob.compat import supports_new_distributed_cache_options
Expand All @@ -91,6 +92,8 @@
from mrjob.fs.s3 import _get_bucket
from mrjob.fs.s3 import wrap_aws_conn
from mrjob.fs.ssh import SSHFilesystem
from mrjob.iam import get_or_create_mrjob_instance_profile
from mrjob.iam import get_or_create_mrjob_service_role
from mrjob.logparsers import EMR_JOB_LOG_URI_RE
from mrjob.logparsers import NODE_LOG_URI_RE
from mrjob.logparsers import STEP_LOG_URI_RE
Expand Down Expand Up @@ -341,22 +344,24 @@ class EMRRunnerOptionStore(RunnerOptionStore):
'ec2_slave_instance_type',
'ec2_task_instance_bid_price',
'ec2_task_instance_type',
'emr_action_on_failure',
'emr_api_params',
'emr_endpoint',
'emr_job_flow_id',
'emr_job_flow_pool_name',
'emr_action_on_failure',
'enable_emr_debugging',
'hadoop_streaming_jar_on_emr',
'hadoop_version',
'iam_instance_profile',
'iam_job_flow_role',
'iam_service_role',
'max_hours_idle',
'mins_to_end_of_hour',
'num_ec2_core_instances',
'pool_wait_minutes',
'num_ec2_instances',
'num_ec2_task_instances',
'pool_emr_job_flows',
'pool_wait_minutes',
's3_endpoint',
's3_log_uri',
's3_scratch_uri',
Expand Down Expand Up @@ -386,6 +391,19 @@ class EMRRunnerOptionStore(RunnerOptionStore):
def __init__(self, alias, opts, conf_path):
super(EMRRunnerOptionStore, self).__init__(alias, opts, conf_path)
self._fix_ec2_instance_opts()
self._fix_deprecated_opts()

def _fix_deprecated_opts(self):
# generalize this for other options

if self['iam_job_flow_role'] is not None:
log.warning('iam_job_flow_role is deprecated and wil be removed'
' in v0.5; use iam_instance_profile instead')

if self['iam_instance_profile'] is None:
self['iam_instance_profile'] = self['iam_job_flow_role']

self['iam_job_flow_role'] = None

def default_options(self):
super_opts = super(EMRRunnerOptionStore, self).default_options()
Expand All @@ -396,10 +414,9 @@ def default_options(self):
'ec2_core_instance_type': 'm1.small',
'ec2_master_instance_type': 'm1.small',
'emr_job_flow_pool_name': 'default',
'hadoop_version': None,
'hadoop_streaming_jar_on_emr': (
'/home/hadoop/contrib/streaming/hadoop-streaming.jar'),
'iam_job_flow_role': None,
'hadoop_version': None, # override runner default
'mins_to_end_of_hour': 5.0,
'num_ec2_core_instances': 0,
'num_ec2_instances': 1,
Expand Down Expand Up @@ -732,7 +749,7 @@ def _set_s3_scratch_uri(self, s3_conn):
return

# That may have all failed. If so, pick a name.
scratch_bucket_name = 'mrjob-%016x' % random.randint(0, 2 ** 64 - 1)
scratch_bucket_name = 'mrjob-' + random_identifier()
self._s3_temp_bucket_to_create = scratch_bucket_name
log.info("creating new scratch bucket %s" % scratch_bucket_name)
self._opts['s3_scratch_uri'] = 's3://%s/tmp/' % scratch_bucket_name
Expand Down Expand Up @@ -1353,10 +1370,18 @@ def _job_flow_args(self, persistent=False, steps=None):
if self._opts['emr_api_params']:
args['api_params'] = self._opts['emr_api_params']

if self._opts['iam_job_flow_role']:
if 'api_params' not in args:
args.setdefault('api_params', {})
args['api_params']['JobFlowRole'] = self._opts['iam_job_flow_role']
# instance profile and service role are required for accounts
# created after April 6, 2015. Doing this for old accounts as well;
# guessing that eventually all accounts will need this stuff anyway.
args.setdefault('api_params', {})

instance_profile = (self._opts['iam_instance_profile'] or
get_or_create_mrjob_instance_profile(self.make_iam_conn()))
args['api_params']['JobFlowRole'] = instance_profile

service_role = (self._opts['iam_service_role'] or
get_or_create_mrjob_service_role(self.make_iam_conn()))
args['api_params']['ServiceRole'] = service_role

if steps:
args['steps'] = steps
Expand Down Expand Up @@ -2545,3 +2570,20 @@ def _addresses_of_slaves(self):
self._address_of_master(),
self._opts['ec2_key_pair_file'])
return self._ssh_slave_addrs

def make_iam_conn(self):
"""Create a connection to S3.
:return: a :py:class:`boto.iam.connection.IAMConnection`, wrapped in a
:py:class:`mrjob.retry.RetryWrapper`
"""
# give a non-cryptic error message if boto isn't installed
if boto is None:
raise ImportError('You must install boto to connect to IAM')

log.debug('creating IAM connection')

raw_iam_conn = boto.connect_iam(
aws_access_key_id=self._aws_access_key_id,
aws_secret_access_key=self._aws_secret_access_key)
return wrap_aws_conn(raw_iam_conn)

0 comments on commit deb1866

Please sign in to comment.