Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added sphinx docs for PyPi. Some renames and hiding of methods so tha…

…t the docs would make more sense.
commit bd46f0a0210d753dacd3576150f6be2deb72d52e 1 parent ea048f9
David Marin authored
Showing with 2,290 additions and 899 deletions.
  1. +71 −0 README.md
  2. +0 −70 README.txt
  3. +3 −0  docs/.gitignore
  4. +130 −0 docs/Makefile
  5. +216 −0 docs/conf.py
  6. +25 −0 docs/configs-conf.rst
  7. +88 −0 docs/configs-reference.rst
  8. +32 −0 docs/configs-runners.rst
  9. +8 −0 docs/configs.rst
  10. +22 −0 docs/index.rst
  11. +42 −0 docs/job-advanced.rst
  12. +33 −0 docs/job-basic.rst
  13. +8 −0 docs/job-testing.rst
  14. +9 −0 docs/job.rst
  15. +13 −0 docs/library.rst
  16. +155 −0 docs/make.bat
  17. +17 −0 docs/protocols.rst
  18. +22 −0 docs/runners-emr.rst
  19. +12 −0 docs/runners-hadoop.rst
  20. +7 −0 docs/runners-local.rst
  21. +38 −0 docs/runners-runner.rst
  22. +10 −0 docs/runners.rst
  23. +15 −0 docs/tools.rst
  24. +5 −0 docs/utils-parse.rst
  25. +5 −0 docs/utils-retry.rst
  26. +5 −0 docs/utils-util.rst
  27. +8 −0 docs/utils.rst
  28. +4 −0 docs/writing-and-running.rst
  29. +1 −0  examples
  30. +1 −1  mrjob.conf.example
  31. +3 −2 mrjob/botoemr/step.py
  32. +173 −81 mrjob/conf.py
  33. +129 −124 mrjob/emr.py
  34. +42 −38 mrjob/hadoop.py
  35. +614 −327 mrjob/job.py
  36. +15 −14 mrjob/local.py
  37. +31 −32 mrjob/parse.py
  38. +71 −31 mrjob/protocol.py
  39. +8 −10 mrjob/retry.py
  40. +125 −128 mrjob/runner.py
  41. +9 −2 mrjob/tools/emr/create_job_flow.py
  42. +3 −1 mrjob/tools/emr/terminate_idle_job_flows.py
  43. +3 −1 mrjob/tools/emr/terminate_job_flow.py
  44. +31 −26 mrjob/util.py
  45. +8 −8 setup.py
  46. +1 −0  tests/conf_test.py
  47. +2 −2 tests/emr_test.py
  48. +1 −1  tests/hadoop_test.py
  49. +16 −0 tests/job_test.py
View
71 README.md
@@ -0,0 +1,71 @@
+mrjob
+=====
+
+mrjob is a Python package that helps you write and run Hadoop Streaming jobs.
+
+mrjob fully supports Amazon's Elastic MapReduce (EMR) service, which allows you to buy time on a Hadoop cluster on an hourly basis. It also works with your own Hadoop cluster.
+
+Some important features:
+
+ * Run jobs on EMR, your own Hadoop cluster, or locally (for testing).
+ * Write multi-step jobs (one map-reduce step feeds into the next)
+ * Duplicate your production environment inside Hadoop
+ * Upload your source tree and put it in your job's `$PYTHONPATH`
+ * Run make and other setup scripts
+ * Set environment variables (e.g. `$TZ`)
+ * Easily install python packages from tarballs (EMR only)
+ * Setup handled transparently by `mrjob.conf` config file
+ * Automatically interpret error logs from EMR
+ * SSH tunnel to hadoop job tracker on EMR
+ * Zero setup on Hadoop (no need to install mrjob on your Hadoop cluster)
+
+Installation
+============
+`python setup.py install`
+
+Works out-of-the box with your hadoop cluster (just set `$HADOOP_HOME`)
+
+Minimal EMR setup:
+
+ * create an Amazon Web Services account: <http://aws.amazon.com/>
+ * sign up for Elastic MapReduce: <http://aws.amazon.com/elasticmapreduce/>
+ * Get your access and secret keys (go to <http://aws.amazon.com/account/> and
+ click on "Security Credentials") and set the environment variables
+ `$AWS_ACCESS_KEY_ID` and `$AWS_SECRET_ACCESS_KEY` accordingly
+ * create at least one S3 bucket in the "US Standard" region to use for logs
+ and scratch space: <https://console.aws.amazon.com/s3/home>
+
+mrjob will work in other AWS regions (e.g. Asia), but you'll have to set up
+`mrjob.conf`. See below.
+
+
+Try it out!
+===========
+ # locally
+ python mrjob/examples/mr_word_freq_count.py README.txt > counts
+ # on EMR
+ python mrjob/examples/mr_word_freq_count.py README.txt -r emr > counts
+ # on your Hadoop cluster
+ python mrjob/examples/mr_word_freq_count.py README.txt -r hadoop > counts
+
+
+Advanced Configuration
+======================
+To run in other AWS regions, upload your source tree, run `make`, and use
+other advanced mrjob features, you'll need to set up `mrjob.conf`. mrjob looks
+for its conf file in:
+
+ * `~/.mrjob`
+ * `mrjob.conf` anywhere in your `$PYTHONPATH`
+ * `/etc/mrjob.conf`
+
+See `mrjob.conf.example` for more information.
+
+
+Links
+=====
+
+ * source: <http://github.com/Yelp/mrjob>
+ * documentation: <http://packages.python.org/mrjob/>
+ * Hadoop MapReduce: <http://hadoop.apache.org/mapreduce/>
+ * Elastic MapReduce: <http://aws.amazon.com/documentation/elasticmapreduce/>
View
70 README.txt
@@ -1,70 +0,0 @@
-=====
-mrjob
-=====
-
-mrjob is a Python package that helps you write and run Hadoop Streaming jobs.
-
-mrjob fully supports Amazon's Elastic MapReduce (EMR) service, which allows you to buy time on a Hadoop cluster on an hourly basis. It also works with your own Hadoop cluster.
-
-Some important features:
-* Run jobs on EMR, your own Hadoop cluster, or locally (for testing).
-* Write multi-step jobs (one map-reduce step feeds into the next)
-* Duplicate your production environment inside Hadoop
- * Upload your source tree and put it in your job's PYTHONPATH
- * Run make and other setup scripts
- * Set environment variables (e.g. $TZ)
- * Easily install python packages from tarballs (EMR only)
- * Setup handled transparently by mrjob.conf config file
-* Automatically interpret error logs from EMR
-* SSH tunnel to hadoop job tracker on EMR
-* Zero setup on Hadoop (no need to install mrjob on your Hadoop cluster)
-
-
-Installation
-============
-* python setup.py install
-
-Works out-of-the box with your hadoop cluster (just set $HADOOP_HOME)
-
-Minimal EMR setup:
-* create an Amazon Web Services account (http://aws.amazon.com/)
-* sign up for Elastic MapReduce (http://aws.amazon.com/elasticmapreduce/)
-* Get your access and secret keys (go to http://aws.amazon.com/account/ and
- click on "Security Credentials") and set the environment variables
- AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY accordingly
-* create at least one S3 bucket in the "US Standard" region to use for logs
- and scratch space (https://console.aws.amazon.com/s3/home)
-
-mrjob will work in other AWS regions (e.g. Asia), but you'll have to set up mrjob.conf. See below.
-
-
-Try it out!
-===========
-Locally:
-python mrjob/examples/mr_word_freq_count.py README.txt > counts
-
-On EMR:
-python mrjob/examples/mr_word_freq_count.py README.txt --runner emr > counts
-
-On your Hadoop cluster:
-python mrjob/examples/mr_word_freq_count.py README.txt --runner hadoop > counts
-
-
-Advanced Configuration
-======================
-To run in other AWS regions, upload your source tree, run make, and use other advanced mrjob features, you'll need to set up mrjob.conf. mrjob looks for its
-conf file in:
-* ~/.mrjob
-* mrjob.conf anywhere in your $PYTHONPATH
-* /etc/mrjob.conf
-
-See mrjob.conf.example for more information.
-
-
-Links
-=====
-source: http://github.com/Yelp/mrjob
-documentation: http://packages.python.org/mrjob/
-Hadoop MapReduce: http://hadoop.apache.org/mapreduce/
-Elastic MapReduce: http://aws.amazon.com/documentation/elasticmapreduce/
-
View
3  docs/.gitignore
@@ -0,0 +1,3 @@
+_build
+_static
+_templates
View
130 docs/Makefile
@@ -0,0 +1,130 @@
+# Makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line.
+SPHINXOPTS =
+SPHINXBUILD = sphinx-build
+PAPER =
+BUILDDIR = _build
+
+# Internal variables.
+PAPEROPT_a4 = -D latex_paper_size=a4
+PAPEROPT_letter = -D latex_paper_size=letter
+ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
+
+.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest
+
+help:
+ @echo "Please use \`make <target>' where <target> is one of"
+ @echo " html to make standalone HTML files"
+ @echo " dirhtml to make HTML files named index.html in directories"
+ @echo " singlehtml to make a single large HTML file"
+ @echo " pickle to make pickle files"
+ @echo " json to make JSON files"
+ @echo " htmlhelp to make HTML files and a HTML help project"
+ @echo " qthelp to make HTML files and a qthelp project"
+ @echo " devhelp to make HTML files and a Devhelp project"
+ @echo " epub to make an epub"
+ @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
+ @echo " latexpdf to make LaTeX files and run them through pdflatex"
+ @echo " text to make text files"
+ @echo " man to make manual pages"
+ @echo " changes to make an overview of all changed/added/deprecated items"
+ @echo " linkcheck to check all external links for integrity"
+ @echo " doctest to run all doctests embedded in the documentation (if enabled)"
+
+clean:
+ -rm -rf $(BUILDDIR)/*
+
+html:
+ $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
+ @echo
+ @echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
+
+dirhtml:
+ $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
+ @echo
+ @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
+
+singlehtml:
+ $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
+ @echo
+ @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
+
+pickle:
+ $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
+ @echo
+ @echo "Build finished; now you can process the pickle files."
+
+json:
+ $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
+ @echo
+ @echo "Build finished; now you can process the JSON files."
+
+htmlhelp:
+ $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
+ @echo
+ @echo "Build finished; now you can run HTML Help Workshop with the" \
+ ".hhp project file in $(BUILDDIR)/htmlhelp."
+
+qthelp:
+ $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
+ @echo
+ @echo "Build finished; now you can run "qcollectiongenerator" with the" \
+ ".qhcp project file in $(BUILDDIR)/qthelp, like this:"
+ @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/mrjob.qhcp"
+ @echo "To view the help file:"
+ @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/mrjob.qhc"
+
+devhelp:
+ $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
+ @echo
+ @echo "Build finished."
+ @echo "To view the help file:"
+ @echo "# mkdir -p $$HOME/.local/share/devhelp/mrjob"
+ @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/mrjob"
+ @echo "# devhelp"
+
+epub:
+ $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
+ @echo
+ @echo "Build finished. The epub file is in $(BUILDDIR)/epub."
+
+latex:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo
+ @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
+ @echo "Run \`make' in that directory to run these through (pdf)latex" \
+ "(use \`make latexpdf' here to do that automatically)."
+
+latexpdf:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo "Running LaTeX files through pdflatex..."
+ make -C $(BUILDDIR)/latex all-pdf
+ @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
+
+text:
+ $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
+ @echo
+ @echo "Build finished. The text files are in $(BUILDDIR)/text."
+
+man:
+ $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
+ @echo
+ @echo "Build finished. The manual pages are in $(BUILDDIR)/man."
+
+changes:
+ $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
+ @echo
+ @echo "The overview file is in $(BUILDDIR)/changes."
+
+linkcheck:
+ $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
+ @echo
+ @echo "Link check complete; look for any errors in the above output " \
+ "or in $(BUILDDIR)/linkcheck/output.txt."
+
+doctest:
+ $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
+ @echo "Testing of doctests in the sources finished, look at the " \
+ "results in $(BUILDDIR)/doctest/output.txt."
View
216 docs/conf.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+#
+# mrjob documentation build configuration file, created by
+# sphinx-quickstart on Thu Oct 14 10:13:34 2010.
+#
+# This file is execfile()d with the current directory set to its containing dir.
+#
+# Note that not all possible configuration values are present in this
+# autogenerated file.
+#
+# All configuration values have a default; values that are commented out
+# serve to show the default.
+
+import sys, os
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+sys.path.insert(0, os.path.abspath('..'))
+
+# -- General configuration -----------------------------------------------------
+
+# If your documentation needs a minimal Sphinx version, state it here.
+#needs_sphinx = '1.0'
+
+# Add any Sphinx extension module names here, as strings. They can be extensions
+# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
+extensions = ['sphinx.ext.autodoc', 'sphinx.ext.coverage']
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ['_templates']
+
+# The suffix of source filenames.
+source_suffix = '.rst'
+
+# The encoding of source files.
+#source_encoding = 'utf-8-sig'
+
+# The master toctree document.
+master_doc = 'index'
+
+# General information about the project.
+project = u'mrjob'
+copyright = u'2010, Yelp'
+
+# The version info for the project you're documenting, acts as replacement for
+# |version| and |release|, also used in various other places throughout the
+# built documents.
+#
+# The short X.Y version.
+version = '0.1.0'
+# The full version, including alpha/beta/rc tags.
+release = '0.1.0-pre1'
+
+# The language for content autogenerated by Sphinx. Refer to documentation
+# for a list of supported languages.
+#language = None
+
+# There are two options for replacing |today|: either, you set today to some
+# non-false value, then it is used:
+#today = ''
+# Else, today_fmt is used as the format for a strftime call.
+#today_fmt = '%B %d, %Y'
+
+# List of patterns, relative to source directory, that match files and
+# directories to ignore when looking for source files.
+exclude_patterns = ['_build']
+
+# The reST default role (used for this markup: `text`) to use for all documents.
+#default_role = None
+
+# If true, '()' will be appended to :func: etc. cross-reference text.
+#add_function_parentheses = True
+
+# If true, the current module name will be prepended to all description
+# unit titles (such as .. function::).
+#add_module_names = True
+
+# If true, sectionauthor and moduleauthor directives will be shown in the
+# output. They are ignored by default.
+#show_authors = False
+
+# The name of the Pygments (syntax highlighting) style to use.
+pygments_style = 'sphinx'
+
+# A list of ignored prefixes for module index sorting.
+#modindex_common_prefix = []
+
+
+# -- Options for HTML output ---------------------------------------------------
+
+# The theme to use for HTML and HTML Help pages. See the documentation for
+# a list of builtin themes.
+html_theme = 'default'
+
+# Theme options are theme-specific and customize the look and feel of a theme
+# further. For a list of options available for each theme, see the
+# documentation.
+#html_theme_options = {}
+
+# Add any paths that contain custom themes here, relative to this directory.
+#html_theme_path = []
+
+# The name for this set of Sphinx documents. If None, it defaults to
+# "<project> v<release> documentation".
+#html_title = None
+
+# A shorter title for the navigation bar. Default is the same as html_title.
+#html_short_title = None
+
+# The name of an image file (relative to this directory) to place at the top
+# of the sidebar.
+#html_logo = None
+
+# The name of an image file (within the static path) to use as favicon of the
+# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
+# pixels large.
+#html_favicon = None
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['_static']
+
+# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
+# using the given strftime format.
+#html_last_updated_fmt = '%b %d, %Y'
+
+# If true, SmartyPants will be used to convert quotes and dashes to
+# typographically correct entities.
+#html_use_smartypants = True
+
+# Custom sidebar templates, maps document names to template names.
+#html_sidebars = {}
+
+# Additional templates that should be rendered to pages, maps page names to
+# template names.
+#html_additional_pages = {}
+
+# If false, no module index is generated.
+#html_domain_indices = True
+
+# If false, no index is generated.
+#html_use_index = True
+
+# If true, the index is split into individual pages for each letter.
+#html_split_index = False
+
+# If true, links to the reST sources are added to the pages.
+html_show_sourcelink = True
+
+# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
+#html_show_sphinx = True
+
+# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
+#html_show_copyright = True
+
+# If true, an OpenSearch description file will be output, and all pages will
+# contain a <link> tag referring to it. The value of this option must be the
+# base URL from which the finished HTML is served.
+#html_use_opensearch = ''
+
+# This is the file name suffix for HTML files (e.g. ".xhtml").
+#html_file_suffix = None
+
+# Output file base name for HTML help builder.
+htmlhelp_basename = 'mrjobdoc'
+
+
+# -- Options for LaTeX output --------------------------------------------------
+
+# The paper size ('letter' or 'a4').
+#latex_paper_size = 'letter'
+
+# The font size ('10pt', '11pt' or '12pt').
+#latex_font_size = '10pt'
+
+# Grouping the document tree into LaTeX files. List of tuples
+# (source start file, target name, title, author, documentclass [howto/manual]).
+latex_documents = [
+ ('index', 'mrjob.tex', u'mrjob Documentation',
+ u'David Marin', 'manual'),
+]
+
+# The name of an image file (relative to this directory) to place at the top of
+# the title page.
+#latex_logo = None
+
+# For "manual" documents, if this is true, then toplevel headings are parts,
+# not chapters.
+#latex_use_parts = False
+
+# If true, show page references after internal links.
+#latex_show_pagerefs = False
+
+# If true, show URL addresses after external links.
+#latex_show_urls = False
+
+# Additional stuff for the LaTeX preamble.
+#latex_preamble = ''
+
+# Documents to append as an appendix to all manuals.
+#latex_appendices = []
+
+# If false, no module index is generated.
+#latex_domain_indices = True
+
+
+# -- Options for manual page output --------------------------------------------
+
+# One entry per manual page. List of tuples
+# (source start file, name, description, authors, manual section).
+man_pages = [
+ ('index', 'mrjob', u'mrjob Documentation',
+ [u'David Marin'], 1)
+]
View
25 docs/configs-conf.rst
@@ -0,0 +1,25 @@
+mrjob.conf - parse and write config files
+=========================================
+
+.. automodule:: mrjob.conf
+
+Reading and writing mrjob.conf
+------------------------------
+
+.. autofunction:: find_mrjob_conf
+.. autofunction:: load_mrjob_conf
+.. autofunction:: load_opts_from_mrjob_conf
+
+Combining options
+-----------------
+
+Combiner functions take a list of values to combine, with later options taking precedence over earlier ones. ``None`` values are always ignored.
+
+.. autofunction:: combine_values
+.. autofunction:: combine_lists
+.. autofunction:: combine_dicts
+.. autofunction:: combine_envs
+.. autofunction:: combine_paths
+.. autofunction:: combine_path_lists
+
+
View
88 docs/configs-reference.rst
@@ -0,0 +1,88 @@
+Quick Reference
+===============
+
+Command-line only options for all runners
+-----------------------------------------
+
+=================== ======================================================= ========================================================
+Option Default Switches
+=================== ======================================================= ========================================================
+*conf_path* (automatic; see :py:func:`~mrjob.conf.find_mrjob.conf`) :option:`-c`, :option:`--conf-path`, :option:`--no-conf`
+*extra_args* ``[]`` (see :py:meth:`~mrjob.job.MRJob.add_passthrough_option`)
+*file_upload_args* ``[]`` (see :py:meth:`~mrjob.job.MRJob.add_file_option`)
+*output_dir* (automatic) :option:`-o`, :option:`--output-dir`
+=================== ======================================================= ========================================================
+
+See :py:meth:`mrjob.runner.MRJobRunner.__init__` for details.
+
+Options for all runners
+-----------------------
+
+=================== ============================== ========================================= ===========================
+Option Default Combined by Switches
+=================== ============================== ========================================= ===========================
+*base_tmp_dir* :envvar:`TMPDIR` or ``'/tmp'`` :py:func:`~mrjob.conf.combine_paths`
+*bootstrap_mrjob* ``True`` :py:func:`~mrjob.conf.combine_values`
+*cleanup* ``'IF_SUCCESSFUL'`` :py:func:`~mrjob.conf.combine_values` :option:`--cleanup`
+*cmdenv* ``{}`` :py:func:`~mrjob.conf.combine_envs`
+*hadoop_extra_args* ``[]`` :py:func:`~mrjob.conf.combine_lists` :option:`--hadoop-arg`
+*job_name_prefix* (automatic) :py:func:`~mrjob.conf.combine_values` :option:`--job-name-prefix`
+*jobconf* ``{}`` :py:func:`~mrjob.conf.combine_dicts` :option:`--jobconf`
+*python_archives* ``[]`` :py:func:`~mrjob.conf.combine_path_lists`
+*setup_cmds* ``[]`` :py:func:`~mrjob.conf.combine_lists`
+*setup_scripts* ``[]`` :py:func:`~mrjob.conf.combine_path_lists`
+*upload_archives* ``[]`` :py:func:`~mrjob.conf.combine_path_lists` :option:`--archive`
+*upload_files* ``[]`` :py:func:`~mrjob.conf.combine_path_lists` :option:`--file`
+=================== ============================== ========================================= ===========================
+
+See :py:meth:`mrjob.runner.MRJobRunner.__init__` for details.
+
+:py:class:`~mrjob.local.LocalMRJobRunner` takes no additional options.
+
+Additional options for :py:class:`~mrjob.emr.EMRJobRunner`
+----------------------------------------------------------
+
+=========================== ============================== ========================================= =====================================
+Option Default Combined by Switches
+=========================== ============================== ========================================= =====================================
+*aws_access_key_id* (automatic) :py:func:`~mrjob.conf.combine_values`
+*aws_secret_access_key* (automatic) :py:func:`~mrjob.conf.combine_values`
+*aws_region* (automatic) :py:func:`~mrjob.conf.combine_values`
+*bootstrap_cmds* ``[]`` :py:func:`~mrjob.conf.combine_lists`
+*bootstrap_files* ``[]`` :py:func:`~mrjob.conf.combine_path_lists`
+*bootstrap_python_packages* ``[]`` :py:func:`~mrjob.conf.combine_path_lists`
+*bootstrap_scripts* ``[]`` :py:func:`~mrjob.conf.combine_values`
+*check_emr_status_every* ``30`` :py:func:`~mrjob.conf.combine_values` :option:`--check-emr-status-every`
+*ec2_instance_type* ``'m1.small'`` :py:func:`~mrjob.conf.combine_values` :option:`--ec2-instance-type`
+*ec2_key_pair* ``None`` :py:func:`~mrjob.conf.combine_values`
+*ec2_key_pair_file* ``None`` :py:func:`~mrjob.conf.combine_paths`
+*ec2_master_instance_type* (same as *ec2_instance_type*) :py:func:`~mrjob.conf.combine_values` :option:`--ec2-master-instance-type`
+*ec2_slave_instance_type* (same as *ec2_instance_type*) :py:func:`~mrjob.conf.combine_values` :option:`--ec2-slave-instance-type`
+*emr_endpoint* (automatic) :py:func:`~mrjob.conf.combine_values`
+*emr_job_flow_id* ``None`` :py:func:`~mrjob.conf.combine_values`
+*num_ec2_instances* ``1`` :py:func:`~mrjob.conf.combine_values` :option:`--num-ec2-instances`
+*s3_endpoint* (automatic) :py:func:`~mrjob.conf.combine_paths`
+*s3_log_uri* (automatic) :py:func:`~mrjob.conf.combine_paths` :option:`--s3-log-uri`
+*s3_scratch_uri* (automatic) :py:func:`~mrjob.conf.combine_values` :option:`--s3-scratch-uri`
+*s3_sync_wait_time* ``5.0`` :py:func:`~mrjob.conf.combine_values`
+*ssh_bin* ``'ssh'`` :py:func:`~mrjob.conf.combine_paths`
+*ssh_bind_ports* ``range(40001, 40841)`` :py:func:`~mrjob.conf.combine_values`
+*ssh_tunnel_to_job_tracker* ``False`` :py:func:`~mrjob.conf.combine_values` :option:`--ssh-tunnel-to-job-tracker`
+*ssh_tunnel_is_open* ``False`` :py:func:`~mrjob.conf.combine_values` :option:`--ssh-tunnel-is-open`
+=========================== ============================== ========================================= =====================================
+
+See :py:meth:`mrjob.emr.EMRJobRunner.__init__` for details.
+
+Additional options for :py:class:`~mrjob.hadoop.HadoopJobRunner`
+----------------------------------------------------------------
+
+====================== =========================== ===================================== ================================
+Option Default Combined by Switches
+====================== =========================== ===================================== ================================
+*hadoop_bin* (automatic) :py:func:`~mrjob.conf.combine_paths` :option:`--hadoop-bin`
+*hadoop_home* :envvar:HADOOP_HOME :py:func:`~mrjob.conf.combine_values`
+*hdfs_scratch_dir* ``tmp/mrjob`` (in HDFS) :py:func:`~mrjob.conf.combine_paths` :option:`--hdfs-scratch-dir`
+*hadoop_streaming_jar* (automatic) :py:func:`~mrjob.conf.combine_paths` :option:`--hadoop-streaming-jar`
+====================== =========================== ===================================== ================================
+
+See :py:meth:`mrjob.hadoop.HadoopJobRunner.__init__` for details.
View
32 docs/configs-runners.rst
@@ -0,0 +1,32 @@
+Configuration options
+=====================
+
+Runners are configured through keyword arguments to their init methods.
+
+These can be set:
+
+- from :py:mod:`mrjob.conf`
+- from the command line
+- by re-defining `~mrjob.job.MRJob.job_runner_kwargs` etc in your `~mrjob.MRJob` (see :ref:`job-configuration`)
+- by instantiating the runner directly
+
+All runners
+-----------
+
+.. automethod:: mrjob.runner.MRJobRunner.__init__
+
+Locally
+-------
+
+.. automethod:: mrjob.local.LocalMRJobRunner.__init__
+
+On EMR
+------
+
+.. automethod:: mrjob.emr.EMRJobRunner.__init__
+
+On your Hadoop cluster
+----------------------
+
+.. automethod:: mrjob.hadoop.HadoopJobRunner.__init__
+
View
8 docs/configs.rst
@@ -0,0 +1,8 @@
+Runner configuration
+====================
+.. toctree::
+
+ configs-conf.rst
+ configs-runners.rst
+ configs-reference.rst
+
View
22 docs/index.rst
@@ -0,0 +1,22 @@
+.. mrjob documentation master file, created by
+ sphinx-quickstart on Thu Oct 14 10:13:34 2010.
+ You can adapt this file completely to your liking, but it should at least
+ contain the root `toctree` directive.
+
+mrjob 0.1.0
+=================================
+
+mrjob - Run Hadoop Streaming jobs on Amazon Elastic MapReduce or your
+own Hadoop cluster
+
+.. toctree::
+ :maxdepth: 3
+ :numbered:
+
+ writing-and-running.rst
+ library.rst
+
+Indices and tables
+==================
+
+* :ref:`search`
View
42 docs/job-advanced.rst
@@ -0,0 +1,42 @@
+Advanced
+========
+
+.. currentmodule:: mrjob.job
+
+.. _job-protocols:
+
+Protocols
+---------
+.. autoattribute:: MRJob.DEFAULT_INPUT_PROTOCOL
+.. autoattribute:: MRJob.DEFAULT_PROTOCOL
+.. autoattribute:: MRJob.DEFAULT_OUTPUT_PROTOCOL
+.. automethod:: MRJob.protocols
+.. automethod:: MRJob.pick_protocols
+
+Custom command-line options
+---------------------------
+.. automethod:: MRJob.configure_options
+.. automethod:: MRJob.add_passthrough_option
+.. automethod:: MRJob.add_file_option
+.. automethod:: MRJob.load_options
+.. automethod:: MRJob.is_mapper_or_reducer
+
+.. _job-configuration:
+
+Job runner configuration
+------------------------
+.. automethod:: MRJob.job_runner_kwargs
+.. automethod:: MRJob.local_job_runner_kwargs
+.. automethod:: MRJob.emr_job_runner_kwargs
+.. automethod:: MRJob.hadoop_job_runner_kwargs
+.. automethod:: MRJob.generate_passthrough_arguments
+.. automethod:: MRJob.generate_file_upload_args
+.. automethod:: MRJob.mr_job_script
+
+How jobs are run
+----------------
+.. automethod:: MRJob.run_job
+.. automethod:: MRJob.run_mapper
+.. automethod:: MRJob.run_reducer
+.. automethod:: MRJob.show_steps
+
View
33 docs/job-basic.rst
@@ -0,0 +1,33 @@
+Basic
+=====
+
+.. currentmodule:: mrjob.job
+
+.. autoclass:: MRJob
+
+One-step jobs
+----------------
+.. automethod:: MRJob.mapper
+.. automethod:: MRJob.reducer
+.. automethod:: MRJob.mapper_final
+
+Running the job
+---------------
+.. automethod:: MRJob.run
+.. automethod:: MRJob.__init__
+.. automethod:: MRJob.make_runner
+
+Parsing the output
+------------------
+.. automethod:: MRJob.parse_output_line
+
+Multi-step jobs
+---------------
+.. automethod:: MRJob.steps
+.. automethod:: MRJob.mr
+
+Counters and status messages
+----------------------------
+.. automethod:: MRJob.increment_counter
+.. automethod:: MRJob.set_status
+
View
8 docs/job-testing.rst
@@ -0,0 +1,8 @@
+Hooks for testing
+=================
+
+.. currentmodule:: mrjob.job
+
+.. automethod:: MRJob.sandbox
+.. automethod:: MRJob.parse_output
+.. automethod:: MRJob.parse_counters
View
9 docs/job.rst
@@ -0,0 +1,9 @@
+mrjob.job.MRJob - base class for all jobs
+=========================================
+
+.. toctree::
+ :maxdepth: 3
+
+ job-basic.rst
+ job-advanced.rst
+ job-testing.rst
View
13 docs/library.rst
@@ -0,0 +1,13 @@
+mrjob Library documentation
+===========================
+.. toctree::
+ :maxdepth: 2
+
+ job.rst
+ protocols.rst
+ runners.rst
+ configs.rst
+ utils.rst
+ tools.rst
+
+
View
155 docs/make.bat
@@ -0,0 +1,155 @@
+@ECHO OFF
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set BUILDDIR=_build
+set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% .
+if NOT "%PAPER%" == "" (
+ set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS%
+)
+
+if "%1" == "" goto help
+
+if "%1" == "help" (
+ :help
+ echo.Please use `make ^<target^>` where ^<target^> is one of
+ echo. html to make standalone HTML files
+ echo. dirhtml to make HTML files named index.html in directories
+ echo. singlehtml to make a single large HTML file
+ echo. pickle to make pickle files
+ echo. json to make JSON files
+ echo. htmlhelp to make HTML files and a HTML help project
+ echo. qthelp to make HTML files and a qthelp project
+ echo. devhelp to make HTML files and a Devhelp project
+ echo. epub to make an epub
+ echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter
+ echo. text to make text files
+ echo. man to make manual pages
+ echo. changes to make an overview over all changed/added/deprecated items
+ echo. linkcheck to check all external links for integrity
+ echo. doctest to run all doctests embedded in the documentation if enabled
+ goto end
+)
+
+if "%1" == "clean" (
+ for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i
+ del /q /s %BUILDDIR%\*
+ goto end
+)
+
+if "%1" == "html" (
+ %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/html.
+ goto end
+)
+
+if "%1" == "dirhtml" (
+ %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml.
+ goto end
+)
+
+if "%1" == "singlehtml" (
+ %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml.
+ goto end
+)
+
+if "%1" == "pickle" (
+ %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle
+ echo.
+ echo.Build finished; now you can process the pickle files.
+ goto end
+)
+
+if "%1" == "json" (
+ %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json
+ echo.
+ echo.Build finished; now you can process the JSON files.
+ goto end
+)
+
+if "%1" == "htmlhelp" (
+ %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp
+ echo.
+ echo.Build finished; now you can run HTML Help Workshop with the ^
+.hhp project file in %BUILDDIR%/htmlhelp.
+ goto end
+)
+
+if "%1" == "qthelp" (
+ %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp
+ echo.
+ echo.Build finished; now you can run "qcollectiongenerator" with the ^
+.qhcp project file in %BUILDDIR%/qthelp, like this:
+ echo.^> qcollectiongenerator %BUILDDIR%\qthelp\mrjob.qhcp
+ echo.To view the help file:
+ echo.^> assistant -collectionFile %BUILDDIR%\qthelp\mrjob.ghc
+ goto end
+)
+
+if "%1" == "devhelp" (
+ %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp
+ echo.
+ echo.Build finished.
+ goto end
+)
+
+if "%1" == "epub" (
+ %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub
+ echo.
+ echo.Build finished. The epub file is in %BUILDDIR%/epub.
+ goto end
+)
+
+if "%1" == "latex" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ echo.
+ echo.Build finished; the LaTeX files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "text" (
+ %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text
+ echo.
+ echo.Build finished. The text files are in %BUILDDIR%/text.
+ goto end
+)
+
+if "%1" == "man" (
+ %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man
+ echo.
+ echo.Build finished. The manual pages are in %BUILDDIR%/man.
+ goto end
+)
+
+if "%1" == "changes" (
+ %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes
+ echo.
+ echo.The overview file is in %BUILDDIR%/changes.
+ goto end
+)
+
+if "%1" == "linkcheck" (
+ %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck
+ echo.
+ echo.Link check complete; look for any errors in the above output ^
+or in %BUILDDIR%/linkcheck/output.txt.
+ goto end
+)
+
+if "%1" == "doctest" (
+ %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest
+ echo.
+ echo.Testing of doctests in the sources finished, look at the ^
+results in %BUILDDIR%/doctest/output.txt.
+ goto end
+)
+
+:end
View
17 docs/protocols.rst
@@ -0,0 +1,17 @@
+mrjob.protocol - input and output
+=================================
+
+.. automodule:: mrjob.protocol
+
+.. autodata:: DEFAULT_PROTOCOL
+.. autodata:: PROTOCOL_DICT
+
+.. autoclass:: HadoopStreamingProtocol
+ :members:
+.. autoclass:: JSONProtocol
+.. autoclass:: JSONValueProtocol
+.. autoclass:: PickleProtocol
+.. autoclass:: PickleValueProtocol
+.. autoclass:: RawValueProtocol
+.. autoclass:: ReprProtocol
+.. autoclass:: ReprValueProtocol
View
22 docs/runners-emr.rst
@@ -0,0 +1,22 @@
+mrjob.emr - run on EMR
+======================
+
+.. currentmodule:: mrjob.emr
+
+.. autoclass:: EMRJobRunner
+
+S3 utilities
+------------
+
+.. automethod:: EMRJobRunner.make_s3_conn
+.. autofunction:: parse_s3_uri
+.. autofunction:: s3_key_to_uri
+.. automethod:: EMRJobRunner.get_s3_key
+.. automethod:: EMRJobRunner.get_s3_keys
+.. automethod:: EMRJobRunner.get_s3_folder_keys
+.. automethod:: EMRJobRunner.make_s3_key
+
+EMR utilities
+-------------
+
+.. automethod:: EMRJobRunner.make_emr_conn
View
12 docs/runners-hadoop.rst
@@ -0,0 +1,12 @@
+mrjob.hadoop - run on your Hadoop cluster
+=========================================
+
+.. currentmodule:: mrjob.hadoop
+.. autoclass:: HadoopJobRunner
+
+Utilities
+---------
+
+.. autofunction:: find_hadoop_streaming_jar
+.. autofunction:: fully_qualify_hdfs_path
+
View
7 docs/runners-local.rst
@@ -0,0 +1,7 @@
+mrjob.local - run locally for testing
+======================================
+
+.. currentmodule:: mrjob.local
+
+.. autoclass:: LocalMRJobRunner
+
View
38 docs/runners-runner.rst
@@ -0,0 +1,38 @@
+mrjob.runner - base class for all runners
+=========================================
+
+.. currentmodule:: mrjob.runner
+
+.. autoclass:: MRJobRunner
+
+Runners' constructors take a bewildering array of keyword arguments; we'll
+get to that in :doc:`configs-runners`
+
+Running your job
+----------------
+
+.. automethod:: MRJobRunner.run
+.. automethod:: MRJobRunner.stream_output
+.. automethod:: MRJobRunner.cleanup
+.. autodata:: mrjob.runner.CLEANUP_CHOICES
+.. autodata:: mrjob.runner.CLEANUP_DEFAULT
+
+File management
+---------------
+
+Some simple filesystem operations that work on both the local filesystem and
+HDFS (when running :py:class:`~mrjob.hadoop.HadoopJobRunner`) or
+S3 (when running :py:class:`~mrjob.emr.EMRJobRunner`).
+
+Use ``hdfs://`` and ``s3://`` URIs to refer to remote files.
+
+We don't currently support ``mv()`` and ``cp()`` because S3 doesn't really
+have directories, so the semantics get a little weird.
+
+.. automethod:: MRJobRunner.get_output_dir
+.. automethod:: MRJobRunner.du
+.. automethod:: MRJobRunner.ls
+.. automethod:: MRJobRunner.mkdir
+.. automethod:: MRJobRunner.path_join
+.. automethod:: MRJobRunner.rm
+.. automethod:: MRJobRunner.touchz
View
10 docs/runners.rst
@@ -0,0 +1,10 @@
+Runners - launching your job
+============================
+
+.. toctree::
+ :maxdepth: 3
+
+ runners-runner.rst
+ runners-local.rst
+ runners-emr.rst
+ runners-hadoop.rst
View
15 docs/tools.rst
@@ -0,0 +1,15 @@
+mrjob.tools - Support scripts
+=============================
+
+
+:title:`mrjob.tools.emr.create_job_flow`
+
+.. automodule:: mrjob.tools.emr.create_job_flow
+
+:title:`mrjob.tools.emr.terminate_job_flow`
+
+.. automodule:: mrjob.tools.emr.terminate_job_flow
+
+:title:`mrjob.tools.emr.terminate_idle_job_flows`
+
+.. automodule:: mrjob.tools.emr.terminate_idle_job_flows
View
5 docs/utils-parse.rst
@@ -0,0 +1,5 @@
+mrjob.parse - log parsing
+=========================
+
+.. automodule:: mrjob.parse
+ :members:
View
5 docs/utils-retry.rst
@@ -0,0 +1,5 @@
+mrjob.retry - retry on transient errors
+=======================================
+
+.. autoclass:: mrjob.retry.RetryWrapper
+.. automethod:: mrjob.retry.RetryWrapper.__init__
View
5 docs/utils-util.rst
@@ -0,0 +1,5 @@
+mrjob.util - utility functions
+==============================
+
+.. automodule:: mrjob.util
+ :members:
View
8 docs/utils.rst
@@ -0,0 +1,8 @@
+Utilities
+=========
+.. toctree::
+ :maxdepth: 2
+
+ utils-retry.rst
+ utils-parse.rst
+ utils-util.rst
View
4 docs/writing-and-running.rst
@@ -0,0 +1,4 @@
+Writing and running a job
+=========================
+.. automodule:: mrjob.job
+
View
1  examples
View
2  mrjob.conf.example
@@ -20,7 +20,7 @@ runners:
ec2_instance_type: c1.xlarge
# use our local time zone (this is important for deciding when
# days start and end, for instance)
- hadoop_env:
+ cmdenv:
TZ: America/Los_Angeles
num_ec2_instances: 1
# we create the src-tree.tar.gz tarball with a Makefile. It only contains
View
5 mrjob/botoemr/step.py
@@ -145,8 +145,6 @@ def args(self):
if self.reducer:
args.extend(['-reducer', self.reducer])
- else:
- args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
if self.input:
if isinstance(self.input, list):
@@ -168,6 +166,9 @@ def args(self):
if self.step_args:
args.extend(self.step_args)
+ if not self.reducer:
+ args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
+
return args
def __repr__(self):
View
254 mrjob/conf.py
@@ -11,14 +11,83 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Logic for reading and writing mrjob.conf config files, and automatically
-setting up configs (e.g. finding the Hadoop streaming JAR).
+""""mrjob.conf" is the name of both this module, and the global config file
+for :py:mod:`mrjob`.
-Configs contain keyword options to pass to the __init__() methods of the
-various job runners (referred to as "opts" throughout our code). This module
-provides a way of programmatically combining options from various sources
-(e.g. mrjob.conf, the command line, defaults) through combine_*() methods.
+We look for :file:`mrjob.conf` in these locations:
+
+- :file:`~/.mrjob`
+- :file:`mrjob.conf` anywhere in your :envvar:`$PYTHONPATH`
+- :file:`/etc/mrjob.conf`
+
+The point of :file:`mrjob.conf` is to let you set up things you want every
+job to have access to so that you don't have to think about it. For example:
+
+- libraries and source code you want to be available for your jobs
+- where temp directories and logs should go
+- security credentials
+
+:file:`mrjob.conf` is just a `YAML <http://www.yaml.org>`_-encoded dictionary
+containing default values to pass in to the constructors of the various runner
+classes. Here's a minimal :file:`mrjob.conf`::
+
+ runners:
+ emr:
+ cmdenv:
+ TZ: America/Los_Angeles
+
+Now whenever you run ``mr_your_script.py -r emr``,
+:py:class:`~mrjob.emr.EMRJobRunner` will automatically set :envvar:`$TZ` to
+``America/Los_Angeles`` in your job's environment when it runs on EMR.
+
+Options specified on the command-line take precedence over
+:file:`mrjob.conf`. Usually this means simply overriding the option in
+:file:`mrjob.conf`. However, we know that *cmdenv*, contains environment
+variables, so we do the right thing. For example, if your :file:`mrjob.conf`
+contained::
+
+ runners:
+ emr:
+ cmdenv:
+ PATH: /usr/local/bin
+ TZ: America/Los_Angeles
+
+and you ran your job as::
+
+ mr_your_script.py -r emr --cmdenv TZ=Europe/Paris --cmdenv PATH=/usr/sbin
+
+We'd automatically handle the :envvar:`$PATH`
+variables and your job's environment would be::
+
+ {'TZ': 'Europe/Paris', 'PATH': '/usr/sbin:/usr/local/bin'}
+
+What's going on here is that *cmdenv* is associated with
+:py:func:`combine_envs`. Each option is associated with an appropriate
+combiner function that that combines options in an appropriate way.
+
+Combiners can also do useful things like expanding environment variables and
+globs in paths. For example, you could set::
+
+ runners:
+ local:
+ upload_files: &upload_files
+ - $DATA_DIR/*.db
+ hadoop:
+ upload_files: *upload_files
+ emr:
+ upload_files: *upload_files
+
+and every time you ran a job, every job in your ``.db`` file in ``$DATA_DIR``
+would automatically be loaded into your job's current working directory.
+
+Also, if you specified additional files to upload with :option:`--file`, those
+files would be uploaded in addition to the ``.db`` files, rather than instead
+of them.
+
+See :doc:`configs-runners` for the entire dizzying array of configurable
+options.
"""
+
from __future__ import with_statement
import glob
@@ -41,13 +110,13 @@
### READING AND WRITING mrjob.conf ###
def find_mrjob_conf():
- """Look for the mrjob.conf file, and return its path, or return
- None if we can't find it.
+ """Look for :file:`mrjob.conf`, and return its path. Places we look:
+
+ - :file:`~/.mrjob`
+ - :file:`mrjob.conf` in any directory in :envvar:`$PYTHONPATH`
+ - :file:`/etc/mrjob.conf`
- We look for:
- - ~/.mrjob
- - mrjob.conf in any directory in $PYTHONPATH
- - /etc/mrjob.conf
+ Return ``None`` if we can't find it.
"""
def candidates():
if 'HOME' in os.environ:
@@ -69,13 +138,19 @@ def candidates():
return None
def load_mrjob_conf(conf_path=None):
- """Load the entire data structure in mrjob.conf. Returns None
- if we can't find it.
-
- Args:
- conf_path -- an alternate place to look for mrjob.conf.
+ """Load the entire data structure in :file:`mrjob.conf`, which should
+ look something like this::
+
+ {'runners':
+ 'local': {'OPTION': VALUE, ...}
+ 'emr': {'OPTION': VALUE, ...}
+ 'hadoop: {'OPTION': VALUE, ...}
+ }
+
+ Returns ``None`` if we can't find :file:`mrjob.conf`.
- If conf_path is False, we'll always return None.
+ :type conf_path: str
+ :param conf_path: an alternate place to look for mrjob.conf. If this is ``False``, we'll always return ``None``.
"""
if conf_path is False:
return None
@@ -90,26 +165,39 @@ def load_mrjob_conf(conf_path=None):
else:
return json.loads(f)
-def load_opts_from_mrjob_conf(runner_type, conf_path=None):
- """Load the options to initialize a runner from mrjob.conf, or return {}
- if we can't find them.
+def load_opts_from_mrjob_conf(runner_alias, conf_path=None):
+ """Load the options to initialize a runner from mrjob.conf, or return
+ ``{}`` if we can't find them.
- Args:
- conf_path -- an alternate place to look for mrjob.conf
+ :type conf_path: str
+ :param conf_path: an alternate place to look for mrjob.conf. If this is ``False``, we'll always return ``{}``.
"""
conf = load_mrjob_conf(conf_path=conf_path)
if conf is None:
return {}
try:
- return conf['runners'][runner_type] or {}
+ return conf['runners'][runner_alias] or {}
except (KeyError, TypeError, ValueError):
log.warning('no configs for runner type %r; returning {}' %
- runner_type)
+ runner_alias)
return {}
def dump_mrjob_conf(conf, f):
- """Write a configuration out to the given file object."""
+ """Write out configuration options to a file.
+
+ Useful if you don't want to bother to figure out YAML.
+
+ *conf* should look something like this:
+
+ {'runners':
+ 'local': {'OPTION': VALUE, ...}
+ 'emr': {'OPTION': VALUE, ...}
+ 'hadoop: {'OPTION': VALUE, ...}
+ }
+
+ :param f: a file object to write to (e.g. ``open('mrjob.conf', 'w')``)
+ """
if yaml:
yaml.safe_dump(conf, f, default_flow_style=False)
else:
@@ -121,20 +209,36 @@ def dump_mrjob_conf(conf, f):
# combiners generally consider earlier values to be defaults, and later
# options to override or add on to them.
-def expand_path(path):
- """Resolve ~ (home dir) and environment variables in paths. If path is None,
- return None.
+def combine_values(*values):
+ """Return the last value in *values* that is not ``None``.
+
+ The default combiner; useful for simple values (booleans, strings, numbers).
"""
- if path is None:
- return None
+ for v in reversed(values):
+ if v is not None:
+ return v
else:
- return os.path.expanduser(os.path.expandvars(path))
+ return None
+
+def combine_lists(*seqs):
+ """Concatenate the given sequences into a list. Ignore ``None`` values.
+
+ Generally this is used for a list of commands we want to run; the
+ "default" commands get run before any commands specific to your job.
+ """
+ result = []
+
+ for seq in seqs:
+ if seq:
+ result.extend(seq)
+
+ return result
def combine_dicts(*dicts):
"""Combine zero or more dictionaries. Values from dicts later in the list
take precedence over values earlier in the list.
- If you pass in None in place of a dictionary, it will be ignored.
+ If you pass in ``None`` in place of a dictionary, it will be ignored.
"""
result = {}
@@ -148,10 +252,10 @@ def combine_envs(*envs):
"""Combine zero or more dictionaries containing environment variables.
Environment variables later from dictionaries later in the list take
- priority over those earlier in the list. For variables ending with 'PATH',
- we prepend (and add a colon) rather than overwriting.
+ priority over those earlier in the list. For variables ending with
+ ``PATH``, we prepend (and add a colon) rather than overwriting.
- If you pass in None in place of a dictionary, it will be ignored.
+ If you pass in ``None`` in place of a dictionary, it will be ignored.
"""
result = {}
for env in envs:
@@ -164,37 +268,45 @@ def combine_envs(*envs):
return result
-def combine_lists(*seqs):
- """Concatenate the given sequences into a list. Ignore None values."""
- result = []
+def combine_paths(*paths):
+ """Returns the last value in *paths* that is not ``None``.
+ Resolve ``~`` (home dir) and environment variables."""
+ return expand_path(combine_values(*paths))
- for seq in seqs:
- if seq:
- result.extend(seq)
+def combine_path_lists(*path_seqs):
+ """Concatenate the given sequences into a list. Ignore None values.
+ Resolve ``~`` (home dir) and environment variables, and expand globs
+ that refer to the local filesystem."""
+ results = []
- return result
+ for path in combine_lists(*path_seqs):
+ expanded = expand_path(path)
+ # if we can't expand a glob, leave as-is (maybe it refers to
+ # S3 or HDFS)
+ paths = sorted(glob.glob(expanded)) or [expanded]
+
+ results.extend(paths)
+
+ return results
def combine_opts(combiners, *opts_list):
- """Utility function to combine options used to initialize
- a job runner, e.g. to combine default opts with opts specified on
- the command line. opts later in the list take precedence.
-
- Args:
- combiners -- a dictionary option name to a combine_*() function to
- use to combine options by that name. By default, we combine options
- using combine_values()
- opts_list -- one or more dictionaries (None is not allowed)
+ """The master combiner, used to combine dictionaries of options with
+ appropriate sub-combiners.
+
+ :param combiners: a map from option name to a combine_*() function to combine options by that name. By default, we combine options using :py:func:`combine_values`.
+ :param opts_list: one or more dictionaries to combine
"""
final_opts = {}
keys = set()
for opts in opts_list:
- keys.update(opts)
+ if opts:
+ keys.update(opts)
for key in keys:
values = []
for opts in opts_list:
- if key in opts:
+ if opts and key in opts:
values.append(opts[key])
combine_func = combiners.get(key) or combine_values
@@ -202,32 +314,12 @@ def combine_opts(combiners, *opts_list):
return final_opts
-def combine_paths(*paths):
- """Returns the last value in *paths that is not None.
- Resolve ~ (home dir) and environment variables."""
- return expand_path(combine_values(*paths))
-
-def combine_path_lists(*path_seqs):
- """Concatenate the given sequences into a list. Ignore None values.
- Resolve ~ (home dir) and environment variables, and expand globs
- that refer to the local filesystem."""
- results = []
-
- for path in combine_lists(*path_seqs):
- expanded = expand_path(path)
- # if we can't expand a glob, leave as-is (maybe it refers to
- # S3 or HDFS)
- paths = sorted(glob.glob(expanded)) or [expanded]
-
- results.extend(paths)
-
- return results
+def expand_path(path):
+ """Resolve ``~`` (home dir) and environment variables in *path*.
-def combine_values(*values):
- """Return the last value in values that is not None"""
- for v in reversed(values):
- if v is not None:
- return v
- else:
+ If *path* is ``None``, return ``None``.
+ """
+ if path is None:
return None
-
+ else:
+ return os.path.expanduser(os.path.expandvars(path))
View
253 mrjob/emr.py
@@ -75,11 +75,10 @@
def parse_s3_uri(uri):
"""Parse an S3 URI into (bucket, key)
- For example
+ >>> parse_s3_uri('s3://walrus/tmp/')
+ ('walrus', 'tmp/')
- s3://emr-land/tmp/ -> 'emr-land', 'tmp/'
-
- If it's not an s3 uri, it'll raise a ValueError
+ If ``uri`` is not an S3 URI, raise a ValueError
"""
match = S3_URI_RE.match(uri)
if match:
@@ -88,107 +87,95 @@ def parse_s3_uri(uri):
raise ValueError('Invalid S3 URI: %s' % uri)
def s3_key_to_uri(s3_key):
- """Convert a boto Key object into an s3:// URI"""
+ """Convert a boto Key object into an ``s3://`` URI"""
return 's3://%s/%s' % (s3_key.bucket.name, s3_key.name)
class EMRJobRunner(MRJobRunner):
- """Class to run a single MRJob once on Amazon Elastic MapReduce.
+ """Runs an :py:class:`~mrjob.job.MRJob` on Amazon Elastic MapReduce.
+
+ :py:class:`EMRJobRunner` runs your job in an EMR job flow, which is
+ basically a temporary Hadoop cluster. Normally, it creates a job flow
+ just for your job; it's also possible to run your job in an existing
+ job flow by setting *emr_job_flow_id* (or :option:`--emr-job-flow-id`).
+
+ Input and support files can be either local or on S3; use ``s3://...``
+ URLs to refer to files on S3.
+
+ This class has some useful utilities for talking directly to S3 and EMR,
+ so you may find it useful to instantiate it without a script::
+
+ from mrjob.emr import EMRJobRunner
- MRJobRunners are typically instantiated through MRJob's make_runner()
- method; see the docstring of mrjob.job.MRJob for more info.
+ emr_conn = EMRJobRunner().make_emr_conn()
+ job_flows = emr_conn.describe_jobflows()
+ ...
+
+ See also: :py:meth:`EMRJobRunner.__init__`.
"""
+ alias = 'emr'
+
def __init__(self, **kwargs):
- """EMRJobRunner takes the same arguments as MRJobRunner
- except for some additional (keyword) opts.
+ """:py:class:`EMRJobRunner` takes the same arguments as
+ :py:class:`~mrjob.runner.MRJobRunner`, plus some additional options
+ which can be defaulted in :py:mod:`mrjob.conf`.
- s3_scratch_uri is required in principle, though if you don't set it,
- we'll look at the list of buckets you own, and try to use tmp/
- inside that bucket as our scratch directory (we'll yell at you for
- it though)
-
- aws_access_key_id and aws_secret_access_key are required if you
+ *aws_access_key_id* and *aws_secret_access_key* are required if you
haven't set them up already for boto (e.g. by setting the environment
- variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY)
-
- opts:
- aws_access_key_id -- "username" for Amazon web services.
- aws_secret_access_key -- your "password" on AWS
- aws_region -- region to connect to S3 and EMR on (e.g. 'us-west-1').
- If you want to use separate regions for S3 and EMR, set the
- endpoints we connect to explicitly using emr_endpoint and
- s3_endpoint.
- bootstrap_cmds -- a list of commands to run on the master node to set
- up libraries, etc. Like setup_cmds, these can be strings, which
- will be run in the shell, or lists of args, which will be run
- directly. We'll automatically prepend 'sudo' to each command
- bootstrap_files -- files to upload to the master node before
- running bootstrap_cmds (for example, python libraries). These
- will be made public on S3 (due to a limitation of the bootstrap
- feature)
- bootstrap_mrjob -- This is actually an option in the base MRJobRunner
- class. If this is True (the default), we'll tar up mrjob from
- the local filesystem, and install it on the master node (when
- other runners bootstrap mrjob, it's part of the wrapper script
- that runs mappers/reducers).
- bootstrap_python_packages -- paths of python modules to install on EMR
- These should be standard python module tarballs. If a module
- is named foo.tar.gz, we expect to be able to run:
-
- tar xfz foo.tar.gz
- cd foo
- sudo python setup.py install
-
- For anything that varies from this, use bootstrap_files and
- bootstrap_cmds/bootstrap_scripts
-
- modules will be installed in the order you specify
-
- bootstrap_scripts -- scripts to upload, and run (a combination of
- bootstrap_cmds and bootstrap_files). These are run after
- the command from bootstrap_cmds. We'll automatically run these
- through sudo.
- check_emr_status_every -- how often to check on the status of EMR jobs.
- Default is 30 seconds (too often and AWS will throttle you).
- Even very simple jobs typically take about 5 minutes to complete
- because of the overhead of setting up instances.
- ec2_instance_type -- what sort of EC2 instance to use. Current choices
- seem to be m1.small, m1.large, m1.xlarge, c1.medium, c1.xlarge,
- m2.2xlarge, and m2.4xlarge. Default is m1.small
- ec2_key_pair -- name of the SSH key you set up for EMR.
- ec2_key_pair_file -- path to file containing the SSH key for EMR
- ec2_master_instance_type -- same as instance_type, but only for master
- Hadoop nodes
- ec2_slave_instance_type -- same as instance_type, but only for slave
- nodes
- emr_endpoint -- optional host to connect to when communicating with S3
- (e.g. 'us-west-1.elasticmapreduce.amazonaws.com'). We can also
- guess this from region.
- emr_job_flow_id -- the ID of a persistent EMR job flow to run jobs in
- (normally we launch our own job). You can create such a job by
- making a separate EMRJobRunner, and calling
- make_persistent_job_flow() on it. It's fine for other jobs to
- be using the job flow; we give our job's steps a unique ID.
- num_ec2_instances -- number of instances to start up. Default is 1.
- s3_endpoint -- optional host to connect to when communicating with S3
- (e.g. 's3-us-west-1.amazonaws.com'). We can also infer this from
- region.
- s3_log_uri -- where on S3 to put logs, for example
- 's3://yourbucket/logs/' (logs will go into a subdirectory,
- in this example s3://yourbucket/logs/j-YOURJOBID/). Default
- is to append 'logs/' to scratch_uri.
- s3_scratch_uri -- S3 directory to use as scratch space, for example
- 's3://yourbucket/tmp/'.
- ssh_bin -- path to the ssh binary. Defaults to 'ssh'
- ssh_bind_ports -- a set/list/tuple of ports that are safe to listen on
- ssh_tunnel_to_job_tracker -- if True, create an ssh tunnel to the job
- tracker and listen on a randomly chosen port. This requires
- you to have a key-pair file (e.g. EMR.pem); which you can set
- through aws_key_pair_file.
- ssh_tunnel_is_open -- if True, any host can connect to the job
- tracker through the tunnel you open (by default, you can only
- connect to the tunnel via localhost).
+ variables :envvar:`$AWS_ACCESS_KEY_ID` and
+ :envvar:`$AWS_SECRET_ACCESS_KEY`)
+
+ Additional options:
+
+ :type aws_access_key_id: str
+ :param aws_access_key_id: "username" for Amazon web services.
+ :type aws_secret_access_key: str
+ :param aws_secret_access_key: your "password" on AWS
+ :type aws_region: str
+ :param aws_region: region to connect to S3 and EMR on (e.g. ``us-west-1``). If you want to use separate regions for S3 and EMR, set *emr_endpoint* and *s3_endpoint*.
+ :type bootstrap_cmds: list
+ :param bootstrap_cmds: a list of commands to run on the master node to set up libraries, etc. Like *setup_cmds*, these can be strings, which will be run in the shell, or lists of args, which will be run directly. We'll automatically prepend ``sudo`` to each command
+ :type bootstrap_files: list of str
+ :param bootstrap_files: files to upload to the master node before running *bootstrap_cmds* (for example, debian packages). These will be made public on S3 due to a limitation of the bootstrap feature.
+ :type bootstrap_mrjob: boolean
+ :param bootstrap_mrjob: This is actually an option in the base MRJobRunner class. If this is ``True`` (the default), we'll tar up :mod:`mrjob` from the local filesystem, and install it on the master node.
+ :type bootstrap_python_packages: list of str
+ :param bootstrap_python_packages: paths of python modules to install on EMR. These should be standard python module tarballs. If a module is named ``foo.tar.gz``, we expect to be able to run ``tar xfz foo.tar.gz; cd foo; sudo python setup.py install``.
+ :type bootstrap_scripts: list of str
+ :param bootstrap_scripts: scripts to upload and then run on the master node (a combination of *bootstrap_cmds* and *bootstrap_files*). These are run after the command from bootstrap_cmds. We'll automatically run these through ``sudo``.
+ :type check_emr_status_every: float
+ :param check_emr_status_every: How often to check on the status of EMR jobs.Default is 30 seconds (too often and AWS will throttle you).
+ :type ec2_instance_type: str
+ :param ec2_instance_type: what sort of EC2 instance(s) to use (see http://aws.amazon.com/ec2/instance-types/). Default is ``m1.small``
+ :type ec2_key_pair: str
+ :param ec2_key_pair: name of the SSH key you set up for EMR.
+ :type ec2_key_pair_file: str
+ :param ec2_key_pair_file: path to file containing the SSH key for EMR
+ :type ec2_master_instance_type: str
+ :param ec2_master_instance_type: same as *ec2_instance_type*, but only for the master Hadoop node
+ :type ec2_slave_instance_type: str
+ :param ec2_slave_instance_type: same as *ec2_instance_type*, but only for the slave Hadoop nodes
+ :type emr_endpoint: str
+ :param emr_endpoint: optional host to connect to when communicating with S3 (e.g. ``us-west-1.elasticmapreduce.amazonaws.com``). Default is to infer this from *region*.
+ :type emr_job_flow_id: str
+ :param emr_job_flow_id: the ID of a persistent EMR job flow to run jobs in (normally we launch our own job). It's fine for other jobs to be using the job flow; we give our job's steps a unique ID.
+ :type num_ec2_instances: int
+ :param num_ec2_instances: number of instances to start up. Default is ``1``.
+ :type s3_endpoint: str
+ :param s3_endpoint: Host to connect to when communicating with S3 (e.g. ``s3-us-west-1.amazonaws.com``). Default is to infer this from *region*.
+ :type s3_log_uri: str
+ :param s3_log_uri: where on S3 to put logs, for example ``s3://yourbucket/logs/``. Logs for your job flow will go into a subdirectory, e.g. ``s3://yourbucket/logs/j-JOBFLOWID/``. in this example s3://yourbucket/logs/j-YOURJOBID/). Default is to append ``logs/`` to *s3_scratch_uri*.
+ :type s3_scratch_uri: str
+ :param s3_scratch_uri: S3 directory (URI ending in ``/``) to use as scratch space, e.g. ``s3://yourbucket/tmp/``. Default is ``tmp/mrjob/`` in the first bucket belonging to you.
+ :type ssh_bin: str
+ :param ssh_bin: path to the ssh binary. Defaults to ``ssh``
+ :type ssh_bind_ports: list of int
+ :param ssh_bind_ports: a list of ports that are safe to listen on. Defaults to ports ``40001`` thru ``40840``.
+ :type ssh_tunnel_to_job_tracker: bool
+ :param ssh_tunnel_to_job_tracker: If True, create an ssh tunnel to the job tracker and listen on a randomly chosen port. This requires you to set *ec2_key_pair* and *ec2_key_pair_file*.
+ :type ssh_tunnel_is_open: bool
+ :param ssh_tunnel_is_open: if True, any host can connect to the job tracker through the SSH tunnel you open. Mostly useful if your browser is running on a different machine from your job.
"""
- super(EMRJobRunner, self).__init__('emr', **kwargs)
+ super(EMRJobRunner, self).__init__(**kwargs)
self._fix_s3_scratch_and_log_uri_opts()
@@ -247,9 +234,9 @@ def __init__(self, **kwargs):
self._uri_of_downloaded_log_file = None
@classmethod
- def allowed_opts(cls):
+ def _allowed_opts(cls):
"""A list of which keyword args we can pass to __init__()"""
- return super(EMRJobRunner, cls).allowed_opts() + [
+ return super(EMRJobRunner, cls)._allowed_opts() + [
'aws_access_key_id', 'aws_secret_access_key', 'aws_region',
'bootstrap_cmds', 'bootstrap_files', 'bootstrap_python_packages',
'bootstrap_scripts', 'check_emr_status_every', 'ec2_instance_type',
@@ -261,9 +248,9 @@ def allowed_opts(cls):
'ssh_tunnel_to_job_tracker']
@classmethod
- def default_opts(cls):
+ def _default_opts(cls):
"""A dictionary giving the default value of options."""
- return combine_dicts(super(EMRJobRunner, cls).default_opts(), {
+ return combine_dicts(super(EMRJobRunner, cls)._default_opts(), {
'check_emr_status_every': 30,
'ec2_instance_type': 'm1.small',
'num_ec2_instances': 1,
@@ -271,14 +258,15 @@ def default_opts(cls):
'ssh_bin': 'ssh',
'ssh_bind_ports': range(40001, 40841),
'ssh_tunnel_to_job_tracker': False,
+ 'ssh_tunnel_is_open': False,
})
@classmethod
- def opts_combiners(cls):
+ def _opts_combiners(cls):
"""Map from option name to a combine_*() function used to combine
values for that option. This allows us to specify that some options
are lists, or contain environment variables, or whatever."""
- return combine_dicts(super(EMRJobRunner, cls).opts_combiners(), {
+ return combine_dicts(super(EMRJobRunner, cls)._opts_combiners(), {
'bootstrap_cmds': combine_lists,
'bootstrap_files': combine_path_lists,
'bootstrap_python_packages': combine_path_lists,
@@ -286,6 +274,7 @@ def opts_combiners(cls):
'ec2_key_pair_file': combine_paths,
's3_log_uri': combine_paths,
's3_scratch_uri': combine_paths,
+ 'ssh_bin': combine_paths,
})
def _fix_s3_scratch_and_log_uri_opts(self):
@@ -659,11 +648,11 @@ def _build_steps(self):
# other arguments passed directly to hadoop streaming
step_args = []
- for key, value in self._hadoop_env.iteritems():
+ for key, value in sorted(self._cmdenv.iteritems()):
step_args.extend(['-cmdenv', '%s=%s' % (key, value)])
- for jobconf_arg in self._opts['jobconf_args']:
- step_args.extend(['-jobconf', jobconf_arg])
+ for key, value in sorted(self._opts['jobconf'].iteritems()):
+ step_args.extend(['-jobconf', '%s=%s' % (key, value)])
step_args.extend(self._opts['hadoop_extra_args'])
@@ -1336,9 +1325,10 @@ def retry_if(ex):
max_tries=EMR_MAX_TRIES)
def make_emr_conn(self):
- """Get our connection to EMR, creating it if necessary. This
- is a boto EmrConnection object wrapped in an
- EmrConnectionRetryWrapper"""
+ """Create a connection to EMR.
+
+ :return: a :py:class:`mrjob.botoemr.connection.EmrConnection`, wrapped in a :py:class:`mrjob.retry.RetryWrapper`
+ """
log.debug('creating EMR connection')
region = self._get_region_info_for_emr_conn()
raw_emr_conn = botoemr.EmrConnection(
@@ -1348,7 +1338,8 @@ def make_emr_conn(self):
return self._wrap_aws_conn(raw_emr_conn)
def _get_region_info_for_emr_conn(self):
- """Get a RegionInfo object to initialize EMR connections with.
+ """Get a :py:class:`boto.ec2.regioninfo.RegionInfo` object to
+ initialize EMR connections with.
This is kind of silly because all EmrConnection ever does with
this object is extract the hostname, but that's how boto rolls.
@@ -1371,8 +1362,10 @@ def _get_region_info_for_emr_conn(self):
# need to do something S3-specific (e.g. setting file permissions)
def make_s3_conn(self):
- """Get our connection to S3, creating if it necessary. This
- is a boto Connection object."""
+ """Create a connection to S3.
+
+ :return: a :py:class:`boto.s3.connection.S3Connection`, wrapped in a :py:class:`mrjob.retry.RetryWrapper`
+ """
log.debug('creating S3 connection')
raw_s3_conn = boto.connect_s3(
aws_access_key_id=self._opts['aws_access_key_id'],
@@ -1392,9 +1385,9 @@ def get_s3_key(self, uri, s3_conn=None):
"""Get the boto Key object matching the given S3 uri, or
return None if that key doesn't exist.
- uri is an S3 URI (like s3://foo/bar)
+ uri is an S3 URI: ``s3://foo/bar``
- You may optionally pass in an existing s3 connection through s3_conn
+ You may optionally pass in an existing s3 connection through ``s3_conn``
"""
if not s3_conn:
s3_conn = self.make_s3_conn()
@@ -1406,9 +1399,9 @@ def make_s3_key(self, uri, s3_conn=None):
"""Create the given S3 key, and return the corresponding
boto Key object.
- uri is an S3 URI (like s3://foo/bar)
+ uri is an S3 URI: ``s3://foo/bar``
- You may optionally pass in an existing s3 connection through s3_conn
+ You may optionally pass in an existing S3 connection through ``s3_conn``
"""
if not s3_conn:
s3_conn = self.make_s3_conn()
@@ -1418,11 +1411,11 @@ def make_s3_key(self, uri, s3_conn=None):
def get_s3_keys(self, uri, s3_conn=None):
"""Get a stream of boto Key objects for each key inside
- the given dir on s3. Save any key objects we see in our internal cache
+ the given dir on S3.
- uri is an S3 URI (like s3://foo/bar)
+ uri is an S3 URI: ``s3://foo/bar``
- You may optionally pass in an existing s3 connection through s3_conn
+ You may optionally pass in an existing S3 connection through s3_conn
"""
if not s3_conn:
s3_conn = self.make_s3_conn()
@@ -1433,15 +1426,27 @@ def get_s3_keys(self, uri, s3_conn=None):
yield key
def get_s3_folder_keys(self, uri, s3_conn=None):
- """Get all the *_$folder$ keys created by EMR that are associated with
- the given URI, as boto Key objects. This is useful if you want to
- grant read access to another user to a given URI; you'll have to
- give read access to all associated *_$folder$ files as well or EMR
- will fail.
+ """Background: S3 is even less of a filesystem than HDFS in that it
+ doesn't have directories. EMR fakes directories by creating special
+ ``*_$folder$`` keys in S3.
+
+ For example if your job outputs ``s3://walrus/tmp/output/part-00000``,
+ EMR will also create these keys:
+
+ - ``s3://walrus/tmp_$folder$``
+ - ``s3://walrus/tmp/output_$folder$``
+
+ If you want to grant another Amazon user access to your files so they
+ can use them in S3, you must grant read access on the actual keys,
+ plus any ``*_$folder$`` keys that "contain" your keys; otherwise
+ EMR will error out with a permissions error.
+
+ This gets all the ``*_$folder$`` keys associated with the given URI,
+ as boto Key objects.
This does not support globbing.
- You may optionally pass in an existing s3 connection through s3_conn
+ You may optionally pass in an existing S3 connection through ``s3_conn``
"""
if not s3_conn:
s3_conn = self.make_s3_conn()
View
80 mrjob/hadoop.py
@@ -56,7 +56,7 @@ def find_hadoop_streaming_jar(path):
return None
def fully_qualify_hdfs_path(path):
- """If path isn't an hdfs:// URL, turn it into one."""
+ """If path isn't an ``hdfs://`` URL, turn it into one."""
if path.startswith('hdfs://'):
return path
elif path.startswith('/'):
@@ -65,33 +65,37 @@ def fully_qualify_hdfs_path(path):
return 'hdfs:///user/%s/%s' % (os.environ['USER'], path)
class HadoopJobRunner(MRJobRunner):
- """Class to run a single MRJob once on a Hadoop cluster.
+ """Runs an :py:class:`~mrjob.job.MRJob` on your Hadoop cluster.
- MRJobRunners are typically instantiated through MRJob's make_runner()
- method; see the docstring of mrjob.job.MRJob for more info.
+ Input and support files can be either local or on HDFS; use ``hdfs://...``
+ URLs to refer to files on HDFS.
+
+ It's rare to need to instantiate this class directly (see
+ :py:meth:`~HadoopJobRunner.__init__` for details).
"""
+ alias = 'hadoop'
+
def __init__(self, **kwargs):
+ """:py:class:`HadoopJobRunner` takes the same arguments as
+ :py:class:`~mrjob.runner.MRJobRunner`, plus some additional options
+ which can be defaulted in :py:mod:`mrjob.conf`.
+
+ *output_dir* and *hdfs_scratch_dir* need not be fully qualified
+ ``hdfs://`` URIs because its understood that they have to be on
+ HDFS (e.g. ``tmp/mrjob/`` would be okay)
+
+ Additional options:
+
+ :type hadoop_bin: str
+ :param hadoop_bin: name/path of your hadoop program. Defaults to *hadoop_home* plus ``bin/hadoop``
+ :type hadoop_home: str
+ :param hadoop_home: alternative to setting :envvar:`$HADOOP_HOME` variable.
+ :type hdfs_scratch_dir: str
+ :param hdfs_scratch_dir: temp directory on HDFS. Default is ``tmp/mrjob``
+ :type hadoop_streaming_jar: str
+ :param hadoop_streaming_jar: path to your hadoop streaming jar. If not set, we'll search for it inside :envvar:`$HADOOP_HOME`
"""
- All arguments that take files can take a path or an hdfs:// URI.
- paths are assumed to refer to local files, except for scratch_dir
- and output_dir (which always have to be on HDFS)
-
- mr_job_script and other keyword args (**kwargs) are handled by
- mrjob.runner.MRJobRunner.__init__().
-
- Additional opts taken by HadoopJobRunner (as keyword args):
- hadoop_bin -- name/path of your hadoop program (default is 'hadoop')
- hadoop_home -- alternative to setting the HADOOP_HOME environment
- variable. We only use this to set/find the hadoop binary
- and the streaming jar. (we default to tmp/mrjob, which goes
- in your HDFS home dir)
- hdfs_scratch_dir -- tmp directory on HDFS. Default is a temp dir inside
- your home directory. Need not be an hdfs:// URI because it's
- assumed to be in HDFS
- hadoop_streaming_jar -- path to your hadoop streaming jar. If not set,
- we'll search for it inside hadoop_home
- """
- super(HadoopJobRunner, self).__init__('hadoop', **kwargs)
+ super(HadoopJobRunner, self).__init__(**kwargs)
# fix hadoop_home
if not self._opts['hadoop_home']:
@@ -133,26 +137,26 @@ def __init__(self, **kwargs):
self._hdfs_input_dir = None
@classmethod
- def allowed_opts(cls):
+ def _allowed_opts(cls):
"""A list of which keyword args we can pass to __init__()"""
- return super(HadoopJobRunner, cls).allowed_opts() + [
+ return super(HadoopJobRunner, cls)._allowed_opts() + [
'hadoop_bin', 'hadoop_home', 'hdfs_scratch_dir',
'hadoop_streaming_jar']
@classmethod
- def default_opts(cls):
+ def _default_opts(cls):
"""A dictionary giving the default value of options."""
- return combine_dicts(super(HadoopJobRunner, cls).default_opts(), {
+ return combine_dicts(super(HadoopJobRunner, cls)._default_opts(), {
'hadoop_home': os.environ.get('HADOOP_HOME'),
'hdfs_scratch_dir': 'tmp/mrjob',
})
@classmethod
- def opts_combiners(cls):
+ def _opts_combiners(cls):
"""Map from option name to a combine_*() function used to combine
values for that option. This allows us to specify that some options
are lists, or contain environment variables, or whatever."""
- return combine_dicts(super(HadoopJobRunner, cls).opts_combiners(), {
+ return combine_dicts(super(HadoopJobRunner, cls)._opts_combiners(), {
'hadoop_bin': combine_paths,
'hadoop_home': combine_paths,
'hdfs_scratch_dir': combine_paths,
@@ -268,7 +272,7 @@ def _run_job_in_hadoop(self):
streaming_args = [self._opts['hadoop_bin'], 'jar', self._opts['hadoop_streaming_jar']]
# add environment variables
- for key, value in self._hadoop_env.iteritems():
+ for key, value in sorted(self._cmdenv.iteritems()):
streaming_args.append('-cmdenv')
streaming_args.append('%s=%s' % (key, value))
@@ -283,6 +287,13 @@ def _run_job_in_hadoop(self):
# set up uploading from HDFS to the working dir
streaming_args.extend(self._upload_args())
+ # add jobconf args
+ for key, value in sorted(self._opts['jobconf'].iteritems()):
+ streaming_args.extend(['-jobconf', '%s=%s' % (key, value)])
+
+ # add extra hadoop args
+ streaming_args.extend(self._opts['hadoop_extra_args'])
+
# set up mapper and reducer
streaming_args.append('-mapper')
streaming_args.append(cmd_line(self._mapper_args(step_num)))
@@ -292,13 +303,6 @@ def _run_job_in_hadoop(self):
else:
streaming_args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
- # add jobconf args
- for jobconf_arg in self._opts['jobconf_args']:
- streaming_args.extend(['-jobconf', jobconf_arg])
-
- # add extra hadoop args
- streaming_args.extend(self._opts['hadoop_extra_args'])
-
log.debug('> %s' % cmd_line(streaming_args))
step_proc = Popen(streaming_args, stdout=PIPE, stderr=PIPE)
View
941 mrjob/job.py
@@ -11,50 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+"""To create your own map reduce job, subclass :py:class:`MRJob`, create a
+series of mappers and reducers, and override :py:meth:`~mrjob.job.MRJob.steps`. For example, a word counter::
-"""Python implementation of a Hadoop streaming job that encapsulates
-one or more mappers and reducers, to run in sequence.
+ from mrjob.job import MRJob
-See MRJob's docstring for typical usage.
-"""
-# don't add imports here that aren't part of the standard Python library,
-# since MRJobs need to run in Amazon's generic EMR environment
-from __future__ import with_statement
-
-import inspect
-import itertools
-import logging
-from optparse import OptionParser, OptionGroup, OptionError
-import sys
-import time
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-
-# don't use relative imports, to allow this script to be invoked as __main__
-from mrjob.conf import combine_dicts
-from mrjob.parse import parse_mr_job_stderr
-from mrjob.protocol import DEFAULT_PROTOCOL, PROTOCOL_DICT
-from mrjob.runner import CLEANUP_CHOICES, CLEANUP_DEFAULT
-from mrjob.util import log_to_stream, read_input
-
-# used by mr() below, to fake no mapper
-def _IDENTITY_MAPPER(key, value):
- yield key, value
-
-# sentinel value; used when running MRJob as a script
-_READ_ARGS_FROM_SYS_ARGV = '_READ_ARGS_FROM_SYS_ARGV'
-
-class MRJob(object):
- """The base class for any map reduce job. Handles argument parsing,
- spawning of sub processes and/or hadoop jobs, and creating of temporary files.
-
- To create your own map reduce job, create a series of mappers and reducers,
- override the steps() method. For example, a word counter:
-
- class WordCounter(MRJob):
+ class MRWordCounter(MRJob):
def get_words(self, key, line):
for word in line.split():
yield word, 1
@@ -66,17 +28,19 @@ def steps(self):
return [self.mr(self.get_words, self.sum_words),]
if __name__ == '__main__':
- WordCounter.run()
+ MRWordCounter.run()
- The two lines at the bottom are mandatory; this is what allows your
- class to be run by hadoop streaming.
+The two lines at the bottom are mandatory; this is what allows your class
+to be run by Hadoop streaming.
- This will take in a file with lines of whitespace separated words, and
- output a file where each line is "(word)\t(count)"
+This will take in a file with lines of whitespace separated words, and
+output a file with tab-separated lines like: ``"stars"\t5``.
- For single-step jobs, you can also just redefine mapper() and reducer():
+For one-step jobs, you can also just redefine :py:meth:`~mrjob.job.MRJob.mapper` and :py:meth:`~mrjob.job.MRJob.reducer`::
- class WordCounter(MRJob):
+ from mrjob.job import MRJob
+
+ class MRWordCounter(MRJob):
def mapper(self, key, line):
for word in line.split():
yield word, 1
@@ -85,47 +49,115 @@ def reducer(self, word, occurrences):
yield word, sum(occurrences)
if __name__ == '__main__':
- WordCounter.run()
-
- You can override configure_options() to add command-line arguments to your
- script. Any arguments that needs to get passed to your mappers / reducers
- should be set in arguments.
+ MRWordCounter.run()
- To test the job locally, just run:
+To test the job locally, just run:
- python your_mr_job_sub_class.py < log_file_or_whatever > output
+``python your_mr_job_sub_class.py < log_file_or_whatever > output``
- The script will automatically invoke itself to run the various steps,
- using LocalMRJobRunner.
+The script will automatically invoke itself to run the various steps,
+using :py:class:`~mrjob.local.LocalMRJobRunner`.
- You can also test individual steps:
+You can also run individual steps::
# test 1st step mapper:
python your_mr_job_sub_class.py --mapper
# test 2nd step reducer (--step-num=1 because step numbers are 0-indexed):
python your_mr_job_sub_class.py --reducer --step-num=1
- By default, we read from stdin, but you can also specify one or more input
-files. It automatically decompresses .gz and .bz2 files:
+By default, we read from stdin, but you can also specify one or more
+input files. It automatically decompresses .gz and .bz2 files::
python your_mr_job_sub_class.py log_01.gz log_02.bz2 log_03
- You can run on Amazon Elastic MapReduce by specifying --runner emr or on your
- own Hadoop cluster by specifying --runner hadoop.
+You can run on Amazon Elastic MapReduce by specifying ``-r emr`` or
+on your own Hadoop cluster by specifying ``-r hadoop``:
+
+``python your_mr_job_sub_class.py -r emr``
- To run an MRJob from within another python process:
+Use :py:meth:`~mrjob.job.MRJob.make_runner` to run an
+:py:class:`~mrjob.job.MRJob` from another script::
- mr_job = YourMrJob(args) # specify cmd line args, as a list
+ from __future__ import with_statement # only needed on Python 2.5
+
+ mr_job = MRWordCounter(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
... # do something with the parsed output
- (in Python 2.5, use from __future__ import with_statement)
- """
+See :py:mod:`mrjob.examples` for more examples.
+"""
+# don't add imports here that aren't part of the standard Python library,
+# since MRJobs need to run in Amazon's generic EMR environment
+from __future__ import with_statement
+
+import inspect
+import itertools
+import logging
+from optparse import Option, OptionParser, OptionGroup, OptionError, OptionValueError
+import sys
+import time
+