<center><h1>BUSS6002 - Data Science in Business</h1></center>

#### Pre-Tutorial Checklist

1. Complete all tasks from week 9
2. Read up to exercise 1
3. Install mockr library


# Tutorial 10 - MapReduce


## What is MapReduce? 

MapReduce is a programming model that allows us to perform calculations in parallel rather than in series, so that instead of requiring a supercomputer to iterate through all calculations extremely fast, we can instead have lots of regular processors working on parts of a problem at the same time. MapReduce is composed of two steps. The Mapping phase breaks the problem into smaller independent calculations that can be solved individually and maps them to queues. The Reducing phase combines the outcomes of these smaller computations to arrive at a solution for the original task. MapReduce is most often used for processing data, however it is flexible enough that it can be used for more complex tasks such as linear regression, which we will demonstrate.

A MapReduce program consists of two primary steps:

**Map**, performs filtering and sorting into queues

**Reduce**, performs a summary operation on a queue

<img src="img/mapreduce.gif">

*Image sourced from https://www.ibm.com/developerworks/cloud/library/cl-openstack-deployhadoop/*

## Why Do We Need MapReduce?

As the volume of data acquired by businesses and individuals continues to grow it becomes problematic to store and process this data rapidly enough for useful insights. Most naive algorithms cannot deal with such volumes of data, either due to requiring too much computation time or memory.

The "divide and conquer" style of MapReduce has the ability to resolve this issue. By carefully decomposing the problem we can distribute the processing load over many machines. This is known as horizontal scaling. This distrubuted computing style is more robust as we are no longer reliant on one large computer for our processing. MapReduce implementations can be designed in a fault tolerant way so that machines can be disabled or disconnected without affecting results. Additionally the hardware requirements of each machine is reduced.

## MapReduce in Python

There are many MapReduce libraries in Python. We will use "mockr", which is a very simple library.

mockr allows us to write our map and reduce functions in normal Python. You can install it via:

    pip install mockr
    
<div style="margin-bottom: 0px;"><img width=20 style="display: block; float: left;  margin-right: 20px;" src="img/docs.png"> <h3 style="padding-top: 0px;">Documentation - mockr</h3></div>
https://pypi.org/project/mockr/
    

## Example: Counting Words

Lets start by using the canonical MapReduce example of counting words in a corpus.

First we need to define the map and reduce functions. Then we can run the mapreduce algorithm to count the words.

In [4]:
import re
from mockr import run_stream_job

# This regular expression matches words
# Test it at https://www.regexpal.com
WORD_RE = re.compile(r"[\w']+") # that means all the words that characters and numbers

# think: if you have n words with k unique words as input
# map function
def map_fn(chunk):
    # Use the regex to find all words in each chunk
    # The chunk is a line of text because we are
    # using run_stream_job
    
    # find all substring that divided by regex (e.g. ' ', '#!@#!@', '$%@#$!!')
    for word in WORD_RE.findall(chunk):
        # Emit a result using the word as the key and
        # the number of times it occured. We emit once
        # for each word so this value is 1.
        yield (word.lower(), 1)
        
# reduce function
def reduce_fn(key, values):
    # Recieves all the values for each key (unique word)
    # then sums them together for the total count
    yield (key, sum(values))

# "\n" is the newline character which seperates the lines of text
input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"
print(input_str)

# run_stream_job expects a newline delimited string, map function and reduce function
# and returns a list of results
# https://mockr.readthedocs.io/en/latest/api.html#mockrmockmr.run_stream_job

# this run_stream_job function is to warp the map and reduce function
results = run_stream_job(input_str, map_fn, reduce_fn)
print('#-----------------------#')
print("result")
print(results)

Hello!
This is a sample string.
It is very simple.
Goodbye!
#-----------------------#
result
[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


In [10]:
# yield is the thing to create the 1-time generator for a loop
# list is also a iterative function, but you can use it infinitely

# generator
def generator():
    data = range(3) # create a list [0,1,2]
    for i in data:
        yield i*i # store i*i into my generator using yeild
        
g = generator()
li = [0,1,2]

for j in range(10):
    print('this is the ' + str(j+1) + ' times loop for yield generator')
    for i in g:
        print(i)

print()
print('####################################')
print()  

# that shows that your list can loop as many as you want
for j in range(10):
    print('this is the ' + str(j+1) + 'times loop for list')
    for i in li:
        print(i)

    

this is the 1 times loop for yield generator
0
1
4
this is the 2 times loop for yield generator
this is the 3 times loop for yield generator
this is the 4 times loop for yield generator
this is the 5 times loop for yield generator
this is the 6 times loop for yield generator
this is the 7 times loop for yield generator
this is the 8 times loop for yield generator
this is the 9 times loop for yield generator
this is the 10 times loop for yield generator

####################################

this is the 1times loop for list
0
1
2
this is the 2times loop for list
0
1
2
this is the 3times loop for list
0
1
2
this is the 4times loop for list
0
1
2
this is the 5times loop for list
0
1
2
this is the 6times loop for list
0
1
2
this is the 7times loop for list
0
1
2
this is the 8times loop for list
0
1
2
this is the 9times loop for list
0
1
2
this is the 10times loop for list
0
1
2


In [12]:
for word in WORD_RE.findall('hello $I @#! am !@##$ here'):
    print(word)

hello
I
am
here


## Yield

The yield keyword returns an ephemeral object. Once it is read by any other code it gets deleted immediately. We use yield so that the data is guaranteed to only be read by one of the reducers.

<div style="margin-bottom: 30px;"><img width=48 style="display: block; float: left;  margin-right: 20px;" src="img/question-mark-button.png"> <h3 style="padding-top: 15px;">Exercise 1 - Wordcount MapReduce</h3></div>

Given a line of text that's n words long with k unique words as input:
- How many keys are returned by the `map_fn` function? 

answer: n

- How many keys are returned after the output from `map_fn` has been passed through `reduce_fn`?

answer: k

## Explanation

1. Each line of text is sent to a mapper


2. Each mapper:
    - Splits the line into words
    - For each word: yields a key/value pair
    - Key: word
    - Value: number of times it occurs (since we iterate over all words we set this to 1)


3. Mapped key/value pairs are sent to shuffler


4. Key/value pairs are shuffled
    - For each key collect the corresponding list of values


5. Key/values are routed to respective reducers
    - Each reducer only works on a single key


6. Reduce
    - Sum the counts for a key

## Simulation of MapReduce Stages

Below is a simulation of the output at each stage.

### Text is broken into lines

In [22]:
# input data

In [17]:
# 1st step: split whole into subdata before input them into the map
input_str = "Hello!\nThis is a sample string.\nIt is very simple.\nGoodbye!"

# print(input_str)

# Split the string into lines and store in a list
lines_of_text = input_str.split("\n")

# #e.g.
# yolo = 'hello ella how are you'.split('e')
# print(yolo)

print(lines_of_text)

['Hello!', 'This is a sample string.', 'It is very simple.', 'Goodbye!']


### Mapping

We need to apply map_fn to each line of text and collect the result

In [20]:
# We will store the output of map_fn in here
word_count_lists = []

# For every line of text
for line in lines_of_text:
    # Apply the map function (split and count words)
    # Save the result as a list in our list
    word_count_lists.append(list(map_fn(line)))

# # still remember what the map_func does?
# def map_fn(chunk):
#     # Use the regex to find all words in each chunk
#     # The chunk is a line of text because we are
#     # using run_stream_job
    
#     # find all substring that divided by regex (e.g. ' ', '#!@#!@', '$%@#$!!')
#     for word in WORD_RE.findall(chunk):
#         # Emit a result using the word as the key and
#         # the number of times it occured. We emit once
#         # for each word so this value is 1.
#         yield (word.lower(), 1) # this is nothing different from append, except it can only be used once
    
# Show the result of mapping
for words in word_count_lists:
    print(words)
# print(word_count_lists)

[('hello', 1)]
[('this', 1), ('is', 1), ('a', 1), ('sample', 1), ('string', 1)]
[('it', 1), ('is', 1), ('very', 1), ('simple', 1)]
[('goodbye', 1)]


This is a nested list of lists, which we can flatten into a normal list using itertools:

In [21]:
import itertools

# word_count_lists is a list of lists
# Flatten the list of words to make it simpler by chaining lists together
word_count_list_flat = list(itertools.chain.from_iterable(word_count_lists))

print(word_count_list_flat)

[('hello', 1), ('this', 1), ('is', 1), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('is', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


### Shuffle and Sort

This stage is taken care of by mockr but it is important to understand!

We need to group all the counts for each word together. We will store this in a dictionary (key/value data type).

In [25]:
# SHUFFLE/SORT STAGE
from collections import defaultdict

# # Create a dictionary where the default value is a list
word_tuple_dict = defaultdict(list)
# # print(word_tuple_dict)

for kv_pair in word_count_list_flat:
#     print(kv_pair)
    # For each unique key append the (word, count) tuple to that keys list
    word_tuple_dict[kv_pair[0]].append(kv_pair)

# Print it in a nice format:
for k, v in word_tuple_dict.items():
    print(str(k) +": " + str(v))

hello: [('hello', 1)]
this: [('this', 1)]
is: [('is', 1), ('is', 1)]
a: [('a', 1)]
sample: [('sample', 1)]
string: [('string', 1)]
it: [('it', 1)]
very: [('very', 1)]
simple: [('simple', 1)]
goodbye: [('goodbye', 1)]


### Reduce

Now we need to apply the reduce_fn to our sorted data. Collect all counts for each word into a list and send this to the reduce_function.

In [26]:
# REDUCE STAGE
results = []

for k, v in word_tuple_dict.items():
    # Get the counts from the list of k/v pairs
    vals_list = [t[1] for t in v]
    
    # Apply the reduce_fn to the word and counts pair
    # reduce_fn will yield a (key, value) tuple
    # inside a generator object which we convert to a list
    results.append(list(reduce_fn(k, vals_list)))
    
print(results)

[[('hello', 1)], [('this', 1)], [('is', 2)], [('a', 1)], [('sample', 1)], [('string', 1)], [('it', 1)], [('very', 1)], [('simple', 1)], [('goodbye', 1)]]


Again we flatten the list using itertools

In [27]:
# Flatten the results to make them more readable
results_flat = list(itertools.chain.from_iterable(results))

print(results_flat)

[('hello', 1), ('this', 1), ('is', 2), ('a', 1), ('sample', 1), ('string', 1), ('it', 1), ('very', 1), ('simple', 1), ('goodbye', 1)]


## Bigger Data Set

This was a small example. Let's look at a bigger example: counting all the words in Moby Dick.

In [29]:
from mockr import run_stream_job

# Open the MobyDick text file
input_file = open("MobyDick.txt", 'r')

# Read the file as a string
input_str = input_file.read()

# print(input_str)

In [31]:
# Run map reduce using the same map_fn and reduce_fn that we defined before
results = run_stream_job(input_str, map_fn, reduce_fn)

# Show the first 20 results
print(results[:50])

[('the', 14697), ('project', 91), ('gutenberg', 92), ('ebook', 10), ('of', 6742), ('moby', 89), ('dick', 88), ('or', 797), ('whale', 1101), ('by', 1222), ('herman', 4), ('melville', 4), ('this', 1438), ('is', 1746), ('for', 1643), ('use', 49), ('anyone', 6), ('anywhere', 16), ('at', 1335), ('no', 591), ('cost', 4), ('and', 6515), ('with', 1769), ('almost', 197), ('restrictions', 2), ('whatsoever', 7), ('you', 938), ('may', 254), ('copy', 19), ('it', 2431), ('give', 90), ('away', 186), ('re', 7), ('under', 126), ('terms', 33), ('license', 18), ('included', 14), ('online', 4), ('www', 6), ('org', 13), ('title', 8), ('author', 9), ('release', 1), ('date', 4), ('december', 4), ('25', 4), ('2008', 1), ('2701', 4), ('last', 278), ('updated', 2)]


## Scaling 

Imagine we want to perform this same process on all the tweets ever generated. Or on all the text on the web. Can we do this with your personal computer in a reasonable time? No! We would need to use a cluster of computers (workers) and map-reduce.

With the previous word count example we could send any line of text to any map worker in the cluster. Then we can have a set of reduce workers dedicated to one or many keys.

## Example: Linear Regression

Many machine learning problems can be decomposed into sub problems that can be solved independently. This means we can use map-reduce to solve them.

Recall from week 6 that we can calculate the optimal coefficients for linear regression explicitly using linear algebra. For feature matrix X:

$$
X = \begin{bmatrix}1&x_{11}&x_{12}&...&x_{1p}\\1&x_{21}&x_{22}&...&x_{2p}\\ & &...\\1&x_{n1}&x_{n2}&...&x_{np}\end{bmatrix}
$$

And label vector y:

$$
y = \begin{bmatrix}y_1\\y_2\\...\\y_n\end{bmatrix}
$$

We can use the formula

$$ \beta = \left( X^T X\right)^{-1} X^T \mathbf y$$

The trick to using MapReduce for this problem is noticing that $X^TX$ and $X^Ty$ for the whole dataset can be written as the sum of $X_i^TX_i$ and $X_i^Ty_i$ respectively, where the subscript i refers to individual observations, or subsets of observations.

We can decompose Linear Regression by dividing the data into subsets and computing the major matrix multiplication on each of the set. These are later recombined and inverted. See the slides for more details.

To pass data in mapreduce it must be in plaintext. So we have to convert our usual DataFrame to a text format such as JSON between each step.

The following code outputs the estimated parameters of a Linear Regression on the BatonRouge dataset.

### implement

In [34]:
import numpy as np

a = np.array([[1,2,3],[1,2,3]])
a.shape

(2, 3)

In [36]:
import pandas as pd
import numpy as np

# (Xi^T Xi, Xi^TYi)
def map_linear_fn(chunk):
    # Get the dependant variable y
    # label y
    y = chunk['Price'].values

    # Get the independant/feature variables
    # which is everything except the price column
    X_vals = chunk[chunk.columns.difference(['Price', 'Style'])].values

    # Get the number of data points
    m = chunk.shape[0]

    # Insert a column of "1"s for the intercept term 
    # X = [2,2,3] => X = [1,2,2,3]
    X = np.column_stack((np.ones(m), X_vals))

    # Convert to matrix to make multiplication easier
    X = np.asmatrix(X)

    # Calculate required multiplications
    XtX = X.T*X
    Xty = X.T * y.reshape(m,1)

    # Yield the result
    yield("result", [XtX, Xty])

# supposely, values should compose with (XtX, Xty)
def reduce_linear_fn(key, values):

    # Create lists to accumulate the matrices/vectors in
    XtX_list = []
    Xty_list = []

    for result_list in values:
        XtX_list.append(result_list[0])
        Xty_list.append(result_list[1])

    # Sum up all the XtX matrices
    XtX = np.asmatrix(sum(XtX_list))

    # Sum up all the Xty vectors
    Xty = sum(Xty_list)

    # Solve the linear regression objective inv is inverse i.e. (^-1)
    betas = np.linalg.inv(XtX) * Xty

    yield (key, betas)

In [38]:
from mockr import run_pandas_job

df = pd.read_excel("BatonRouge.xls")
# print(df)

# df = df.drop(columns=['Style']).astype('float32')

results = run_pandas_job(df, map_linear_fn, reduce_linear_fn, n_chunks = 4)
# each single element is the P-i for P

In [5]:
print(results)

[('result', matrix([[-4.93453976e+04],
        [-4.30636024e+02],
        [ 4.01560091e+04],
        [-2.69227757e+04],
        [-2.25516565e+01],
        [-2.59406734e+03],
        [ 6.92537050e+03],
        [-2.50175666e+03],
        [ 8.54116998e+01],
        [ 1.63298319e+01],
        [ 5.57871062e+04]]))]


### Comparison with sklearn parameters

Note that even though we seperated the data into multiple chunks the final result is exactly the same!

In [39]:
from sklearn.linear_model import LinearRegression

lr_obj = LinearRegression()

features = df[df.columns.difference(['Price', 'Style'])]
target = df['Price']

# training function
lr_obj.fit(features, target)

params_sk = np.append(np.array(lr_obj.intercept_), lr_obj.coef_)

print(params_sk)

[-4.93453976e+04 -4.30636024e+02  4.01560091e+04 -2.69227757e+04
 -2.25516565e+01 -2.59406734e+03  6.92537050e+03 -2.50175666e+03
  8.54116998e+01  1.63298319e+01  5.57871062e+04]


### Result Format

In [40]:
# Simplify the result to make it more readable
params_mr = np.array(results[0][1]).ravel() # flat the array

print(params_mr)

[-4.93453976e+04 -4.30636024e+02  4.01560091e+04 -2.69227757e+04
 -2.25516565e+01 -2.59406734e+03  6.92537050e+03 -2.50175666e+03
  8.54116998e+01  1.63298319e+01  5.57871062e+04]


<div style="margin-bottom: 30px;"><img width=48 style="display: block; float: left;  margin-right: 20px;" src="img/question-mark-button.png"> <h3 style="padding-top: 15px;">Exercise 2 - Linear Regression and MapReduce</h3></div>

In the equation $ \beta = \left( X^T X\right)^{-1} X^T \mathbf y$:

- What are the dimensions of $X^TX$?
- What about $X^Ty$?

2. Check using Numpy that the product $$X^TX = \begin{bmatrix}1&2&3\\ 4&5&6\end{bmatrix}\begin{bmatrix}1&4\\ 2&5\\ 3&6\end{bmatrix}$$ can be written as the sum 

$$
\begin{bmatrix}3\\ 6\end{bmatrix} \begin{bmatrix}3& 6\end{bmatrix}
+ \begin{bmatrix}2\\ 5\end{bmatrix} \begin{bmatrix}2& 5\end{bmatrix}
+ \begin{bmatrix}1\\ 4\end{bmatrix} \begin{bmatrix}1& 4\end{bmatrix}
$$

In [1]:
import pandas as pd

#Import airline dataset
airline = pd.read_csv('airline_small.csv', encoding='ISO-8859-1')

#Create delayed column (1 if ArrDelay > 20, 0 otherwise)
airline['Delayed'] = (airline['ArrDelay'] > 20).astype(int)

#Subset to two columns and drop null values
sub_airline = airline[['Distance', 'Delayed']]
sub_airline = sub_airline.dropna()

sub_airline.head()

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,Distance,Delayed
0,308.0,0
1,296.0,0
2,480.0,1
3,296.0,0
4,373.0,0


In [2]:
import numpy as np
from sklearn.linear_model import LogisticRegression

def map_linear_fn(chunk):
    # Get the dependant variable y
    # label, 
    y = chunk['Delayed'].values

    # Get the independant/feature variables
    # which is everything except the price column
    x = chunk['Distance'].values.reshape(-1, 1)

    # Initialize our logisic regression model and then fit it
    log_reg = LogisticRegression(solver = 'lbfgs')
    
    # train sub-set of data
    log_reg.fit(x, y)

    # Combine the intercept and the coefficient together, because together they are the parameters of our model
    # Those are all the parameters of our Logistic Regression Model
    parameters = np.append(np.array(log_reg.intercept_), log_reg.coef_)

    # Return the actual parameter
    # Yield is the same as return, but the returned result can ONLY BE USED ONCE
    #print('beta:')
    #print(parameters)
    yield ('result', parameters) # pi for subset data i

def reduce_linear_fn(key, values):

    # The reducer's job is just to average all the results from the independent jobs together
    # A generator just means the same thing as a list BUT it can only be accessed once

    # Store all the results into a list, and then we will average all of them together
    param_list = []
    for param in values:
        param_list.append(np.array(param))

    # Average all the parameters together
    #mean (p0, p1, p2, p3) = > p
    avg_params = np.mean(param_list, axis=0)

    # Return the final result
    yield key, avg_params

In [3]:
from mockr import run_pandas_job
results = run_pandas_job(sub_airline, map_linear_fn, reduce_linear_fn, n_chunks =100)

print(results)

[('result', array([ -1.88075578e+00,   1.76807716e-04]))]


In [4]:
from sklearn.linear_model import LogisticRegression

y = sub_airline['Delayed']

x = sub_airline['Distance'].values.reshape(-1,1)

log_res = LogisticRegression()

log_res.fit(x, y)

params = np.append(np.array(log_res.intercept_), log_res.coef_)

print(params)

[ -1.80835051e+00   1.39805132e-04]
