# Big Data Platform
## Assignment 2: MapReduce

**By:**  


Omer Gralnik, 20637768<br>Yam Daniel, 311515381

<br><br>

**The goal of this assignment is to:**
- Understand and practice the details of MapReduceEngine

**Instructions:**
- Students will form teams of two people each, and submit a single homework for each team.
- The same score for the homework will be given to each member of your team.
- Your solution is in the form of a Jupyter notebook file (with extension ipynb).
- Images/Graphs/Tables should be submitted inside the notebook.
- The notebook should be runnable and properly documented. 
- Please answer all the questions and include all your code.
- You are expected to submit a clear and pythonic code.
- You can change functions signatures/definitions.

**Submission:**
- Submission of the homework will be done via Moodle by uploading a Jupyter notebook.
- The homework needs to be entirely in English.
- The deadline for submission is on Moodle.
- Late submission won't be allowed.
  
  
- In case of identical code submissions - both groups will get a Zero. 
- Some groups might be selected randomly to present their code.

**Requirements:**  
- Python 3.6 should be used.  
- You should implement the algorithms by yourself using only basic Python libraries (such as numpy,pandas,etc.)

<br><br><br><br>

**Grading:**
- Q1 - 5 points - Initial Steps
- Q2 - 50 points - MapReduceEngine
- Q3 - 30 points - Implement the MapReduce Inverted index of the JSON documents
- Q4 - 5 points - Testing Your MapReduce
- Q5 - 10 points - Final Thoughts 

`Total: 100`

**Prerequisites**

In [None]:
!pip install names
!pip install pytest-shutil

**Imports**

In [None]:
# general
import os
# import random
import warnings
import sqlite3
# import csv
import names # we chose this package to generate random second names
import shutil
import threading
from queue import Queue

# ml
import pandas as pd

**Hide Warnings**

In [None]:
warnings.filterwarnings('ignore')

**Disable Autoscrolling**

In [None]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    
return false;
}

<br><br><br><br>
# Question 1
# Initial Steps

Write Python code to create 20 different CSV files in this format:  `myCSV[Number].csv`, where each file contains 10 records. 

The schema is `(‘firstname’,’secondname’,city’)`  

Values should be randomly chosen from the lists: 
- `firstname` : `[John, Dana, Scott, Marc, Steven, Michael, Albert, Johanna]`  
- `city` : `[New York, Haifa, München, London, Palo Alto,  Tel Aviv, Kiel, Hamburg]`  
- `secondname`: any value  

In [None]:
# define folder path in which the data will be saved
folder_path = '/Users/omergralnik/Machine Learning _ Data science/First Year/Semester A/Big Data Platforms/Datasets/HW2'

# define 'queue' variable which will be used to return the values of the functions in the threads
queue = Queue()

In [None]:
firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiel', 'Hamburg']

# using 'names' library we could easily generate random second names
secondname = []
for i in range(len(firstname)):
    random_name = names.get_first_name()
    secondname.append(random_name)

# let's create the 20 csvs
for i in range(1,21):
    df = pd.DataFrame(list(zip(firstname, secondname, city)),
                columns=['firstname', 'secondname', 'city'])
    df.to_csv(f'{folder_path}/myCSV{i}.csv', index=False)


Use python to Create `mapreducetemp` and `mapreducefinal` folders

In [None]:
# check if the folder exist, if so then remove it and create a new one
if os.path.exists(f'{folder_path}/mapreducetemp'):
    shutil.rmtree(f'{folder_path}/mapreducetemp')
os.mkdir(f'{folder_path}/mapreducetemp')

if os.path.exists(f'{folder_path}/mapreducefinal'):
    shutil.rmtree(f'{folder_path}/mapreducefinal')
os.mkdir(f'{folder_path}/mapreducefinal')

<br><br><br>
# Question 2
## MapReduceEngine

Write Python code to create an SQLite database with the following table

`TableName: temp_results`   
`schema: (key:TEXT,value:TEXT)`

In [None]:
connection = sqlite3.connect(f'{folder_path}/hw2.db')
cursor = connection.cursor()

cursor.execute('''CREATE TABLE IF NOT EXISTS temp_results(
                    key text,
                    value text)''')

1. **Create a Python class** `MapReduceEngine` with method `def execute(input_data, map_function, reduce_function)`, such that:
    - `input_data`: is an array of elements
    - `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value) 
    - `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

<br><br>

**Implement** the following functionality in the `execute(...)` function:

<br>

1. For each key  from the  input_data, start a new Python thread that executes map_function(key) 
<br><br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread.
<br><br>
3. Keep the list of all threads and check whether they are completed.
<br><br>
4. Once all threads completed, load content of all CSV files into the temp_results table in SQLite.

    Remark: Easiest way to loop over all CSV files and load them into Pandas first, then load into SQLite  
    `data = pd.read_csv(path to csv)`  
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
<br><br>

5. **Write SQL statement** that generates a sorted list by key of the form `(key, value)` where value is concatenation of ALL values in the value column that match specific key. For example, if table has records
<table>
    <tbody>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV1.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Dana</td>
                <td style="text-align:center">myCSV5.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV7.csv</td>
            </tr>
    </tbody>
</table>

    Then SQL statement will return `(‘John’,’myCSV1.csv, myCSV7.csv’)`  
    Remark: use GROUP_CONCAT and also GROUP BY ORDER BY
<br><br><br>
6. **Start a new thread** for each value from the generated list in the previous step, to execute `reduce_function(key,value)` 
<br>    
7. Each thread will store results of reduce_function into `mapreducefinal/part-X-final.csv` file  
<br>
8. Keep list of all threads and check whether they are completed  
<br>
9. Once all threads completed, print on the screen `MapReduce Completed` otherwise print `MapReduce Failed` 



In [None]:
class MapReduceEngine():
    
    def execute(self, input_data, map_function, reduce_function):
        
        # For each key from the input_data, start a new Python thread that executes map_function(key)
        map_threads = []
        for key in input_data:
            t = threading.Thread(target=map_function, args=[key, ])
            t.start()
            map_threads.append(t)
        
        # Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv 
        # X is a unique number per each thread
        for idx, thread in enumerate(map_threads):
            thread.join()
            results = queue.get()
            df = pd.DataFrame(results, columns=['key', 'value'])
            df.to_csv(f'{folder_path}/mapreducetemp/part-tmp-{idx+1}.csv', index=False)
         
        # Keep the list of all threads and check whether they are completed
        for t in map_threads:
            if not t.is_alive():
                # get results from thread
                t.handled = True
        map_threads_validate = [t for t in map_threads if t.handled] # the list of all valid threads
        
        # check if whether all of the threads are completed
        if len(map_threads) == len(map_threads_validate):
            print('Map treads completed succesfully')
            completed = True
        else:
            completed = False
            print('Map treads failed!!')
            return
        
        # Once all threads completed, load content of all CSV files into the temp_results table in SQLite
        if completed:
            for file in os.listdir(f'{folder_path}/mapreducetemp'):
                df = pd.read_csv(f'{folder_path}/mapreducetemp/{file}')
                df.to_sql('temp_results',connection, if_exists='append',index=False)
            
            # Write SQL statement that generates a sorted list by key of the form (key, value) 
            # where value is concatenation of ALL values in the value column that match specific key
            cursor.execute('''SELECT Key, GROUP_CONCAT(value) FROM temp_results GROUP BY Key ORDER BY Key''')
            key_value_list = cursor.fetchall()
            
        # Start a new thread for each value from the generated list in the previous step, to execute reduce_function(key,value)
        reduce_threads = []
        for value in key_value_list:
            t = threading.Thread(target=reduce_function, args=[value[0], value[1],])
            t.start()
            reduce_threads.append(t)
        
        # Each thread will store results of reduce_function into mapreducefinal/part-X-final.csv file
        for idx, thread in enumerate(reduce_threads):
            thread.join()
            results = queue.get()
            df = pd.DataFrame(results[1:], columns=[results[0]])
            df.to_csv(f'{folder_path}/mapreducefinal/part-{idx+1}-final.csv', index=False)
        
        # Keep list of all threads and check whether they are completed
        for t in reduce_threads:
            if not t.is_alive():
                # get results from thread
                t.handled = True
        reduce_threads_validate = [t for t in reduce_threads if t.handled] # the list of all valid threads
        
        # Once all threads completed, print on the screen MapReduce Completed otherwise print MapReduce Failed
        if len(reduce_threads) == len(reduce_threads_validate):
            print('Reduce treads completed succesfully')
            return '\nMapReduce Completed'
        else:
            return '\nMapReduce Failed'

<br><br><br><br>

# Question 3
## Implement the MapReduce Inverted index of the JSON documents

Implement a function `inverted_map(document_name)` which reads the CSV document from the local disc and return a list that contains entries of the form (key_value, document name).

For example, if myCSV4.csv document has values like:  
`{‘firstname’:’John’,‘secondname’:’Rambo’,‘city’:’Palo Alto’}`

Then `inverted_map(‘myCSV4.csv’)` function will return a list:  
`[(‘firstname_John’,’ myCSV4.csv’),(‘secondname_Rambo’,’ myCSV4.csv’), (‘city_Palo Alto’,’ myCSV4.csv’)]`

$\Rightarrow$ We will use a decorator which its goal is to return the values of the functions that will be executed by the threads

In [None]:
# creating the decorator
def ResultsQueue(func):
    def wrapper(*args):
        queue.put(func(*args))
    return wrapper


@ResultsQueue
def inverted_map(document_name: str):
    
    """
    This function reads the CSV document from the local disc 
    and return a list that contains entries of the form (key_value, document name)
    """
    # read the csv and create an empty list
    data = pd.read_csv(f'{folder_path}/{document_name}')
    inverted_list = []
    
    # convert each row in the csv to a dictionary and add it to the list
    for i in range(len(data)):
        row_dict = dict(data.iloc[i, :]) # {firstname: John, secondname: Bon, city: TelAviv}
        for key, value in row_dict.items():
            inverted_list.append((f'{key}_{value}',document_name)) # (firstname_John, secondname_Bon, city_TelAviv)
    return inverted_list

Write a reduce function `inverted_reduce(value, documents)`, where the field “documents” contains a list of all CSV documents per given value.   
This list might have duplicates.   
Reduce function will return new list without duplicates.

For example,  
calling the function `inverted_reduce(‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’)`   
will return a list `[‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’]`

In [None]:
@ResultsQueue
def inverted_reduce(value, documents):
    """
    The field “documents” contains a list of all CSV documents per given value.
    This list might have duplicates.
    The function will return new list without duplicates.
    """
    
    # convert the long string of csvs to a list
    documents = documents.split(',') 
    
    # return a list without duplicates
    return [value]+list(dict.fromkeys(documents))

<br><br><br><br>
# Question 4
## Testing Your MapReduce

**Create Python list** `input_data` : `[‘myCSV1.csv’,.. ,‘myCSV20.csv’]`

In [None]:
input_data = [csv for csv in os.listdir(folder_path) if 'csv' in csv]

**Submit MapReduce as follows:**

In [None]:
mapreduce = MapReduceEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce)
print(status)

Make sure that `MapReduce Completed` should be printed and `mapreducefinal` folder should contain the result files.

**Use python to delete all temporary data from mapreducetemp folder and delete SQLite database:**

In [None]:
# if the folder exists, delete it recursively
if os.path.exists(f'{folder_path}/mapreducetemp'):
    shutil.rmtree(f'{folder_path}/mapreducetemp')

# try to close the connection
try:
    cursor.close()
    connection.close()
# if the exception is 'ProgrammingError' then the connection is already close
# print it
except sqlite3.ProgrammingError:
    print('Connection is already closed!')

# if the database exists - remove it
if os.path.exists(f'{folder_path}/hw2.db'):
    os.remove(f'{folder_path}/hw2.db')

<br><br><br><br>

# Question 5
# Final Thoughts

The phase where `MapReduceEngine` reads all temporary files generated by maps and sort them to provide each reducer a specific key is called the **shuffle step**.

Please explain **clearly** what would be the main problem of MapReduce when processing Big Data, if there is no shuffle step at all, meaning reducers will directly read responses from the mappers.

The map and reduce processes are independent of each other, and the connection between them is the shuffle step. If there is no shuffle step at all, the reducers would not have any input from every mapper, that is because the shuffle step transfers the map output to the reducer as input.<br>The absence of the shuffle step could also cause a situation where 2 maps outputs that are supposed to get the same reducer, will get a different reducer and the process will fail.

<br><br><br><br>
Good Luck :)