Skip to content

Commit

Permalink
Initial version with working typedbytes
Browse files Browse the repository at this point in the history
  • Loading branch information
dgleich committed Feb 7, 2012
1 parent 3d2c7bf commit def0452
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 19 deletions.
6 changes: 6 additions & 0 deletions mrjob/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,12 @@ def _run_job_in_hadoop(self):
streaming_args.append(cmd_line(self._reducer_args(step_num)))
else:
streaming_args.extend(['-jobconf', 'mapred.reduce.tasks=0'])

# setup typedbytes communication
streaming_args.extend(['-jobconf', 'stream.map.input=typedbytes'])
streaming_args.extend(['-jobconf', 'stream.reduce.input=typedbytes'])
streaming_args.extend(['-jobconf', 'stream.reduce.output=typedbytes'])
streaming_args.extend(['-jobconf', 'stream.map.output=typedbytes'])

log.debug('> %s' % cmd_line(streaming_args))
step_proc = Popen(streaming_args, stdout=PIPE, stderr=PIPE)
Expand Down
34 changes: 33 additions & 1 deletion mrjob/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def reducer(self, word, occurrences):
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO

import typedbytes

# don't use relative imports, to allow this script to be invoked as __main__
from mrjob.conf import combine_dicts
Expand All @@ -126,6 +128,7 @@ def reducer(self, word, occurrences):
from mrjob.util import log_to_stream
from mrjob.util import parse_and_save_options
from mrjob.util import read_input
from mrjob.util import input_files


log = logging.getLogger('mrjob.job')
Expand Down Expand Up @@ -788,6 +791,30 @@ def mr_job_script(cls):
return inspect.getsourcefile(cls)

### Other useful utilities ###

def _wrap_typedbytes(self):
tbout = typedbytes.Output(self.stdout)

def read_lines():
paths = self.args or ['-']
for path in paths:
for file in input_files(path, stdin=self.stdin):
for key,val in typedbytes.PairedInput(file):
log.info('Read keyval pair, key=%s', str(key))
yield key,val

def write_line(key, value):
try:
tbout.write(key)
tbout.write(value)
except Exception, e:
if self.options.strict_protocols:
raise
else:
self.increment_counter('Unencodable output',
e.__class__.__name__)

return read_lines, write_line

def _read_input(self):
"""Read from stdin, or one more files, or directories.
Expand All @@ -802,7 +829,7 @@ def _read_input(self):
for path in paths:
for line in read_input(path, stdin=self.stdin):
yield line

def _wrap_protocols(self, step_num, step_type):
"""Pick the protocol classes to use for reading and writing
for the given step, and wrap them so that bad input and output
Expand All @@ -819,6 +846,11 @@ def _wrap_protocols(self, step_num, step_type):
step_num -- which step to run (e.g. 0)
step_type -- 'M' for mapper, 'C' for combiner, 'R' for reducer
"""

# TEMP code to evaluate typedbytes
if True:
return self._wrap_typedbytes()

read, write = self.pick_protocols(step_num, step_type)

def read_lines():
Expand Down
55 changes: 37 additions & 18 deletions mrjob/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ def populate_option_groups_with_options(assignments, indexed_options):
key=lambda item: item.get_opt_string())


def read_input(path, stdin=None):
"""Stream input the way Hadoop would.
def input_files(path, stdin=None):
"""An iterable of file objects for Hadoop-like syntax
- Resolve globs (``foo_*.gz``).
- Decompress ``.gz`` and ``.bz2`` files.
Expand All @@ -309,41 +309,53 @@ def read_input(path, stdin=None):
"""
if stdin is None:
stdin = sys.stdin

# handle '-' (special case)

if path == '-':
for line in stdin:
yield line
yield stdin
return

# resolve globs
paths = glob.glob(path)
if not paths:
raise IOError(2, 'No such file or directory: %r' % path)
elif len(paths) > 1:
for path in paths:
for line in read_input(path, stdin=stdin):
yield line
for file in get_files_from_input(path, stdin=stdin):
yield file
return
else:
path = paths[0]

# recurse through directories
if os.path.isdir(path):
for dirname, _, filenames in os.walk(path):
for filename in filenames:
for line in read_input(os.path.join(dirname, filename),
for file in get_files_from_input(os.path.join(dirname, filename),
stdin=stdin):
yield line
yield file
return

# read from files
for line in read_file(path):
yield line
yield file_object(path)

def read_input(path, stdin=None):
"""Stream input the way Hadoop would.
def read_file(path, fileobj=None):
"""Reads a file.
- Resolve globs (``foo_*.gz``).
- Decompress ``.gz`` and ``.bz2`` files.
- If path is ``'-'``, read from stdin
- If path is a directory, recursively read its contents.
You can redefine *stdin* for ease of testing. *stdin* can actually be
any iterable that yields lines (e.g. a list).
"""

for file in input_files(path,stdin):
for line in file:
yield line

def file_object(path, fileobj=None):
"""Return file.
- Decompress ``.gz`` and ``.bz2`` files.
- If *fileobj* is not ``None``, stream lines from the *fileobj*
Expand All @@ -360,9 +372,16 @@ def read_file(path, fileobj=None):
else:
f = fileobj

for line in f:
yield line
return f

def read_file(path, fileobj=None):
"""Reads a file.
- Decompress ``.gz`` and ``.bz2`` files.
- If *fileobj* is not ``None``, stream lines from the *fileobj*
"""
for line in file_object(path, fileobj):
yield line

def bunzip2_stream(fileobj):
"""Return an uncompressed bz2 stream from a file object
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
'boto>=2.0',
'PyYAML',
'simplejson>=2.0.9',
'typedbytes',
],
'provides': ['mrjob'],
'test_suite': 'tests.suite.load_tests',
Expand Down

0 comments on commit def0452

Please sign in to comment.