Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 736e0e19d4
Fetching contributors…

Cannot retrieve contributors at this time

executable file 160 lines (146 sloc) 4.449 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
#!/bin/bash
# bashreduce: mapreduce in bash
# erik@fawx.com

function usage() {
        printf "Usage: %s: [-m host1 [host2...]] [-m map] [-r reduce] [-i input] [-o output]\n" `basename $1`
        printf " %s -h for help.\n" `basename $1`
        exit 2
}

function showhelp() {
        printf "Usage: %s: [-m host1 [host2...]] [-c column] [-r reduce] [-i input] [-o output]\n" `basename $1`
        echo "bashreduce. Map an input file to many hosts, sort/reduce, merge"
        echo " -m: hosts to use, can repeat hosts for multiple cores"
        echo " default hosts from /etc/br.hosts"
        echo " -c: column to partition, default = 1 (1-based)"
        echo " -r: reduce function, default = identity"
        echo " -i: input file, default = stdin"
        echo " -o: output file, default = stdout"
        echo " -t: tmp dir to use, default = /tmp"
        echo " -S: memory to use for sort, default = 256M"
        echo " -h: this help message"
        exit 2
}

hosts=
mapcolumn=1
reduce=
input=
output=
tmp_dir=/tmp
sort_mem=256M

while getopts "m:c:r:i:o:t:S:h" name
do
  case $name in
    m) hosts=$OPTARG;;
    c) mapcolumn=$OPTARG;;
    r) reduce=$OPTARG;;
    i) input=$OPTARG;;
    o) output=$OPTARG;;
    t) tmp_dir=$OPTARG;;
    S) sort_mem=$OPTARG;;
    h) showhelp $0;;
    [?]) usage $0;;
  esac
done

if [[ -z $hosts ]]
then
  if [[ -e /etc/br.hosts ]]
  then
    hosts=`cat /etc/br.hosts`
  else
    printf "%s: must specify hosts with -m or provide /etc/br.hosts\n" `basename $0`
    usage $0
  fi
fi

if [[ ! -z $reduce ]]
then
  reduce="| "$reduce
fi

# okay let's get started! first we need a name for our job
jobid=`uuidgen`
jobpath="$tmp_dir/br_job_$jobid"
nodepath="$tmp_dir/br_node_$jobid"
mkdir $jobpath
mkdir $jobpath/in
mkdir $jobpath/out

# now, for each host, set up in and out fifos (and a netcat for each), and ssh to each host to set up workers listening on netcat

port_in=8192
port_out=`expr $port_in + 1`
host_idx=0
out_files=

for host in $hosts
do
  # our named pipes
  mkfifo $jobpath/in/$host_idx
  mkfifo $jobpath/out/$host_idx
  # lets get the pid of our listener
  ssh -n $host "mkdir -p $nodepath"
  pid=`ssh -n $host "nc -l -p $port_out > $nodepath/in_$host_idx 2> /dev/null < /dev/null & jobs -l" | awk {'print $2'}`
  ssh $host -n "tail -s0.1 -f --pid=$pid $nodepath/in_$host_idx 2> /dev/null < /dev/null | LC_ALL='$LC_ALL' sort -S$sort_mem -T$tmp_dir -k$mapcolumn,$mapcolumn $reduce 2>/dev/null | nc -q0 -l -p $port_in >& /dev/null &"
  # our local forwarders
  nc $host $port_in > $jobpath/in/$host_idx &
  nc -q0 $host $port_out < $jobpath/out/$host_idx &
  # our vars
  out_files="$out_files $jobpath/out/$host_idx"
  port_in=`expr $port_in + 2`
  port_out=`expr $port_in + 1`
  host_idx=`expr $host_idx + 1`
done

# okay, time to map
if [[ -z `which brp` ]]
then
  # use awk if we don't have brp
  # we're taking advantage of a special property that awk leaves its file handles open until its done
  # i think this is universal
  # we're also sending a zero length string to all the handles at the end, in case some pipe got no love
  mapfunction="{
srand(\$$mapcolumn);
print \$0 >> \"$jobpath/out/\"int(rand()*$host_idx);
}
END {
for (i = 0; i != $host_idx; ++i)
{
printf \"\" >> \"$jobpath/out/\"i
}
}"
  if [[ -z $input ]]
  then
    awk "$mapfunction"
  else
    pv $input | awk "$mapfunction"
  fi
else
  if [[ -z $input ]]
  then
    brp - `expr $mapcolumn - 1` $out_files
  else
    pv $input | brp - `expr $mapcolumn - 1` $out_files
  fi
fi

# save it somewhere
if [[ -z `which brm` ]]
then
  # use sort -m if we don't have brm
  # sort -m creates tmp files if too many input files are specified
  # brm doesn't do this
  if [[ -z $output ]]
  then
    sort -k$mapcolumn,$mapcolumn -m $jobpath/in/*
  else
    sort -k$mapcolumn,$mapcolumn -m $jobpath/in/* | pv > $output
  fi
else
  if [[ -z $output ]]
  then
    brm - `expr $mapcolumn - 1` `find $jobpath/in/ -type p | xargs`
  else
    brm - `expr $mapcolumn - 1` `find $jobpath/in/ -type p | xargs` | pv > $output
  fi
fi

# finally, clean up after ourselves
rm -rf $jobpath
for host in $hosts
do
  ssh $host "rm -rf $nodepath"
done

# TODO: is there a safe way to kill subprocesses upon fail?
# this seems to work: /bin/kill -- -$$
Something went wrong with that request. Please try again.