Logs   
- [2023/03/08]   
  Restart this notebook if you change the scratch library

In [33]:
from typing import List, Iterator, Tuple, Iterable, Callable, Any, NamedTuple
from collections import Counter, defaultdict

Imagine we have a collection of items we'd like to process somehow.     
For instance, the items might be website logs, the texts of various books,    
image files, or anything else. 

A basic version of the MapReduce algorithm consists of the following steps:
1. Use a `mapper` function to turn each item into zero or more key/values pairs.   
2. Collect together all the pairs with identical keys.
3. Use a `reducer` function on each collction of grouped values to produces     
   output values for the corresponding key.

In the following examples, we try to count how many word occurrence for a given    
texts. The first step is defines as a function `wc_mapper`, it will emit on    
each words after tokenizing into a tuple `(_word_, 1)`.    
Step 2 is embedded inside `word_count` function as a nested loop to append     
for each occurrence as a long list of ones.    
Step 3 is defined as `wc_reducer` that will count the length of the long list   
of ones.

## Example: Word Count

In [2]:
def tokenize(document: str) -> List[str]:
  """Just split on whitespace""" 
  return document.split()


def word_count_old(documents: List[str]):
  """Word count not using MapReduce""" 
  return Counter(word for document in documents
                  for word in tokenize(document))

In [4]:
def wc_mapper(document: str) -> Iterator[Tuple[str, int]]:
  """For each word in the document, emit (word, 1)""" 
  for word in tokenize(document):
    yield (word, 1)

In [6]:
def wc_reducer(word: str, counts: Iterable[int]) -> Iterator[Tuple[str, int]]:
  """Sum up the counts for a word""" 
  yield (word, sum(counts))

In [14]:
def word_count(documents: List[str], verbose: bool = False) -> List[Tuple[str, int]]:
  """Count the word in the input documents using MapReduce""" 
  collector = defaultdict(list)       # To store grouped values

  for document in documents:
    for word, count in wc_mapper(document):
      collector[word].append(count)

  if verbose:
    display(collector)  

  return [output for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

In [15]:
# Test
sample_documents = ["data science", "big data", "science fiction"]

list(wc_mapper(sample_documents[0]))

[('data', 1), ('science', 1)]

In [18]:
wc_count_out = word_count(sample_documents, verbose=True)

defaultdict(list,
            {'data': [1, 1], 'science': [1, 1], 'big': [1], 'fiction': [1]})

In [19]:
wc_count_out

[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]

## Why MapReduce?

Imagine now that our billions of documents are scattered across 100 machines.

The algorithm can scales horizontally: If we double the number of
machines, then (ignoring certain fixed costs of running MapReduce system)    
our computation should run approximately twice as fast.

## MapReduce More Generally

In [22]:
# A key/value pair is just 2-tuple
KV = Tuple[Any, Any]

# A Mapper is a function that returns an Iterable of key/value pairs
Mapper = Callable[..., Iterable[KV]]

# A Reducer is a function that takes a key and an iterable of values
# and returns a kye/value pair
Reducer = Callable[[Any, Iterable], KV]

In [25]:
def map_reduce(inputs: Iterable, mapper: Mapper, reducer: Reducer) -> List[KV]:
  """Run MapReduce of the inputs using mapper and reducer""" 
  collector = defaultdict(list)

  for _input in inputs:
    for key, value in mapper(_input):
      collector[key].append(value)

  return [output for key, values in collector.items()
            for output in reducer(key, values)]

Use the general version of MapReduce with user-defined 
`wc_mapper` and `wc_reducer`

In [26]:
word_counts = map_reduce(sample_documents, wc_mapper, wc_reducer)
word_counts

[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]

In [30]:
def reduce(key, values: Iterable, values_fn: Callable) -> KV:
  return (key, values_fn(values))

def values_reducer(values_fn: Callable) -> Reducer:
  """Return a reducer that just applies values_fn to its valeus"""
  return lambda key, values: reduce(key, values, values_fn)

In [32]:
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

assert sum_reducer("key", [1, 2, 3, 3]) == ("key", 9)
assert min_reducer("key", [1, 2, 3, 3]) == ("key", 1)
assert max_reducer("key", [1, 2, 3, 3]) == ("key", 3)
assert count_distinct_reducer("key", [1, 2, 3, 3]) == ("key", 3)

## Example: Analyzing Status Updates

In the future I will try to compile a dataset to demonstrate this
analysis

## Example: Matrix Multiplication

Using a *sparse* matrix to store matrix efficiently and performing
MapReduce algorithm onit

In [34]:
class Entry(NamedTuple):
  name: str 
  i: int 
  j: int 
  value: float

In [43]:
def matrix_mapper(entry: Entry, num_rows_a, num_cols_b):
  if entry.name == "A":
    for y in range(num_cols_b):
      key = (entry.i, y)                # which element of C
      value = (entry.j, entry.value)    # which entry in the sum
      yield (key, value)

  else: 
    for x in range(num_rows_a):
      key = (x, entry.j)                # which element of C
      value = (entry.i, entry.value)    # which entry in the sum
      yield (key, value)

def matrix_multiply_mapper(num_rows_a: int, num_cols_b: int) -> Mapper:
  # C[x][y] = A[x][0] * B[0][y] + ... + A[x][m] * B[m][y]
  #
  # so an element A[i][j] goes into every C[i][y] with coef B[j][y]
  # and an element B[i][j] goes into every C[x][j] with coef A[x][i]
  return lambda entry: matrix_mapper(entry, num_rows_a, num_cols_b)

In [48]:
def matrix_multiply_reducer(key: Tuple[int, int], 
                            indexed_values: Iterable[Tuple[int, int]]):
  results_by_index = defaultdict(list)

  for index, value in indexed_values:
    results_by_index[index].append(value)
  
  # Multiply the values for positions with two values
  # (one from A, and one from B) and sum them up
  sumproduct = sum(values[0] * values[1] 
                    for values in results_by_index.values()
                    if len(values) == 2)

  if sumproduct != 0.0:
    yield (key, sumproduct)

In [49]:
A = [[3, 2, 0],
     [0, 0, 0]]

B = [[4, -1, 0],
     [10, 0, 0],
     [0, 0, 0]]


In [50]:
entries = [Entry("A", 0, 0, 3), Entry("A", 0, 1, 2), Entry("B", 0, 0, 4),
            Entry("B", 0, 1, -1), Entry("B", 1, 0, 10)]

mapper = matrix_multiply_mapper(num_rows_a=2, num_cols_b=3)
reducer = matrix_multiply_reducer

# Product should be ([32, -3, 0], [0, 0, 0])
assert (set(map_reduce(entries, mapper, reducer)) == {((0, 1), -3), ((0, 0), 32)})

## An Aside: Combiners