In [4]:
def gen(x):
    for word in x:
        yield word

x = [1,2,1,3]
list(gen(x))

[1, 2, 1, 3]

In [73]:
from collections import defaultdict

def tokenize(s):
    return s.split()

def wc_mapper(document):
    for word in tokenize(document):
        yield (word, 1)

def wc_reducer(word, counts):
    yield (word, sum(counts))
    
def word_count(documents):
    collector = defaultdict(list)
    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)
    return [output 
            for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

In [21]:
docs = ["data science", "big data", "science fiction"]

In [22]:
word_count(docs)

defaultdict(<class 'list'>, {'data': [1, 1], 'science': [1, 1], 'big': [1], 'fiction': [1]})


[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]

In [74]:
def map_reduce(inputs, mapper, reducer):

    collector = defaultdict(list)

    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)

    return [output
            for key, values in collector.items()
            for output in reducer(key,values)]

map_reduce(docs, wc_mapper, wc_reducer)

[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]

In [75]:
def matrix_multiply_mapper(m, element):

    """m is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    
    name, i, j, value = element
    if name == "A":
        # A_ij is the jth entry in the sum for each C_ik, k=1..m
        for k in range(m):
            # group with other entries for C_ik
            yield((i, k), (j, value))
    else:
        # B_ij is the i-th entry in the sum for each C_kj
        for k in range(m):
            # group with other entries for C_kj
            yield((k, j), (i, value))


In [88]:
def matrix_multiply_reducer(m, key, indexed_values):
    
    results_by_index = defaultdict(list)
    
    for index, value in indexed_values:
        results_by_index[index].append(value)
       
    # sum up all the products of the positions with two results
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)
    if sum_product != 0.0:
        yield (key, sum_product)
        

In [89]:
entries = [("A", 0, 0, 3), ("A", 0, 1, 2),
           ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]

In [90]:
list(matrix_multiply_mapper(3, entries[3]))

[((0, 1), (0, -1)), ((1, 1), (0, -1)), ((2, 1), (0, -1))]

In [91]:
from functools import partial 
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)
map_reduce(entries, mapper, reducer)

[(0, 3), (1, 2), (0, 4), (1, 10)]
[(0, 3), (1, 2), (0, -1)]
[(0, 3), (1, 2)]
[(0, 4), (1, 10)]
[(0, 4), (1, 10)]
[(0, -1)]
[(0, -1)]


[((0, 0), 32), ((0, 1), -3)]