Skip to content

Commit

Permalink
Merge branch 'docs' into cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Johnson committed Aug 27, 2012
2 parents 5230ad1 + 3dc53d9 commit 457cd82
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -27,6 +27,7 @@ v0.4, 2012-08-?? -- Slouching toward nirvana
* PROTOCOL_DICT
* IF_SUCCESSFUL
* DEFAULT_CLEANUP
* S3Filesystem.get_s3_folder_keys()

v0.3.5, 2012-08-21 -- The Last Ride of v0.3.x[?]
* EMR:
Expand Down
66 changes: 49 additions & 17 deletions docs/guides/configs-all-runners.rst
Expand Up @@ -20,16 +20,26 @@ data files, etc. This section covers options available to all runners that
mrjob uses to upload files to your job's execution environments.


**bootstrap_mrjob** (:option:`--bootstrap-mrjob`, :option:`--bootstrap-mrjob`)
**bootstrap_mrjob** (:option:`--bootstrap-mrjob`, :option:`--no-bootstrap-mrjob`)
Should we automatically tar up the mrjob library and install it when we run
job? Set this to ``False`` if you've already installed ``mrjob`` on your
Hadoop cluster or install it by some other method.

**upload_files** (:option:`--file`)
A list of files to copy to the local directory of the mr_job script when it
runs. You can set the local name of the dir we unpack into by appending
Files to copy to the local directory of the mr_job script when it runs. You
can set the local name of the dir we unpack into by appending
``#localname`` to the path; otherwise we just use the name of the file.

In the config file::

upload_files:
- file_1.txt
- file_2.sqlite

On the command line::

--file file_1.txt --file file_2.sqlite

**upload_archives** (:option:`--archive`)

A list of archives (e.g. tarballs) to unpack in the local directory of the
Expand All @@ -54,12 +64,26 @@ Temp files and cleanup
List of which kinds of directories to delete when a job succeeds. Valid
choices are:

* ``ALL``: delete local scratch, remote scratch, and logs
* ``LOCAL_SCRATCH``: delete local scratch only
* ``LOGS``: delete logs only
* ``NONE``: delete nothing
* ``REMOTE_SCRATCH``: delete remote scratch only
* ``SCRATCH``: delete local and remote scratch, but not logs
* ``'ALL'``: delete local scratch, remote scratch, and logs; stop job flow
if on EMR and the job is not done when cleanup is run.
* ``'LOCAL_SCRATCH'``: delete local scratch only
* ``'LOGS'``: delete logs only
* ``'NONE'``: delete nothing
* ``'REMOTE_SCRATCH'``: delete remote scratch only
* ``'SCRATCH'``: delete local and remote scratch, but not logs
* ``'JOB'``: stop job if on EMR and the job is not done when cleanup runs
* ``'JOB_FLOW'``: terminate the job flow if on EMR and the job is not done
on cleanup
* ``'IF_SUCCESSFUL'`` (deprecated): same as ``ALL``. Not supported for
``cleanup_on_failure``.

In the config file::

cleanup: [LOGS, JOB]

On the command line::

--cleanup=LOGS,JOB

**cleanup_on_failure** (:option:`--cleanup-on-failure`)
Which kinds of directories to clean up when a job fails. Valid choices are
Expand All @@ -82,7 +106,18 @@ Job execution context
---------------------

**cmdenv** (:option:`--cmdenv`)
Environment variables to pass to the job inside Hadoop streaming
Dictionary of environment variables to pass to the job inside Hadoop
streaming.

In the config file::

cmdenv:
PYTHONPATH: $HOME/stuff
TZ: America/Los_Angeles

On the command line::

--cmdenv PYTHONPATH=$HOME/stuff,TZ=America/Los_Angeles

**interpreter** (:option:`--interpreter`)
Interpreter to launch your script with. Defaults to the value of
Expand All @@ -108,6 +143,10 @@ Job execution context
locking to keep multiple mappers/reducers on the same node from running
*setup_scripts* simultaneously.

**steps_python_bin** (:option:`--steps-python-bin`)
Name/path of alternate python binary to use to query the job about its
steps (e.g. for use with :py:mod:`virtualenv`). Rarely needed. Defaults
to ``sys.executable`` (the current Python interpreter).

Other
-----
Expand Down Expand Up @@ -138,13 +177,6 @@ Other

python my_job.py -c left.conf --no-conf -c right.conf



**steps_python_bin** (:option:`--steps-python-bin`)
Name/path of alternate python binary to use to query the job about its
steps (e.g. for use with :py:mod:`virtualenv`). Rarely needed. Defaults
to ``sys.executable`` (the current Python interpreter).

Options ignored by the inline runner
------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion docs/whats-new.rst
Expand Up @@ -9,7 +9,7 @@ For a complete list of changes, see `CHANGES.txt

The *pool_wait_minutes* (:opt:`--pool-wait-minutes`) option lets your job delay
itself in case a job flow becomes available. Reference:
:doc:`configs-reference`
:doc:`guides/configs-reference`

The ``JOB`` and ``JOB_FLOW`` cleanup options tell mrjob to clean up the job
and/or the job flow on failure (including Ctrl+C). See
Expand Down
6 changes: 5 additions & 1 deletion mrjob/emr.py
Expand Up @@ -185,7 +185,10 @@
# The reason we don't just create a job flow and then query its Hadoop version
# is that for most jobs, we create the steps and the job flow at the same time.
AMI_VERSION_TO_HADOOP_VERSION = {
None: '0.18', # ami_version not specified means version 1.0
None: '0.20.205', # The default is 'latest' now, but you can still pass
# None in the runner kwargs, and that should default us
# to 'latest' in boto. But we really can't know, so
# caveat programmor.
'1.0': '0.18',
'2.0': '0.20.205',
'latest': '0.20.205',
Expand Down Expand Up @@ -412,6 +415,7 @@ def __init__(self, alias, opts, conf_path):
def default_options(self):
super_opts = super(EMRRunnerOptionStore, self).default_options()
return combine_dicts(super_opts, {
'ami_version': 'latest',
'check_emr_status_every': 30,
'ec2_core_instance_type': 'm1.small',
'ec2_master_instance_type': 'm1.small',
Expand Down
8 changes: 5 additions & 3 deletions mrjob/fs/s3.py
Expand Up @@ -261,9 +261,11 @@ def get_s3_keys(self, uri, s3_conn=None):
yield key

def get_s3_folder_keys(self, uri, s3_conn=None):
"""Background: S3 is even less of a filesystem than HDFS in that it
doesn't have directories. EMR fakes directories by creating special
``*_$folder$`` keys in S3.
""".. deprecated:: 0.4.0
Background: EMR used to fake directories on S3 by creating special
``*_$folder$`` keys in S3. That is no longer true, so this method is
deprecated.
For example if your job outputs ``s3://walrus/tmp/output/part-00000``,
EMR will also create these keys:
Expand Down
3 changes: 2 additions & 1 deletion mrjob/options.py
Expand Up @@ -252,7 +252,8 @@ def add_emr_opts(opt_group):
opt_group.add_option(
'--ami-version', dest='ami_version', default=None,
help=(
'AMI Version to use (currently 1.0, 2.0, or latest).')),
'AMI Version to use (currently 1.0, 2.0, or latest, default'
' latest).')),

opt_group.add_option(
'--aws-availability-zone', dest='aws_availability_zone',
Expand Down
19 changes: 11 additions & 8 deletions mrjob/runner.py
Expand Up @@ -20,6 +20,7 @@
import getpass
import logging
import os
import pprint
import random
import re
import shutil
Expand Down Expand Up @@ -82,11 +83,6 @@
CLEANUP_CHOICES = ['ALL', 'LOCAL_SCRATCH', 'LOGS', 'NONE', 'REMOTE_SCRATCH',
'SCRATCH', 'JOB', 'IF_SUCCESSFUL', 'JOB_FLOW']

#: .. deprecated:: 0.3.0
#:
#: the default cleanup-on-success option: ``'IF_SUCCESSFUL'``
CLEANUP_DEFAULT = 'IF_SUCCESSFUL'

_STEP_RE = re.compile(r'^M?C?R?$')

# buffer for piping files into sort on Windows
Expand Down Expand Up @@ -171,6 +167,9 @@ def __init__(self, alias, opts, conf_paths):

self._fix_interp_options()

log.debug('Active configuration:')
log.debug(pprint.pformat(self))

def default_options(self):
super_opts = super(RunnerOptionStore, self).default_options()

Expand All @@ -186,8 +185,6 @@ def default_options(self):
'cleanup_on_failure': ['NONE'],
'hadoop_version': '0.20',
'owner': owner,
'python_bin': ['python'],
'steps_python_bin': [sys.executable or 'python'],
})

def _validate_cleanup(self):
Expand Down Expand Up @@ -217,7 +214,13 @@ def validate_cleanup(error_str, opt_list):

def _fix_interp_options(self):
if not self['steps_python_bin']:
self['steps_python_bin'] = self['python_bin']
self['steps_python_bin'] = (
self['python_bin'] or
sys.executable or
['python'])

if not self['python_bin']:
self['python_bin'] = ['python']

if not self['steps_interpreter']:
if self['interpreter']:
Expand Down
33 changes: 19 additions & 14 deletions tests/test_emr.py
Expand Up @@ -412,7 +412,7 @@ def test_args_version_018(self):
stdin = StringIO('foo\nbar\n')

mr_job = MRTwoStepJob(['-r', 'emr', '-v',
'--hadoop-version=0.18'])
'--hadoop-version=0.18', '--ami-version=1.0'])
mr_job.sandbox(stdin=stdin)

with mr_job.make_runner() as runner:
Expand Down Expand Up @@ -593,18 +593,20 @@ def run_and_get_job_flow(self, *args):
return emr_conn.describe_jobflow(runner.get_emr_job_flow_id())

def test_defaults(self):
job_flow = self.run_and_get_job_flow()
self.assertFalse(hasattr(job_flow, 'amiversion'))
self.assertEqual(job_flow.hadoopversion, '0.20')
job_flow = self.run_and_get_job_flow('--ami-version=1.0')
self.assertEqual(job_flow.amiversion, '1.0')
self.assertEqual(job_flow.hadoopversion, '0.18')

def test_hadoop_version_0_18(self):
job_flow = self.run_and_get_job_flow('--hadoop-version', '0.18')
self.assertFalse(hasattr(job_flow, 'amiversion'))
job_flow = self.run_and_get_job_flow(
'--hadoop-version=0.18', '--ami-version=1.0')
self.assertEqual(job_flow.amiversion, '1.0')
self.assertEqual(job_flow.hadoopversion, '0.18')

def test_hadoop_version_0_20(self):
job_flow = self.run_and_get_job_flow('--hadoop-version', '0.20')
self.assertFalse(hasattr(job_flow, 'amiversion'))
job_flow = self.run_and_get_job_flow(
'--hadoop-version=0.20', '--ami-version=1.0')
self.assertEqual(job_flow.amiversion, '1.0')
self.assertEqual(job_flow.hadoopversion, '0.20')

def test_bad_hadoop_version(self):
Expand Down Expand Up @@ -2040,26 +2042,29 @@ def test_join_named_pool(self):
'--pool-name', 'pool1'])

def test_pooling_with_hadoop_version(self):
_, job_flow_id = self.make_pooled_job_flow(hadoop_version='0.18')
_, job_flow_id = self.make_pooled_job_flow(
ami_version='1.0', hadoop_version='0.18')

self.assertJoins(job_flow_id, [
'-r', 'emr', '-v', '--pool-emr-job-flows',
'--hadoop-version', '0.18'])
'--hadoop-version', '0.18', '--ami-version', '1.0'])

def test_dont_join_pool_with_wrong_hadoop_version(self):
_, job_flow_id = self.make_pooled_job_flow(hadoop_version='0.18')
_, job_flow_id = self.make_pooled_job_flow(
ami_version='1.0', hadoop_version='0.18')

self.assertDoesNotJoin(job_flow_id, [
'-r', 'emr', '-v', '--pool-emr-job-flows',
'--hadoop-version', '0.20'])
'--hadoop-version', '0.20', '--ami-version', '1.0'])

def test_join_anyway_if_i_say_so(self):
_, job_flow_id = self.make_pooled_job_flow(hadoop_version='0.18')
_, job_flow_id = self.make_pooled_job_flow(
ami_version='1.0', hadoop_version='0.18')

self.assertJoins(job_flow_id, [
'-r', 'emr', '-v', '--pool-emr-job-flows',
'--emr-job-flow-id', job_flow_id,
'--hadoop-version', '0.20'])
'--hadoop-version', '0.20', '--ami-version', '1.0'])

def test_pooling_with_ami_version(self):
_, job_flow_id = self.make_pooled_job_flow(ami_version='2.0')
Expand Down

0 comments on commit 457cd82

Please sign in to comment.