# MapReduce

MapReduce is a programming model as well as a framework for processing and generating big data sets with a parallel, distributed algorithm on a cluster.

In the MapReduce paradigm, the programmer defines the program logic as two functions:

- the **map function**, which transforms the input into key-value pairs to process
    - *(k1,v1) --> list[(k2,v2)]*
- the **reduce function**, which aggregates the list of values for each key
    - *(k2, list[v2]) --> list[(k3,v3)]

A complex program can be decomposed as a succession of Map and Reduce tasks. 

![mapreduce_view.png](../images/mapreduce_view.png)

There are two more components in the MapReduce environment, and their goal is to reduce network traffic. The first are the **partitioners**, which assign data to the reducers according to a *partition function*, generally an *hash*. Generally the partition function should distribute the data as uniformly as possible for load-balancing purposes. Finally, there are **combiners**, which are basically mini-reducers, allowing for local aggregation before the shuffle and sort phase.

### Example: WordCount problem

A typical example application is the **Word count problem**. Given some input data in the form of one or more documents, our objective is to count the frequency of appearance of a word. A possible solution is the following pseudocode:

![wordcount_pseudo1.png](../images/wordcount_pseudo1.png)

What this code does is to pass the pair (*word*, 1) to the reducer function, which then computes the aggregation by sum after the shuffle phase.

There is room for improvement in this code: it is possible to generate an intermediate reduce on a document level by using an associate array (HashMap or similar) - the *emit* function, responsible for passing the data to the reducer, is called only after the update of the associative map is complete, which means that fewer data are shuffled through the network. The following pseudocode represent this improved version of the code.

![wordcount_pseudo2](../images/wordcount_pseudo2.png)

If this associate array works works for the same document, then it is also possible the apply the same concept *across* documents. The methods *map* is called on one document at a time, therefore an *initialize* method and a *close* method can be used to store the state (basically, the values) generated by the *map* function at each iteration. This is also called the **in-mapper combining**.

![wordcount_pseudo3](../images/wordcount_pseudo3.png)

In this case, there is no need for a *reduce* function, as all documents have already been processed and computed in the mapper+combiner. An identity reducer should suffice. However, this does not hold if there is more than one node in the architecture, then the reduce function to sum all nodes results is necessary, and the previously written one works perfectly.

### Example : Averaging

Averaging a series of values by key is an interesting problem, because the associative property does not hold for the division operation. This means that it is not possible to compute a local average in the mapper, but the division has to be done necessarily during the *reduce* phase. Even the combiner cannot do the division! However, it can compute the local sum and then leave the last bit of operation to the reducer.

![averaging_pseudo1](../images/averaging_pseudo1.png)

Of course, the combiner can be also written with the *in-mapper* style, leaving the reducer unchanged:

![averaging_pseudo2](../images/averaging_pseudo2.png)

### Example : word co-occurrence matrices

Let's consider having a document. We want to build a co-occurrence matrix, which is a *n x n* matrix (with *n* number of unique words, or vocabulary size) with cells containing the number of times a word *w<sub>i</sub>* co-occurs with word *w<sub>j</sub>* within a specific context, such as a sentence, a paragraph, a document or a user-defined window of words. Two approaches are possible.

The **pairs** approach, which is just like the wordcount problem, but using the pair (*w<sub>i</sub>*,*w<sub>j</sub>*) as key.

![word_cooc_pairs.png](../images/word_cooc_pairs.png)

The **stripes** approach is based on generating an associative array for every word and all the words it appears with.

![word_cooc_stripes.png](../images/word_cooc_stripes.png)


## MapReduce Framework

![mr_framework.png](../images/mr_framework.png)

The MapReduce environment (or architecture) is the one responsible for distributing data and orchestrating processes. Among its goals:

- handles scheduling - assigning workers to map and reduce tasks
- handles data distribution - moves processes to data
- handles synchronization - gathers, sorts and shuffles intermediate data
- handles errors and faults - detects workers failures and restarts
- everything happens on top of a distributed filesystem

The **Master node** is responsible for fault tolerance. It periodically checks the availability and reachability of the tasktrackers (heartbeats) and whether map or reduce jobs make any progress:

- if a mapper fails, its task is reassigned to another tasktracker
- if a reducer fails, its task is reassigned to another tasktracker; this usually require restarting mapper tasks as well (to produce intermediate groups)
- if the jobtracker fails, the whole job should be re-initiated