# Big Data Platform
## Assignment 2: MapReduce

**By:**  

Alexander Gorelik

<br><br>

**The goal of this assignment is to:**
- Understand and practice the details of MapReduceEngine

**Instructions:**
- Students will form teams of two people each, and submit a single homework for each team.
- The same score for the homework will be given to each member of your team.
- Your solution is in the form of a Jupyter notebook file (with extension ipynb).
- Images/Graphs/Tables should be submitted inside the notebook.
- The notebook should be runnable and properly documented. 
- Please answer all the questions and include all your code.
- You are expected to submit a clear and pythonic code.
- You can change functions signatures/definitions.

**Submission:**
- Submission of the homework will be done via Moodle by uploading a Jupyter notebook.
- The homework needs to be entirely in English.
- The deadline for submission is on Moodle.
- Late submission won't be allowed.
  
  
- In case of identical code submissions - both groups will get a Zero. 
- Some groups might be selected randomly to present their code.

**Requirements:**  
- Python 3.6 should be used.  
- You should implement the algorithms by yourself using only basic Python libraries (such as numpy,pandas,etc.)

<br><br><br><br>

**Grading:**
- Q1 - 5 points - Initial Steps
- Q2 - 50 points - MapReduceEngine
- Q3 - 30 points - Implement the MapReduce Inverted index of the JSON documents
- Q4 - 5 points - Testing Your MapReduce
- Q5 - 10 points - Final Thoughts 

`Total: 100`

**Prerequisites**

In [16]:
# example
!pip install --quiet zipfile36

**Imports**

In [2]:
# general
import os
import time
import random
import warnings
import glob
import threading # you can use easier threading packages
import concurrent.futures
import csv
import sqlite3
from sqlite3 import Error

# ml
import numpy as np
import scipy as sp
import pandas as pd

# visual
import seaborn as sns
import matplotlib.pyplot as plt

# notebook
from IPython.display import display

**Hide Warnings**

In [18]:
warnings.filterwarnings('ignore')

**Disable Autoscrolling**

In [19]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

**Set Random Seed**

In [20]:
random.seed(123)

<br><br><br><br>
# Question 1
# Initial Steps

Write Python code to create 20 different CSV files in this format:  `myCSV[Number].csv`, where each file contains 10 records. 

The schema is `(‘firstname’,’secondname’,city’)`  

Values should be randomly chosen from the lists: 
- `firstname` : `[John, Dana, Scott, Marc, Steven, Michael, Albert, Johanna]`  
- `city` : `[New York, Haifa, München, London, Palo Alto,  Tel Aviv, Kiel, Hamburg]`  
- `secondname`: any value  

In [21]:
firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiel', 'Hamburg']
secondname = ['Stroul', 'Gorelik', 'Snow', 'White', 'Ridley', 'Wahlberg', 'Hawking', 'Jordan']

In [22]:
for csv_index in range(1,21):
    rows = []
    for _ in range(10):
        row = [f'{random.choice(firstname)}', f'{random.choice(secondname)}', f'{random.choice(city)}']
        rows.append(row)
    df = pd.DataFrame(rows, columns=['firstname', 'secondname', 'city'])
    df.to_csv(f'myCSV{csv_index}.csv', index=False)

Use python to Create `mapreducetemp` and `mapreducefinal` folders

In [23]:
mapreducetmp_dir = './mapreducetemp'
mapreducefinal_dir = './mapreducefinal'

In [24]:
for folder in [mapreducetmp_dir, mapreducefinal_dir]:
    if not os.path.isdir(folder):
        os.mkdir(folder)

<br><br><br>
# Question 2
## MapReduceEngine

Write Python code to create an SQLite database with the following table

`TableName: temp_results`   
`schema: (key:TEXT,value:TEXT)`

In [25]:
database = 'tmp_database.db'

CREATE_SCHEMA_STR = ''' CREATE TABLE IF NOT EXISTS temp_results (
                                        key TEXT,
                                        value TEXT
                                    ); '''

conn = None
try:
    conn = sqlite3.connect(database)
    c = conn.cursor()
    c.execute(CREATE_SCHEMA_STR)
except Error as e:
    print(e)

1. **Create a Python class** `MapReduceEngine` with method `def execute(input_data, map_function, reduce_function)`, such that:
    - `input_data`: is an array of elements
    - `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value) 
    - `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

<br><br>

**Implement** the following functionality in the `execute(...)` function:

<br>

1. For each key  from the  input_data, start a new Python thread that executes map_function(key) 
<br><br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread.
<br><br>
3. Keep the list of all threads and check whether they are completed.
<br><br>
4. Once all threads completed, load content of all CSV files into the temp_results table in SQLite.

    Remark: Easiest way to loop over all CSV files and load them into Pandas first, then load into SQLite  
    `data = pd.read_csv(path to csv)`  
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
<br><br>

5. **Write SQL statement** that generates a sorted list by key of the form `(key, value)` where value is concatenation of ALL values in the value column that match specific key. For example, if table has records
<table>
    <tbody>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV1.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Dana</td>
                <td style="text-align:center">myCSV5.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV7.csv</td>
            </tr>
    </tbody>
</table>

    Then SQL statement will return `(‘John’,’myCSV1.csv, myCSV7.csv’)`  
    Remark: use GROUP_CONCAT and also GROUP BY ORDER BY
<br><br><br>
6. **Start a new thread** for each value from the generated list in the previous step, to execute `reduce_function(key,value)` 
<br>    
7. Each thread will store results of reduce_function into `mapreducefinal/part-X-final.csv` file  
<br>
8. Keep list of all threads and check whether they are completed  
<br>
9. Once all threads completed, print on the screen `MapReduce Completed` otherwise print `MapReduce Failed` 



In [26]:
# implement all of the class here

class MapReduceEngine():
        
    def execute(self, input_data, map_function, reduce_function, params):
        
        workers_num = len(input_data)
        
        # using ThreadPoolExecutor as a context manager
        # the end of the with block causes the ThreadPoolExecutor to do a .join() on each of the threads in the pool.
        # map threads
        with concurrent.futures.ThreadPoolExecutor(max_workers=workers_num) as executor:
            for i, key in enumerate(input_data):
                try:
                    executor.submit(self.thread_map_helper, i, key, map_function, params)
                except Exception as e:
                    return e
        
        # at this point all threads completed there tasks
        
        # concatenating all csv files to a single df
        tmp_dfs = []
        for tmp_file in glob.glob(os.path.join(mapreducetmp_dir, '*.csv')):
            data = pd.read_csv(tmp_file)
            tmp_dfs.append(data)
        tmp_dfs = pd.concat(tmp_dfs)
        
        # creating connection and qurying db
        sql_conn = None
        try:
            sql_conn = sqlite3.connect(database)
            tmp_dfs.to_sql('temp_results',sql_conn, if_exists='append',index=False)
            
            cur = sql_conn.cursor()
            sort_and_shuffle_query = "SELECT key, GROUP_CONCAT(value) as value " \
                                     "FROM temp_results " \
                                     "GROUP BY key " \
                                     "ORDER BY key "
            cur.execute(sort_and_shuffle_query)
            sort_and_shuffle_res = cur.fetchall()
        except Error as e:
            return e
        finally:
            if sql_conn:
                sql_conn.close()
        
        # reduce threads
        workers_num = len(sort_and_shuffle_res)
        with concurrent.futures.ThreadPoolExecutor(max_workers=workers_num) as executor:
            for i, res in enumerate(sort_and_shuffle_res):
                try:
                    executor.submit(self.thread_reduce_helper, i, res, reduce_function)
                except Exception as e:
                    return e
                
        return 'MapReduce Completed'
                        
        
                
    def thread_map_helper(self, i, key, map_function, params):
        # every thread runs map_function, getting result and saves it to the csv file
        result = map_function(key, params['column'])
        df = pd.DataFrame(result, columns =['key', 'value'])
        df.to_csv(f'mapreducetemp/part-tmp-{i}.csv', index=False)
        
    def thread_reduce_helper(self, i, res, reduce_function):
        # every thread runs reduce_function, getting result and saves it to the csv file
        result = reduce_function(res)
        df = pd.DataFrame({'key': result[0], 'value': result[1]}, index=[0])
        df.to_csv(f'mapreducefinal/part-{i}-final.csv', index=False)
    
            

<br><br><br><br>

# Question 3
## Implement the MapReduce Inverted index of the JSON documents

Implement a function `inverted_map(document_name)` which reads the CSV document from the local disc and return a list that contains entries of the form (key_value, document name).

For example, if myCSV4.csv document has values like:  
`{‘firstname’:’John’,‘secondname’:’Rambo’,‘city’:’Palo Alto’}`

Then `inverted_map(‘myCSV4.csv’)` function will return a list:  
`[(‘firstname_John’,’ myCSV4.csv’),(‘secondname_Rambo’,’ myCSV4.csv’), (‘city_Palo Alto’,’ myCSV4.csv’)]`

In [1]:
def inverted_map(document_name, column_index=0):
    
    csv_df = pd.read_csv(document_name)
    # extracting keys from csv to list
    keys = csv_df.iloc[:,column_index].tolist()
    # returning list of tuples (key_value, document_name)
    return list(zip(keys, [document_name]*len(keys)))

In [4]:
inverted_map('myCSV4.csv')

[('John', 'myCSV4.csv'),
 ('Michael', 'myCSV4.csv'),
 ('Albert', 'myCSV4.csv'),
 ('Dana', 'myCSV4.csv'),
 ('Michael', 'myCSV4.csv'),
 ('John', 'myCSV4.csv'),
 ('Michael', 'myCSV4.csv'),
 ('Albert', 'myCSV4.csv'),
 ('Michael', 'myCSV4.csv'),
 ('John', 'myCSV4.csv')]

Write a reduce function `inverted_reduce(value, documents)`, where the field “documents” contains a list of all CSV documents per given value.   
This list might have duplicates.   
Reduce function will return new list without duplicates.

For example,  
calling the function `inverted_reduce(‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’)`   
will return a list `[‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’]`

In [28]:
def inverted_reduce(documents):

    # extracting key and value
    res_list = [documents[0]]    
    value = documents[1]
    
    # split value by ',' to get list of documents names and remove duplicates using set
    docs_set = set(value.split(','))
    # creating new "value" string from the set with no duplicates
    docs_str = ','.join(docs_set)
    
    #appending value to res list 
    res_list.append(docs_str)
    
    return res_list

<br><br><br><br>
# Question 4
## Testing Your MapReduce

**Create Python list** `input_data` : `[‘myCSV1.csv’,.. ,‘myCSV20.csv’]`

In [5]:
input_data = glob.glob('myCSV*.csv')

In [6]:
input_data

['myCSV1.csv',
 'myCSV2.csv',
 'myCSV3.csv',
 'myCSV7.csv',
 'myCSV6.csv',
 'myCSV4.csv',
 'myCSV5.csv',
 'myCSV19.csv',
 'myCSV18.csv',
 'myCSV20.csv',
 'myCSV10.csv',
 'myCSV11.csv',
 'myCSV13.csv',
 'myCSV12.csv',
 'myCSV16.csv',
 'myCSV17.csv',
 'myCSV15.csv',
 'myCSV14.csv',
 'myCSV8.csv',
 'myCSV9.csv']

**Submit MapReduce as follows:**

In [30]:
mapreduce = MapReduceEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, params={'column':0})
print(status)

MapReduce Completed


Make sure that `MapReduce Completed` should be printed and `mapreducefinal` folder should contain the result files.

**Use python to delete all temporary data from mapreducetemp folder and delete SQLite database:**

In [31]:
for file in glob.glob('./mapreducetemp/*.csv'):
    os.remove(file)
    
os.remove(database)

<br><br><br><br>

# Question 5
# Final Thoughts

The phase where `MapReduceEngine` reads all temporary files generated by maps and sort them to provide each reducer a specific key is called the **shuffle step**.

Please explain **clearly** what would be the main problem of MapReduce when processing Big Data, if there is no shuffle step at all, meaning reducers will directly read responses from the mappers.

The process of transferring data via Network from mapper to reducer is Shuffling. Actually the output of Shuffle process serves as input to reduce tasks, otherwise, the reducers will not have any input or as mentioned in the question, reducers will need to read responses from the mappers directly.
In this case, reducers will not get all the values for a specific key from all the mappers, for example if we look on the count words example from the class, the output of 4 mappers were:

Apple 1 <br> 
Orange 1 <br>
Mango 1 <br>

----

Orange 1 <br>
Grapes 1 <br>
Plum 1 <br>

----

Apple 1 <br>
Plum 1 <br>
Mango 1 <br>

----

Apple 1 <br>
Apple 1 <br>
Plum 1 <br>

---

If we have Shuffle phase we will be able transfer each key/value (having the same key) reducer so it will be able to reduce by key by one reducer. Since we will have:

Apple 1 <br>
Apple 1 <br>
Apple 1 <br>
Apple 1 <br>

---

Grapes 1 <br>

---

Mango 1 <br>
Mango 1 <br>

---

Orange 1 <br>
Orange 1 <br>

---

Plum 1 <br>
Plum 1 <br>
Plum 1 <br>

we will run reduce function 5 times.

<br>
But in case reducers will read from mappers directly, we will need to run reducer per key on mappers output, meaning we end up with running reducers on the same key on different machines, so as a result we will run reduce function more times, in our example, 11 times, as:

Output of 1st mapper has 3 unique keys

Output of 2nd mapper has 3 unique keys

Output of 3d mapper has 3 unique keys

Output of 4th mapper has 2 unique keys

---

So when we are talking about Big Data we will end up with much more and not acceptable processing time and I think this is the main problem.

<br><br><br><br>
Good Luck :)