Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/v0.4.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
David Marin committed May 27, 2015
2 parents 9e212d0 + 69fe50f commit 6f55b7a
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 43 deletions.
98 changes: 65 additions & 33 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
from mrjob.fs.s3 import _get_bucket
from mrjob.fs.s3 import wrap_aws_conn
from mrjob.fs.ssh import SSHFilesystem
from mrjob.iam import FALLBACK_INSTANCE_PROFILE
from mrjob.iam import FALLBACK_SERVICE_ROLE
from mrjob.iam import get_or_create_mrjob_instance_profile
from mrjob.iam import get_or_create_mrjob_service_role
from mrjob.logparsers import EMR_JOB_LOG_URI_RE
Expand Down Expand Up @@ -1369,25 +1371,41 @@ def _job_flow_args(self, persistent=False, steps=None):
args['api_params'] = self._opts['emr_api_params']

# instance profile and service role are required for accounts
# created after April 6, 2015. Doing this for old accounts as well;
# guessing that eventually all accounts will need this stuff anyway.
args.setdefault('api_params', {})

instance_profile = (
self._opts['iam_instance_profile'] or
get_or_create_mrjob_instance_profile(self.make_iam_conn()))
args['api_params']['JobFlowRole'] = instance_profile

service_role = (
self._opts['iam_service_role'] or
get_or_create_mrjob_service_role(self.make_iam_conn()))
args['api_params']['ServiceRole'] = service_role
# created after April 6, 2015, and will eventually be required
# for all accounts
api_params = args.setdefault('api_params', {})
api_params['JobFlowRole'] = self._instance_profile()
api_params['ServiceRole'] = self._service_role()

if steps:
args['steps'] = steps

return args

def _instance_profile(self):
try:
return (self._opts['iam_instance_profile'] or
get_or_create_mrjob_instance_profile(self.make_iam_conn()))
except boto.exception.BotoServerError as ex:
if ex.status != 403:
raise
log.warning(
"Can't access IAM API, trying default instance profile: %s" %
FALLBACK_INSTANCE_PROFILE)
return FALLBACK_INSTANCE_PROFILE

def _service_role(self):
try:
return (self._opts['iam_service_role'] or
get_or_create_mrjob_service_role(self.make_iam_conn()))
except boto.exception.BotoServerError as ex:
if ex.status != 403:
raise
log.warning(
"Can't access IAM API, trying default service role: %s" %
FALLBACK_SERVICE_ROLE)
return FALLBACK_SERVICE_ROLE

@property
def _action_on_failure(self):
# don't terminate other people's job flows
Expand Down Expand Up @@ -1547,6 +1565,7 @@ def _launch_emr_job(self):
# keep track of when we launched our job
self._emr_job_start = time.time()

# TODO: break this method up; it's too big to write tests for
def _wait_for_job_to_complete(self):
"""Wait for the job to complete, and raise an exception if
the job failed.
Expand Down Expand Up @@ -1663,26 +1682,39 @@ def _wait_for_job_to_complete(self):
msg = 'Job on job flow %s failed with status %s: %s' % (
job_flow.jobflowid, job_state, reason)
log.error(msg)
if self._s3_job_log_uri:
log.info('Logs are in %s' % self._s3_job_log_uri)
# look for a Python traceback
cause = self._find_probable_cause_of_failure(
step_nums, sorted(lg_step_num_mapping.values()))
if cause:
# log cause, and put it in exception
cause_msg = [] # lines to log and put in exception
cause_msg.append('Probable cause of failure (from %s):' %
cause['log_file_uri'])
cause_msg.extend(line.strip('\n') for line in cause['lines'])
if cause['input_uri']:
cause_msg.append('(while reading from %s)' %
cause['input_uri'])

for line in cause_msg:
log.error(line)

# add cause_msg to exception message
msg += '\n' + '\n'.join(cause_msg) + '\n'
# check for invalid fallback IAM roles
if any(reason.rstrip().endswith('/%s is invalid' % role)
for role in (FALLBACK_INSTANCE_PROFILE,
FALLBACK_SERVICE_ROLE)):
msg += (
'\n\n'
'Ask your admin to create the default EMR roles'
' by following:\n\n'
' http://docs.aws.amazon.com/ElasticMapReduce/latest'
'/DeveloperGuide/emr-iam-roles-creatingroles.html\n')
else:
if self._s3_job_log_uri:
log.info('Logs are in %s' % self._s3_job_log_uri)
# look for a Python traceback
cause = self._find_probable_cause_of_failure(
step_nums, sorted(lg_step_num_mapping.values()))
if cause:
# log cause, and put it in exception
cause_msg = [] # lines to log and put in exception
cause_msg.append('Probable cause of failure (from %s):' %
cause['log_file_uri'])
cause_msg.extend(
line.strip('\n') for line in cause['lines'])

if cause['input_uri']:
cause_msg.append('(while reading from %s)' %
cause['input_uri'])

for line in cause_msg:
log.error(line)

# add cause_msg to exception message
msg += '\n' + '\n'.join(cause_msg) + '\n'

raise Exception(msg)

Expand Down
10 changes: 10 additions & 0 deletions mrjob/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@
}]
}

# if we can't create or find our own service role, use the one
# created by the AWS console and CLI
FALLBACK_SERVICE_ROLE = 'EMR_DefaultRole'

# if we can't create or find our own instance profile, use the one
# created by the AWS console and CLI
FALLBACK_INSTANCE_PROFILE = 'EMR_EC2_DefaultRole'



log = getLogger(__name__)


Expand Down
33 changes: 23 additions & 10 deletions tests/test_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,37 +665,50 @@ def test_role_auto_creation(self):

def test_iam_instance_profile_option(self):
job_flow = self.run_and_get_job_flow(
'--iam-instance-profile', 'EMR_DefaultRole')
'--iam-instance-profile', 'EMR_EC2_DefaultRole')
self.assertTrue(boto.connect_iam.called)

self.assertEqual(job_flow.jobflowrole, 'EMR_DefaultRole')
self.assertEqual(job_flow.jobflowrole, 'EMR_EC2_DefaultRole')

def test_deprecated_job_flow_role_option(self):
with logger_disabled('mrjob.emr'):
job_flow = self.run_and_get_job_flow(
'--iam-job-flow-role', 'EMR_DefaultRole')
'--iam-job-flow-role', 'EMR_EC2_DefaultRole')
self.assertTrue(boto.connect_iam.called)

self.assertEqual(job_flow.jobflowrole, 'EMR_DefaultRole')
self.assertEqual(job_flow.jobflowrole, 'EMR_EC2_DefaultRole')

def test_iam_service_role_option(self):
job_flow = self.run_and_get_job_flow(
'--iam-service-role', 'EMR_EC2_DefaultRole')
'--iam-service-role', 'EMR_DefaultRole')
self.assertTrue(boto.connect_iam.called)

self.assertEqual(job_flow.servicerole, 'EMR_EC2_DefaultRole')
self.assertEqual(job_flow.servicerole, 'EMR_DefaultRole')

def test_both_iam_options(self):
job_flow = self.run_and_get_job_flow(
'--iam-instance-profile', 'EMR_DefaultRole',
'--iam-service-role', 'EMR_EC2_DefaultRole')
'--iam-instance-profile', 'EMR_EC2_DefaultRole',
'--iam-service-role', 'EMR_DefaultRole')

# users with limited access may not be able to connect to the IAM API.
# This gives them a plan B
self.assertFalse(boto.connect_iam.called)

self.assertEqual(job_flow.jobflowrole, 'EMR_DefaultRole')
self.assertEqual(job_flow.servicerole, 'EMR_EC2_DefaultRole')
self.assertEqual(job_flow.jobflowrole, 'EMR_EC2_DefaultRole')
self.assertEqual(job_flow.servicerole, 'EMR_DefaultRole')

def test_no_iam_access(self):
ex = boto.exception.BotoServerError(403, 'Forbidden')
self.assertIsInstance(boto.connect_iam, Mock)
boto.connect_iam.side_effect = ex

with logger_disabled('mrjob.emr'):
job_flow = self.run_and_get_job_flow()

self.assertTrue(boto.connect_iam.called)

self.assertEqual(job_flow.jobflowrole, 'EMR_EC2_DefaultRole')
self.assertEqual(job_flow.servicerole, 'EMR_DefaultRole')


class EMRAPIParamsTestCase(MockEMRAndS3TestCase):
Expand Down

0 comments on commit 6f55b7a

Please sign in to comment.