# Chapter 24 - MapReduce

For performing parallel processing on large datasets, but the basics are simple.

In [1]:
import math, random, re, datetime
from collections import defaultdict, Counter
from functools import partial

def tokenize(message):
    message = message.lower()                       
    all_words = re.findall("[a-z0-9']+", message)   
    return set(all_words)                          

In [4]:
# First example has to involve counting words, as per the rules.

# Need a function that turns a document into a sequence of key-value pairs.

def wc_wrapper(document):
    for word in tokenize(document):
        yield (word, 1)
        
# now we need a reducer function

def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))
    
def word_count(documents):
    """count the words in the input documents using MapReduce"""

    # place to store grouped values
    collector = defaultdict(list)

    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)

    return [output
            for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

In [10]:
# generalized for a single machine
def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer"""
    collector = defaultdict(list)

    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)

    return [output
            for key, values in collector.items()
            for output in reducer(key,values)]
            
def reduce_with(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn to the values"""
    yield (key, aggregation_fn(values))

def values_reducer(aggregation_fn):
    """turns a function (values -> output) into a reducer"""
    return partial(reduce_with, aggregation_fn)

sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

In [13]:
# Analyzing status updates
status_updates = [
    {"id": 1,
     "username" : "joelgrus",
     "text" : "Is anyone interested in a data science book?",
     "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
     "liked_by" : ["data_guy", "data_gal", "bill"] },]

# what day of the week do people talk about it the most?
def data_science_day_mapper(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)

data_science_days = map_reduce(status_updates,
                               data_science_day_mapper,
                               sum_reducer)
print(data_science_days)

[(5, 1)]


In [14]:
# What words are used the most?
def words_per_user_mapper(status_update):
    user = status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user, (word, 1))

def most_popular_word_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs,
    return the word with the highest total count"""

    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    word, count = word_counts.most_common(1)[0]

    yield (user, (word, count))

user_words = map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_reducer)

print(user_words)

[('joelgrus', ('is', 1))]


In [15]:
# number of distinct status-likers for each user?

def liker_mapper(status_update):
    user = status_update["username"]
    for liker in status_update["liked_by"]:
        yield (user, liker)

distinct_likers_per_user = map_reduce(status_updates,
                                      liker_mapper,
                                      count_distinct_reducer)
print(distinct_likers_per_user)

[('joelgrus', 3)]


In [16]:
# generalizing to matrix multiplication
def matrix_multiply_mapper(m, element):
    """m is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    matrix, i, j, value = element

    if matrix == "A":
        for column in range(m):
            # A_ij is the jth entry in the sum for each C_i_column
            yield((i, column), (j, value))
    else:
        for row in range(m):
            # B_ij is the ith entry in the sum for each C_row_j
            yield((row, j), (i, value))
            
def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)

    # sum up all the products of the positions with two results
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)

    if sum_product != 0.0:
        yield (key, sum_product)

In [18]:
entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
           ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)

print("map-reduce matrix multiplication")
print("entries:", entries)
print("result:", map_reduce(entries, mapper, reducer))


map-reduce matrix multiplication
entries: [('A', 0, 0, 3), ('A', 0, 1, 2), ('B', 0, 0, 4), ('B', 0, 1, -1), ('B', 1, 0, 10)]
result: [((0, 0), 32), ((0, 1), -3)]


## This concludes chapter 24

Most widely used mapreduce system is Hadoop, which has books written about it. You have to set up a cluster, which I hear is difficult and commonly written in Java, although there is Hadoop streaming which will let you write it in python.

Amazon has elstic mapreduce which is basically mapreduce as a cloud service

mrjob  is a python packae for interfacing with hadoop or elastic mapreduce.

Hadoop has high latency, so there are alternative frameworks for real time analytics such as Spark and Storm.

And as these things move quickly the newest framework is probably newer than the book. So it goes. 