# Chapter 24. MapReduce

Note that MapReduce is designed to be used on very, very large datasets, so the examples here don't really show the best use cases or its true potential.

In [443]:
from __future__ import division
import math, random, re, datetime
from collections import defaultdict, Counter
from functools import partial
from naive_bayes import tokenize

[MapReduce](https://en.wikipedia.org/wiki/MapReduce) is a programming model for performing parallel processing on large data sets.  
Imagine that we have a (very large) collection that we would like to process in some way.  
For example, the items might be website logs, the texts of various books, image files, or anything else.  
A basic version of the MapReduce algorithm consists of the following steps:
1. Use a `mapper` function to turn each item into zero or more key-value pairs. This is often called the `map()` function, but Python already has a function called `map()`, and we don't want to confuse the two.
2. Collect together all of the pairs with identical keys.
3. Use a `reducer()` function on each collection of grouped values to produce output values for the corresponding key.  

Let's examine this process more concretely using an example that involves counting words.

## Example: Word Count

DataSciencester has grown to millions to users!  
This is great for your job security, but it makes routine analyses slightly more difficult.  
For example, your VP of Content wants to know what sorts of things people are talking about in their status updates.  
As a first attempt, you decide to count the words that appear, so that you can prepare a report on the most frequent ones.  
When you only had a few hundred users this was simple to do:

In [444]:
def word_count_old(documents):
    """ word count not using MapReduce """
    return Counter(word
                   for document in documents
                   for word in tokenize(document))

In [445]:
documents = ["data science", "big data", "science fiction"]
word_count_old(documents)

Counter({'big': 1, 'data': 2, 'fiction': 1, 'science': 2})

With millions of users the set of `documents` (status updates) is now too big to fit on your computer.  
If you can just fit this into the MapReduce model, you can use some "big data" infrastructure that your engineers have implemented.  
First, we need a function that turns a document into a sequence of key-value pairs.  
We'll want our output to be grouped by word, which means that the keys should be words.  
For each word, we'll use the value 1 to indicate that this pair corresponds to one occurrence of the word:

In [446]:
def wc_mapper(document):
    """ for each word in the document, return (word,1) """
    for word in tokenize(document):
        yield (word, 1)
# https://stackoverflow.com/questions/231767/what-does-the-yield-keyword-do-in-python
wc_mapper_results = [result
                     for document in documents
                     for result in wc_mapper(document)]

wc_mapper_results

[('science', 1),
 ('data', 1),
 ('big', 1),
 ('data', 1),
 ('science', 1),
 ('fiction', 1)]

Skipping the "plumbing" step 2 (collect together all the pairs with identical keys) for the moment, imagine that for some word we've collected a list of the corresponding counts we have emitted.  
Then to produce the overall count for that word we can use:

In [447]:
def wc_reducer(word, counts):
    """ sum up the counts for a word """
    yield (word, sum(counts))

Now returning to step 2, we need to collect the results from `wc_mapper()` and feed them to `wc_reducer()`.  
Let's think about how we can do this on just one computer:

In [448]:
def word_count(documents):
    """ count the words in the input documents using MapReduce """
    # create a place to store the grouped values
    collector = defaultdict(list)
    
    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)
       
    return [output
            for word, counts in collector.iteritems()
            for output in wc_reducer(word, counts)]

Now let's take our three documents:

In [449]:
documents = ["data science", "big data", "science fiction"]

Then `wc_mapper` applied to the first document yields the two pairs `("data", 1)` and `("science", 1)`.  
After we've gone through all three documents, the `collector` contains:

Then `wc_reducer()` produces the count for each word:

In [450]:
word_count(documents)

[('science', 2), ('fiction', 1), ('data', 2), ('big', 1)]

## Why MapReduce?

As mentioned earlier, the primary benefit of MapReduce is that it allows us to distribute computations by moving the processing to the data.  
Imagine that we want to word-count across billions of documents.  
Our original (non-MapReduce) approach requires the machine doing the processing to have access to every document.  
This means that the documents all need to either live on that macine or else be transferred to it during processing.  
More important, it means that the machine can only process one document at a time.  
Granted, you can write code to use multiple processors, multiple cores on the processor, and so on, but the documents still have to get to that machine.  
Now, imagine that our billions of documents are scattered across 100 machines.  
With the right infrastructure (and glossing over some of the details), we can do the following:  
- Have the machine run the mapper on its documents, producing lots of (key, value) pairs.
- Distribute those (key, value) pairs to a number of "reducing" machines, making sure that the pairs corresponding to any given key all end up on the same machine.
- Have each reducing machine group the pairs by key and then run the reduce on each set of values.
- Return each (key, output) pair. 

What is amazing about this is that it scales horizontally.  
If we double the number of machines, then (ignoring certain fixed costs of running a MapReduce system) our calculations will be executed approximately twice as fast.  
Each mapper machine will only need to do half as much work, and (assuming that there are enough distinct keys to further distribute the reducer work) the same is true for the reducer machines.

## MapReduce More Generally

If you think about it for a minute, all of the word-count-specific code in the previous example is contained in the `wc_mapper()` and `wc_reducer()` functions.  
This means that with a couple of changes we have a much more general framework (that still runs on a single machine):

In [451]:
def map_reduce(inputs, mapper, reducer):
    """ runs MapReduce on the inputs using mapper and reducer """
    
    collector = defaultdict(list)
    
    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)
            
    return [output
            for key, values in collector.iteritems()
            for output in reducer(key, values)]

Now we can count words simply by using:

In [452]:
word_counts = map_reduce(documents, wc_mapper, wc_reducer)
word_counts

[('science', 2), ('fiction', 1), ('data', 2), ('big', 1)]

This gives us the flexibility to solve a wide variety of problems.

Before we proceed, observe that `wc_reducer()` is just summing the values corresponding to each key.  
This kind of aggregation is common enough that it's worth abstracting it out:

In [453]:
def reduce_values_using(aggregation_fn, key, values):
    """ reduces a key-values pair by applying aggregation_fn to the values """
    yield (key, aggregation_fn(values))
    
def values_reducer(aggregation_fn):
    """ turns a function (values -> ouput) into a reducer that maps (key, values) -> (key, output) """
    return partial(reduce_values_using, aggregation_fn)

Now we can create:

In [454]:
# the following variables each create <functools.partial object at 0xxxxxxxxxx>
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

and so on.

## Example: Analyzing Status Updates

The content VP was impressed with the word counts and asks what else you can learn from people's status updates.  
You manage to extract a data set of status updates that look like:

In [455]:
status_updates = [
    { "id": 1,
      "username": "bgroveben",
      "text": "Yo, imma be geekin my brainz out, howboudah?",
      "created_at": datetime.datetime(2017, 2, 15, 3, 30, 0),
      "liked_by": ["some_guy", "some_gal", "cousin_tavo"] },
    { "id": 2,
      "username": "cousin_tavo",
      "text": "Yo, user bgroveben has very little respect for data science or grammar!",
      "created_at": datetime.datetime(2017, 2, 16, 4, 30, 0),
      "liked_by": ["some_guy", "some_gal", "bgroveben"] },
    { "id": 3,
      "username": "some_gal",
      "text": "Yo, stop worrying about petty things and focus on the data science stuff, people.",
      "created_at": datetime.datetime(2017, 2, 17, 5, 15, 0),
      "liked_by": ["some_guy", "bgroveben", "cousin_tavo"] },
    { "id": 4,
      "username": "some_guy",
      "text": "Big data, big data, data science, data science, data, data.",
      "created_at": datetime.datetime(2017, 2, 17, 10, 45, 0),
      "liked_by": ["some_gal", "bgroveben", "cousin_tavo"] }
]

Let's say that we need to figure out which day of the week people talk the most about data science.  
In order to find this, we'll just count how many data science updates there are on each day of the week.  
This means that we'll need to group our data by day of the week, so that's our key.  
If we emit a value of 1 for each update that contains the phrase "data science", we can simply get the total number using `sum`:

In [456]:
def data_science_day_mapper(status_update):
    """ yields (day_of_week, 1) if status_update contains 'data science' """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)  # day_of_week is a number
        
data_science_days = map_reduce(status_updates,
                               data_science_day_mapper,
                               sum_reducer)
data_science_days

[(3, 1), (4, 2)]

As a slightly more complicated example, imagine that we need to find out for each user the most common word that she uses in her status updates.  
There are three possible approaches that spring to mind for the `mapper`:
- Put the username in the key; put the words and counts in the values. 
- Put the word in the key; put the usernames and counts in the values.
- Put the username and word in the key; put the counts in the values.

If you think about it a bit more, we definitely want to group by `username`, because we want to consider each person's words separately.  
We don't want to group by `word`, since our reducer will need to see all of the words for each person to find out which one is the most popular.  
This means that we're going to go with the first option:

In [457]:
def words_per_user_mapper(status_update):
    user = status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user, (word, 1))
        
def most_popular_word_reducer(user, words_and_counts):
    """ given a sequence of (word, count) pairs, return the word with the highest total count """
    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    word, count = word_counts.most_common(1)[0]

    yield (user, (word, count))


user_words = map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_reducer)
# user_words
#!# user_words doesn't return the correct results -- yield in words_per_user_mapper() always emits a count of 1

Or we could find out the number of distinct status-likers for each user:

In [458]:
def liker_mapper(status_update):
    user = status_update["username"]
    for liker in status_update["liked_by"]:
        yield (user, liker)
        
distinct_likers_per_user = map_reduce(status_updates,
                                      liker_mapper,
                                      count_distinct_reducer)
distinct_likers_per_user

[('some_guy', 3), ('some_gal', 3), ('bgroveben', 3), ('cousin_tavo', 3)]

## Example: Matrix Multiplication

Recall from [Matrix Multiplication](http://localhost:8888/notebooks/21_Chapter_Network_Analysis.ipynb#Matrix-Multiplication) that given an $m \times n$ matrix $A$ and a $n \times k$ matrix $B$, we can multiply them to form a $m \times k$ matrix $C$, where the element of $C$ in row $i$ and column $j$ is given by:  

$\Large C_{ij} = A_{i1}B_{1j} + A_{i2}B_{2j} + \ldots + A_{in}B_{nj}$

As we've seen, a "natural" way to represent an $m \times n$ matrix is with a `list` of `lists`, where the element $A_{ij}$ is the $j$th element of the $i$th list.  
However, large matrices are sometimes [sparse](https://en.wikipedia.org/wiki/Sparse_matrix), which means that most of their elements are zero.  
For large sparse matrices, a list of lists can be a very wasteful representation.  
A more compact representation is a list of tuples (`name`, `i`, `j`, `value`) where `name` identifies the matrix and `i`, `j`, and `value` indicate a location with a nonzero value.  
For example, a billion x billion matrix has a [quintillion](https://en.wiktionary.org/wiki/quintillion) entries, which is not easy to store on a computer.  
If there are only a few nonzero entries on each row, however, this alternative representation is many orders of magnitude smaller.  
Given this sort of tuple representation, it turns out that we can use MapReduce to perform matrix multiplication in a distributed manner.

To motivate our algorithm, notice that each element $A_{ij}$ is only used to calculate the elements of $C$ in row $i$, and each element $B_{ij}$ is only used to compute thelements of $C$ in column $j$.  
Our goal will be for each output of our `reducer` to be a single entry of $C$, which means that we'll need our mapper to emit keys identifying a single entry of $C$.
This suggests the following:

In [459]:
def matrix_multiply_mapper(m, element):
    """ m is the common dimension (columns of A, rows of B) """
    """ element is a tuple (matrix_name, i, j, value) """
    matrix, i, j, value = element

    if matrix == "A":
        for column in range(m):
            # A_ij is the j-th entry in the sum for each C_i_column
            yield((i, column), (j, value))
    else:
        for row in range(m):
            # B_ij is the i-th entry in the sum for each C-row_j
            yield((row, j), (i, value))


def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)
    # sum up all of the products of the positions with two results
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)
    if sum_product != 0.0:
        yield (key, sum_product)

For example, if you had the two matrices:

In [460]:
A = [[3, 2, 0],
     [0, 0, 0]]

B = [[4, -1, 0],
     [10, 0, 0],
     [0,  0, 0]]

you could rewrite them as tuples:

In [461]:
entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
           ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]

mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)

map_reduce(entries, mapper, reducer)

[((0, 1), -3), ((0, 0), 32)]

This isn't terribly interesting on such small matrices, but if you are dealing with millions (or more) of rows and columns, it could help you a lot.

## An Aside: Combiners

One thing that you have probably noticed is that many of our mappers seem to include a bunch of extra information.  
For example, when counting words, rather than emitting `(word, 1)` and summing over the values, we could have emitted `(word, None)` and just taken the length.  
One reason we didn't do this is that, in the distributed setting, we sometimes want to use [combiners](https://www.tutorialspoint.com/map_reduce/map_reduce_combiners.htm) to reduce the amount of data that has to be transferred around from machine to machine.  
If one of our mapper machines sees the word "data" 500 times, we can tell it to combine the 500 instances of `("data", 1)` into a single `("data", 500)` before handing the data off to a reducing machine.  
This results in a lot less data getting moved around, which can make our algorithm substantially faster still.  
Because of the way we wrote our reducer, it would handle this combined data correctly.  
Had we written it using `len()`, it would not have.

## For Further Exploration