Skip to content

Commit

Permalink
more 0.20 work
Browse files Browse the repository at this point in the history
  • Loading branch information
Klaas Bosteels authored and Klaas Bosteels committed Nov 28, 2008
1 parent 0cb3ccc commit 06a1572
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 76 deletions.
2 changes: 1 addition & 1 deletion README
Expand Up @@ -31,7 +31,7 @@ USAGE
python examples/wordcount.py -hadoop /path/to/hadoop \
-file excludes.txt -input brian.txt -output brian-wc

/usr/local/hadoop/bin/hadoop dfs -getmerge brian-wc brian-wc.txt
python -m dumbo cat brian-wc > brian-wc.txt


MORE INFO
Expand Down
4 changes: 2 additions & 2 deletions examples/itertwice.py
Expand Up @@ -2,11 +2,11 @@
Example of two iterations in one Dumbo program:
>>> import dumbo
>>> opts = [('input','brian.txt'),('output','counts.txt')]
>>> opts = [('input','brian.txt'),('output','counts.txt'),('inputformat','text')]
>>> logfile = open('log.txt','a')
>>> dumbo.submit('itertwice.py',opts,stdout=logfile,stderr=logfile)
0
>>> output = dict(line[:-1].split('\\t') for line in open('counts.txt'))
>>> output = dict(dumbo.loadcode(open('counts.txt')))
>>> int(output['e'])
14
"""
Expand Down
4 changes: 2 additions & 2 deletions examples/wordcount.py
Expand Up @@ -2,11 +2,11 @@
Counts how many times each non-excluded word occurs:
>>> import dumbo
>>> opts = [('input','brian.txt'),('output','counts.txt')]
>>> opts = [('input','brian.txt'),('output','counts.txt'),('inputformat','text')]
>>> logfile = open('log.txt','a')
>>> dumbo.submit('wordcount.py',opts,stdout=logfile,stderr=logfile)
0
>>> output = dict(line[:-1].split('\\t') for line in open('counts.txt'))
>>> output = dict(dumbo.loadcode(open('counts.txt')))
>>> int(output['Brian'])
6
"""
Expand Down
139 changes: 68 additions & 71 deletions src/python/dumbo.py
Expand Up @@ -69,23 +69,15 @@ def run(mapper,reducer=None,combiner=None,
if mapconf: mapconf()
outputs = itermap(inputs,mapper)
if combiner: outputs = iterreduce(sorted(outputs),combiner)
if reducer: outputs = dumpcode(outputs)
else: outputs = dumptext(outputs)
if mapclose: mapclose()
elif reducer:
if redconf: redconf()
outputs = dumpcode(iterreduce(inputs,reducer))
outputs = iterreduce(inputs,reducer)
if redclose: redclose()
else: outputs = dumptext(inputs)
for output in outputs: print "\t".join(output)
else: outputs = inputs
for output in dumpcode(outputs): print "\t".join(output)
else:
opts = parseargs(sys.argv[1:]) + [("iteration","%i" % iter)]
if hasattr(mapper,'coded'):
print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated"
if mapper.coded: newopts["inputformat"] = "coded"
if hasattr(reducer,'coded'):
print >>sys.stderr,"WARNING: the '.coded' attribute is deprecated"
if reducer.coded: newopts["outputformat"] = "coded"
if not reducer: newopts["numreducetasks"] = "0"
key,delindexes = None,[]
for index,(key,value) in enumerate(opts):
Expand All @@ -108,10 +100,10 @@ def run(self):
if index != 0:
newopts["input"] = "%s-%i" % (scratch,index-1)
newopts["delinputs"] = "yes"
newopts["inputformat"] = "coded"
newopts["inputformat"] = "sequencefile"
if index != len(self.iters)-1:
newopts["output"] = "%s-%i" % (scratch,index)
newopts["outputformat"] = "coded"
newopts["outputformat"] = "sequencefile"
kwargs["iter"],kwargs["newopts"] = index,newopts
run(*args,**kwargs)

Expand Down Expand Up @@ -241,34 +233,40 @@ def startonunix(prog,opts,python):
cat = "pv -cN source"
mpv,spv,rpv = "| pv -cN map ","| pv -cN sort ","| pv -cN reduce "
else: cat,mpv,spv,rpv = "cat","","",""
encodepipe = "| %s -m dumbo encodepipe " % python
if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes':
print >>sys.stderr,"WARNING: the added filenames might be incorrect"
encodepipe += "-addfilename %s " % addedopts["input"][0]
decodepipe = "| %s -m dumbo decodepipe " % python
if addedopts["inputformat"] and addedopts["inputformat"][0] == "coded":
encodepipe = ""
if addedopts["outputformat"] and addedopts["outputformat"][0] == "coded":
decodepipe = ""
encodepipe = ""
if addedopts["inputformat"] and addedopts["inputformat"][0] == "text":
encodepipe = "| %s -m dumbo encodepipe " % python
if addedopts["addfilename"] and addedopts["addfilename"][0] == 'yes':
print >>sys.stderr,"WARNING: the added filenames might be incorrect"
encodepipe += "-addfilename %s " % addedopts["input"][0]
if addedopts["numreducetasks"] and addedopts["numreducetasks"][0] == "0":
retval = execute("%s %s %s| %s %s %s %s > '%s'" % \
(cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv,output))
else:
retval = execute("%s %s %s| %s %s %s %s| LC_ALL=C " \
"sort %s| %s %s %s %s%s> '%s'" % \
"sort %s| %s %s %s %s> '%s'" % \
(cat,inputs,encodepipe,pyenv,cmdenv,mapper,mpv,
spv,pyenv,cmdenv,reducer,decodepipe,rpv,output))
spv,pyenv,cmdenv,reducer,rpv,output))
if addedopts["delinputs"] and addedopts["delinputs"][0] == "yes":
for file in addedopts["input"]: execute("rm " + file)
return retval

def startonstreaming(prog,opts,hadoop):
opts += configopts("streaming",prog,opts)
opts.append(("file",prog))
opts.append(("file",sys.argv[0]))
addedopts = getopts(opts,["name","delinputs","libegg","libjar",
"inputformat","outputformat","nummaptasks","numreducetasks",
"priority","cachefile","cachearchive","codewritable","addfilename"])
opts.append(("file",prog))
opts.append(("file",sys.argv[0]))
streamingjar,dumbojar = findjar(hadoop,"streaming"),findjar(hadoop,"dumbo")
if not streamingjar:
print >>sys.stderr,"ERROR: Streaming jar not found"
return 1
if not dumbojar:
print >>sys.stderr,"ERROR: Dumbo jar not found"
return 1
addedopts["libjar"].append(dumbojar)
dumbopkg = "org.apache.hadoop.dumbo"
if not addedopts["name"]:
opts.append(("jobconf","mapred.job.name=" + prog.split("/")[-1]))
else: opts.append(("jobconf","mapred.job.name=%s" % addedopts["name"][0]))
Expand All @@ -277,48 +275,37 @@ def startonstreaming(prog,opts,hadoop):
if addedopts["numreducetasks"]:
numreducetasks = int(addedopts["numreducetasks"][0])
opts.append(("numReduceTasks",str(numreducetasks)))
if numreducetasks == 0: addedopts["codewritable"] = ['no']
if numreducetasks == 0:
opts.append(("jobconf",
"mapred.mapoutput.key.class=%s.CodeWritable" % dumbopkg))
opts.append(("jobconf",
"mapred.mapoutput.value.class=%s.CodeWritable" % dumbopkg))
addedopts["codewritable"] = ['no']
if addedopts["priority"]: opts.append(("jobconf",
"mapred.job.priority=%s" % addedopts["priority"][0]))
if addedopts["cachefile"]: opts.append(("cacheFile",
addedopts["cachefile"][0]))
if addedopts["cachearchive"]: opts.append(("cacheArchive",
addedopts["cachearchive"][0]))
streamingjar,dumbojar = findjar(hadoop,"streaming"),findjar(hadoop,"dumbo")
if not streamingjar:
print >>sys.stderr,"ERROR: Streaming jar not found"
return 1
if not dumbojar:
print >>sys.stderr,"ERROR: Dumbo jar not found"
return 1
addedopts["libjar"].append(dumbojar)
if not addedopts["inputformat"]: addedopts["inputformat"].append("text")
if not addedopts["outputformat"]: addedopts["outputformat"].append("text")
dumbopkg = "org.apache.hadoop.dumbo"
if not addedopts["inputformat"]: addedopts["inputformat"] = ["sequencefile"]
inputformat_shortcuts = {
"text": "org.apache.hadoop.mapred.TextInputFormat",
"sequencefile": "org.apache.hadoop.mapred.SequenceFileInputFormat",
"coded": "org.apache.hadoop.mapred.SequenceFileInputFormat"}
"sequencefile": "org.apache.hadoop.mapred.SequenceFileInputFormat"}
inputformat_shortcuts.update(configopts("inputformats",prog))
if addedopts["inputformat"] and addedopts["inputformat"][0]:
inputformat = addedopts["inputformat"][0]
if inputformat_shortcuts.has_key(inputformat.lower()):
inputformat = inputformat_shortcuts[inputformat.lower()]
opts.append(("jobconf",
"dumbo.as.code.input.format.class=" + inputformat))
opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat"))
inputformat = addedopts["inputformat"][0]
if inputformat_shortcuts.has_key(inputformat.lower()):
inputformat = inputformat_shortcuts[inputformat.lower()]
opts.append(("jobconf","dumbo.as.code.input.format.class=" + inputformat))
opts.append(("inputformat",dumbopkg + ".AsCodeInputFormat"))
if not addedopts["outputformat"]: addedopts["outputformat"] = ["sequencefile"]
outputformat_shortcuts = {
"text": "org.apache.hadoop.mapred.TextOutputFormat",
"sequencefile": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
"coded": "org.apache.hadoop.mapred.SequenceFileOutputFormat"}
"sequencefile": "org.apache.hadoop.mapred.SequenceFileOutputFormat"}
outputformat_shortcuts.update(configopts("outputformats",prog))
if addedopts["outputformat"] and addedopts["outputformat"][0]:
outputformat = addedopts["outputformat"][0]
if outputformat_shortcuts.has_key(outputformat.lower()):
outputformat = outputformat_shortcuts[outputformat.lower()]
opts.append(("jobconf",
"dumbo.from.code.output.format.class=" + outputformat))
opts.append(("outputformat",dumbopkg + ".FromCodeOutputFormat"))
outputformat = addedopts["outputformat"][0]
if outputformat_shortcuts.has_key(outputformat.lower()):
outputformat = outputformat_shortcuts[outputformat.lower()]
opts.append(("jobconf","dumbo.from.code.output.format.class=" + outputformat))
opts.append(("outputformat",dumbopkg + ".FromCodeOutputFormat"))
if not (addedopts["codewritable"] and addedopts["codewritable"][0] == 'no'):
opts.append(("jobconf",
"mapred.mapoutput.key.class=%s.CodeWritable" % dumbopkg))
Expand Down Expand Up @@ -346,15 +333,13 @@ def startonstreaming(prog,opts,hadoop):

def cat(path,opts):
addedopts = getopts(opts,["hadoop","type","libjar"])
if not addedopts["hadoop"]:
print >>sys.stderr,"ERROR: Hadoop dir not specified"
return 1
if not addedopts["hadoop"]: return decodepipe(opts + [("path",path)])
hadoop = addedopts["hadoop"][0]
dumbojar = findjar(hadoop,"dumbo")
if not dumbojar:
print >>sys.stderr,"ERROR: Dumbo jar not found"
return 1
if not addedopts["type"]: type = "text"
if not addedopts["type"]: type = "sequencefile"
else: type = addedopts["type"][0]
hadenv = envdef("HADOOP_CLASSPATH",addedopts["libjar"])
try:
Expand All @@ -369,17 +354,24 @@ def cat(path,opts):
except IOError: pass # ignore
return 0

def encodepipe(opts):
addedopts,filename = getopts(opts,["addfilename"]),None
def encodepipe(opts=[]):
addedopts,filename = getopts(opts,["addfilename","path"]),None
if addedopts["addfilename"]: filename = addedopts["addfilename"][0]
outputs = loadtext(line[:-1] for line in sys.stdin)
if addedopts["path"]: file = open(addedopts["path"][0])
else: file = sys.stdin
outputs = loadtext(line[:-1] for line in file)
if filename: outputs = (((filename,key),value) for key,value in outputs)
for output in dumpcode(outputs): print "\t".join(output)
file.close()
return 0

def decodepipe(opts):
outputs = loadcode(line[:-1] for line in sys.stdin)
def decodepipe(opts=[]):
addedopts = getopts(opts,["path"])
if addedopts["path"]: file = open(addedopts["path"][0])
else: file = sys.stdin
outputs = loadcode(line[:-1] for line in file)
for output in dumptext(outputs): print "\t".join(output)
file.close()
return 0

if __name__ == "__main__":
Expand All @@ -391,11 +383,16 @@ def decodepipe(opts):
print " python -m dumbo encodepipe [<options>]"
print " python -m dumbo decodepipe [<options>]"
sys.exit(1)
if sys.argv[1] == "submit": retval = submit(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "start": retval = start(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "cat": retval = cat(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "encodepipe": retval = encodepipe(parseargs(sys.argv[2:]))
elif sys.argv[1] == "decodepipe": retval = decodepipe(parseargs(sys.argv[2:]))
if sys.argv[1] == "submit":
retval = submit(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "start":
retval = start(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "cat":
retval = cat(sys.argv[2],parseargs(sys.argv[2:]))
elif sys.argv[1] == "encodepipe":
retval = encodepipe(parseargs(sys.argv[2:]))
elif sys.argv[1] == "decodepipe":
retval = decodepipe(parseargs(sys.argv[2:]))
else:
print >>sys.stderr,"WARNING: the command 'python -m dumbo <prog>' is " \
"deprecated, use 'python <prog>' instead"
Expand Down

0 comments on commit 06a1572

Please sign in to comment.