Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Yelp/mrjob
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Marin committed Oct 7, 2017
2 parents 977c0df + 8b7d4dc commit afc0d51
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 72 deletions.
1 change: 0 additions & 1 deletion mrjob/bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ def _spark_script_args(self, step_num):
step = self._get_step(step_num)

if step['type'] == 'spark':
# TODO v0.6.0: add in passthrough options
args = (
[
'--step-num=%d' % step_num,
Expand Down
5 changes: 4 additions & 1 deletion mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,13 @@ def __init__(self, **kwargs):
self._output_dir = self._cloud_tmp_dir + 'output/'

# check AMI version
# TODO v0.6.0: warn about AMIs that only have Python 2.6
if self._opts['image_version'].startswith('1.'):
log.warning('1.x AMIs will probably not work because they use'
' Python 2.5. Use a later AMI version or mrjob v0.4.2')
elif not version_gte(self._opts['image_version'], '2.4.3'):
log.warning("AMIs prior to 2.4.3 probably will not work because"
" they don't support Python 2.7. Use a later AMI"
" version or mrjob v0.5.11")

if self._opts['emr_api_params'] is not None:
log.warning('emr_api_params is deprecated and does nothing.'
Expand Down
40 changes: 7 additions & 33 deletions mrjob/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self, args=None):

@classmethod
def _usage(cls):
return "usage: %(prog)s [options]"
return "usage: %(prog)s [options] [input files]"

### Defining one-step streaming jobs ###

Expand Down Expand Up @@ -1097,40 +1097,14 @@ def jobconf(self):
return mrjob.conf.combine_dicts(orig_jobconf, custom_jobconf)
"""

# deal with various forms of bad behavior by users
# combine job and runner jobconf
unfiltered_jobconf = combine_dicts(self.JOBCONF, self.options.jobconf)
filtered_jobconf = {}

def format_hadoop_version(v_float):
if v_float >= 1.0:
# e.g. 1.0
return '%.1f' % v_float
else:
# e.g. 0.20
return '%.2f' % v_float

for key in unfiltered_jobconf:
unfiltered_val = unfiltered_jobconf[key]
filtered_val = unfiltered_val

# boolean values need to be lowercased
if isinstance(unfiltered_val, bool):
if unfiltered_val:
filtered_val = 'true'
else:
filtered_val = 'false'

# TODO v0.6.0: why would a jobconf variable be named
# 'hadoop_version'? hadoop_version should be a string
elif (key == 'hadoop_version' and
isinstance(unfiltered_val, float)):
log.warning('hadoop_version should be a string, not %s' %
unfiltered_val)
filtered_val = format_hadoop_version(unfiltered_val)
filtered_jobconf[key] = filtered_val

return filtered_jobconf
# turn booleans into the Java equivalent ("false", not "False")
return {
k: json.dumps(v) if not isinstance(v, string_types) else v
for k, v in unfiltered_jobconf.items() if v is not None
}

### Secondary Sort ###

Expand Down
8 changes: 4 additions & 4 deletions mrjob/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def error(msg):
def _usage(cls):
"""Command line usage string for this class"""
return ("usage: mrjob run [script path|executable path|--help]"
" [options]")
" [options] [input files]")

def _print_help(self, options):
"""Print help for this job. This will either print runner
Expand Down Expand Up @@ -365,9 +365,9 @@ def configure_args(self):
def pass_arg_through(self, opt_str):
"""Pass the given argument through to the job."""

# _get_optional_actions() is hidden but the interface appears
# to be stable, and theres no non-hidden interface
for action in self.arg_parser._get_optional_actions():
# _actions is hidden but the interface appears to be stable,
# and there's no non-hidden interface we can use
for action in self.arg_parser._actions:
if opt_str in action.option_strings or opt_str == action.dest:
self._passthru_arg_dests.add(action.dest)
break
Expand Down
14 changes: 6 additions & 8 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ def _print_basic_help(option_parser, usage, include_deprecated=False):
"""
help_parser = ArgumentParser(usage=usage, add_help=False)

for action in option_parser._get_optional_actions():
for action in option_parser._actions:
if action.dest in _RUNNER_OPTS:
continue

Expand All @@ -1332,10 +1332,10 @@ def _print_basic_help(option_parser, usage, include_deprecated=False):
not include_deprecated):
continue

help_parser.add_argument(
*(action.option_strings),
dest=action.dest,
help=action.help)
if not action.option_strings:
continue

help_parser._add_action(action)

help_parser.print_help()

Expand Down Expand Up @@ -1370,9 +1370,7 @@ def error(msg):
raw_parser = ArgumentParser(add_help=False)
raw_parser.error = error

actions = parser._get_optional_actions() + parser._get_positional_actions()

for action in actions:
for action in parser._actions:
# single args become single item lists
nargs = 1 if action.nargs is None else action.nargs

Expand Down
1 change: 1 addition & 0 deletions mrjob/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ def __exit__(self, type, value, traceback):

def get_opts(self):
"""Get options set for this runner, as a dict."""
log.warning('get_opts() is deprecated and will be removed in v0.7.0')
return copy.deepcopy(self._opts)

def get_job_key(self):
Expand Down
4 changes: 2 additions & 2 deletions tests/test_bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ def test_job_can_override_jobconf(self):
runner._jobconf_for_step(0), {
'mapreduce.partition.keycomparator.options': '-k1 -k2nr',
'mapreduce.partition.keypartitioner.options': '-k1,1',
'stream.num.map.output.key.fields': 3,
'stream.num.map.output.key.fields': '3',
}
)

Expand All @@ -927,7 +927,7 @@ def test_steps_can_override_jobconf(self):
'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',
'mapreduce.partition.keycomparator.options': '-k1 -k2nr',
'mapreduce.partition.keypartitioner.options': '-k1,1',
'stream.num.map.output.key.fields': 3,
'stream.num.map.output.key.fields': '3',
}
)

Expand Down
23 changes: 1 addition & 22 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,6 @@ class MRBoolJobConfJob(MRJob):
JOBCONF = {'true_value': True,
'false_value': False}

class MRHadoopVersionJobConfJob1(MRJob):
JOBCONF = {'hadoop_version': 1.0}

class MRHadoopVersionJobConfJob2(MRJob):
JOBCONF = {'hadoop_version': 0.20}

def test_empty(self):
mr_job = MRJob()

Expand All @@ -640,21 +634,6 @@ def test_bool_options(self):
self.assertEqual(mr_job.jobconf()['true_value'], 'true')
self.assertEqual(mr_job.jobconf()['false_value'], 'false')

def assert_hadoop_version(self, JobClass, version_string):
mr_job = JobClass()
mock_log = StringIO()
with no_handlers_for_logger('mrjob.job'):
log_to_stream('mrjob.job', mock_log)
self.assertEqual(mr_job.jobconf()['hadoop_version'],
version_string)
self.assertIn('should be a string', mock_log.getvalue())

def test_float_options(self):
self.assert_hadoop_version(self.MRHadoopVersionJobConfJob1, '1.0')

def test_float_options_2(self):
self.assert_hadoop_version(self.MRHadoopVersionJobConfJob2, '0.20')

def test_jobconf_method(self):
mr_job = self.MRJobConfJob()

Expand Down Expand Up @@ -1250,7 +1229,7 @@ def test_basic_help_deprecated(self):
# currently there are no deprecated options to test against
#self.assertIn('--partitioner', output)
self.assertNotIn('add --deprecated', output)
self.assertIn('--deprecated DEPRECATED', output)
self.assertIn('--deprecated', output)

def test_runner_help(self):
MRJob(['--help', '-r', 'emr'])
Expand Down
2 changes: 1 addition & 1 deletion tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def test_can_turn_off_bootstrap_mrjob(self):

with mr_job.make_runner() as runner:
# sanity check
self.assertEqual(runner.get_opts()['bootstrap_mrjob'], False)
self.assertEqual(runner._opts['bootstrap_mrjob'], False)
local_tmp_dir = os.path.realpath(runner._get_local_tmp_dir())
try:
with no_handlers_for_logger():
Expand Down

0 comments on commit afc0d51

Please sign in to comment.