Skip to content
Browse files

"-typedbytes input|output|all" instead of "-typedbytes yes"

  • Loading branch information...
1 parent 2f23e72 commit 37efb090776cd4a6eb2406cbc10c7ed1a24a952b Klaas Bosteels committed Jan 28, 2009
Showing with 38 additions and 21 deletions.
  1. +38 −21 src/python/dumbo.py
View
59 src/python/dumbo.py
@@ -270,7 +270,8 @@ def run(self):
self.opts.append(('cacheArchive', cachearchive))
if not addedopts['inputformat']:
addedopts['inputformat'] = ['auto']
- if not (addedopts['typedbytes'] and addedopts['typedbytes'][0] == 'yes'):
+ if not (addedopts['typedbytes'] and \
+ addedopts['typedbytes'][0] in ('input','all')):
inputformat_shortcuts = \
{'code': dumbopkg + '.SequenceFileAsCodeInputFormat',
'text': dumbopkg + '.TextAsCodeInputFormat',
@@ -305,14 +306,15 @@ def run(self):
outputformat = addedopts['outputformat'][0]
if outputformat_shortcuts.has_key(outputformat.lower()):
outputformat = outputformat_shortcuts[outputformat.lower()]
- if not (addedopts['typedbytes'] and addedopts['typedbytes'][0] == 'yes'):
+ if not (addedopts['typedbytes'] and \
+ addedopts['typedbytes'][0] in ('output','all')):
self.opts.append(('jobconf', 'dumbo.from.code.output.format.class='
+ outputformat))
self.opts.append(('outputformat', dumbopkg + '.FromCodeOutputFormat'))
else:
self.opts.append(('outputformat', outputformat))
if not (addedopts['codewritable'] and addedopts['codewritable'][0] == 'no') and \
- not (addedopts['typedbytes'] and addedopts['typedbytes'][0] == 'yes'):
+ not addedopts['typedbytes']:
self.opts.append(('jobconf', 'mapred.mapoutput.key.class=%s.CodeWritable'
% dumbopkg))
self.opts.append(('jobconf', 'mapred.mapoutput.value.class=%s.CodeWritable'
@@ -322,10 +324,10 @@ def run(self):
+ urllib.quote_plus(opt[0])))
self.opts.append(('mapper', dumbopkg + '.CodeWritableMapper'))
self.opts.append(('jobconf', 'dumbo.code.writable.map.class=org.apache.hadoop.streaming.PipeMapper'))
- if addedopts['typedbytes'] and addedopts['typedbytes'][0] == 'yes':
+ if addedopts['typedbytes']:
import typedbytes
self.opts.append(('file', typedbytes.findmodpath()))
- self.opts.append(('typedbytes', 'all'))
+ self.opts.append(('typedbytes', addedopts['typedbytes'][0]))
if addedopts['addpath'] and addedopts['addpath'][0] == 'yes':
self.opts.append(('cmdenv', 'dumbo_add_path=true'))
pyenv = envdef('PYTHONPATH',
@@ -403,14 +405,14 @@ def run(mapper,
combiner = combiner().reduce
else:
combiner = combiner()
- if os.environ.has_key('stream_input_typed_bytes') and \
- os.environ['stream_input_typed_bytes'] == 'true':
- print >> sys.stderr, "INFO: inputting typed bytes"
- import typedbytes
- inputs = typedbytes.PairedInput(sys.stdin).reads()
- else:
- inputs = loadcode(line[:-1] for line in sys.stdin)
if sys.argv[1].startswith('map'):
+ if os.environ.has_key('stream_map_input_typed_bytes') and \
+ os.environ['stream_map_input_typed_bytes'] == 'true':
+ print >> sys.stderr, "INFO: inputting typed bytes"
+ import typedbytes
+ inputs = typedbytes.PairedInput(sys.stdin).reads()
+ else:
+ inputs = loadcode(line[:-1] for line in sys.stdin)
if mapconf:
mapconf()
if os.environ.has_key('dumbo_add_path'):
@@ -424,22 +426,37 @@ def run(mapper,
outputs = iterreduce(sorted(outputs, buffersize), combiner)
if mapclose:
mapclose()
+ if os.environ.has_key('stream_map_output_typed_bytes') and \
+ os.environ['stream_map_output_typed_bytes'] == 'true':
+ print >> sys.stderr, "INFO: outputting typed bytes"
+ import typedbytes
+ typedbytes.PairedOutput(sys.stdout).writes(outputs)
+ else:
+ for output in dumpcode(outputs):
+ print '\t'.join(output)
elif reducer:
+ if os.environ.has_key('stream_reduce_input_typed_bytes') and \
+ os.environ['stream_reduce_input_typed_bytes'] == 'true':
+ print >> sys.stderr, "INFO: inputting typed bytes"
+ import typedbytes
+ inputs = typedbytes.PairedInput(sys.stdin).reads()
+ else:
+ inputs = loadcode(line[:-1] for line in sys.stdin)
if redconf:
redconf()
-
outputs = iterreduce(inputs, reducer)
if redclose:
redclose()
+ if os.environ.has_key('stream_reduce_output_typed_bytes') and \
+ os.environ['stream_reduce_output_typed_bytes'] == 'true':
+ print >> sys.stderr, "INFO: outputting typed bytes"
+ import typedbytes
+ typedbytes.PairedOutput(sys.stdout).writes(outputs)
+ else:
+ for output in dumpcode(outputs):
+ print '\t'.join(output)
else:
- outputs = inputs
- if os.environ.has_key('stream_output_typed_bytes') and \
- os.environ['stream_output_typed_bytes'] == 'true':
- print >> sys.stderr, "INFO: outputting typed bytes"
- import typedbytes
- typedbytes.PairedOutput(sys.stdout).writes(outputs)
- else:
- for output in dumpcode(outputs):
+ for output in dumpcode(inputs):
print '\t'.join(output)
else:
opts = parseargs(sys.argv[1:])

0 comments on commit 37efb09

Please sign in to comment.
Something went wrong with that request. Please try again.