# Map-reduce

Chapter 24 of [Data Science from Scratch](http://shop.oreilly.com/product/0636920033400.do). Joel's code: [mapreduce.py](https://github.com/joelgrus/data-science-from-scratch/blob/master/code-python3/mapreduce.py)

In [10]:
import datetime, io, math, random, re
from collections import defaultdict, Counter
from functools import partial

In [11]:
def tokenize(message):
    message = message.lower()
    all_words = re.findall("[a-z0-9']+", message)
    return set(all_words)

In [12]:
def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer"""
    collector = defaultdict(list)

    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)

    return [output
            for key, values in collector.items()
                for output in reducer(key,values)]

In [13]:
def wc_mapper(document):
    """for each word in the document, emit (word,1)"""
    for word in tokenize(document):
        yield (word, 1)

In [14]:
def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))

### More generally...

We can abstract the general pattern of wc_reducer with these functions:

In [150]:
def reduce_values_using(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn to the values"""
    yield (key, aggregation_fn(values))

In [151]:
def values_reducer(aggregation_fn):
    """turns a function (values -> output) into a reducer that
    maps (key, values) -> (key, output)"""
    return partial(reduce_values_using, aggregation_fn)

## Shakespeare word-count

A little googling led me to the fine folks at MIT who've provided the [complete works of Shakespeare](https://ocw.mit.edu/ans7870/6/6.006/s08/lecturenotes/files/t8.shakespeare.txt) as a 5.2MB text file. The Sonnets make a nice set of short documents to play around with. Here's a bit of text-hackery to extract them as a 154 element list.

In [154]:
begin_sonnets = 'THE SONNETS'
end_sonnets = 'THE END'

In [155]:
reading_sonnets = False
buffer = io.StringIO()
with open('../data/t8.shakespeare.txt') as f:
    for line in f:
        if reading_sonnets:
            if line.strip() == end_sonnets:
                break
            buffer.write(line)
        if line.strip() == begin_sonnets:
            ## throw away a few lines
            for i in range(6): next(f)
            reading_sonnets = True

In [156]:
sonnets = re.split('\n\n\s*\d+\s*\n', buffer.getvalue().rstrip())

In [157]:
len(sonnets)

154

Well over 40 winters have besieged my brow, so I kinda like this one.

In [160]:
print(sonnets[1])

  When forty winters shall besiege thy brow,
  And dig deep trenches in thy beauty's field,
  Thy youth's proud livery so gazed on now,
  Will be a tattered weed of small worth held:  
  Then being asked, where all thy beauty lies,
  Where all the treasure of thy lusty days;
  To say within thine own deep sunken eyes,
  Were an all-eating shame, and thriftless praise.
  How much more praise deserved thy beauty's use,
  If thou couldst answer 'This fair child of mine
  Shall sum my count, and make my old excuse'
  Proving his beauty by succession thine.
    This were to be new made when thou art old,
    And see thy blood warm when thou feel'st it cold.


In [161]:
wc = map_reduce(sonnets, wc_mapper, values_reducer(sum))

In [162]:
sorted(wc, key= lambda pair: pair[1], reverse=True)[:20]

[('and', 148),
 ('to', 146),
 ('in', 140),
 ('the', 134),
 ('of', 134),
 ('that', 131),
 ('my', 117),
 ('i', 112),
 ('but', 110),
 ('with', 98),
 ('for', 98),
 ('a', 97),
 ('not', 97),
 ('is', 95),
 ('so', 86),
 ('love', 86),
 ('thy', 82),
 ('thou', 82),
 ('me', 82),
 ('be', 79)]

It's nice that the first non-stop-word is 'love'. Remember to pursue distributed computing with love.

## Sparse matrix multiplication

In [173]:
def matrix_multiply_mapper(m, element):
    """m is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    matrix, i, j, value = element

    if matrix == "A":
        for column in range(m):
            # A_ij is the jth entry in the sum for each C_i_column
            yield((i, column), (j, value))
    else:
        for row in range(m):
            # B_ij is the ith entry in the sum for each C_row_j
            yield((row, j), (i, value))

In [174]:
def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)

    # sum up all the products of the positions with two results
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)

    if sum_product != 0.0:
        yield (key, sum_product)

In [175]:
entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
       ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]

In [176]:
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)

In [177]:
print("map-reduce matrix multiplication")
print("entries:", entries)
print("result:", map_reduce(entries, mapper, reducer))

map-reduce matrix multiplication
entries: [('A', 0, 0, 3), ('A', 0, 1, 2), ('B', 0, 0, 4), ('B', 0, 1, -1), ('B', 1, 0, 10)]
result: [((0, 0), 32), ((0, 1), -3)]
