From 0cb3ccc2184223819bdde3e87fa37d775e83c0de Mon Sep 17 00:00:00 2001 From: Klaas Bosteels Date: Fri, 28 Nov 2008 09:05:18 +0100 Subject: [PATCH] beginnings of 0.20, needs more work --- examples/itertwice.py | 2 +- src/python/dumbo.py | 156 ++++++++++++++++++++++++------------------ src/python/setup.py | 2 +- 3 files changed, 92 insertions(+), 68 deletions(-) diff --git a/examples/itertwice.py b/examples/itertwice.py index 5816144..38e3615 100644 --- a/examples/itertwice.py +++ b/examples/itertwice.py @@ -15,7 +15,7 @@ def mapper1(key,value): for word in value.split(): yield word,1 def mapper2(key,value): - for letter in value.split()[0]: yield letter,1 + for letter in key: yield letter,1 def reducer1(key,values): count = sum(values) diff --git a/src/python/dumbo.py b/src/python/dumbo.py index a4a3d7d..6edac11 100644 --- a/src/python/dumbo.py +++ b/src/python/dumbo.py @@ -2,6 +2,15 @@ from itertools import groupby from operator import itemgetter +def identitymapper(key,value): + yield key,value + +def identityreducer(key,values): + for value in values: yield key,value + +def sumreducer(key,values): + yield key,sum(values) + def itermap(data,mapfunc): for key,value in data: for output in mapfunc(key,value): yield output @@ -29,11 +38,14 @@ def dumptext(outputs): del newoutput[:] def loadtext(inputs): - for input in inputs: yield (None,input) + offset = 0 + for input in inputs: + yield (offset,input) + offset += len(input) def run(mapper,reducer=None,combiner=None, mapconf=None,redconf=None,mapclose=None,redclose=None, - code_in=False,code_out=False,iter=0,newopts={}): + iter=0,newopts={}): if len(sys.argv) > 1 and not sys.argv[1][0] == "-": try: regex = re.compile(".*\.egg") @@ -52,28 +64,29 @@ def run(mapper,reducer=None,combiner=None, iterarg = 0 # default value if len(sys.argv) > 2: iterarg = int(sys.argv[2]) if iterarg == iter: + inputs = loadcode(line[:-1] for line in sys.stdin) if sys.argv[1].startswith("map"): if mapconf: mapconf() - if (hasattr(mapper,"coded") and mapper.coded) or code_in: - inputs = loadcode(line[:-1] for line in sys.stdin) - else: inputs = loadtext(line[:-1] for line in sys.stdin) outputs = itermap(inputs,mapper) if combiner: outputs = iterreduce(sorted(outputs),combiner) - if reducer or code_out: outputs = dumpcode(outputs) + if reducer: outputs = dumpcode(outputs) else: outputs = dumptext(outputs) if mapclose: mapclose() elif reducer: if redconf: redconf() - inputs = loadcode(line[:-1] for line in sys.stdin) - outputs = iterreduce(inputs,reducer) - if (hasattr(reducer,"coded") and reducer.coded) or code_out: - outputs = dumpcode(outputs) - else: outputs = dumptext(outputs) + outputs = dumpcode(iterreduce(inputs,reducer)) if redclose: redclose() - else: outputs = dumptext((line[:-1],) for line in sys.stdin) + else: outputs = dumptext(inputs) for output in outputs: print "\t".join(output) else: opts = parseargs(sys.argv[1:]) + [("iteration","%i" % iter)] + if hasattr(mapper,'coded'): + print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated" + if mapper.coded: newopts["inputformat"] = "coded" + if hasattr(reducer,'coded'): + print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated" + if reducer.coded: newopts["outputformat"] = "coded" + if not reducer: newopts["numreducetasks"] = "0" key,delindexes = None,[] for index,(key,value) in enumerate(opts): if newopts.has_key(key): delindexes.append(index) @@ -90,22 +103,15 @@ def additer(self,*args,**kwargs): self.iters.append((args,kwargs)) def run(self): scratch = "dumbo-tmp-%i" % random.randint(0,sys.maxint) for index,(args,kwargs) in enumerate(self.iters): - newopts = {} + newopts = {"name": "%s (%s/%s)" % (sys.argv[0].split("/")[-1], + index+1,len(self.iters))} if index != 0: newopts["input"] = "%s-%i" % (scratch,index-1) newopts["delinputs"] = "yes" - mapper,code_in = args[0],False - if kwargs.has_key("code_in"): code_in = kwargs["code_in"] - if (hasattr(mapper,"coded") and mapper.coded) or code_in: - newopts["inputformat"] = "binaryascode" - else: newopts["inputformat"] = "binary" + newopts["inputformat"] = "coded" if index != len(self.iters)-1: newopts["output"] = "%s-%i" % (scratch,index) - reducer,code_out = args[1],False - if kwargs.has_key("code_out"): code_out = kwargs["code_out"] - if (hasattr(reducer,"coded") and reducer.coded) or code_out: - newopts["outputformat"] = "binaryfromcode" - else: newopts["outputformat"] = "binary" + newopts["outputformat"] = "coded" kwargs["iter"],kwargs["newopts"] = index,newopts run(*args,**kwargs) @@ -213,13 +219,14 @@ def dummysystem(*args,**kwargs): return 0 else: progincmd = prog.split("/")[-1] opts.append(("mapper","%s %s map %i" % (python,progincmd,iter))) opts.append(("reducer","%s %s red %i" % (python,progincmd,iter))) - if not addedopts["hadoop"]: return startonunix(prog,opts) + if not addedopts["hadoop"]: return startonunix(prog,opts,python) else: return startonstreaming(prog,opts,addedopts["hadoop"][0]) -def startonunix(prog,opts): +def startonunix(prog,opts,python): opts += configopts("unix",prog,opts) addedopts = getopts(opts,["input","output","mapper","reducer","libegg", - "delinputs","cmdenv","pv"]) + "delinputs","cmdenv","pv","addfilename", + "inputformat","outputformat","numreducetasks"]) mapper,reducer = addedopts["mapper"][0],addedopts["reducer"][0] if (not addedopts["input"]) or (not addedopts["output"]): print >>sys.stderr,"ERROR: input or output not specified" @@ -234,8 +241,23 @@ def startonunix(prog,opts): cat = "pv -cN source" mpv,spv,rpv = "| pv -cN map ","| pv -cN sort ","| pv -cN reduce " else: cat,mpv,spv,rpv = "cat","","","" - retval = execute("%s %s | %s %s %s %s| LC_ALL=C sort %s| %s %s %s %s> '%s'" % \ - (cat,inputs,pyenv,cmdenv,mapper,mpv,spv,pyenv,cmdenv,reducer,rpv,output)) + encodepipe = "| %s -m dumbo encodepipe " % python + if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes': + print >>sys.stderr,"WARNING: the added filenames might be incorrect" + encodepipe += "-addfilename %s " % addedopts["input"][0] + decodepipe = "| %s -m dumbo decodepipe " % python + if addedopts["inputformat"] and addedopts["inputformat"][0] == "coded": + encodepipe = "" + if addedopts["outputformat"] and addedopts["outputformat"][0] == "coded": + decodepipe = "" + if addedopts["numreducetasks"] and addedopts["numreducetasks"][0] == "0": + retval = execute("%s %s %s| %s %s %s %s > '%s'" % \ + (cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv,output)) + else: + retval = execute("%s %s %s| %s %s %s %s| LC_ALL=C " \ + "sort %s| %s %s %s %s%s> '%s'" % \ + (cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv, + spv,pyenv,cmdenv,reducer,decodepipe,rpv,output)) if addedopts["delinputs"] and addedopts["delinputs"][0] == "yes": for file in addedopts["input"]: execute("rm " + file) return retval @@ -244,8 +266,7 @@ def startonstreaming(prog,opts,hadoop): opts += configopts("streaming",prog,opts) addedopts = getopts(opts,["name","delinputs","libegg","libjar", "inputformat","outputformat","nummaptasks","numreducetasks", - "priority","cachefile","cachearchive","codewritable", - "inputascode","outputfromcode","namedcode"]) + "priority","cachefile","cachearchive","codewritable","addfilename"]) opts.append(("file",prog)) opts.append(("file",sys.argv[0])) if not addedopts["name"]: @@ -253,8 +274,10 @@ def startonstreaming(prog,opts,hadoop): else: opts.append(("jobconf","mapred.job.name=%s" % addedopts["name"][0])) if addedopts["nummaptasks"]: opts.append(("jobconf", "mapred.map.tasks=%s" % addedopts["nummaptasks"][0])) - if addedopts["numreducetasks"]: opts.append(("numReduceTasks", - addedopts["numreducetasks"][0])) + if addedopts["numreducetasks"]: + numreducetasks = int(addedopts["numreducetasks"][0]) + opts.append(("numReduceTasks",str(numreducetasks))) + if numreducetasks == 0: addedopts["codewritable"] = ['no'] if addedopts["priority"]: opts.append(("jobconf", "mapred.job.priority=%s" % addedopts["priority"][0])) if addedopts["cachefile"]: opts.append(("cacheFile", @@ -265,48 +288,38 @@ def startonstreaming(prog,opts,hadoop): if not streamingjar: print >>sys.stderr,"ERROR: Streaming jar not found" return 1 - dumbopkg,dumbojar_needed = "org.apache.hadoop.dumbo",False + if not dumbojar: + print >>sys.stderr,"ERROR: Dumbo jar not found" + return 1 + addedopts["libjar"].append(dumbojar) + if not addedopts["inputformat"]: addedopts["inputformat"].append("text") + if not addedopts["outputformat"]: addedopts["outputformat"].append("text") + dumbopkg = "org.apache.hadoop.dumbo" inputformat_shortcuts = { - "text": "org.apache.hadoop.mapred.TextInputFormat", + "text": "org.apache.hadoop.mapred.TextInputFormat", "sequencefile": "org.apache.hadoop.mapred.SequenceFileInputFormat", - "binary": "org.apache.hadoop.mapred.SequenceFileInputFormat", - "textascode": dumbopkg + ".TextAsCodeInputFormat", - "sequencefileascode": dumbopkg + ".SequenceFileAsCodeInputFormat", - "binaryascode": dumbopkg + ".SequenceFileAsCodeInputFormat"} + "coded": "org.apache.hadoop.mapred.SequenceFileInputFormat"} inputformat_shortcuts.update(configopts("inputformats",prog)) if addedopts["inputformat"] and addedopts["inputformat"][0]: inputformat = addedopts["inputformat"][0] if inputformat_shortcuts.has_key(inputformat.lower()): inputformat = inputformat_shortcuts[inputformat.lower()] - dumbojar_needed = True - opts.append(("inputformat",inputformat)) + opts.append(("jobconf", + "dumbo.as.code.input.format.class=" + inputformat)) + opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat")) outputformat_shortcuts = { "text": "org.apache.hadoop.mapred.TextOutputFormat", "sequencefile": "org.apache.hadoop.mapred.SequenceFileOutputFormat", - "binary": "org.apache.hadoop.mapred.SequenceFileOutputFormat", - "textfromcode": dumbopkg + ".TextFromCodeOutputFormat", - "sequencefilefromcode": dumbopkg + ".SequenceFileFromCodeOutputFormat", - "binaryfromcode": dumbopkg + ".SequenceFileFromCodeOutputFormat"} + "coded": "org.apache.hadoop.mapred.SequenceFileOutputFormat"} outputformat_shortcuts.update(configopts("outputformats",prog)) if addedopts["outputformat"] and addedopts["outputformat"][0]: outputformat = addedopts["outputformat"][0] if outputformat_shortcuts.has_key(outputformat.lower()): outputformat = outputformat_shortcuts[outputformat.lower()] - dumbojar_needed = True - opts.append(("outputformat",outputformat)) - if addedopts["inputascode"] and addedopts["inputascode"][0] == 'yes': - opt = getopts(opts,["inputformat"])["inputformat"] - if opt: opts.append(("jobconf", - "dumbo.as.code.input.format.class=" + opt[0])) - opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat")) - dumbojar_needed = True - if addedopts["outputfromcode"] and addedopts["outputfromcode"][0] == 'yes': - opt = getopts(opts,["outputformat"])["outputformat"] - if opt: opts.append(("jobconf", - "dumbo.as.code.output.format.class=" + opt[0])) + opts.append(("jobconf", + "dumbo.from.code.output.format.class=" + outputformat)) opts.append(("outputformat",dumbopkg + ".FromCodeOutputFormat")) - dumbojar_needed = True - if addedopts["codewritable"] and addedopts["codewritable"][0] == 'yes': + if not (addedopts["codewritable"] and addedopts["codewritable"][0] == 'no'): opts.append(("jobconf", "mapred.mapoutput.key.class=%s.CodeWritable" % dumbopkg)) opts.append(("jobconf", @@ -317,14 +330,8 @@ def startonstreaming(prog,opts,hadoop): opts.append(("mapper",dumbopkg + ".CodeWritableMapper")) opts.append(("jobconf","dumbo.code.writable.map.class=" \ "org.apache.hadoop.streaming.PipeMapper")) - dumbojar_needed = True - if addedopts["namedcode"] and addedopts["namedcode"][0] == 'yes': + if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes': opts.append(("jobconf", "dumbo.as.named.code=true")) - if dumbojar_needed: - if not dumbojar: - print >>sys.stderr,"ERROR: Dumbo jar not found" - return 1 - addedopts["libjar"].append(dumbojar) envdef("PYTHONPATH",addedopts["libegg"],"file",opts, shortcuts=dict(configopts("eggs",prog))) hadenv = envdef("HADOOP_CLASSPATH",addedopts["libjar"],"file",opts, @@ -362,16 +369,33 @@ def cat(path,opts): except IOError: pass # ignore return 0 +def encodepipe(opts): + addedopts,filename = getopts(opts,["addfilename"]),None + if addedopts["addfilename"]: filename = addedopts["addfilename"][0] + outputs = loadtext(line[:-1] for line in sys.stdin) + if filename: outputs = (((filename,key),value) for key,value in outputs) + for output in dumpcode(outputs): print "\t".join(output) + return 0 + +def decodepipe(opts): + outputs = loadcode(line[:-1] for line in sys.stdin) + for output in dumptext(outputs): print "\t".join(output) + return 0 + if __name__ == "__main__": - if len(sys.argv) < 3: + if len(sys.argv) < 2: print "Usages:" print " python -m dumbo submit []" print " python -m dumbo start []" print " python -m dumbo cat []" + print " python -m dumbo encodepipe []" + print " python -m dumbo decodepipe []" sys.exit(1) if sys.argv[1] == "submit": retval = submit(sys.argv[2],parseargs(sys.argv[2:])) elif sys.argv[1] == "start": retval = start(sys.argv[2],parseargs(sys.argv[2:])) elif sys.argv[1] == "cat": retval = cat(sys.argv[2],parseargs(sys.argv[2:])) + elif sys.argv[1] == "encodepipe": retval = encodepipe(parseargs(sys.argv[2:])) + elif sys.argv[1] == "decodepipe": retval = decodepipe(parseargs(sys.argv[2:])) else: print >>sys.stderr,"WARNING: the command 'python -m dumbo ' is " \ "deprecated, use 'python ' instead" diff --git a/src/python/setup.py b/src/python/setup.py index 94ff713..8a799ee 100644 --- a/src/python/setup.py +++ b/src/python/setup.py @@ -1,7 +1,7 @@ from distutils.core import setup setup(name='dumbo', - version='0.19.4', + version='0.20.1', py_modules=['dumbo'], author='Klaas Bosteels', author_email='klaas@last.fm',