### Lab5: MapReduce

#### Introduction

[MapReduce](https://en.wikipedia.org/wiki/MapReduce) is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster.

A MapReduce program is composed of a Map() procedure (method) that performs filtering and sorting (such as sorting students by first name into queues, one queue for each name) and a Reduce() method that performs a summary operation (such as counting the number of students in each queue, yielding name frequencies). 

High performance implementations of "MapReduce Systems" (also called "infrastructures" or "frameworks") orchestrate the processing by marshaling distributed servers, running the various tasks in parallel, managing all communications and data transfers between the various parts of the system, and providing for redundancy and fault tolerance. The model is a specialization of the split-apply-combine strategy.

In this lab, you will use MapReduce to 1) create an inverted index and 2) do some matrix algebra.  The package `mapreduce.py` included with this lab simulates [Apache Spark](https://spark.apache.org/), a high performance MapReduce framework commonly used in many big data applications.

#### Example
Below is a simple word counting example.  The `mapper1` function returns a list of (key, value) pairs consisting of (word,1) tuples, where word is each word in a document, and a `reducer` function adds all the values for each set of identical keys (in this case words) in order to calculate counts for each word.  

The line starting with to `sc.parallelize` invokes the Spark MapReduce framework.  See the [Spark documentation](https://spark.apache.org/docs/latest/api/python/pyspark.html) for more details on the methods and parameters being used to control the computation. 

In [None]:
from mapreduce import mapreduce
from pprint import pprint
import string

def mapper1(s):
    '''
    Returns a list of tuples of the form (word,1) 
    for every word in string s 
    '''
    words = s.split()
    result = [(word, 1) for word in words]
    print('mapper1: %s -> %s'%(s,result))
    return result
    
def reducer(a, b):
    'Returns the sum of word count results a and b'
    return a + b

def word_count(text):
    '''
    Returns a list of (word,count) tuples
    '''
    if isinstance(text,str):
        text = [text]
    # Remove punctuation    
    translator = str.maketrans('', '', string.punctuation+'\n')
    text = [line.translate(translator).lower() for line in text] 
    
    # Count words using MapReduce
    ## print('MapReduce Input:')
    pprint(text)
    sc = mapreduce()
    word_count_result = sc.parallelize(text, 4)\
        .flatMap(mapper1)\
        .reduceByKey(reducer)\
        .sortByKey(True)\
        .collect()
    sc.stop()
    ## print('MapReduce Output:')
    ## pprint(word_count_result)
    return word_count_result

In [None]:
word_count('A rose is a rose is a rose.')

In [None]:
text = open('data/hadoop.txt', 'r')
r = word_count(text)    
r[0:10] # Print out the first 10 counts

#### Task 0
Re-run the above example after you uncomment `## print` statements.  While looking at the annotated output try and get a clear understanding what `mapper1` does, and how it's output is consolidated via `.reduceByKey(reducer)`. 

#### TASK 1
Given a set of documents, an inverted index is a dictionary where each word is associated with a list of the document names in which that word appears. Fill in the `mapper1`, `reducer`, and `mapper2` functions to create an inverted index of `hadoop.txt`. Refer to the docstrings of these function to see what the expected input and output formats are.

In [None]:
import json
def mapper1(document):
    """
    Given a document consisting of a (title, text) pair returns 
    a list of (word,[title]) pairs for every word in the text.
    """
    # YOUR CODE HERE
    
def reducer(a, b):
    """
    Returns the concatenation of a document title b, stored in a list, 
    to a list of document titles a.
    """
    # YOUR CODE HERE
    
def mapper2(record):
    """
    Assumes record is in (word, [title1, title2, ...]) format.
    Returns a version of record (word, [title1*, title2*, ...]) where
    [title1*, tite2*, ...] is a sorted list of unique titles.
    """
    # YOUR CODE HERE
    
def inverted_index(texts):
    '''
    Return an inverted word index on list of texts given 
    in [title, text] format.
    '''
    # Preprocess texts to eliminate punctuation, numbers 
    # and make them lower cases
    translator = str.maketrans('', '', string.punctuation+'0123456789')
    texts = [[title,text.translate(translator).lower()] for [title,text] in texts]
    
    # Compute inverted index using map reduce
    sc = mapreduce()
    inv_index = sc.parallelize(texts, 128) \
        .flatMap(mapper1) \
        .reduceByKey(reducer) \
        .sortByKey(True) \
        .map(mapper2) \
        .collect()
    sc.stop()
    return inv_index

Below is some output:
~~~~
>>> inverted_index([['Mavis-Typing 101','The quick brown fox jumped over the lazy dog!'], 
                    ['Milne-Winne the Pooh', 'Pooh and the brown fox best of friends.']])
                
[('and', ['Milne-Winne the Pooh']),
 ('best', ['Milne-Winne the Pooh']),
 ('brown', ['Mavis-Typing 101', 'Milne-Winne the Pooh']),
 ('dog', ['Mavis-Typing 101']),
 ('fox', ['Mavis-Typing 101', 'Milne-Winne the Pooh']),
 ('friends', ['Milne-Winne the Pooh']),
 ('jumped', ['Mavis-Typing 101']),
 ('lazy', ['Mavis-Typing 101']),
 ('of', ['Milne-Winne the Pooh']),
 ('over', ['Mavis-Typing 101']),
 ('pooh', ['Milne-Winne the Pooh']),
 ('quick', ['Mavis-Typing 101']),
 ('the', ['Mavis-Typing 101', 'Milne-Winne the Pooh'])]
~~~~

In [None]:
# Keep updating your code until you can reproduce the above result 
# by running the command below
inverted_index([['Mavis-Typing 101','The quick brown fox jumped over the lazy dog!'], 
              ['Milne-Winne the Pooh', 'Pooh and the brown fox best of friends.']])

In [None]:
# Once you have your code working, try this more complex 
# example on a JSON file full of books
with open('data/books.json', 'r') as infile:
        texts = [json.loads(line) for line in infile]
inv_ind = inverted_index(texts)
inv_ind[0:10]

#### Task 2
1. What happens if you eliminate the use of `mapper2`? (i.e., remove the `.map(mapper2)` call in `sc.parallelize`) 
2. Why didnt't we need to do a reduce step after `mapper2` was applied? 

** Fill in your answer below:**



#### Task 3
Can you see an easy way to add a count of the total number of times 
a word appears in all texts?  Implement your solution below.  Here's some example output:
~~~~
>>> inv_ind = inverted_index([['Mavis-Typing 101','The quick brown fox jumped over the lazy dog!'], 
              ['Milne-Winne the Pooh', 'Pooh and the brown fox best of friends.']])

[('and', 1, ['Milne-Winne the Pooh']),
 ('best', 1, ['Milne-Winne the Pooh']),
 ('brown', 2, ['Mavis-Typing 101', 'Milne-Winne the Pooh']),
 ('dog', 1, ['Mavis-Typing 101']),
 ('fox', 2, ['Mavis-Typing 101', 'Milne-Winne the Pooh']),
 ('friends', 1, ['Milne-Winne the Pooh']),
 ('jumped', 1, ['Mavis-Typing 101']),
 ('lazy', 1, ['Mavis-Typing 101']),
 ('of', 1, ['Milne-Winne the Pooh']),
 ('over', 1, ['Mavis-Typing 101']),
 ('pooh', 1, ['Milne-Winne the Pooh']),
 ('quick', 1, ['Mavis-Typing 101']),
 ('the', 3, ['Mavis-Typing 101', 'Milne-Winne the Pooh'])]
 ~~~~

In [None]:
# YOUR CODE HERE

# Test your solution
inv_ind = inverted_index([['Mavis-Typing 101','The quick brown fox jumped over the lazy dog!'], 
              ['Milne-Winne the Pooh', 'Pooh and the brown fox best of friends.']])

pprint(inv_ind)

with open('data/books.json', 'r') as infile:
        texts = [json.loads(line) for line in infile]
inv_ind = inverted_index(texts)
inv_ind[0:10]

### Matrix Algebra
There are a variety of ways represent matrices inside of a computer.  By organizing the way elements are stored (e.g., column-wise versus row-wise) some calculations will be faster, other slower. Similarly, the way elements are stored is also important.  Some matrices have a regular structure (e.g. diagonal matrices, e.g. symmetric matrices) which means storing every element is not required.  Others may contain mostly zero elements, in which case just storing the non-zero elements may make sense. Such matrices are known as sparse matrices, and there are there are 7 different  [SciPy sparse matrix types](https://docs.scipy.org/doc/scipy/reference/sparse.html#module-scipy.sparse) available for storing matrices of this type. 

In this part of the lab we are going to use a simple sparse matrix representation `SpMatrix` that is similar to  SciPy's [scipy.sparse.coo](https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.coo_matrix.html#scipy.sparse.coo_matrix) class.  The idea is to store the non-zero elements of a matrix in (i,j,v) format, where (i,j) is an elements position and v is it's value.  

Using this approach allows one to write MapReduce algorithms that can work on truly massive matrices.  Below is the class definition for `SpMatrix` and an examples.

In [None]:
import numpy as np
# Sparse Matrix class
class SpMatrix:
    def __init__(self, m=1, n=1, ijv=[]):
        '''
        m: number rows
        n: number columns
        ijv: list of ((i, j), value) tuples
        '''
        self.m = m
        self.n = n
        self.ijv = ijv
    def np(self):
        '''Returns self as a NumPy Array'''
        r = np.zeros((self.m,self.n))
        for (ij,v) in self.ijv:
            r[ij[0],ij[1]]=v
        return r
    def __str__(self):
        return self.np().__str__()

# Create the matrix
# A = [10   0   30;
#      40  50  -30]
A =  SpMatrix(2,3, [((0,0),10), ((0,2),30),((1,0),40),((1,1),50),((1,2),-30)])
print("A's internal storage = \n", A.ijv)
print("Matrix A represents = \n", A)

#### Task 4
Write a function SpEye(n) that returns an n by n identity SpMatrix.  

In [None]:
# YOUR CODE HERE
print(SpEye(3))


#### Task 5 
The `col_sum` function below takes an SpMatrix as input, and returns the sum of each of it's columns as another SpMatrix.  Test the code out, and then uncomment the print statements to get a better feel on how it works.

In [None]:
def mapper1(ijv):
    '''
    Returns [(0,j),val)] given an ijv tuple ((i, j), val).
    '''
    result = [((0,ijv[0][1]), ijv[1])]
    ## print('mapper1: %s -> %s'%(ijv, result))
    return result

def reducer(a, b):
    return a + b
 
def col_sum(A):
    sc = mapreduce()
    ## print('MapReduce Input:')
    ## pprint(A.ijv)
    col_sum = sc.parallelize(A.ijv, 4) \
        .flatMap(mapper1) \
        .reduceByKey(reducer) \
        .sortByKey(True)\
        .collect()
    sc.stop()
    ## print('MapReduce Output:')
    ## pprint(col_sum)
    return SpMatrix(1, A.n, col_sum)

In [None]:
A = SpMatrix(2, 3, [((0, 0), 20), ((0, 1), 40), ((1, 1), 50), ((1, 2), 30)])
print('A = \n%s'%(A))
print()
cs = col_sum(A)
print()
print('col_sum(A) = \n', cs)
assert np.all(cs.np()==np.sum(A.np(),0)) # Test

#### TASK 6
Complete the code block below to compute the ** element-wise sum ** of two SpMatrices. Include a simple test using an assert to check that your solution is working.

Below is some example output.

Notice that the input created for the MapReduce (see `MapReduce Input:` below) is a list containing both matrices' elements.  This is the general setup for handling matrix algebra using MapReduce.     
~~~~
A = SpMatrix(2, 3, [((0, 0), 1), ((1, 1), 1), ((1,2),1)])
B = SpMatrix(2, 3, [((0, 0), 2), ((0, 1), 1), ((1, 0), 3), ((1,2), -1)])
print('A = \n%s'%(A))
print('B = \n%s'%(B))
print()
result = sp_sum(A, B)
print()
print('sp_sum(A,B) = \n%s'%(result))
---
A = 
[[ 1.  0.  0.]
 [ 0.  1.  1.]]
B = 
[[ 2.  1.  0.]
 [ 3.  0. -1.]]

MapReduce Input: 
[(('a', 0, 0), 1),
 (('a', 1, 1), 1),
 (('a', 1, 2), 1),
 (('b', 0, 0), 2),
 (('b', 0, 1), 1),
 (('b', 1, 0), 3),
 (('b', 1, 2), -1)]
mapper1: (('a', 0, 0), 1) -> [((0, 0), 1)]
mapper1: (('a', 1, 1), 1) -> [((1, 1), 1)]
mapper1: (('a', 1, 2), 1) -> [((1, 2), 1)]
mapper1: (('b', 0, 0), 2) -> [((0, 0), 2)]
mapper1: (('b', 0, 1), 1) -> [((0, 1), 1)]
mapper1: (('b', 1, 0), 3) -> [((1, 0), 3)]
mapper1: (('b', 1, 2), -1) -> [((1, 2), -1)]
MapReduce Output:
[((0, 0), 3), ((0, 1), 1), ((1, 0), 3), ((1, 1), 1), ((1, 2), 0)]

sp_sum(A,B) = 
[[ 3.  1.  0.]
 [ 3.  1.  0.]]
~~~~

In [None]:
def mapper1(sijv):
    '''
    Given a record tuple ((s,i,j),value))
    Returns [((i,j), value)] 
    '''
    # YOUR CODE HERE
    
def reducer(a, b):
    # YOUR CODE HERE

def sp_sum(A,B):
    assert A.m==B.m and A.n==B.n
    sijv = [(('a',ijv[0][0], ijv[0][1]), ijv[1]) for ijv in A.ijv]
    sijv += [(('b',ijv[0][0], ijv[0][1]), ijv[1]) for ijv in B.ijv]
    sc = mapreduce()
    print('MapReduce Input: ')
    pprint(sijv)
    # YOUR CODE HERE

# Software Test(s)
###
print(sp_sum(SpEye(2),SpEye(2)))
assert(np.all(sp_sum(SpEye(2),SpEye(2)).np()==SpMatrix(2,2,[((0,0),2),((1,1),2)]).np()))
###

In [None]:
A = SpMatrix(2, 3, [((0, 0), 1), ((1, 1), 1), ((1,2),1)])
B = SpMatrix(2, 3, [((0, 0), 2), ((0, 1), 1), ((1, 0), 3), ((1,2), -1)])
print('A = \n%s'%(A))
print('B = \n%s'%(B))
print()
result = sp_sum(A, B)
print()
print('sp_sum(A,B) = \n%s'%(result))