Skip to content
Browse files

cool syntax updates from Valery V. Vorotyntsev: more idiomatic bash, …

…uses super-clever bash globbing, much shorter
  • Loading branch information...
1 parent 16d14c8 commit 71619384b2ab07ff61443a4ca54591b03c44dce0 @erikfrey committed
Showing with 52 additions and 86 deletions.
  1. +52 −86 br
View
138 br
@@ -2,25 +2,26 @@
# bashreduce: mapreduce in bash
# erik@fawx.com
-function usage() {
- printf "Usage: %s: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]\n" `basename $1`
- printf " %s -h for help.\n" `basename $1`
- exit 2
+usage() {
+ local prog="`basename $1`"
+ echo "Usage: $prog [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]"
+ echo " $prog -h for help."
+ exit 2
}
-function showhelp() {
- printf "Usage: %s: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]\n" `basename $1`
- echo "bashreduce. Map an input file to many hosts, sort/reduce, merge"
- echo " -m: hosts to use, can repeat hosts for multiple cores"
- echo " default hosts from /etc/br.hosts"
- echo " -c: column to partition, default = 1 (1-based)"
- echo " -r: reduce function, default = identity"
- echo " -i: input file, default = stdin"
- echo " -o: output file, default = stdout"
- echo " -t: tmp dir to use, default = /tmp"
- echo " -S: memory to use for sort, default = 256M"
- echo " -h: this help message"
- exit 2
+showhelp() {
+ echo "Usage: `basename $1`: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]"
+ echo "bashreduce. Map an input file to many hosts, sort/reduce, merge"
+ echo " -m: hosts to use, can repeat hosts for multiple cores"
+ echo " default hosts from /etc/br.hosts"
+ echo " -c: column to partition, default = 1 (1-based)"
+ echo " -r: reduce function, default = identity"
+ echo " -i: input file, default = stdin"
+ echo " -o: output file, default = stdout"
+ echo " -t: tmp dir to use, default = /tmp"
+ echo " -S: memory to use for sort, default = 256M"
+ echo " -h: this help message"
+ exit 2
}
hosts=
@@ -31,8 +32,7 @@ output=
tmp_dir=/tmp
sort_mem=256M
-while getopts "m:c:r:i:o:t:S:h" name
-do
+while getopts "m:c:r:i:o:t:S:h" name; do
case $name in
m) hosts=$OPTARG;;
c) mapcolumn=$OPTARG;;
@@ -46,113 +46,79 @@ do
esac
done
-if [[ -z $hosts ]]
-then
- if [[ -e /etc/br.hosts ]]
- then
+if [ -z $hosts ]; then
+ if [ -e /etc/br.hosts ]; then
hosts=`cat /etc/br.hosts`
else
- printf "%s: must specify hosts with -m or provide /etc/br.hosts\n" `basename $0`
+ echo "`basename $0`: must specify hosts with -m or provide /etc/br.hosts"
usage $0
fi
fi
-if [[ ! -z $reduce ]]
-then
- reduce="| "$reduce
-fi
+# if we have a reduce, add the pipe explicitly
+[ -n "$reduce" ] && reduce="| $reduce 2>/dev/null"
# okay let's get started! first we need a name for our job
-jobid=`uuidgen`
+jobid="`uuidgen`"
jobpath="$tmp_dir/br_job_$jobid"
nodepath="$tmp_dir/br_node_$jobid"
-mkdir $jobpath
-mkdir $jobpath/in
-mkdir $jobpath/out
+mkdir -p $jobpath/{in,out}
# now, for each host, set up in and out fifos (and a netcat for each), and ssh to each host to set up workers listening on netcat
port_in=8192
-port_out=`expr $port_in + 1`
+port_out=$(($port_in + 1))
host_idx=0
out_files=
-for host in $hosts
-do
+for host in $hosts; do
# our named pipes
- mkfifo $jobpath/in/$host_idx
- mkfifo $jobpath/out/$host_idx
+ mkfifo $jobpath/{in,out}/$host_idx
# lets get the pid of our listener
ssh -n $host "mkdir -p $nodepath"
- pid=`ssh -n $host "nc -l -p $port_out > $nodepath/in_$host_idx 2> /dev/null < /dev/null & jobs -l" | awk {'print $2'}`
- ssh $host -n "tail -s0.1 -f --pid=$pid $nodepath/in_$host_idx 2> /dev/null < /dev/null | LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp_dir -k$mapcolumn,$mapcolumn $reduce 2>/dev/null | nc -q0 -l -p $port_in >& /dev/null &"
+ pid=$(ssh -n $host "nc -l -p $port_out >$nodepath/in_$host_idx 2>/dev/null </dev/null & jobs -l" | awk {'print $2'})
+ ssh $host -n "tail -s0.1 -f --pid=$pid $nodepath/in_$host_idx 2>/dev/null </dev/null | LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp_dir -k$mapcolumn,$mapcolumn 2>/dev/null $reduce | nc -q0 -l -p $port_in >&/dev/null &"
# our local forwarders
- nc $host $port_in > $jobpath/in/$host_idx &
- nc -q0 $host $port_out < $jobpath/out/$host_idx &
+ nc $host $port_in >$jobpath/in/$host_idx &
+ nc -q0 $host $port_out <$jobpath/out/$host_idx &
# our vars
out_files="$out_files $jobpath/out/$host_idx"
- port_in=`expr $port_in + 2`
- port_out=`expr $port_in + 1`
- host_idx=`expr $host_idx + 1`
+ port_in=$(($port_in + 2))
+ port_out=$(($port_in + 1))
+ host_idx=$(($host_idx + 1))
done
# okay, time to map
-if [[ -z `which brp` ]]
-then
+if which brp >/dev/null; then
+ eval "${input:+pv $input |} brp - $(($mapcolumn - 1)) $out_files"
+else
# use awk if we don't have brp
# we're taking advantage of a special property that awk leaves its file handles open until its done
# i think this is universal
# we're also sending a zero length string to all the handles at the end, in case some pipe got no love
- mapfunction="{
- srand(\$$mapcolumn);
- print \$0 >> \"$jobpath/out/\"int(rand()*$host_idx);
- }
- END {
- for (i = 0; i != $host_idx; ++i)
- {
- printf \"\" >> \"$jobpath/out/\"i
- }
- }"
- if [[ -z $input ]]
- then
- awk "$mapfunction"
- else
- pv $input | awk "$mapfunction"
- fi
-else
- if [[ -z $input ]]
- then
- brp - `expr $mapcolumn - 1` $out_files
- else
- pv $input | brp - `expr $mapcolumn - 1` $out_files
- fi
+ eval "${input:+pv $input |} awk '{
+ srand(\$$mapcolumn);
+ print \$0 >>\"$jobpath/out/\"int(rand() * $host_idx);
+ }
+ END {
+ for (i = 0; i != $host_idx; ++i)
+ printf \"\" >>\"$jobpath/out/\"i;
+ }'"
fi
# save it somewhere
-if [[ -z `which brm` ]]
-then
+if which brm >/dev/null; then
+ eval "brm - $(($mapcolumn - 1)) `find $jobpath/in/ -type p | xargs` ${output:+| pv >$output}"
+else
# use sort -m if we don't have brm
# sort -m creates tmp files if too many input files are specified
# brm doesn't do this
- if [[ -z $output ]]
- then
- sort -k$mapcolumn,$mapcolumn -m $jobpath/in/*
- else
- sort -k$mapcolumn,$mapcolumn -m $jobpath/in/* | pv > $output
- fi
-else
- if [[ -z $output ]]
- then
- brm - `expr $mapcolumn - 1` `find $jobpath/in/ -type p | xargs`
- else
- brm - `expr $mapcolumn - 1` `find $jobpath/in/ -type p | xargs` | pv > $output
- fi
+ eval "sort -k$mapcolumn,$mapcolumn -m $jobpath/in/* ${output:+| pv >$output}"
fi
# finally, clean up after ourselves
rm -rf $jobpath
-for host in $hosts
-do
+for host in $hosts; do
ssh $host "rm -rf $nodepath"
done

0 comments on commit 7161938

Please sign in to comment.
Something went wrong with that request. Please try again.