# Big Data Platform
## Assignment 2: MapReduce

**By:**  

Roni Ben Dom, 207576463
Yotam Lev, 315870964

<br><br>

**Prerequisites**

In [None]:
# example
!pip install --quiet zipfile36
!pip install names
!pip install Ipython

**Imports**

In [1]:
# general
import os
import time
import random
import warnings
import threading # you can use easier threading packages
import sqlite3

# ml
import numpy as np
import scipy as sp
import pandas as pd

# visual
import seaborn as sns
import matplotlib.pyplot as plt

# notebook
from IPython.display import display

#Additionals
import names
import glob
import timeit

**Hide Warnings**

In [2]:
warnings.filterwarnings('ignore')

**Disable Autoscrolling**

In [3]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

**Set Random Seed**

In [4]:
random.seed(123)

<br><br><br><br>
# Question 1
# Initial Steps

Write Python code to create 20 different CSV files in this format:  `myCSV[Number].csv`, where each file contains 10 records. 

The schema is `(‘firstname’,’secondname’,city’)`  

Values should be randomly chosen from the lists: 
- `firstname` : `[John, Dana, Scott, Marc, Steven, Michael, Albert, Johanna]`  
- `city` : `[New York, Haifa, München, London, Palo Alto,  Tel Aviv, Kiel, Hamburg]`  
- `secondname`: any value  

In [5]:
firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiel', 'Hamburg']
secondname = [names.get_last_name() for i in range(10)]

for i in range(20):
    with open(f'csvs/myCSV[{i}].csv', 'w') as f:
        f.write('firstname, secondname, city\n')
        num_lines = np.random.randint(100, 200)
        for j in range(num_lines):
            first_name = firstname[random.randint(0, len(firstname)-1)]
            second_name = secondname[random.randint(0, len(secondname)-1)]
            city_name = city[random.randint(0, len(city)-1)]
            f.write(f'{first_name}, {second_name}, {city_name}\n')        

Use python to Create `mapreducetemp` and `mapreducefinal` folders

In [6]:
os.makedirs('mapreducetemp', exist_ok=True)
os.makedirs('mapreducefinal', exist_ok=True)

<br><br><br>
# Question 2
## MapReduceEngine

Write Python code to create an SQLite database with the following table

`TableName: temp_results`   
`schema: (key:TEXT,value:TEXT)`

In [7]:
db_file = 'HW_2.db'

#Connecting to sqlite
conn = sqlite3.connect(db_file)

#Creating a cursor object using the cursor() method
cursor = conn.cursor()

#Droping mydata table if already exists.
cursor.execute("DROP TABLE IF EXISTS temp_results")

#Creating table as per requirement
sql ='''CREATE TABLE IF NOT EXISTS temp_results(
   key TEXT,
   value TEXT
)'''
cursor.execute(sql)
print("Table created successfully")

# Commiting changes in the database
conn.commit()

conn = sqlite3.connect(db_file)

Table created successfully


1. **Create a Python class** `MapReduceEngine` with method `def execute(input_data, map_function, reduce_function)`, such that:
    - `input_data`: is an array of elements
    - `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value) 
    - `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

<br><br>

**Implement** the following functionality in the `execute(...)` function:

<br>

1. For each key  from the  input_data, start a new Python thread that executes map_function(key) 
<br><br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread.
<br><br>
3. Keep the list of all threads and check whether they are completed.
<br><br>
4. Once all threads completed, load content of all CSV files into the temp_results table in SQLite.

    Remark: Easiest way to loop over all CSV files and load them into Pandas first, then load into SQLite  
    `data = pd.read_csv(path to csv)`  
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
<br><br>

5. **Write SQL statement** that generates a sorted list by key of the form `(key, value)` where value is concatenation of ALL values in the value column that match specific key. For example, if table has records
<table>
    <tbody>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV1.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Dana</td>
                <td style="text-align:center">myCSV5.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV7.csv</td>
            </tr>
    </tbody>
</table>

    Then SQL statement will return `(‘John’,’myCSV1.csv, myCSV7.csv’)`  
    Remark: use GROUP_CONCAT and also GROUP BY ORDER BY
<br><br><br>
6. **Start a new thread** for each value from the generated list in the previous step, to execute `reduce_function(key,value)` 
<br>    
7. Each thread will store results of reduce_function into `mapreducefinal/part-X-final.csv` file  
<br>
8. Keep list of all threads and check whether they are completed  
<br>
9. Once all threads completed, print on the screen `MapReduce Completed` otherwise print `MapReduce Failed` 



In [15]:
# implement all of the class here

class MapReduceEngine_smallFiles():
    def __init__(self, connection, cursor, block_size = 128):
        self.conn = connection
        self.cursor = cursor
        self.block_size = block_size #Block size in MB
    
    def auxilary_writing_function(self, function, index, title, *args):
        return_val = function(*args)
        with open(eval(f'f"{title}"'), 'w') as f:
            f.write('key,value\n')
            for val in return_val:
                f.write(f'{val}\n')
    
    def execute(self, input_data, map_function, reduce_function, params):

        threads = list()
        small_files = []
        i = 0
        for key in input_data:
            file_size_bytes = os.path.getsize(key)
            if file_size_bytes >= self.block_size * (2 ** 20):
                x = threading.Thread(target=self.auxilary_writing_function, args=(map_function, i, f'mapreducetemp/part-tmp-{i}.csv', key, params[key]))
                threads.append(x)
                x.start()
                i += 1
            else:
                small_files.append([key, file_size_bytes])
                
        small_files_df = pd.DataFrame(small_files)
        small_files_df.columns = ['path', 'size']
        small_files_df.sort_values(by = 'size', ignore_index = True, inplace = True)
        
        index_smallest = 0
        index_largest = small_files_df.shape[0] - 1

        sum_files_size = 0
        list_indexes_list = []
        while index_smallest <= index_largest:
            sum_files_size = small_files_df.iloc[index_largest]['size']
            index_list = []
            while index_smallest <= index_largest and sum_files_size + small_files_df.iloc[index_largest]['size'] < self.block_size * (2 ** 20):
                sum_files_size += small_files_df.iloc[index_largest]['size']
                index_list.append(index_largest)
                index_largest -= 1
            
            while sum_files_size + small_files_df.iloc[index_smallest]['size'] < self.block_size * (2 ** 20) and index_smallest <= index_largest:
                sum_files_size += small_files_df.iloc[index_smallest]['size']
                index_list.append(index_smallest)
                index_smallest += 1
                
            list_indexes_list.append(index_list)
            
        for indexes in list_indexes_list:
            df_from_each_file = (pd.read_csv(small_files_df.iloc[ind]['path'], sep=',') for ind in indexes)
            df_merged = pd.concat(df_from_each_file, ignore_index=True)
            df_merged.to_csv(f"csvs/merged_{i}.csv", index = False)
            x = threading.Thread(target=self.auxilary_writing_function, args=(map_function, i, f'mapreducetemp/part-tmp-{i}.csv', os.path.abspath(f"csvs/merged_{i}.csv"), 1))
            threads.append(x)
            x.start()
            i += 1
                
        for thread in threads:
            thread.join()
        
        for index in range(i):
            data = pd.read_csv(f'mapreducetemp/part-tmp-{index}.csv')
            data.to_sql(name='temp_results', con=conn, if_exists='append',index=False)
            
         
        select_all = '''SELECT key, GROUP_CONCAT(value)
                        FROM temp_results
                        GROUP BY key
                        ORDER BY key'''
        
        rows = self.cursor.execute(select_all).fetchall()
        keys = [row[0] for row in rows]
        documents = [row[1] for row in rows]
        
        threads = list()
        for index, key, documents in zip(range(len(rows)), keys, documents):
            x = threading.Thread(target=self.auxilary_writing_function, args=(reduce_function, index, 'mapreducefinal/part-{index}-final.csv', key, documents))
            threads.append(x)
            x.start()

        for index, thread in enumerate(threads):
            thread.join()
        
        return 'MapReduce Completed' 

In [9]:
# implement all of the class here

class MapReduceEngine():
    def __init__(self, connection, cursor):
        self.conn = connection
        self.cursor = cursor
    
    def auxilary_writing_function(self, function, index, title, *args):
        return_val = function(*args)
        with open(eval(f'f"{title}"'), 'w') as f:
            f.write('key,value\n')
            for val in return_val:
                f.write(f'{val}\n')
    
    def execute(self, input_data, map_function, reduce_function, params):

        threads = list()
        for index, key in zip(range(len(input_data)), input_data):
            x = threading.Thread(target=self.auxilary_writing_function, args=(map_function, index, f'mapreducetemp/part-tmp-{index}.csv', key, params[key]))
            threads.append(x)
            x.start()

        for thread in threads:
            thread.join()
        
        for index in range(len(input_data)):
            data = pd.read_csv(f'mapreducetemp/part-tmp-{index}.csv')
            data.to_sql(name='temp_results', con=conn, if_exists='append',index=False)
            
         
        select_all = '''SELECT key, GROUP_CONCAT(value)
                        FROM temp_results
                        GROUP BY key
                        ORDER BY key'''
        
        rows = self.cursor.execute(select_all).fetchall()
        keys = [row[0] for row in rows]
        documents = [row[1] for row in rows]
        
        threads = list()
        for index, key, documents in zip(range(len(rows)), keys, documents):
            x = threading.Thread(target=self.auxilary_writing_function, args=(reduce_function, index, f'mapreducefinal/part-{index}-final.csv', key, documents))
            threads.append(x)
            x.start()

        for index, thread in enumerate(threads):
            thread.join()
        
        return 'MapReduce Completed'

<br><br><br><br>

# Question 3
## Implement the MapReduce Inverted index of the JSON documents

Implement a function `inverted_map(document_name)` which reads the CSV document from the local disc and return a list that contains entries of the form (key_value, document name).

For example, if myCSV4.csv document has values like:  
`{‘firstname’:’John’,‘secondname’:’Rambo’,‘city’:’Palo Alto’}`

Then `inverted_map(‘myCSV4.csv’)` function will return a list:  
`[(‘firstname_John’,’ myCSV4.csv’),(‘secondname_Rambo’,’ myCSV4.csv’), (‘city_Palo Alto’,’ myCSV4.csv’)]`

In [10]:
def inverted_map(document_name, column_index):
    df = pd.read_csv(document_name)
    file_abs_path = os.path.abspath(document_name)
    return [f'{key}, {file_abs_path}' for key in df.iloc[:, column_index - 1]]

Write a reduce function `inverted_reduce(value, documents)`, where the field “documents” contains a list of all CSV documents per given value.   
This list might have duplicates.   
Reduce function will return new list without duplicates.

For example,  
calling the function `inverted_reduce(‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’)`   
will return a list `[‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’]`

In [11]:
def inverted_reduce(key, documents):
    documents = documents.split(',')
    documents = set(documents)
    new_documents = ','.join(documents)
    return [f'{key}, {new_documents}']

<br><br><br><br>
# Question 4
## Testing Your MapReduce

**Create Python list** `input_data` : `[‘myCSV1.csv’,.. ,‘myCSV20.csv’]`

In [12]:
input_data = [f'csvs/myCSV[{i}].csv' for i in range(20)]
params = {val: 1 for val in input_data}

**Submit MapReduce as follows:**

In [13]:
mapreduce = MapReduceEngine(conn, cursor)
t_start = timeit.default_timer()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, params)
t_stop = timeit.default_timer()
print(status, t_stop - t_start) 

MapReduce Completed 0.07785329100000027


In [16]:
mapreduce = MapReduceEngine_smallFiles(conn, cursor, block_size=1)
t_start = timeit.default_timer()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, params)
t_stop = timeit.default_timer()
print(status, t_stop - t_start) 

MapReduce Completed 0.05071649999999295


Make sure that `MapReduce Completed` should be printed and `mapreducefinal` folder should contain the result files.

**Use python to delete all temporary data from mapreducetemp folder and delete SQLite database:**

In [None]:
for i in range(20):
    os.remove(f'mapreducetemp/part-tmp-{i}.csv')

<br><br><br><br>

# Question 5
# Final Thoughts

The phase where `MapReduceEngine` reads all temporary files generated by maps and sort them to provide each reducer a specific key is called the **shuffle step**.

Please explain **clearly** what would be the main problem of MapReduce when processing Big Data, if there is no shuffle step at all, meaning reducers will directly read responses from the mappers.

            If you say "I dont know" you will get 2 points :)

As I understand it, lacking a shuffle means that there is no guarantee that the reduce will be calculated on a machine which is close to its data -- the result of the map function -- therefore data would have to be transferred over the net, which is a problem the platform (say, hadoop) is trying to solve.

<br><br><br><br>
Good Luck :)