# Introduction to MapReduce

<a rel="license" href="http://creativecommons.org/licenses/by-nc-sa/4.0/"><img alt="Creative Commons License" align="left" src="https://i.creativecommons.org/l/by-nc-sa/4.0/80x15.png" /></a>&nbsp;| Dennis G. Wilson | <a href="https://supaerodatascience.github.io/DE/">https://supaerodatascience.github.io/DE/</a>

In [None]:
using IJulia, Distributed, DataFrames, RDatasets, Statistics

The MapReduce programming is the base of many distributed components, notably Apache Hadoop. This programming model is inspired by the `map` and `reduce` functions in many programming languages, such as Julia, which is the focus of this notebook. In this model, processing is split into two steps: map, which applies a function to independent parts of the dataset, and reduce, which aggregates the results from map.

The MapReduce library, as used by Google and Spark, formalizes this programming model by specifying inputs and outputs of the two functions and by automatically handling data and computation distribution. Specifically, the computation takes a set of input key/value pairs and produces a set of output key/value pairs, which facilitates the distribution and offers a common set of operators.

In the MapReduce Library, `map` takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the reduce function. The `reduce` function accepts an intermediate key `I` and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per reduce invocation. The intermediate values are supplied to the user’s reduce function via an iterator. This allows handling lists of values that are too large to fit in memory.

In this notebook we'll focus on understanding the base MapReduce programming model, outside of its formal Spark or Google definition, to get a better understanding of this way of breaking up problems. The first example we'll see is the computation of $\sum_{i=1}^n i^3$. This simple example won't require a (key, value) map return; single values from map are sufficient for the reduce function here.

We first define a map function which we will apply independently to each element of the input array.

In [None]:
function mapper(x)
    x^3
end

The reduce function in this case is addition since we're computing $\sum_{i=1}^n i^3$.

In [None]:
function reducer(x, y)
    x + y
end

In [None]:
n = 1:5

In [None]:
m = map(mapper, n)

A similar operator to `map` is `broadcast`, which in Julia can be represented by the `.` operator.

In [None]:
mapper.(n)

Be careful with the difference between these two operators though, which have different behaviors for multi-dimensional arrays.

In [None]:
map(+, [1,1], [1 2; 3 4])

In [None]:
broadcast(+, [1,1], [1 2; 3 4])

Now that we have mapped `mapper` to each input, we can reduce the results.

In [None]:
reduce(reducer, m)

There's a shortcut for map and reduce in Julia - `mapreduce`.

In [None]:
mapreduce(mapper, reducer, n)

We also could have used an anonymous function for the mapper and the standard definition of `+` for the reducer, given their simplicity.

In [None]:
mapreduce(x->x^3, +, n)

## Case Study 1 : Approximating π

Monte Carlo methods are easy to parallelize due to the independent computations performed on random values. In this exercise, we'll calculate an approximation of π using a Monte Carlo method. The principle is simple: given a point defined by $x, y$, where $x$ and $y$ are in $[0, 1]$, we determine if the point is within a quadrant of a unit circle. By randomly generating many such points uniformly between $[0, 1]$, the number of points which falls inside  the quadrant give an approximation to its area. The total area of the space is 1, and the area of the quadrant is $\frac{π}{4}$, so the number of points inside the quadrant, $q$, divided by the total number of points, $N$, approximates $\frac{π}{4}$. In other words, $π \approx 4\frac{q}{N} $

<img src="Pi_30K.gif">

You should define a map and reduce function. What is the purpose of each function? Consider taking in a list of points either as a 2D array:

In [None]:
points = rand(2, 1000)

Or as a list of coordinates:

In [None]:
points = [rand(2) for i in 1:1000]

In [None]:
function mapper(x)
    # your code here
end

function reducer(x, y)
    # your code here
end

In [None]:
#IJulia.load("solutions/mapreduce_1.jl")

Once you've defined `mapper` and `reducer`, the following should approximate π.

In [None]:
mapreduce(mapper, reducer, points) / length(points) * 4

This works but requires us to know the number of points at the end, assuming that the number of points processed is equivalent to the original matrix. One of the advantages of MapReduce is that it allows for flexibility to changing workloads and data sources by putting the necessary information directly into the map and reduce functions.

Rewrite your map and reduce functions to include the point count and allow for the mapreduce call to approximate pi independent of the length of points. Your final result should be a tuple of $(q, N)$.

In [None]:
#IJulia.load("solutions/mapreduce_2.jl")

In [None]:
q, N = mapreduce(mapper, reducer, points)

In [None]:
q / N * 4

This representation is flexible to combining multiple responses from different simulations, allowing for workers with different loads to be able to contribute to the computation. To demonstrate this, we'll use the parallel processing library `Distributed`. We can add two additional threads to the notebook's worker pool.

In [None]:
using Distributed
addprocs(2);

Using the `@distributed` macro, we can launch parallel computations. However, to do this, we need to define our mapper and reducer functions on all threads using the `@everywhere` macro.

In [None]:
@everywhere function mapper(x)
    # your code here
end

@everywhere function reducer(x, y)
    # your code here
end

By default, we can apply a reduction operator to the result of each of these threads. We will reuse the previously defined reduce operator to combine the results from independent `reduce` calls. This idea of chaining map or reduce functions is a common design in MapReduce.

In [None]:
q, N = @distributed reducer for i in 1:10
    points = [rand(2) for j in 1:rand(100:5000)]
    mapreduce(mapper, reducer, points)
end

In [None]:
q / N * 4

## Case Study 2 : Average of a List

Now consider that we have a long list of floating point values and we want to determine the average of this list. To speed it up, it would be best to split the computation over multiple different threads, meaning splitting up the list. Write map and reduce functions for this average calculating, making sure that your functions will work even if the full list isn't split evenly between workers.

In [None]:
μ = 0.345
list = randn(100000) .+ μ
nworkers = 5
s = [0]
append!(s, sort!(rand(1:length(list), nworkers-1)))
push!(s, length(list))

Define your map function `av_mapper` and reduce function `av_reducer`. How can you combine the results from different parts of the list?

In [None]:
#IJulia.load("solutions/mapreduce_3.jl")

In [None]:
final = @distributed av_reducer for i in 1:nworkers
    chunk = list[s[i]+1:s[i+1]]
    println("processing ", length(chunk), " samples")
    mapreduce(av_mapper, av_reducer, chunk)
end

You can double-check your results using the standard `mean` function.

In [None]:
mean(list)

## Case study 3: Grep

In this example, we'll implement a similar command to the Unix command `grep`, allowing us to search through a text for a specific pattern. This could be used to search through data logs, text documents, or full datasets. We'll aim to output just how many time each search pattern appears in the document, not its line number as grep does.

The text we'll use is Alice's Adventures in Wonderland. You can download a copy rather quickly.

In [None]:
run(`wget "http://www.umich.edu/~umfandsf/other/ebooks/alice30.txt"`)

In [None]:
lines = readlines("alice30.txt")

In order to match regular expressions in each line of text, we'll use the `match` or `eachmatch` function in Julia.

In [None]:
matches = eachmatch(r".l.ce", lines[40])

In [None]:
[(r.match, 1) for r in collect(matches)]

First, look for one of these three regular expressions. Note that `".abbit"` can match "rabbit", "Rabbit", or any other word ending in "abbit".

In [None]:
reg = [r"Alice", r".abbit", r"Queen"]

Write map and reduce functions to parse each line of the text and count the instances of the above regular expressions. You may find the [Hadoop version](https://cwiki.apache.org/confluence/display/HADOOP2/Grep) of grep helpful. Try getting it to work on a single thread before considering how to distribute the computation.

<div class="alert alert-info">
<b>Bonus exercise</b>
    
Adapt your MapReduce version of grep to find out the distribution of word lengths in Alices Adventure's in Wonderland and plot it as a histogram.
</div>

## Split-Apply-Combine

The MapReduce programming model is a case of a programming design very common in data processing tasks, whether benefitting from distributed computation or not. This design is known as ["split-apply-combine"](http://www.jstatsoft.org/v40/i01) and is based around splitting a data set into groups, applying some functions to each of the groups and then combining the results. We'll see an example of this on the `iris` dataset.

In [None]:
iris = dataset("datasets", "iris");

In [None]:
head(iris)

The first part of this paradigm is to split the data, often based on the task at hand. If you're trying to study a specific feature, split the data on this feature, for example. However, split can also be used as in most MapReduce functions for data distribution and increased performance. Here, we'll look at splitting the dataset by Species to gather statistics about this feature.

In [None]:
gdf = groupby(iris, :Species)

In [None]:
combine(gdf, :PetalLength => mean)

In [None]:
combine(gdf) do df
    (m = mean(df.PetalLength), s² = var(df.PetalLength), min = minimum(df.PetalLength), max = maximum(df.PetalLength))
end

<div class="alert alert-info">
    <b>Discussion</b>
    
Consider the above examples. First, identify how the dataset was split, what functions were applied, and how the results were combined. Considering this seperation, what is the `map` function equivalent? What is the equivalent of `reduce`?
</div>

For further reading, this [blogpost](https://towardsdatascience.com/how-to-use-the-split-apply-combine-strategy-in-pandas-groupby-29e0eb44b62e) gives a good overview of applying this framework in pandas through the `groupby-apply` function.