# Map Reduce

In 2003, Ghemawat and Dean wrote a program to greatly simplify the use of parallel and distributed systems, which they have outlined in their [paper.](http://static.googleusercontent.com/media/research.google.com/en/us/archive/mapreduce-osdi04.pdf) Their program was able to take care of the details of partioning the data over numerous computers, scheduling which programs ran at which time and on which machine, managing the necessary between computer communication, and managing what happens when an error occurs. Their program allows everyday programmers to use vast clusters of commodity machines without any particular experience using parallel or distributed systems. 

They were able to accomplish this by restricting what the program user was allowed to do. In fact, the program user is only allowed to supply two functions: a mapper and a reducer.

Even with this restriction, many real world programs are programmable using just map and reduce. To get a feeling for how this works, let us take a look at map and reduce separately.

### Map

To follow along with this tutorial, one should use Python 2.7. If you are unfortunate and somehow you are running Python 3.4 on your home computer, please SSH into almost any linux server and python 2.7 should hopefully be available. 

Map is a pretty straightforward concept, but it was slightly miss-named. A much better name would have been apply, because what we will be doing is applying a function to a list. Let's see how this works.

Suppose you are given a list of numbers like 

```
1 2 3 4 5 6 7 8 9 10 11
```
and you want to square each of the numbers.

What you need to do then is to apply a funtion that squares each number, so that the final output is

```
1 4 9 16 25 36 49 64 81 100 121
```
And, believe it or not, that is essentially all that a map function does. Let's program that up in Python: 


In [2]:
#First, we start off with the list of numbers:
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

#Now, we simply define the function that we want to apply to our list:
def square(x):
    return x*x

#Finally, we apply our function to each member of the list:
map(square, x)

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121]

As you can tell, map is really straight forward. We create a function, and then we apply that function to every element in the data set that we are processing. Let us move on to the Reduce function.

### Reduce

Reduce is only slightly more complicated than map. The way I think of it is as a cummulative adder.

For example, if we want to add all the numbers in a list:
```
1 2 3 4 5 6
```
Then the reducer scans through the numbers and calculates the output in the following manner:
```
1 + 2 = 3
3 + 3 = 6
6 + 4 = 10
10 + 5 = 15
15 + 6 = 21
```
Let's try it in Python to see how it works:

In [4]:
#First, we set up the input:
x = [1, 2, 3, 4, 5, 6]

#Next, we must write the function that we will reduce with
def add(x, y):
    return x + y

#Finally, we simply call reduce:
reduce(add, x)

21

Reducing is not much harder than mapping, but complicated reducers can take time to wrap your head around. Basically a reducer will take a list of numbers and gather them together one-by-one  by using the function supplied to the reducer.

Let's take a look at an easy example to get some practice with what we are doing. 

### Using Map Reduce to Multiply Vectors

Suppose you are given two vectors, x and y, and we need to calculate their dot product, i.e., suppose

$$x = \left(\begin{matrix}2\\ 6\\ 3 \end{matrix}\right) ~~~~~ y = \left(\begin{matrix}-4\\ 2\\ 1
\end{matrix}\right)$$

Then the dot product is simply

$$ x^T \cdot y  = \left(\begin{matrix}2& 6& 3 \end{matrix}\right)\cdot \left(\begin{matrix}-4\\ 2\\ 1
\end{matrix}\right) = -8 + 12 +3= 7$$



To break this problem down into a map reduce job, we will make the mapper multiply the individual elements together and make the reducer add up the results: 

In [33]:
#First I put the input into a form that is easy for the mapper to digest
x = [(2,-4), (6,2), (3,1)]

#Next, I create my mapper,
#keeping in mind that my input will be a tuple of the form (2, -4)
def multiply(x):
    return x[0]*x[1]

#Then I simply run the mapper:
out = map(multiply, x)

#Now, we will simply add up the numbers from the mapper
#I will use the adder that I already made previously to perform this job:
reduce(add, out)

7

Hopefully that last example seemed straightforward. 

The next example is a little more complex, but is probably the most famous map reduce example.

### Word Count

To make sure that we fully understand Map-Reduce, let's take a look at the famous word count problem.

Our task is simple, we are given a document, and we want to find out how many times each word appeared in the document. Suppose we are given the following document:

In [23]:
s = "This is a sentence about cats. This is a sentence about dogs. This is not about anything."

For this task, our mapper is essentially trivial. It will simply go through every word in the document and do nothing.

In [24]:
#First, we turn the string into a list for our mapper to work upon:
x = s.split()

#Next, we make the function we will apply:
def doNothing(x):
    return x

#Finally, we apply map to the list and save it:
output = map(doNothing, x)

#Let's print the first few words in our output to make sure it is doing what it is supposed to do:
output[:6]

['This', 'is', 'a', 'sentence', 'about', 'cats.']

Between the map step and the reduce step, it is often advantagous to sort the output from the mapper. In fact, when using Hadoop, this is the standard behavior.

In [25]:
output = sorted(output)

Now the reduce step, takes this sorted output and adds up the occurences of each word and returns a list of the word and the count.

We will take advantage of the fact that the input is sorted.

In [26]:
#First we setup some global variables used to keep track of which word we are currently processing
#and how many times we have already processed this word
current_word= None
count = 0

#We will go through each element outputed by the mapper
for word in output:
    
    #If this is the first word of the list, then current_word is still initialized as None
    #We need to set it to be the first word in the list
    if current_word == None:
        current_word = word
        
    #Is this the word we are currently processing?
    #If so, then we need to increment the count
    if current_word == word:
        count += 1
        
    #If not, then we have finished processing the current_word
    #So we need to print the current_word and the count
    #and re-initialize the current_word and the count
    else:
        print current_word, count
        current_word = word
        count = 1
        
#Finally, the last word and its count must be printed        
print current_word, count        

This 3
a 2
about 3
anything. 1
cats. 1
dogs. 1
is 3
not 1
sentence 2


As I mentioned earlier, sometimes it takes some effort to wrap your head around what the reducer is doing. However, I would highly recommend trying to figure out how this reducer works as it will be the basis for many of the other reducers you will see in the other tutorials.

Finally, let's take this map-reduce algorithm and use it in conjunction with Hadoop

### Hadoop and Map-Reduce

Now we are ready to try using Map and Reduce in conjunction with Hadoop. 

I am assuming that you are on a server running Hadoop. To check, try typing "jps" into the terminal, you should see something like the following
```
$ jps
6068 NodeManager
5568 DataNode
5940 ResourceManager
5444 NameNode
6123 Jps
5785 SecondaryNameNode
```

If you do not, get your system administrator to figure out what is wrong.

Now we need some files to work with. Try the following:

```
$ echo "This is one sentence." > text1.txt
$ echo "This is another sentence." > text2.txt
```

and continue doing that until you have several text files.

Now we need to move these text files into the Hadoop Distributed FIle System (HDFS). First, we should make a folder:

```
$ hadoop dfs -mkdir text
```
Then we need to move the files into that folder

```
$ hadoop dfs -put text*.txt text/
```

Now you should be able to check that they are all there by typing

```
$ hadoop dfs -ls text/
```

Now we need to make our mapper, type

```
$ emacs word_count_mapper.py
```

and input the following lines:

In [None]:
import sys

for line in sys.stdin:
    words = line.split()
    for word in words:
        print word

In case you are wondering, the way Hadoop works with Python is that Hadoop will send the contents of the current file to the "standard input" (If you don't know what that means, don't worry about it) and python will then collect that information via "sys.stdin".

After you have finished typing that in, type ctrl+x and then ctrl+s to save and then ctrl+x and then ctrl+c to exit emacs.

Now we need to make our reducer, type 

```
$ emacs word_count_reducer.py
```
and input the following lines:

In [None]:
import sys

current_word= None
count = 0

for word in sys.stdin:
    word = word.strip() #Remove trailing whitespace
    
    if current_word == None:
        current_word = word

    if current_word == word:
        count += 1

    else:
        print current_word, count
        current_word = word
        count = 1
     
print current_word, count    

After that is all saved up, we are ready to run the map reduce job. Type the following lines into the terminal:


```
$ hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar -file word_count_mapper.py -mapper "python word_count_mapper.py" -file word_count_reducer.py -reducer "python word_count_reducer.py" -input text/* -output text_out
```

If everything worked correctly, the last line of output should read

```
15/05/18 16:16:26 INFO streaming.StreamJob: Output directory: text_out
```
If not, you will have to comb through the output and try to figure out what went wrong.

Note, a common mistake is that there may already be a folder called text_out in HDFS, if that's the case then simply change the output folder to text_out2 or something equivalent.

Finally, if it all ran correctly, we can check the output with the following command:

```
$ hadoop dfs -cat text_out/*
```

And hopefully, it outputed what you expected.