In [30]:
from collections import Counter, defaultdict
from functools import partial
import datetime
import re

In [16]:
# From naive_bayes chapter.
def tokenize(message):
    message = message.lower()                       # convert to lowercase
    all_words = re.findall("[a-z0-9']+", message)   # extract the words
    return set(all_words)                           # remove duplicates

In [17]:
def word_count_old(documents):
    """Word count not using MapReduce."""
    return Counter(word
                   for document in documents
                   for word in tokenize(documents))

In [18]:
def wc_mapper(document):
    """For each word in document, emit (word, 1)."""
    for word in tokenize(document):
        yield (word, 1)

In [7]:
def wc_reducer(word, counts):
    """Sum up the counts for a word."""
    yield (word, sum(counts))

In [8]:
def word_count(documents):
    """Count the words in the input documents using MapReduce."""
    
    # Place to store grouped values.
    collector = defaultdict(list)
    
    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)
    
    return [output
            for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

In [9]:
documents = ["data science", "big data", "science fiction"]
word_count(documents)

[('science', 2), ('data', 2), ('big', 1), ('fiction', 1)]

In [10]:
def map_reduce(inputs, mapper, reducer):
    """Runs MapReduce on the inputs using mapper and reducer."""
    collector = defaultdict(list)
    
    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)
    
    return [output
            for key, values in collector.items()
            for output in reducer(key, values)]

In [11]:
word_counts = map_reduce(documents, wc_mapper, wc_reducer)
word_counts

[('science', 2), ('data', 2), ('big', 1), ('fiction', 1)]

In [20]:
def reduce_values_using(aggregation_fn, key, values):
    """Reduces a key-values pair by applying aggregation_fn to values."""
    yield (key, aggregation_fn(values))

In [23]:
def values_reducer(aggregration_fn):
    """Turns a function (values -> output) into a reducer
    that maps (key, values) -> (key, output)."""
    return partial(reduce_values_using, aggregration_fn)

In [28]:
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))
map_reduce(documents, wc_mapper, sum_reducer)

[('science', 2), ('data', 2), ('big', 1), ('fiction', 1)]

In [31]:
status_updates = [
    {"id": 1,
     "username" : "joelgrus",
     "text" : "Is anyone interested in a data science book?",
     "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
     "liked_by" : ["data_guy", "data_gal", "bill"] },
    # add your own
]

In [32]:
def data_science_day_mapper(status_update):
    """Yields (day_of_week, 1) if status_update contains 'data science'."""
    if 'data science' in status_update['text'].lower():
        day_of_week = status_update['created_at'].weekday()
        yield (day_of_week, 1)

In [34]:
data_science_days = map_reduce(status_updates,
                               data_science_day_mapper,
                               sum_reducer)
data_science_days

[(5, 1)]

In [35]:
def words_per_user_mapper(status_update):
    user = status_update['username']
    for word in tokenize(status_update['text']):
        yield (user, (word, 1))

In [38]:
def most_popular_word_reducer(user, words_and_counts):
    """Given a sequence of (word, count) pairs,
    return the word with the highest total count."""
    
    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count
    
    word, count = word_counts.most_common(1)[0]
    yield (user, (word, count))

In [39]:
user_words = map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_reducer)
user_words

[('joelgrus', ('science', 1))]

In [40]:
def liker_mapper(status_update):
    user = status_update['username']
    for liker in status_update['liked_by']:
        yield (user, liker)

In [42]:
distinct_likers_per_user = map_reduce(status_updates, 
                                      liker_mapper,
                                      count_distinct_reducer)
distinct_likers_per_user

[('joelgrus', 3)]

In [43]:
def matrix_multiply_mapper(m, element):
    """m is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)."""
    name, i, j, value = element
    
    if name == 'A':
        # A_ij is the jth entry in the sum for each C_ik, k=1..m.
        for k in range(m):
            # Group other entry in the sum for each C_ik.
            yield ((i, k), (k, value))
    else:
        # B_ij is the i-th entry in the sum for each C_kj.
        for k in range(m):
            yield ((k, j), (i, value))

In [44]:
def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)
    
    # Sum up all the products of the positions with two results.
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)

    if sum_product != 0.0:
        yield (key, sum_product)

In [45]:
A = [[3, 2, 0],
     [0, 0, 0]]
B = [[4, -1, 0],
     [10, 0, 0],
     [0, 0, 0]]

In [49]:
entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
           ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]

In [50]:
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)
map_reduce(entries, mapper, reducer)

[((0, 1), 6), ((0, 2), 6)]