Skip to content

Commit

Permalink
beginnings of 0.20, needs more work
Browse files Browse the repository at this point in the history
  • Loading branch information
Klaas Bosteels authored and Klaas Bosteels committed Nov 28, 2008
1 parent 8585087 commit 0cb3ccc
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 68 deletions.
2 changes: 1 addition & 1 deletion examples/itertwice.py
Expand Up @@ -15,7 +15,7 @@ def mapper1(key,value):
for word in value.split(): yield word,1

def mapper2(key,value):
for letter in value.split()[0]: yield letter,1
for letter in key: yield letter,1

def reducer1(key,values):
count = sum(values)
Expand Down
156 changes: 90 additions & 66 deletions src/python/dumbo.py
Expand Up @@ -2,6 +2,15 @@
from itertools import groupby
from operator import itemgetter

def identitymapper(key,value):
yield key,value

def identityreducer(key,values):
for value in values: yield key,value

def sumreducer(key,values):
yield key,sum(values)

def itermap(data,mapfunc):
for key,value in data:
for output in mapfunc(key,value): yield output
Expand Down Expand Up @@ -29,11 +38,14 @@ def dumptext(outputs):
del newoutput[:]

def loadtext(inputs):
for input in inputs: yield (None,input)
offset = 0
for input in inputs:
yield (offset,input)
offset += len(input)

def run(mapper,reducer=None,combiner=None,
mapconf=None,redconf=None,mapclose=None,redclose=None,
code_in=False,code_out=False,iter=0,newopts={}):
iter=0,newopts={}):
if len(sys.argv) > 1 and not sys.argv[1][0] == "-":
try:
regex = re.compile(".*\.egg")
Expand All @@ -52,28 +64,29 @@ def run(mapper,reducer=None,combiner=None,
iterarg = 0 # default value
if len(sys.argv) > 2: iterarg = int(sys.argv[2])
if iterarg == iter:
inputs = loadcode(line[:-1] for line in sys.stdin)
if sys.argv[1].startswith("map"):
if mapconf: mapconf()
if (hasattr(mapper,"coded") and mapper.coded) or code_in:
inputs = loadcode(line[:-1] for line in sys.stdin)
else: inputs = loadtext(line[:-1] for line in sys.stdin)
outputs = itermap(inputs,mapper)
if combiner: outputs = iterreduce(sorted(outputs),combiner)
if reducer or code_out: outputs = dumpcode(outputs)
if reducer: outputs = dumpcode(outputs)
else: outputs = dumptext(outputs)
if mapclose: mapclose()
elif reducer:
if redconf: redconf()
inputs = loadcode(line[:-1] for line in sys.stdin)
outputs = iterreduce(inputs,reducer)
if (hasattr(reducer,"coded") and reducer.coded) or code_out:
outputs = dumpcode(outputs)
else: outputs = dumptext(outputs)
outputs = dumpcode(iterreduce(inputs,reducer))
if redclose: redclose()
else: outputs = dumptext((line[:-1],) for line in sys.stdin)
else: outputs = dumptext(inputs)
for output in outputs: print "\t".join(output)
else:
opts = parseargs(sys.argv[1:]) + [("iteration","%i" % iter)]
if hasattr(mapper,'coded'):
print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated"
if mapper.coded: newopts["inputformat"] = "coded"
if hasattr(reducer,'coded'):
print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated"
if reducer.coded: newopts["outputformat"] = "coded"
if not reducer: newopts["numreducetasks"] = "0"
key,delindexes = None,[]
for index,(key,value) in enumerate(opts):
if newopts.has_key(key): delindexes.append(index)
Expand All @@ -90,22 +103,15 @@ def additer(self,*args,**kwargs): self.iters.append((args,kwargs))
def run(self):
scratch = "dumbo-tmp-%i" % random.randint(0,sys.maxint)
for index,(args,kwargs) in enumerate(self.iters):
newopts = {}
newopts = {"name": "%s (%s/%s)" % (sys.argv[0].split("/")[-1],
index+1,len(self.iters))}
if index != 0:
newopts["input"] = "%s-%i" % (scratch,index-1)
newopts["delinputs"] = "yes"
mapper,code_in = args[0],False
if kwargs.has_key("code_in"): code_in = kwargs["code_in"]
if (hasattr(mapper,"coded") and mapper.coded) or code_in:
newopts["inputformat"] = "binaryascode"
else: newopts["inputformat"] = "binary"
newopts["inputformat"] = "coded"
if index != len(self.iters)-1:
newopts["output"] = "%s-%i" % (scratch,index)
reducer,code_out = args[1],False
if kwargs.has_key("code_out"): code_out = kwargs["code_out"]
if (hasattr(reducer,"coded") and reducer.coded) or code_out:
newopts["outputformat"] = "binaryfromcode"
else: newopts["outputformat"] = "binary"
newopts["outputformat"] = "coded"
kwargs["iter"],kwargs["newopts"] = index,newopts
run(*args,**kwargs)

Expand Down Expand Up @@ -213,13 +219,14 @@ def dummysystem(*args,**kwargs): return 0
else: progincmd = prog.split("/")[-1]
opts.append(("mapper","%s %s map %i" % (python,progincmd,iter)))
opts.append(("reducer","%s %s red %i" % (python,progincmd,iter)))
if not addedopts["hadoop"]: return startonunix(prog,opts)
if not addedopts["hadoop"]: return startonunix(prog,opts,python)
else: return startonstreaming(prog,opts,addedopts["hadoop"][0])

def startonunix(prog,opts):
def startonunix(prog,opts,python):
opts += configopts("unix",prog,opts)
addedopts = getopts(opts,["input","output","mapper","reducer","libegg",
"delinputs","cmdenv","pv"])
"delinputs","cmdenv","pv","addfilename",
"inputformat","outputformat","numreducetasks"])
mapper,reducer = addedopts["mapper"][0],addedopts["reducer"][0]
if (not addedopts["input"]) or (not addedopts["output"]):
print >>sys.stderr,"ERROR: input or output not specified"
Expand All @@ -234,8 +241,23 @@ def startonunix(prog,opts):
cat = "pv -cN source"
mpv,spv,rpv = "| pv -cN map ","| pv -cN sort ","| pv -cN reduce "
else: cat,mpv,spv,rpv = "cat","","",""
retval = execute("%s %s | %s %s %s %s| LC_ALL=C sort %s| %s %s %s %s> '%s'" % \
(cat,inputs,pyenv,cmdenv,mapper,mpv,spv,pyenv,cmdenv,reducer,rpv,output))
encodepipe = "| %s -m dumbo encodepipe " % python
if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes':
print >>sys.stderr,"WARNING: the added filenames might be incorrect"
encodepipe += "-addfilename %s " % addedopts["input"][0]
decodepipe = "| %s -m dumbo decodepipe " % python
if addedopts["inputformat"] and addedopts["inputformat"][0] == "coded":
encodepipe = ""
if addedopts["outputformat"] and addedopts["outputformat"][0] == "coded":
decodepipe = ""
if addedopts["numreducetasks"] and addedopts["numreducetasks"][0] == "0":
retval = execute("%s %s %s| %s %s %s %s > '%s'" % \
(cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv,output))
else:
retval = execute("%s %s %s| %s %s %s %s| LC_ALL=C " \
"sort %s| %s %s %s %s%s> '%s'" % \
(cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv,
spv,pyenv,cmdenv,reducer,decodepipe,rpv,output))
if addedopts["delinputs"] and addedopts["delinputs"][0] == "yes":
for file in addedopts["input"]: execute("rm " + file)
return retval
Expand All @@ -244,17 +266,18 @@ def startonstreaming(prog,opts,hadoop):
opts += configopts("streaming",prog,opts)
addedopts = getopts(opts,["name","delinputs","libegg","libjar",
"inputformat","outputformat","nummaptasks","numreducetasks",
"priority","cachefile","cachearchive","codewritable",
"inputascode","outputfromcode","namedcode"])
"priority","cachefile","cachearchive","codewritable","addfilename"])
opts.append(("file",prog))
opts.append(("file",sys.argv[0]))
if not addedopts["name"]:
opts.append(("jobconf","mapred.job.name=" + prog.split("/")[-1]))
else: opts.append(("jobconf","mapred.job.name=%s" % addedopts["name"][0]))
if addedopts["nummaptasks"]: opts.append(("jobconf",
"mapred.map.tasks=%s" % addedopts["nummaptasks"][0]))
if addedopts["numreducetasks"]: opts.append(("numReduceTasks",
addedopts["numreducetasks"][0]))
if addedopts["numreducetasks"]:
numreducetasks = int(addedopts["numreducetasks"][0])
opts.append(("numReduceTasks",str(numreducetasks)))
if numreducetasks == 0: addedopts["codewritable"] = ['no']
if addedopts["priority"]: opts.append(("jobconf",
"mapred.job.priority=%s" % addedopts["priority"][0]))
if addedopts["cachefile"]: opts.append(("cacheFile",
Expand All @@ -265,48 +288,38 @@ def startonstreaming(prog,opts,hadoop):
if not streamingjar:
print >>sys.stderr,"ERROR: Streaming jar not found"
return 1
dumbopkg,dumbojar_needed = "org.apache.hadoop.dumbo",False
if not dumbojar:
print >>sys.stderr,"ERROR: Dumbo jar not found"
return 1
addedopts["libjar"].append(dumbojar)
if not addedopts["inputformat"]: addedopts["inputformat"].append("text")
if not addedopts["outputformat"]: addedopts["outputformat"].append("text")
dumbopkg = "org.apache.hadoop.dumbo"
inputformat_shortcuts = {
"text": "org.apache.hadoop.mapred.TextInputFormat",
"text": "org.apache.hadoop.mapred.TextInputFormat",
"sequencefile": "org.apache.hadoop.mapred.SequenceFileInputFormat",
"binary": "org.apache.hadoop.mapred.SequenceFileInputFormat",
"textascode": dumbopkg + ".TextAsCodeInputFormat",
"sequencefileascode": dumbopkg + ".SequenceFileAsCodeInputFormat",
"binaryascode": dumbopkg + ".SequenceFileAsCodeInputFormat"}
"coded": "org.apache.hadoop.mapred.SequenceFileInputFormat"}
inputformat_shortcuts.update(configopts("inputformats",prog))
if addedopts["inputformat"] and addedopts["inputformat"][0]:
inputformat = addedopts["inputformat"][0]
if inputformat_shortcuts.has_key(inputformat.lower()):
inputformat = inputformat_shortcuts[inputformat.lower()]
dumbojar_needed = True
opts.append(("inputformat",inputformat))
opts.append(("jobconf",
"dumbo.as.code.input.format.class=" + inputformat))
opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat"))
outputformat_shortcuts = {
"text": "org.apache.hadoop.mapred.TextOutputFormat",
"sequencefile": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
"binary": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
"textfromcode": dumbopkg + ".TextFromCodeOutputFormat",
"sequencefilefromcode": dumbopkg + ".SequenceFileFromCodeOutputFormat",
"binaryfromcode": dumbopkg + ".SequenceFileFromCodeOutputFormat"}
"coded": "org.apache.hadoop.mapred.SequenceFileOutputFormat"}
outputformat_shortcuts.update(configopts("outputformats",prog))
if addedopts["outputformat"] and addedopts["outputformat"][0]:
outputformat = addedopts["outputformat"][0]
if outputformat_shortcuts.has_key(outputformat.lower()):
outputformat = outputformat_shortcuts[outputformat.lower()]
dumbojar_needed = True
opts.append(("outputformat",outputformat))
if addedopts["inputascode"] and addedopts["inputascode"][0] == 'yes':
opt = getopts(opts,["inputformat"])["inputformat"]
if opt: opts.append(("jobconf",
"dumbo.as.code.input.format.class=" + opt[0]))
opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat"))
dumbojar_needed = True
if addedopts["outputfromcode"] and addedopts["outputfromcode"][0] == 'yes':
opt = getopts(opts,["outputformat"])["outputformat"]
if opt: opts.append(("jobconf",
"dumbo.as.code.output.format.class=" + opt[0]))
opts.append(("jobconf",
"dumbo.from.code.output.format.class=" + outputformat))
opts.append(("outputformat",dumbopkg + ".FromCodeOutputFormat"))
dumbojar_needed = True
if addedopts["codewritable"] and addedopts["codewritable"][0] == 'yes':
if not (addedopts["codewritable"] and addedopts["codewritable"][0] == 'no'):
opts.append(("jobconf",
"mapred.mapoutput.key.class=%s.CodeWritable" % dumbopkg))
opts.append(("jobconf",
Expand All @@ -317,14 +330,8 @@ def startonstreaming(prog,opts,hadoop):
opts.append(("mapper",dumbopkg + ".CodeWritableMapper"))
opts.append(("jobconf","dumbo.code.writable.map.class=" \
"org.apache.hadoop.streaming.PipeMapper"))
dumbojar_needed = True
if addedopts["namedcode"] and addedopts["namedcode"][0] == 'yes':
if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes':
opts.append(("jobconf", "dumbo.as.named.code=true"))
if dumbojar_needed:
if not dumbojar:
print >>sys.stderr,"ERROR: Dumbo jar not found"
return 1
addedopts["libjar"].append(dumbojar)
envdef("PYTHONPATH",addedopts["libegg"],"file",opts,
shortcuts=dict(configopts("eggs",prog)))
hadenv = envdef("HADOOP_CLASSPATH",addedopts["libjar"],"file",opts,
Expand Down Expand Up @@ -362,16 +369,33 @@ def cat(path,opts):
except IOError: pass # ignore
return 0

def encodepipe(opts):
addedopts,filename = getopts(opts,["addfilename"]),None
if addedopts["addfilename"]: filename = addedopts["addfilename"][0]
outputs = loadtext(line[:-1] for line in sys.stdin)
if filename: outputs = (((filename,key),value) for key,value in outputs)
for output in dumpcode(outputs): print "\t".join(output)
return 0

def decodepipe(opts):
outputs = loadcode(line[:-1] for line in sys.stdin)
for output in dumptext(outputs): print "\t".join(output)
return 0

if __name__ == "__main__":
if len(sys.argv) < 3:
if len(sys.argv) < 2:
print "Usages:"
print " python -m dumbo submit <python program> [<options>]"
print " python -m dumbo start <python program> [<options>]"
print " python -m dumbo cat <path> [<options>]"
print " python -m dumbo encodepipe [<options>]"
print " python -m dumbo decodepipe [<options>]"
sys.exit(1)
if sys.argv[1] == "submit": retval = submit(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "start": retval = start(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "cat": retval = cat(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "encodepipe": retval = encodepipe(parseargs(sys.argv[2:]))
elif sys.argv[1] == "decodepipe": retval = decodepipe(parseargs(sys.argv[2:]))
else:
print >>sys.stderr,"WARNING: the command 'python -m dumbo <prog>' is " \
"deprecated, use 'python <prog>' instead"
Expand Down
2 changes: 1 addition & 1 deletion src/python/setup.py
@@ -1,7 +1,7 @@
from distutils.core import setup

setup(name='dumbo',
version='0.19.4',
version='0.20.1',
py_modules=['dumbo'],
author='Klaas Bosteels',
author_email='klaas@last.fm',
Expand Down

0 comments on commit 0cb3ccc

Please sign in to comment.