Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Yelp/mrjob
Browse files Browse the repository at this point in the history
  • Loading branch information
David Marin committed Mar 14, 2015
2 parents 1bbc6f4 + a382db1 commit 4f8d31e
Show file tree
Hide file tree
Showing 17 changed files with 91 additions and 61 deletions.
2 changes: 1 addition & 1 deletion mrjob/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

# lists alternative names for jobconf variables
# full listing thanks to translation table in
# http://hadoop.apache.org/common/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
# http://hadoop.apache.org/common/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html # noqa

log = logging.getLogger(__name__)

Expand Down
30 changes: 20 additions & 10 deletions mrjob/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,9 @@ def __init__(self, **kwargs):
})

if self._opts['bootstrap_files']:
log.warning("bootstrap_files is deprecated since v0.4.2 and will be removed in v0.6.0. Consider using bootstrap instead.")
log.warning(
"bootstrap_files is deprecated since v0.4.2 and will be"
" removed in v0.6.0. Consider using bootstrap instead.")
for path in self._opts['bootstrap_files']:
self._bootstrap_dir_mgr.add(**parse_legacy_hash_path(
'file', path, must_name='bootstrap_files'))
Expand Down Expand Up @@ -1277,9 +1279,11 @@ def _job_flow_args(self, persistent=False, steps=None):
if self._opts['additional_emr_info']:
args['additional_info'] = self._opts['additional_emr_info']

if self._opts['visible_to_all_users'] and not 'VisibleToAllUsers' in self._opts['emr_api_params']:
self._opts['emr_api_params']['VisibleToAllUsers'] = \
'true' if self._opts['visible_to_all_users'] else 'false'
if (self._opts['visible_to_all_users'] and
not 'VisibleToAllUsers' in self._opts['emr_api_params']): # noqa

self._opts['emr_api_params']['VisibleToAllUsers'] = (
'true' if self._opts['visible_to_all_users'] else 'false')

if self._opts['emr_api_params']:
args['api_params'] = self._opts['emr_api_params']
Expand Down Expand Up @@ -1966,10 +1970,11 @@ def _parse_legacy_bootstrap(self):
# can't determine which AMI `latest` is at
# job flow creation time so we call both
bootstrap.append(['sudo apt-get install -y python-pip || '
'sudo yum install -y python-pip'])
'sudo yum install -y python-pip'])
# Print a warning
log.warning("bootstrap_python_packages is deprecated since v0.4.2 and will be removed in v0.6.0. Consider using bootstrap instead.")

log.warning(
"bootstrap_python_packages is deprecated since v0.4.2 and will"
" be removed in v0.6.0. Consider using bootstrap instead.")

for path in self._opts['bootstrap_python_packages']:
path_dict = parse_legacy_hash_path('file', path)
Expand All @@ -1979,15 +1984,19 @@ def _parse_legacy_bootstrap(self):

# setup_cmds
if self._opts['bootstrap_cmds']:
log.warning("bootstrap_cmds is deprecated since v0.4.2 and will be removed in v0.6.0. Consider using bootstrap instead.")
log.warning(
"bootstrap_cmds is deprecated since v0.4.2 and will be"
" removed in v0.6.0. Consider using bootstrap instead.")
for cmd in self._opts['bootstrap_cmds']:
if not isinstance(cmd, basestring):
cmd = cmd_line(cmd)
bootstrap.append([cmd])

# bootstrap_scripts
if self._opts['bootstrap_scripts']:
log.warning("bootstrap_scripts is deprecated since v0.4.2 and will be removed in v0.6.0. Consider using bootstrap instead.")
log.warning(
"bootstrap_scripts is deprecated since v0.4.2 and will be"
" removed in v0.6.0. Consider using bootstrap instead.")

for path in self._opts['bootstrap_scripts']:
path_dict = parse_legacy_hash_path('file', path)
Expand Down Expand Up @@ -2169,7 +2178,8 @@ def add_if_match(job_flow):
# extracted from the existing job flow should always
# be a full major.minor.patch, so checking matching
# prefixes should be sufficient.
if not job_flow_ami_version.startswith(self._opts['ami_version']):
if not job_flow_ami_version.startswith(
self._opts['ami_version']):
return
else:
if not warned_about_ami_version_latest:
Expand Down
8 changes: 6 additions & 2 deletions mrjob/fs/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ def ls(self, path_glob):

# Try to figure out which part of the line is the path
# Expected lines:
# -rw-r--r-- 3 dave users 3276 2010-01-13 14:00 /foo/bar # HDFS
# -rwxrwxrwx 1 3276 010-01-13 14:00 /foo/bar # S3
#
# HDFS:
# -rw-r--r-- 3 dave users 3276 2010-01-13 14:00 /foo/bar
#
# S3:
# -rwxrwxrwx 1 3276 010-01-13 14:00 /foo/bar
path_index = None
for index, field in enumerate(fields):
if len(field) == 5 and field[2] == ':':
Expand Down
2 changes: 1 addition & 1 deletion mrjob/fs/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(self, aws_access_key_id, aws_secret_access_key, s3_endpoint):
"""
:param aws_access_key_id: Your AWS access key ID
:param aws_secret_access_key: Your AWS secret access key
:param s3_endpoint: S3 endpoint to access, e.g.
:param s3_endpoint: S3 endpoint to access, e.g.
``s3-us-west-2.amazonaws.com``
"""
super(S3Filesystem, self).__init__()
Expand Down
17 changes: 11 additions & 6 deletions mrjob/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,22 @@ def _upload_local_files_to_hdfs(self):

def _mkdir_on_hdfs(self, path):
log.debug('Making directory %s on HDFS' % path)

hadoop_version = self.get_hadoop_version()
# from version 0.23 / 2.x, -mkdir needs a -p option to create parent directories
# test if version == 0.23
if (mrjob.compat.version_gte(hadoop_version, "0.23") and not mrjob.compat.version_gte(hadoop_version, "0.24")):
# from version 0.23 / 2.x on, -mkdir needs a -p option to create
# parent directories

# version == 0.23
if ((mrjob.compat.version_gte(hadoop_version, "0.23") and
not mrjob.compat.version_gte(hadoop_version, "0.24"))):
self.invoke_hadoop(['fs', '-mkdir', '-p', path])
# test if version >= 2.0

# version >= 2.0
elif mrjob.compat.version_gte(hadoop_version, "2.0"):
self.invoke_hadoop(['fs', '-mkdir', '-p', path])

# for version 0.20, 1.x
else:
# for version 0.20, 1.x
self.invoke_hadoop(['fs', '-mkdir', path])

def _upload_to_hdfs(self, path, target):
Expand Down
2 changes: 1 addition & 1 deletion mrjob/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def increment_counter(self, group, counter, amount=1):
# cast to str() because sometimes people pass in exceptions or whatever
#
# The relevant Hadoop code is incrCounter(), here:
# http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=markup
# http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=markup # noqa
if isinstance(group, unicode) or isinstance(counter, unicode):
group = unicode(group).replace(',', ';')
counter = unicode(counter).replace(',', ';')
Expand Down
11 changes: 7 additions & 4 deletions mrjob/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,13 @@ def load_options(self, args):
self.option_parser.error)

# emr_api_params
emr_api_err = 'emr-api-params argument "%s" is not of the form KEY=VALUE'
self.options.emr_api_params = parse_key_value_list(self.options.emr_api_params,
emr_api_err,
self.option_parser.error)
emr_api_err = (
'emr-api-params argument "%s" is not of the form KEY=VALUE')

self.options.emr_api_params = parse_key_value_list(
self.options.emr_api_params,
emr_api_err,
self.option_parser.error)

# no_emr_api_params just exists to modify emr_api_params
for param in self.options.no_emr_api_params:
Expand Down
13 changes: 7 additions & 6 deletions mrjob/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,10 @@ def add_runner_opts(opt_group, default_runner='local'):
opt_group.add_option(
'--export-job-name', dest='export_job_name', action='store_true',
default=None,
help=("Export the internal job name that uniquely identifies each "
"task to the environment variable MRJOB_JOB_NAME as if using "
"--cmdenv, which can be useful for, e.g., setup scripts.")),
help="Export the internal job name that uniquely identifies each"
" task to the environment variable MRJOB_JOB_NAME as if using"
" --cmdenv, which can be useful for, e.g., setup scripts."
),

opt_group.add_option(
'--file', dest='upload_files', action='append',
Expand Down Expand Up @@ -565,9 +566,9 @@ def add_emr_opts(opt_group):
opt_group.add_option(
'--emr-api-param', dest='emr_api_params',
default=[], action='append',
help='Additional parameters to pass directly to the EMR API; should'
' take the form KEY=VALUE. You can use --emr-api-param multiple'
' times.'
help='Additional parameters to pass directly to the EMR API; '
'should take the form KEY=VALUE. You can use --emr-api-param'
' multiple times.'
),

opt_group.add_option(
Expand Down
7 changes: 4 additions & 3 deletions mrjob/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
boto = None

# match the filename of a hadoop streaming jar
HADOOP_STREAMING_JAR_RE = re.compile(r'^hadoop.*streaming.*(?<!-sources)\.jar$')
HADOOP_STREAMING_JAR_RE = re.compile(
r'^hadoop.*streaming.*(?<!-sources)\.jar$')

# match an mrjob job name (these are used to name EMR job flows)
JOB_NAME_RE = re.compile(r'^(.*)\.(.*)\.(\d+)\.(\d+)\.(\d+)$')
Expand All @@ -56,7 +57,7 @@
# urlparse()
NETLOC_RE = re.compile(r'//(.*?)((/.*?)?)$')

# Used to check if the candidate candidate uri is actually a local windows path.
# Used to check if the candidate uri is actually a local windows path.
WINPATH_RE = re.compile(r"^[aA-zZ]:\\")


Expand Down Expand Up @@ -417,7 +418,7 @@ def parse_mr_job_stderr(stderr, counters=None):
- *other*: lines that aren't either counters or status messages
"""
# For the corresponding code in Hadoop Streaming, see ``incrCounter()`` in
# http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=markup
# http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=markup # noqa
if isinstance(stderr, str):
stderr = StringIO(stderr)

Expand Down
1 change: 1 addition & 0 deletions mrjob/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from mrjob.parse import iso8601_to_datetime


def est_time_to_hour(job_flow, now=None):
"""How long before job reaches the end of the next full hour since it
began. This is important for billing purposes.
Expand Down
12 changes: 9 additions & 3 deletions mrjob/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,9 @@ def stream_output(self):
raise AssertionError('Run the job before streaming output')

if self._closed is True:
log.warn('WARNING! Trying to stream output from a closed runner, output will probably be empty.')
log.warn(
'WARNING! Trying to stream output from a closed runner, output'
' will probably be empty.')

log.info('Streaming final output from %s' % output_dir)

Expand Down Expand Up @@ -958,7 +960,9 @@ def _parse_setup(self):

# setup_cmds
if self._opts['setup_cmds']:
log.warning("setup_cmds is deprecated since v0.4.2 and will be removed in v0.6.0. Consider using setup instead.")
log.warning(
"setup_cmds is deprecated since v0.4.2 and will be removed"
" in v0.6.0. Consider using setup instead.")

for cmd in self._opts['setup_cmds']:
if not isinstance(cmd, basestring):
Expand All @@ -967,7 +971,9 @@ def _parse_setup(self):

# setup_scripts
if self._opts['setup_scripts']:
log.warning("setup_scripts is deprecated since v0.4.2 and will be removed in v0.6.0. Consider using setup instead.")
log.warning(
"setup_scripts is deprecated since v0.4.2 and will be removed"
" in v0.6.0. Consider using setup instead.")

for path in self._opts['setup_scripts']:
path_dict = parse_legacy_hash_path('file', path)
Expand Down
4 changes: 2 additions & 2 deletions mrjob/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
r"((?P<single_quoted>'[^']*')|"
r'(?P<double_quoted>"([^"\\]|\\.)*")|'
r'(?P<hash_path>'
r'(?P<path>([A-Za-z][A-Za-z0-9\.-]*://([^\'"\s\\]|\\.)+)|'
r'([^\'":=\s\\]|\\.)+)'
r'(?P<path>([A-Za-z][A-Za-z0-9\.-]*://([^\'"\s\\]|\\.)+)|' # noqa
r'([^\'":=\s\\]|\\.)+)' # noqa
r'#(?P<name>([^\'":;><|=/#\s\\]|\\.)*)'
r'(?P<name_slash>/)?)|'
r'(?P<unquoted>([^\'":=\s\\]|\\.)+)|'
Expand Down
9 changes: 6 additions & 3 deletions mrjob/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ def __init__(self, **kwargs):
raise ValueError("Step has no mappers and no reducers")

self.has_explicit_mapper = any(
value for name, value in kwargs.iteritems() if name in _MAPPER_FUNCS)
value for name, value in kwargs.iteritems()
if name in _MAPPER_FUNCS)

self.has_explicit_combiner = any(
value for name, value in kwargs.iteritems() if name in _COMBINER_FUNCS)
value for name, value in kwargs.iteritems()
if name in _COMBINER_FUNCS)

self.has_explicit_reducer = any(
value for name, value in kwargs.iteritems() if name in _REDUCER_FUNCS)
value for name, value in kwargs.iteritems()
if name in _REDUCER_FUNCS)

steps = dict((f, None) for f in _JOB_STEP_PARAMS)

Expand Down
23 changes: 12 additions & 11 deletions mrjob/tools/emr/collect_emr_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Collect EMR stats from active jobflows.
""" Collect EMR stats from active jobflows.
Active jobflows are those in states of:
BOOTSTRAPPING, RUNNING, STARTING, and WAITING.
Collected stats include total number of active jobflows and total
Collected stats include total number of active jobflows and total
number of Amazon EC2 instances used to execute these jobflows.
The instance counts are not separated by instance type.
Expand Down Expand Up @@ -49,9 +49,9 @@
from mrjob.job import MRJob
from mrjob.options import add_basic_opts


log = getLogger(__name__)


def main(args):
# parser command-line args
usage = '%prog [options]'
Expand All @@ -63,7 +63,8 @@ def main(args):
description += "these jobflows. The instance counts are not separated by"
description += "instance type."
option_parser = OptionParser(usage=usage, description=description)
option_parser.add_option("-p", "--pretty-print",
option_parser.add_option(
"-p", "--pretty-print",
action="store_true", dest="pretty_print", default=False,
help=('Pretty print the collected stats'))
add_basic_opts(option_parser)
Expand Down Expand Up @@ -95,15 +96,15 @@ def pretty_print(stats):
print 'Number of instance counts: %s' % s['total_instance_count']
print '* The active jobflows are those in states of BOOTSTRAPPING,'
print ' STARTING, RUNNING, and WAITING.'


def collect_active_job_flows(conf_paths):
"""Collect active job flow information from EMR.
:param str conf_path: Alternate path to read :py:mod:`mrjob.conf` from,
:param str conf_path: Alternate path to read :py:mod:`mrjob.conf` from,
or ``False`` to ignore all config files
Return a list of job flows
Return a list of job flows
"""
emr_conn = EMRJobRunner(conf_paths=conf_paths).make_emr_conn()
active_states = ['STARTING', 'BOOTSTRAPPING', 'WAITING', 'RUNNING']
Expand All @@ -112,14 +113,14 @@ def collect_active_job_flows(conf_paths):


def job_flows_to_stats(job_flows):
""" Compute total number of jobflows and instance count given a list of
""" Compute total number of jobflows and instance count given a list of
jobflows.
:param job_flows: A list of :py:class:`boto.emr.EmrObject`
Returns a dictionary with many keys, including:
* *timestamp*: The time when stats are collected (current UTC time in
* *timestamp*: The time when stats are collected (current UTC time in
POSIX timestamp, float format).
* *num_jobflows*: Total number of jobflows.
* *total_instance_count*: Total number of instance counts from jobflows.
Expand All @@ -132,7 +133,7 @@ def job_flows_to_stats(job_flows):
now = datetime.utcnow()

stats = {}
stats['timestamp'] = mktime(now.timetuple()) # convert into POSIX timestamp
stats['timestamp'] = mktime(now.timetuple()) # convert to POSIX timestamp
stats['num_jobflows'] = len(job_flow_ids)
stats['total_instance_count'] = total_instance_count

Expand Down
3 changes: 2 additions & 1 deletion mrjob/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def buffer_iterator_to_line_iterator(iterator):
# this will happen eventually
buf = buf[start:]

# set search offset so we do not need to scan this part of the buffer again
# set search offset so we do not need to scan this part of
# the buffer again
search_offset = len(buf)
break

Expand Down
1 change: 0 additions & 1 deletion tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from tests.mr_two_step_job import MRTwoStepJob
from tests.mr_verbose_job import MRVerboseJob
from tests.mr_word_count import MRWordCount
from tests.quiet import logger_disabled
from tests.quiet import no_handlers_for_logger
from tests.sandbox import mrjob_conf_patcher
from tests.sandbox import EmptyMrjobConfTestCase
Expand Down

0 comments on commit 4f8d31e

Please sign in to comment.