### Intro to MapReduce algorithm
* Use a `mapper` function to turn each item into zero or more key-value pairs 
* Collect together all the pairs with identical keys
* Use a `reducer` function on each collection of grouped values to produce output values for the correponding key

In [None]:
from nltk.tokenize import word_tokenize
from collections import defaultdict
from functools import partial
from collections import Counter

In [2]:
# example: count words in document

def wc_mapper(document):
    for word in word_tokenize(document):
        yield (word, 1)

def wc_reducer(word, counts):
    # sum up the count for one word
    yiled (word, sum(counts))

In [4]:
# collect the result from wc_mapper and feed them to wc_reducer.

def word_count(documents):
    collector = defaultdict(list)
    
    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)
    
    return [output for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

#### Generic reduce function

In [29]:
# def map_reduce(inputs, mapper, reducer):
#     collector = defaultdict(list)
    
#     for input in inputs:
#         for key, value in mapper(input):
#             collector[key].append(value)
    
#     return [output for key, value in collector.items()
#             for output in reducer(key, value)]

def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer""" 
    collector = defaultdict(list)
    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)
    return [output for key, values in collector.items() 
            for output in reducer(key,values)]

#word_counts = map_reduce(documents, wc_mapper, wc_reducer)

In [10]:
# generic reducer function
def reduce_value_using(aggregation_fn, key, value):
    # reduce a key-value pair by applying aggregation_fn to the value
    yield (key, aggregation_fn(value))
    
def value_reducer(aggregation_fn):
    """turns a function (value -> output) into a reducer that maps (key, values) - > (key, output)
    """
    return partial(reduce_value_using, aggregation_fn)

In [11]:
sum_reducer = value_reducer(sum)
max_reducer = value_reducer(max)
count_distinct_reducer = value_reducer(lambda values: len(set(values)))

#### Find the most common words each user put in status update

In [13]:
def words_per_user_mapper(status_update):
    user = status_update["username"]
    for word in word_tokenize(status_update["text"]):
        yield (user, (word, 1))

def most_popular_word_reducer(user, words_and_counts):
    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count
    word, count = word_counts.most_common(1)[0]
    yield (user, (word, count))

In [14]:
#user_words = map_reduce(status_updates, words_per_user_mapper, most_popular_word_reducer)

#### Find the number of distinct status-likers for each user

In [15]:
def liker_mapper(status_update):
    user = status_update["username"]
    for liker in status_update["liked_by"]:
        yield (user, liker)

In [16]:
#distinct_liker_per_user = map_reduce(status_updates, liker_mapper, count_distinct_reducer)

#### Sparse Matrix Multiplication
More efficient way to store data for large matrix (esp when it's sparse) is to represent as a list of tuples, instead of list of list 

(name, i, j, value) where `name` identifies the matri x,and wherei, j, value indicates a location with non-zero value.

In [23]:
def matrix_multiply_mapper(m, element):
    """m is the common dimension (columns of A, rows of B) element is a tuple (matrix_name, i, j, value)"""
    name, i, j, value = element
    if name == "A":
        # A_ij is the jth entry in the sum for each C_ik, k=1..m 
        for k in range(m):
            # group with other entries for C_ik
            yield((i, k), (j, value))
    else:
        # B_ij is the i-th entry in the sum for each C_kj
        for k in range(m):
            # group with other entries for C_kj
            yield((k, j), (i, value))

In [19]:
def matrix_multiply_reducer(m, key, indexed_values): 
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)
    # sum up all the products of the positions with two results
    sum_product = sum(results[0] * results[1]
    for results in results_by_index.values()
      if len(results) == 2)

In [24]:
A = [[3, 2, 0],
        [0, 0, 0]]
B = [[4, -1, 0], [10, 0, 0],
    [0, 0, 0]]
# can be rewrite as :

entries = [("A", 0, 0, 3), ("A", 0, 1, 2),
("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]

In [31]:
# mapper = partial(matrix_multiply_mapper, 3) 
# reducer = partial(matrix_multiply_reducer, 3)

# map_reduce(entries, mapper, reducer)