#!/bin/bash
# bashreduce: mapreduce in bash
# erik@fawx.com
usage() {
echo "Usage: $1 [-h '<host>[ <host>][...]'] [-f]" >&2
echo " [-m <map>] [-r <reduce>] [-M <merge>]" >&2
echo " [-i <input>] [-o <output>] [-c <column>]" \
"[-S <sort-mem-MB>]" >&2
echo " [-t <tmp>] [-?]" >&2
if [ -n "$2" ] ; then
echo " -h hosts to use; repeat hosts for multiple cores" >&2
echo " (defaults to contents of /etc/br.hosts)" >&2
echo " -f send filenames, not data, over the network," >&2
echo " implies each host has a mirror of the dataset" >&2
echo " -m map program (effectively defaults to cat)" >&2
echo " -r reduce program" >&2
echo " -M merge program" >&2
echo " -i input file or directory (defaults to stdin)" >&2
echo " -o output file (defaults to stdout)" >&2
echo " -c column used by sort (defaults to 1)" >&2
echo " -S memory to use for sort (defaults to 256M)" >&2
echo " -t tmp directory (defaults to /tmp)" >&2
echo " -? this help message" >&2
fi
exit 2
}
# Defaults
hosts=
filenames=false
map=
reduce=
merge=
input=
output=
column=1
sort_mem=256M
tmp=/tmp
program=$(basename $0)
while getopts "h:fm:r:M:i:o:c:S:t:?" name; do
case "$name" in
h) hosts=$OPTARG;;
f) filenames=true;;
m) map=$OPTARG;;
r) reduce=$OPTARG;;
M) merge=$OPTARG;;
i) input=$OPTARG;;
o) output=$OPTARG;;
c) column=$OPTARG;;
S) sort_mem=$OPTARG;;
t) tmp=$OPTARG;;
?) usage $program MOAR;;
*) usage $program;;
esac
done
# If -h wasn't given, try /etc/br.hosts
if [[ -z "$hosts" ]]; then
if [[ -e /etc/br.hosts ]]; then
hosts=$(cat /etc/br.hosts)
else
echo "$program: must specify -h or provide /etc/br.hosts"
usage $program
fi
fi
# Start br_stderr from a clean slate
cp /dev/null $tmp/br_stderr
# Setup map and reduce as parts of a pipeline
[[ -n "$map" ]] && map="| $map 2>>$tmp/br_stderr"
[[ $filenames == true ]] && map="| xargs -n1 \
sh -c 'zcat \$0 2>>$tmp/br_stderr || cat \$0 2>>$tmp/br_stderr' $map"
[[ -n "$reduce" ]] && reduce="| $reduce 2>>$tmp/br_stderr"
jobid="$(uuidgen)"
jobpath="$tmp/br_job_$jobid"
nodepath="$tmp/br_node_$jobid"
mkdir -p $jobpath/{in,out}
port_in=8192
port_out=$(($port_in + 1))
host_idx=0
out_files=
for host in $hosts; do
mkfifo $jobpath/{in,out}/$host_idx
# Listen for work (remote)
ssh -n $host "mkdir -p $nodepath/"
pid=$(ssh -n $host "nc -l -p $port_out >$nodepath/$host_idx \
2>>$tmp/br_stderr </dev/null & jobs -l" \
| awk {'print $2'})
# Do work (remote)
ssh -n $host "tail -s0.1 -f --pid=$pid $nodepath/$host_idx \
2>>$tmp/br_stderr </dev/null \
| LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp -k$column,$column \
2>>$tmp/br_stderr \
$map $reduce \
| nc -q0 -l -p $port_in >>$tmp/br_stderr &"
# Send work (local)
nc $host $port_in >$jobpath/in/$host_idx &
# Receive results (local)
nc -q0 $host $port_out <$jobpath/out/$host_idx &
out_files="$out_files $jobpath/out/$host_idx"
# ++i
port_in=$(($port_in + 2))
port_out=$(($port_in + 1))
host_idx=$(($host_idx + 1))
done
# Create the command to produce input
if [[ -d "$input" ]]; then
input="find $input -type f |"
[[ $filenames == false ]] && input="$input xargs -n1 \
sh -c 'zcat \$0 2>>$tmp/br_stderr || cat \$0 2>>$tmp/br_stderr' |"
elif [[ -f "$input" ]]; then
input="sh -c 'zcat $input 2>>$tmp/br_stderr \
|| cat $input 2>>$tmp/br_stderr' |"
else
input=
fi
# Partition local input to the remote workers
if which brp >>$tmp/br_stderr; then
BRP=brp
elif [[ -f brutils/brp ]]; then
BRP=brutils/brp
fi
if [[ -n "$BRP" ]]; then
eval "$input $BRP - $(($column - 1)) $out_files"
else
# use awk if we don't have brp
# we're taking advantage of a special property that awk leaves its file handles open until its done
# i think this is universal
# we're also sending a zero length string to all the handles at the end, in case some pipe got no love
eval "$input awk '{
srand(\$$column);
print \$0 >>\"$jobpath/out/\"int(rand() * $host_idx);
}
END {
for (i = 0; i != $host_idx; ++i)
printf \"\" >>\"$jobpath/out/\"i;
}'"
fi
# Merge output from hosts into one
# Maybe use the -M program, if not just sort (preferring brm)
if which brm >>$tmp/br_stderr; then
BRM=brm
elif [[ -f brutils/brm ]]; then
BRM=brutils/brm
fi
if [[ -n "$merge" ]]; then
eval "find $jobpath/in -type p | xargs cat \
| $merge 2>>$tmp/br_stderr ${output:+| pv >$output}"
else
if [[ -n "$BRM" ]]; then
eval "$BRM - $(($column - 1)) $(find $jobpath/in/ -type p | xargs) \
${output:+| pv >$output}"
else
# use sort -m if we don't have brm
# sort -m creates tmp files if too many input files are specified
# brm doesn't do this
eval "sort -k$column,$column -S$sort_mem -m $jobpath/in/* \
${output:+| pv >$output}"
fi
fi
# Cleanup
rm -rf $jobpath
for host in $hosts; do
ssh $host "rm -rf $nodepath"
done
# TODO: is there a safe way to kill subprocesses upon fail?
# this seems to work: /bin/kill -- -$$