diff --git a/README b/README index 2b99852..24fa212 100644 --- a/README +++ b/README @@ -31,7 +31,7 @@ USAGE python examples/wordcount.py -hadoop /path/to/hadoop \ -file excludes.txt -input brian.txt -output brian-wc -/usr/local/hadoop/bin/hadoop dfs -getmerge brian-wc brian-wc.txt +python -m dumbo cat brian-wc > brian-wc.txt MORE INFO diff --git a/examples/itertwice.py b/examples/itertwice.py index 38e3615..5c5993c 100644 --- a/examples/itertwice.py +++ b/examples/itertwice.py @@ -2,11 +2,11 @@ Example of two iterations in one Dumbo program: >>> import dumbo ->>> opts = [('input','brian.txt'),('output','counts.txt')] +>>> opts = [('input','brian.txt'),('output','counts.txt'),('inputformat','text')] >>> logfile = open('log.txt','a') >>> dumbo.submit('itertwice.py',opts,stdout=logfile,stderr=logfile) 0 ->>> output = dict(line[:-1].split('\\t') for line in open('counts.txt')) +>>> output = dict(dumbo.loadcode(open('counts.txt'))) >>> int(output['e']) 14 """ diff --git a/examples/wordcount.py b/examples/wordcount.py index 4c20184..7fa9365 100644 --- a/examples/wordcount.py +++ b/examples/wordcount.py @@ -2,11 +2,11 @@ Counts how many times each non-excluded word occurs: >>> import dumbo ->>> opts = [('input','brian.txt'),('output','counts.txt')] +>>> opts = [('input','brian.txt'),('output','counts.txt'),('inputformat','text')] >>> logfile = open('log.txt','a') >>> dumbo.submit('wordcount.py',opts,stdout=logfile,stderr=logfile) 0 ->>> output = dict(line[:-1].split('\\t') for line in open('counts.txt')) +>>> output = dict(dumbo.loadcode(open('counts.txt'))) >>> int(output['Brian']) 6 """ diff --git a/src/python/dumbo.py b/src/python/dumbo.py index 6edac11..5422be1 100644 --- a/src/python/dumbo.py +++ b/src/python/dumbo.py @@ -69,23 +69,15 @@ def run(mapper,reducer=None,combiner=None, if mapconf: mapconf() outputs = itermap(inputs,mapper) if combiner: outputs = iterreduce(sorted(outputs),combiner) - if reducer: outputs = dumpcode(outputs) - else: outputs = dumptext(outputs) if mapclose: mapclose() elif reducer: if redconf: redconf() - outputs = dumpcode(iterreduce(inputs,reducer)) + outputs = iterreduce(inputs,reducer) if redclose: redclose() - else: outputs = dumptext(inputs) - for output in outputs: print "\t".join(output) + else: outputs = inputs + for output in dumpcode(outputs): print "\t".join(output) else: opts = parseargs(sys.argv[1:]) + [("iteration","%i" % iter)] - if hasattr(mapper,'coded'): - print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated" - if mapper.coded: newopts["inputformat"] = "coded" - if hasattr(reducer,'coded'): - print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated" - if reducer.coded: newopts["outputformat"] = "coded" if not reducer: newopts["numreducetasks"] = "0" key,delindexes = None,[] for index,(key,value) in enumerate(opts): @@ -108,10 +100,10 @@ def run(self): if index != 0: newopts["input"] = "%s-%i" % (scratch,index-1) newopts["delinputs"] = "yes" - newopts["inputformat"] = "coded" + newopts["inputformat"] = "sequencefile" if index != len(self.iters)-1: newopts["output"] = "%s-%i" % (scratch,index) - newopts["outputformat"] = "coded" + newopts["outputformat"] = "sequencefile" kwargs["iter"],kwargs["newopts"] = index,newopts run(*args,**kwargs) @@ -241,34 +233,40 @@ def startonunix(prog,opts,python): cat = "pv -cN source" mpv,spv,rpv = "| pv -cN map ","| pv -cN sort ","| pv -cN reduce " else: cat,mpv,spv,rpv = "cat","","","" - encodepipe = "| %s -m dumbo encodepipe " % python - if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes': - print >>sys.stderr,"WARNING: the added filenames might be incorrect" - encodepipe += "-addfilename %s " % addedopts["input"][0] - decodepipe = "| %s -m dumbo decodepipe " % python - if addedopts["inputformat"] and addedopts["inputformat"][0] == "coded": - encodepipe = "" - if addedopts["outputformat"] and addedopts["outputformat"][0] == "coded": - decodepipe = "" + encodepipe = "" + if addedopts["inputformat"] and addedopts["inputformat"][0] == "text": + encodepipe = "| %s -m dumbo encodepipe " % python + if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes': + print >>sys.stderr,"WARNING: the added filenames might be incorrect" + encodepipe += "-addfilename %s " % addedopts["input"][0] if addedopts["numreducetasks"] and addedopts["numreducetasks"][0] == "0": retval = execute("%s %s %s| %s %s %s %s > '%s'" % \ (cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv,output)) else: retval = execute("%s %s %s| %s %s %s %s| LC_ALL=C " \ - "sort %s| %s %s %s %s%s> '%s'" % \ + "sort %s| %s %s %s %s> '%s'" % \ (cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv, - spv,pyenv,cmdenv,reducer,decodepipe,rpv,output)) + spv,pyenv,cmdenv,reducer,rpv,output)) if addedopts["delinputs"] and addedopts["delinputs"][0] == "yes": for file in addedopts["input"]: execute("rm " + file) return retval def startonstreaming(prog,opts,hadoop): opts += configopts("streaming",prog,opts) + opts.append(("file",prog)) + opts.append(("file",sys.argv[0])) addedopts = getopts(opts,["name","delinputs","libegg","libjar", "inputformat","outputformat","nummaptasks","numreducetasks", "priority","cachefile","cachearchive","codewritable","addfilename"]) - opts.append(("file",prog)) - opts.append(("file",sys.argv[0])) + streamingjar,dumbojar = findjar(hadoop,"streaming"),findjar(hadoop,"dumbo") + if not streamingjar: + print >>sys.stderr,"ERROR: Streaming jar not found" + return 1 + if not dumbojar: + print >>sys.stderr,"ERROR: Dumbo jar not found" + return 1 + addedopts["libjar"].append(dumbojar) + dumbopkg = "org.apache.hadoop.dumbo" if not addedopts["name"]: opts.append(("jobconf","mapred.job.name=" + prog.split("/")[-1])) else: opts.append(("jobconf","mapred.job.name=%s" % addedopts["name"][0])) @@ -277,48 +275,37 @@ def startonstreaming(prog,opts,hadoop): if addedopts["numreducetasks"]: numreducetasks = int(addedopts["numreducetasks"][0]) opts.append(("numReduceTasks",str(numreducetasks))) - if numreducetasks == 0: addedopts["codewritable"] = ['no'] + if numreducetasks == 0: + opts.append(("jobconf", + "mapred.mapoutput.key.class=%s.CodeWritable" % dumbopkg)) + opts.append(("jobconf", + "mapred.mapoutput.value.class=%s.CodeWritable" % dumbopkg)) + addedopts["codewritable"] = ['no'] if addedopts["priority"]: opts.append(("jobconf", "mapred.job.priority=%s" % addedopts["priority"][0])) if addedopts["cachefile"]: opts.append(("cacheFile", addedopts["cachefile"][0])) if addedopts["cachearchive"]: opts.append(("cacheArchive", addedopts["cachearchive"][0])) - streamingjar,dumbojar = findjar(hadoop,"streaming"),findjar(hadoop,"dumbo") - if not streamingjar: - print >>sys.stderr,"ERROR: Streaming jar not found" - return 1 - if not dumbojar: - print >>sys.stderr,"ERROR: Dumbo jar not found" - return 1 - addedopts["libjar"].append(dumbojar) - if not addedopts["inputformat"]: addedopts["inputformat"].append("text") - if not addedopts["outputformat"]: addedopts["outputformat"].append("text") - dumbopkg = "org.apache.hadoop.dumbo" + if not addedopts["inputformat"]: addedopts["inputformat"] = ["sequencefile"] inputformat_shortcuts = { "text": "org.apache.hadoop.mapred.TextInputFormat", - "sequencefile": "org.apache.hadoop.mapred.SequenceFileInputFormat", - "coded": "org.apache.hadoop.mapred.SequenceFileInputFormat"} + "sequencefile": "org.apache.hadoop.mapred.SequenceFileInputFormat"} inputformat_shortcuts.update(configopts("inputformats",prog)) - if addedopts["inputformat"] and addedopts["inputformat"][0]: - inputformat = addedopts["inputformat"][0] - if inputformat_shortcuts.has_key(inputformat.lower()): - inputformat = inputformat_shortcuts[inputformat.lower()] - opts.append(("jobconf", - "dumbo.as.code.input.format.class=" + inputformat)) - opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat")) + inputformat = addedopts["inputformat"][0] + if inputformat_shortcuts.has_key(inputformat.lower()): + inputformat = inputformat_shortcuts[inputformat.lower()] + opts.append(("jobconf","dumbo.as.code.input.format.class=" + inputformat)) + opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat")) + if not addedopts["outputformat"]: addedopts["outputformat"] = ["sequencefile"] outputformat_shortcuts = { - "text": "org.apache.hadoop.mapred.TextOutputFormat", - "sequencefile": "org.apache.hadoop.mapred.SequenceFileOutputFormat", - "coded": "org.apache.hadoop.mapred.SequenceFileOutputFormat"} + "sequencefile": "org.apache.hadoop.mapred.SequenceFileOutputFormat"} outputformat_shortcuts.update(configopts("outputformats",prog)) - if addedopts["outputformat"] and addedopts["outputformat"][0]: - outputformat = addedopts["outputformat"][0] - if outputformat_shortcuts.has_key(outputformat.lower()): - outputformat = outputformat_shortcuts[outputformat.lower()] - opts.append(("jobconf", - "dumbo.from.code.output.format.class=" + outputformat)) - opts.append(("outputformat",dumbopkg + ".FromCodeOutputFormat")) + outputformat = addedopts["outputformat"][0] + if outputformat_shortcuts.has_key(outputformat.lower()): + outputformat = outputformat_shortcuts[outputformat.lower()] + opts.append(("jobconf","dumbo.from.code.output.format.class=" + outputformat)) + opts.append(("outputformat",dumbopkg + ".FromCodeOutputFormat")) if not (addedopts["codewritable"] and addedopts["codewritable"][0] == 'no'): opts.append(("jobconf", "mapred.mapoutput.key.class=%s.CodeWritable" % dumbopkg)) @@ -346,15 +333,13 @@ def startonstreaming(prog,opts,hadoop): def cat(path,opts): addedopts = getopts(opts,["hadoop","type","libjar"]) - if not addedopts["hadoop"]: - print >>sys.stderr,"ERROR: Hadoop dir not specified" - return 1 + if not addedopts["hadoop"]: return decodepipe(opts + [("path",path)]) hadoop = addedopts["hadoop"][0] dumbojar = findjar(hadoop,"dumbo") if not dumbojar: print >>sys.stderr,"ERROR: Dumbo jar not found" return 1 - if not addedopts["type"]: type = "text" + if not addedopts["type"]: type = "sequencefile" else: type = addedopts["type"][0] hadenv = envdef("HADOOP_CLASSPATH",addedopts["libjar"]) try: @@ -369,17 +354,24 @@ def cat(path,opts): except IOError: pass # ignore return 0 -def encodepipe(opts): - addedopts,filename = getopts(opts,["addfilename"]),None +def encodepipe(opts=[]): + addedopts,filename = getopts(opts,["addfilename","path"]),None if addedopts["addfilename"]: filename = addedopts["addfilename"][0] - outputs = loadtext(line[:-1] for line in sys.stdin) + if addedopts["path"]: file = open(addedopts["path"][0]) + else: file = sys.stdin + outputs = loadtext(line[:-1] for line in file) if filename: outputs = (((filename,key),value) for key,value in outputs) for output in dumpcode(outputs): print "\t".join(output) + file.close() return 0 -def decodepipe(opts): - outputs = loadcode(line[:-1] for line in sys.stdin) +def decodepipe(opts=[]): + addedopts = getopts(opts,["path"]) + if addedopts["path"]: file = open(addedopts["path"][0]) + else: file = sys.stdin + outputs = loadcode(line[:-1] for line in file) for output in dumptext(outputs): print "\t".join(output) + file.close() return 0 if __name__ == "__main__": @@ -391,11 +383,16 @@ def decodepipe(opts): print " python -m dumbo encodepipe []" print " python -m dumbo decodepipe []" sys.exit(1) - if sys.argv[1] == "submit": retval = submit(sys.argv[2],parseargs(sys.argv[2:])) - elif sys.argv[1] == "start": retval = start(sys.argv[2],parseargs(sys.argv[2:])) - elif sys.argv[1] == "cat": retval = cat(sys.argv[2],parseargs(sys.argv[2:])) - elif sys.argv[1] == "encodepipe": retval = encodepipe(parseargs(sys.argv[2:])) - elif sys.argv[1] == "decodepipe": retval = decodepipe(parseargs(sys.argv[2:])) + if sys.argv[1] == "submit": + retval = submit(sys.argv[2],parseargs(sys.argv[2:])) + elif sys.argv[1] == "start": + retval = start(sys.argv[2],parseargs(sys.argv[2:])) + elif sys.argv[1] == "cat": + retval = cat(sys.argv[2],parseargs(sys.argv[2:])) + elif sys.argv[1] == "encodepipe": + retval = encodepipe(parseargs(sys.argv[2:])) + elif sys.argv[1] == "decodepipe": + retval = decodepipe(parseargs(sys.argv[2:])) else: print >>sys.stderr,"WARNING: the command 'python -m dumbo ' is " \ "deprecated, use 'python ' instead"