# Introduction to MapReduce and PySpark  

#### Few words about MapReduce
MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. A MapReduce program is composed of a map procedure, which performs filtering and sorting, and a reduce method, which performs a summary operation. It has been implemented in several systems, including Google's internal implementation and the popular HDFS file system from Apache Foundation. It is suitable for handling huge datasets in distributed fashion. It only needs two functions *Map* and *Reduce* in order to work, while the system is responsible for the parallelism and the distribution of the jobs. For more concrete information read chapter 2 from the book (pages:23-35)

In this tutorial we will see some examples which are easy to be solved with MapReduce logic but also, we will see the procedure of a *Map* and *Reduce* functions in order to understand better the algorithms. Then, we will move to a MapReduce Framework called __[Spark](https://spark.apache.org/docs/0.9.0/index.html)__ . Apache Spark is a general purpuse cluster-computing system. It provides huge API's for many programming lunguages but we will deal only with the python part of spark called PySpark.  

First we have to understand the algorithms behind Map and Reduce, so for this purpose we will try to solve the word counting problem using only these two functions. The objective of this task is to implement a mapper and a reducer and solve the word counting problem in a way that the user will feed the system with a text and the output will be the unique words of the text and their frequencies next to them. 

##### Example
System-Input  :  "Hello Hello my name is George George George"<br>
System-Output : {"Hello" : 2 , "my" : 1 , "name" : 1 , "is" : 1 , "George" : 3}

#### Define the function "mapper" that splits the line into words and generates the key for each word.
#### def mapper: 
 - Input : text 
 - Body  : split line into words, generate tuples in the form -> (word,1)
 - Output: return the generated tuples containing the key and the word

In [None]:
def mapper(line):
    words = line.split()
    # increase counters
    tups = [(w, 1) for w in words]
    return tups    

#### Define function "reducer" that takes as input the mapper's output and groups by key (word) and sums up the frequency of each.

#### def reducer:
 - Input  : output of mapper
 - Body   : groupby key and sum up the frequency
 - Output : dictionary with as key the word and as value the count

In [None]:
from collections import defaultdict

def reducer(tups):
    d = defaultdict(int)
    for w, c in tups:
        d[w] += c
    return d

#### Test mapper's and reducer's functionality using a trivial string as input.

In [None]:
# Input: line
line = 'Advances in Data Mining Mining Data in Advances Data Mining in Advances in in Advances George'
#
# Call the mapper using the above string as input
tups = mapper(line)
#
# Call reducer using the result of the mapper
red_tups = reducer(tups)
#
# Print your results
print("Mapper Output:")
list(map(print,tups))[0]

print("\nReducer Output:")
print(red_tups)

<a id='Map'></a>
#### Now instead of a signle string, we have a list of strings as input. <br>
On the one hand we could call many times function 'mapper' inside a for loop which iterates the strings of the list but on the other-functional-hand, we can use python's build-in function called map and plug 'mapper' directly to our data. Function map breaks the input in small pieces and applies our function on each of them. 

In [None]:
# Input : List
doc = ['Advances in Data Mining Mining Data in Advances Data Mining in Advances in in Advances',
      'Science Fiction Science Movies Star Wars Star Wars',
      'Advances Data Star Science Wars Advances Star Trek']
# Call the 'mapper' function in a loop
for i in doc:
    print(mapper(i))

In [None]:
# Use function map
# Basic systax map(function, data)
l = list(map(mapper, doc))
l

By using functional tools we have the opportunity to avoid for loops. Moreover, functional tools are very helpful for cloud computing where documents exist in distributed file systems and we want to apply functions on each chunk of data in the system. 

As we can see the above function generated a list of lists one for each string. With the function below we can flatten those lists into one and then call the reducer.

In [None]:
flatten = lambda l: [item for sublist in l for item in sublist]
reducer(flatten(l))

## Other MapReduce Examples 
### Matrix Multiplication using MapReduce


 - #### For this example you need to define the function 'mapper_matmul' which takes as input two matrices in a sparse format and emits their indexes and their values.

##### Let us assume that we have two matrices of dim[5,5]  in the following sparse format: 

A,0,0,63 <br>
A,0,1,45<br>
A,0,2,93<br>
A,0,3,32<br>
A,0,4,49<br>
    ... <br>
    ...  
B,0,0,63<br>
B,0,1,18<br>
B,0,2,89<br>
B,0,3,28<br>
B,0,4,39<br>
...<br>
...
************
##### This is our input file which contains 4 columns. 
1. The first column denotes the name of the matrix
2. The second indicates the row
3. The third indicates the column
4. The fourth contains the value of the specific matrix

##### It is essential to mention that sparse matrices A and B contain only the indeces of non zero values.


#### Load the <font color=red>*input.txt*</font> file using the script below

In [None]:
matrices = []
with open('input.txt') as inputfile:
    for line in inputfile:
        line_ = line.strip()
        matrices.append(line_)
        
matrices

#### Define the first function <font color=red>*mapper_matmul*</font>  
#### def mapper_matmul: 
 - Input : Two Sparse Matrices A & B (input.txt), number of rows of A and number of columns of B
 - Body  : For each line of the input emit indexing and values
 - Output: return the key, row/col, value 

In [None]:
def mapper_matmul(x,num_rows_A,num_cols_B):    
    tups = []
    for line in x:
        matrix_index, row, col, value = line.rstrip().split(",")
        if matrix_index == "A":
            for i in range(0,num_cols_B):
                key = row + "," + str(i)
                tups.append("%s\t%s\t%s"%(key,col,value))    
                print ("%s\t%s\t%s"%(key,col,value))
        else:
            for j in range(0,num_rows_A):
                key = str(j) + "," + col 
                tups.append("%s\t%s\t%s"%(key,row,value))
                print("%s\t%s\t%s"%(key,row,value))
    return tups         

#### Test the functionality of <font color=red>*mapper_matmul*</font>  using the object <font color=red>*matrices*</font> and see the output

In [None]:
map_out = mapper_matmul(matrices,5,5)

#### Now you have to define function <font color=red>*reducer_matmul*</font> which takes as input the <font color=red>*mapper_matmul*</font> output andapplies the dot product of the values. 
#### def reducer_matmul: 
 - Input : Mapper Output (index, raw/col, value)
 - Body  : Dot product of the values
 - Output: return the index and the result of the product

In [None]:
import sys
from operator import itemgetter

def reducer_matmul(mapper_out):
    prev_index = None
    value_list = []
    out = sorted(mapper_out)
    tups = []
    for line in out:
        curr_index, index, value = line.strip().split("\t")
        index, value = map(int,[index,value])
        if curr_index == prev_index:
            value_list.append((index,value))
        else:
            if prev_index:
                value_list = sorted(value_list,key=itemgetter(0))
                i = 0
                result = 0
                while i < len(value_list) - 1:
                    if value_list[i][0] == value_list[i + 1][0]:
                        result += value_list[i][1]*value_list[i + 1][1]
                        i += 2
                    else:
                        i += 1
                print ("%s,%s"%(prev_index,str(result)))
                tups.append("%s,%s"%(prev_index,str(result)))
            prev_index = curr_index
            value_list = [(index,value)]

    if curr_index == prev_index:
        value_list = sorted(value_list,key=itemgetter(0))
        i = 0
        result = 0
        while i < len(value_list) - 1:
            if value_list[i][0] == value_list[i + 1][0]:
                result += value_list[i][1]*value_list[i + 1][1]
                i += 2
            else:
                i += 1
    print ("%s,%s"%(prev_index,str(result)))
    tups.append("%s,%s"%(prev_index,str(result)))
    return tups

#### Let's test the reducer's result 

In [None]:
matMul = reducer_matmul(map_out)

<a id='SettingSpark'></a>
# Moving to PySpark
0. #### Install pyspark using: ```conda install -c conda-forge pyspark```, probably you have to set-up PySpark environment in your .bashrc or .zshrc file. 
1. #### Load essential modules from pyspark. 
2. #### Set the application name to AiDM.
3. #### Set the master_host to "local".
4. #### Configure pyspark session.

In [None]:
from pyspark import SparkContext, SparkConf # 1
Conf = SparkConf().setAppName('AiDM').setMaster("local") # Steps 2 & 3
sc = SparkContext(conf=Conf) # 4

#### Check pySpark's configuration and click the link below to visit PySpark's UI 

In [None]:
sc

### Load a big dataset with text and implement word counting with MapReduce framwork in <font color=red>__[PySpark](https://spark.apache.org/docs/0.9.0/python-programming-guide.html)__</font>

In [None]:
results = []
with open('example_text.txt') as inputfile:
    for line in inputfile:
        line_ = line.strip().split(',')
        if line_ != ['']:
            results.append(line_[0])
        
results

#### Define a function for testing the format of results in order to use PySpark

In [None]:
def CHECK(results):
    if (type(results) == list) and (type(results[0]) == str):
        print(">_ Bingoo! you can proceed  >_")
        return 1
    else:
        print("Check the structure of your data and try again...")
        return 0

#### Check if object **'results'** is in a good format
We need a list which contains strings. 
So each line of of the text has to be a string in the list.
Use the function <font color=red>CHECK</font> for this job.

In [None]:
CHECK(results)

#### Now you can parallelize your data using the PySpark function <font color=red>*parallelize*</font> which parallelizes your data using the <font color=red>*sc*</font>  session from [Section: Moving to Spark](#SettingSpark)

Function <font color=red>*sc.parallelize*</font> understands the structure of the data and organizes their distribution for further processing. It generates a __[ParallelCollectionRDD](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD)__  object, which is managable from PySpark framework. It can either handle data which are loaded to RAM or data from a DFS storage space (cluster).

In [None]:
docs_rdd = sc.parallelize(results)
docs_rdd

As in the previous chunk [map()](#Map), map the function <font color=red>*mapper*</font> into docs_rdd and return the output using spark function <font color=red>*collect()*</font>. Function <font color=red>*flatMap*</font> returns an RDD object with flatten output as list of elements. 

In [None]:
tups_rdd = docs_rdd.flatMap(mapper)
tups_rdd.collect()

Now use function <font color=red>*reduceByKey()*</font> which first groupes the RDD object by key (key=word in this example) and then reduce the output by applying accumulation on the item frequencies.

In [None]:
tups_rdd.reduceByKey(lambda a,b: a+b).collect()

## Word Counting in Functional-MapReduce fashion
#### All the above procedure can be written in only three lines using PySpark's API and MapReduce logic

In [None]:
# First Parallelize input data 
text_file = sc.parallelize(results)
#
# 1:   Map the lambda-function which splits the lines and flatten them (.flatMap)
# 2:   Then emit each word and the number one for each word (.map)
# 3:   Then groupByKey (word) and sum-up the ones that mapper emits (.reduceByKey)
#
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)                               
counts.collect()

#### TF-IDF PySpark's implementation 
Term Frequency–Inverse Document Frequency is a numerical statistic measure which reflects the importance of each word in a document inside a document-corpus. It is basicaly used in information retrieval for ranking documents while running queries. 

TF(t) = (Number of times term 't' appears in a document) / (Total number of terms in the document)<br>
IDF(t) = log(Total number of documents / Number of documents with term t in it)
\begin{equation*}
TFIDF =TF \times IDF 
\end{equation*}

The procedure of TF-IDF computation in this example is made by using the Hashing-trick. In this way, each word takes a unique random number for id.

In [None]:
from __future__ import print_function

from pyspark import SparkContext
# $example on$
from pyspark.mllib.feature import HashingTF, IDF
# $example off$

doc_1 = ['Advances Advances Advances Computer Computer Science Science George',
         'Advances Science Science Science George',
        'Advances George Science Science',
        'Science Science George']

if __name__ == "__main__":
    doc = sc.parallelize(doc_1)
    # $example on$
    # Load documents (one per line).
    documents = doc.map(lambda line: line.split(" "))

    hashingTF = HashingTF()
    tf = hashingTF.transform(documents)

    # While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
    # First to compute the IDF vector and second to scale the term frequencies by IDF.
    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)

    # spark.mllib's IDF implementation provides an option for ignoring terms
    # which occur in less than a minimum number of documents.
    # In such cases, the IDF for these terms is set to 0.
    # This feature can be used by passing the minDocFreq value to the IDF constructor.
    idfIgnore = IDF(minDocFreq=2).fit(tf)
    tfidfIgnore = idfIgnore.transform(tf)
    

In [None]:
   tfidf.collect()

# Matrix Multiplication in Spark (pyspark)

\begin{equation*}
\mathbf{A} \times \mathbf{B} =  \mathbf{P}
\end{equation*}

\begin{equation*}
\begin{vmatrix}
\mathbf{a}_{00} & \mathbf{a}_{01} & \mathbf{a}_{02}\\
\mathbf{a}_{10} & \mathbf{a}_{11} & \mathbf{a}_{12}\\
\end{vmatrix} \
\
\times
\
\begin{vmatrix}
\mathbf{b}_{00} & \mathbf{b}_{01} \\
\mathbf{b}_{10} & \mathbf{b}_{11} \\
\mathbf{b}_{20} & \mathbf{b}_{21} \\
\end{vmatrix} 
\
=
\
\begin{vmatrix}
 \mathbf{a}_{00} \times \mathbf{b}_{00} + \
 \mathbf{a}_{01} \times \mathbf{b}_{10} + \
 \mathbf{a}_{02} \times \mathbf{b}_{20}  &\
 \
 \mathbf{a}_{00} \times \mathbf{b}_{01} +\
 \mathbf{a}_{01} \times \mathbf{b}_{11} +\
 \mathbf{a}_{02} \times \mathbf{b}_{21} &\
 \\
 \mathbf{a}_{10} \times \mathbf{b}_{00} + \
 \mathbf{a}_{11} \times \mathbf{b}_{10}  +\
 \mathbf{a}_{12} \times \mathbf{b}_{20}  &\
 \
 \mathbf{a}_{10} \times \mathbf{b}_{01} + \
 \mathbf{a}_{11} \times \mathbf{b}_{11}  +\
 \mathbf{a}_{12} \times \mathbf{b}_{21}  &\
\end{vmatrix}
\end{equation*}

#### Generate random matrices <font color=red>A</font> and <font color=red>B</font>

In [None]:
import numpy as np
L=2
M=3
N=2
a_shape = (L,M)
b_shape = (M,N)
p_shape = (a_shape[0], b_shape[1])


a = np.random.randint(100, size = a_shape)
b = np.random.randint(100, size = b_shape)

#### Make them in sparse format

In [None]:
def sparsify(a):
    rv = []
    for i, line in enumerate(a):
        for (j, elem) in enumerate(line):
            if elem!=0: rv.append(((i,j),elem))
    return rv
        
sparse_a = sparsify(a)
sparse_b = sparsify(b)

In [None]:
def desparsify(sparse, shape):
    a = np.zeros(shape)
    for (i, j), e in sparse:
        a[i][j] = e
    return a

#### Check if the transformation is correct

In [None]:
np.array_equal(a, desparsify(sparse_a, a_shape)) and \
np.array_equal(b, desparsify(sparse_b, b_shape))

#### Make two mappers one for each matrix.<br>
Mappers have to produce:<br>
For each element i,j in A, emit([i,k], A[i,j]) for k=1...N, where N: Number of columns of matrix B<br>
For each element j,k in B, emit([i,k], B[j,k]) for i=1...L, where L: Number of rows of matrix A

In [None]:
# First construct and RDD object
a_rdd = sc.parallelize(sparse_a)

def a_mapper(elem, N):
    (i, j), Aij = elem
    acc = []
    for k in range(N):
        acc.append(((i,k),(j, Aij)))
    return acc
# Map a_mapper to a_rdd object
a_interm = a_rdd.flatMap(lambda e: a_mapper(e, b_shape[1]))
a_interm.collect()

In [None]:
b_rdd = sc.parallelize(sparse_b)

def b_mapper(elem, L):
    (j, k), Bjk = elem
    acc = []
    for i in range(L):
        acc.append(((i,k),(j, Bjk)))
    return acc
b_interm = b_rdd.flatMap(lambda e: b_mapper(e, b_shape[1]))
b_interm.collect()

#### Concatenate the two outputs in one by taking the union

In [None]:
all_interm = a_interm.union(b_interm)
all_interm.collect()

#### Define function reducer which applies the dot product of the values

In [None]:
def reducer(tup):
    idx, vals = tup
    acc = {}
    for i,v in vals:
        if i in acc:
            acc[i] *= v
        else:
            acc[i] = v
            
    return (idx, sum(acc.values()))

#### Use PySpark's functional API and plug the functions into data
1. GroupByKey the list 'all_interm' RDD object
2. Then map the reducer on it
3. Collect the outcome

In [None]:
lines = all_interm \
    .groupByKey() \
    .map(reducer) \
    .collect()

lines

#### Check if matrix multiplication is correct

In [None]:
np.dot(a,b)

### Useful links 
__[Map-Reduce Examples](http://www.science.smith.edu/dftwiki/index.php/Map-Reduce_Examples)__ <br>
__[Hadoop-MapReduce](https://www.tutorialspoint.com/hadoop/hadoop_mapreduce.htm)__ <br>
__[Python Functional tools](http://book.pythontips.com/en/latest/map_filter.html)__ <br>
__[Python Examples](https://medium.com/@nidhog/how-to-quickly-write-a-mapreduce-framework-in-python-821a79fda554)__ <br>
__[Lambda](https://www.python-course.eu/lambda.php)__

