Skip to content

Commit

Permalink
Merge branch 'master' of git@github.com:klbostee/dumbo
Browse files Browse the repository at this point in the history
  • Loading branch information
klbostee committed May 18, 2009
2 parents 050a6ac + fec8493 commit cde6182
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
8 changes: 5 additions & 3 deletions dumbo/core.py
Expand Up @@ -277,7 +277,7 @@ def run(self):
' -m dumbo encodepipe -file ' + ' -file '.join(inputs)
if addedopts['inputformat'] and addedopts['inputformat'][0] == 'code':
encodepipe += ' -alreadycoded yes'
if addedopts['addpath'] and addedopts['addpath'][0] == 'yes':
if addedopts['addpath'] and addedopts['addpath'][0] != 'no':
encodepipe += ' -addpath yes'
if addedopts['numreducetasks'] and addedopts['numreducetasks'][0] == '0':
retval = execute("%s | %s %s %s %s > '%s'" % (encodepipe,
Expand Down Expand Up @@ -409,7 +409,7 @@ def run(self):
if outputformat_shortcuts.has_key(outputformat.lower()):
outputformat = outputformat_shortcuts[outputformat.lower()]
self.opts.append(('outputformat', outputformat))
if addedopts['addpath'] and addedopts['addpath'][0] == 'yes':
if addedopts['addpath'] and addedopts['addpath'][0] != 'no':
self.opts.append(('cmdenv', 'dumbo_addpath=true'))
pyenv = envdef('PYTHONPATH',
addedopts['libegg'],
Expand Down Expand Up @@ -603,12 +603,14 @@ def run(mapper,
print >> sys.stderr, 'ERROR: no output path given'
sys.exit(1)
preoutputsopt = getopt(opts, 'preoutputs')
addpathopt = getopt(opts, 'addpath', delete=False)
if iter != 0:
newopts['input'] = outputopt[0] + "_pre" + str(iter)
if not (preoutputsopt and preoutputsopt[0] == 'yes'):
newopts['delinputs'] = 'yes'
newopts['inputformat'] = 'code'
newopts['addpath'] = 'no'
if addpathopt and addpathopt[0] == 'yes': # not when == 'iter'
newopts['addpath'] = 'no'
if iter < itercnt - 1:
newopts['output'] = outputopt[0] + "_pre" + str(iter + 1)
newopts['outputformat'] = 'code'
Expand Down
18 changes: 16 additions & 2 deletions dumbo/lib.py
Expand Up @@ -99,7 +99,7 @@ def __new__(cls):

def __init__(self):
self._mappers = []
self.opts = [("addpath", "yes")]
self.opts = [("addpath", "iter")]

def itermappers(self):
for pattern, mapper in self._mappers:
Expand Down Expand Up @@ -143,9 +143,23 @@ class JoinReducer(object):

def __call__(self, key, values):
if key.isprimary:
self.primary(key.body, values)
output = self.primary(key.body, values)
if output:
for k, v in output:
jk = copy(key)
jk.body = k
yield jk, v
else:
for k, v in self.secondary(key.body, values):
jk = copy(key)
jk.body = k
yield jk, v

def primary(self, key, values):
for value in values:
yield key, value

def secondary(self, key, values):
for value in values:
yield key, value

2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -2,7 +2,7 @@

setup(
name = 'dumbo',
version = '0.21.13',
version = '0.21.14',
author = 'Klaas Bosteels',
author_email = 'klaas@last.fm',
license = 'Apache Software License (ASF)',
Expand Down

0 comments on commit cde6182

Please sign in to comment.