# Big Data Platforms
## Assignment 2: MapReduce

**The goal of this assignment is to:**
- Understand and practice the details of MapReduceEngine

**Prerequisites**

In [1]:
# example
!pip install --quiet zipfile36

**Imports**

In [2]:
# general
import os
import time
import csv
import random
import warnings
import sqlite3
import glob
import queue 
import concurrent.futures
from threading import Thread
from queue import Queue
from time import sleep
import pathlib
import regex as re

# ml
import numpy as np
import scipy as sp
import pandas as pd

# visual
import seaborn as sns
import matplotlib.pyplot as plt

# notebook
from IPython.display import display

**Hide Warnings**

In [3]:
warnings.filterwarnings('ignore')

**Disable Autoscrolling**

In [4]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

**Set Random Seed**

In [5]:
random.seed(123)

# Initial Steps

Write Python code to create 20 different CSV files in this format:  `myCSV[Number].csv`, where each file contains 10 records. 

The schema is `(‘firstname’,’secondname’,city’)`  

Values should be randomly chosen from the lists: 
- `firstname` : `[John, Dana, Scott, Marc, Steven, Michael, Albert, Johanna]`  
- `city` : `[New York, Haifa, München, London, Palo Alto,  Tel Aviv, Kiel, Hamburg]`  
- `secondname`: any value  

In [6]:
# declared lists that values should be taken from
firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
secondname = ['Wang', 'Vuitton', 'Gabanna', 'Kors', 'Jacobs', 'Stileto', 'Armani', 'Boss']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiel', 'Hamburg'] 

# generate 20 files
for file_number in range(1, 21):
  
  # assemble file name according to format
  file_name = "myCSV[%s].csv" % file_number

  # open the file and write 10 lines
  with open(file_name, 'w') as file:

    writer = csv.writer(file)
    writer.writerow(["firstname", "secondname", "city"])

    for row in range(0, 10):
        row = [random.choice(firstname), random.choice(secondname), random.choice(city)]
        writer.writerow(row)


Create mapreducetemp and mapreducefinal folders in your laptop

In [7]:
# create directories
os.mkdir('mapreducetemp') 
os.mkdir('mapreducefinal') 

# Task 1: Map Reduce Engine

1 - Write Python code to create an SQLite database with the following table

`TableName: temp_results`   
`schema: (key:TEXT,value:TEXT)`

In [8]:
# initialize a connection object
conn = None

# open / create new database
try:
    conn = sqlite3.connect("mydb.db")

except Error as e:
    print(e)

# if it opens without errors, 
finally:

    if conn:
      # create a new table and execute the statement
      cursor = conn.cursor()
      create_table_statement = ''' CREATE TABLE IF NOT EXISTS temp_results(key TEXT, value TEXT)'''
      cursor.execute(create_table_statement)
      conn.commit()
      conn.close()

2 - Create a Python class `MapReduceEngine` with method `def execute(input_data, map_function, reduce_function)`, such that: `input_data`: is an array of elements, `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value), `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

**Implement** the following functionality in the `execute(...)` function:

1. For each key  from the  input_data, start a new Python thread that executes map_function(key) 
<br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread.
<br>
3. Keep the list of all threads and check whether they are completed.
<br>
4. Once all threads completed, load content of all CSV files into the temp_results table in SQLite.

  Remark: The easiest way to loop over all CSV files and load them into Pandas first, then load into SQLite  
    `data = pd.read_csv(path to csv)`  
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
<br>

5. Write SQL statement that generates a sorted list by key of the form `(key, value)` where value is concatenation of ALL values in the value column that match specific key. For example, if table has records
<table>
    <tbody>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV1.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Dana</td>
                <td style="text-align:center">myCSV5.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV7.csv</td>
            </tr>
    </tbody>
</table>

    Then SQL statement will return `(‘John’,’myCSV1.csv, myCSV7.csv’)`  
    Remark: use GROUP_CONCAT and also GROUP BY ORDER BY
<br>
6. Start a new thread for each value from the generated list in the previous step, to execute `reduce_function(key,value)` 
<br>    
7. Each thread will store results of reduce_function into `mapreducefinal/part-X-final.csv` file  
<br>
8. Keep list of all threads and check whether they are completed  
<br>
9. Once all threads completed, print on the screen `MapReduce Completed`.

In [9]:
class MapReduceEngine():

    '''
    input_data: array of elements [‘myCSV1.csv’,.. ,‘myCSV20.csv’]
    map_function: a pointer to the function that returns a list where each entry of the form (key,value)
    reduce_function: a pointer to the function that returns a list where each entry of the form (key,value)
    params: parameters to the map_function of the form params={key:value}
    '''
    @staticmethod
    def execute(input_data, map_function, reduce_function, params):

      threads_map = []
      queue_map = Queue()

      # for each key from the input_data
      for document_name in input_data:

        # initialize the thread a new Python thread that executes map_function(key)
        thread = Thread(target=map_function, args=(document_name, params['column'], queue_map))

        # keep list of all threads
        threads_map.append(thread)

        # start the thread
        thread.start()

      # join thread together to block until they finish
      for thr in threads_map:
        thr.join()

      # store results of map_function into mapreducetemp/...
      regex = re.compile(r'\d+')

      # iterate over every CSV that was created
      for document_name in input_data:

        # unique id per each thread
        unique_id = regex.findall(document_name)[0]

        # extract results
        map_results = queue_map.get()

        # create and open a temp file, write results in
        file_name = "mapreducetemp/part-tmp-%s.csv" % unique_id
        with open(file_name, 'w') as file:
          writer = csv.writer(file)
          writer.writerow(["key", "value"])
          # write every list value as entry
          for entry in map_results:
            writer.writerow(entry)

      # check whether they are completed
      for thr in threads_map:
        if not thr.isAlive():
          continue
        else:
          print('Somethings wrong')

      # once all threads are completed, load content of all CSV files into temp_results table
      conn = sqlite3.connect("mydb.db")
      path = "mapreducetemp"
      for filename in os.listdir(path):

        data = pd.read_csv(path+'/'+filename)
        data.to_sql('temp_results', conn, if_exists='append', index=False)

      conn.commit()

      # Create a SQL connection
      cur = conn.cursor()

      # generate sorted list (key, value) where value is concat of ALL values that match key
      db_df = pd.read_sql_query("SELECT key, GROUP_CONCAT(value) as value FROM temp_results GROUP BY key ORDER BY key", conn)
      conn.close()

      threads_reduce = []
      queue_reduce = Queue()

      # start a new thread for each key from generated list 
      for index, row in db_df.iterrows():
        key = row['key']
        value = row['value']

        # start a new Python thread that executes map_function(key)
        thread = Thread(target=reduce_function, args=(key, value, queue_reduce))

        # keep list of all threads
        threads_reduce.append(thread)

        thread.start()

      # join thread together to block until they finish
      for thr in threads_reduce:
        thr.join()

      for thr in range(len(threads_reduce)):
        reduce_results = queue_reduce.get()

        file_name = "mapreducefinal/part-%s-final.csv" % thr
        with open(file_name, 'w') as file:
          writer = csv.writer(file)
          writer.writerow(["key", "value"])

          # write every list value as entry
          #for entry in reduce_results:
          writer.writerow(reduce_results)

      # check whether they are completed
      for thr in threads_map:
        if not thr.isAlive():
          continue
        else:
          print('Somethings wrong')
      
      return 'MapReduce Completed'

# Task 2: Implement the MapReduce inverted index of the JSON documents

1 - Implement a function `inverted_map(document_name, column_index)` which reads the CSV document from the local disc and returns  a list that contains entries of the form (key_value, document_name) for the specific column_index provided.

For example, if column_index = 1 and myCSV11.csv document has values like:
<table>
    <tbody>
            <tr>
                <td style="text-align:center">firstname</td>
                <td style="text-align:center">secondname</td>
                <td style="text-align:center">city</td>
            </tr>
            <tr>
                <td style="text-align:center">Michael</td>
                <td style="text-align:center">Vernik</td>
                <td style="text-align:center">Tel Aviv</td>
            </tr>
            <tr>
                <td style="text-align:center">Johanna</td>
                <td style="text-align:center">Vernik</td>
                <td style="text-align:center">Hamburg</td>
            </tr>
            <tr>
                <td style="text-align:center">Steven</td>
                <td style="text-align:center">Friedman</td>
                <td style="text-align:center">Palo Alto</td>
            </tr>
                        <tr>
                <td style="text-align:center">...</td>
                <td style="text-align:center">...</td>
                <td style="text-align:center">...</td>
            </tr>
    </tbody>
</table>

Then `inverted_map(‘myCSV11.csv’, column_index=1)` function will return a list of the form:  

<table>
    <tbody>
            <tr>
                <td style="text-align:center">key</td>
                <td style="text-align:center">value</td>
            </tr>
            <tr>
                <td style="text-align:center">Michael</td>
                <td style="text-align:center">/Users/gilv/Dev/DataStore/csv/myCSV11.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Johanna</td>
                <td style="text-align:center">/Users/gilv/Dev/DataStore/csv/myCSV11.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Steven</td>
                <td style="text-align:center">/Users/gilv/Dev/DataStore/csv/myCSV11.csv</td>
            </tr>
                        <tr>
                <td style="text-align:center">...</td>
                <td style="text-align:center">...</td>
            </tr>
    </tbody>
</table>

In [10]:
# Read CSV file, return a list that contains entries: (key_value, document_name)
# for the specific column index provided
def inverted_map(document_name, column_index, queue_map):

  # set column mapping
  column_mapping = {1: 'firstname', 2: 'secondname', 3: 'city'}
  selected_column = column_mapping[column_index]

  # read csv file and loads into dataframe
  csv_data = pd.read_csv(document_name)

  # list that contains entries of the form (key_value, document_name)
  results = []

  # for every value in selected column, assemble results
  for index, value in csv_data[selected_column].items():
    entry = (value, document_name)
    results.append(entry)

  # [('John', '/content/myCSV[1].csv'), ('Albert', '/content/myCSV[1].csv'), ...]
  queue_map.put(results)
  return results

2 - Write a reduce function `inverted_reduce(key, documents)`, where the field “documents” contains a list of all CSV documents per given key.   
This list might have duplicates.   
Reduce function will return new list without duplicates.

In [11]:
# documents is a list of CSV documents per given key (might have duplicates)
# Return a new list without duplicates
def inverted_reduce(key, documents, queue_reduce):

  # remove duplicates by creating a dictionary using list items as keys
  documents = documents.split(",")
  documents = list(set(documents))

  queue_reduce.put((key, documents))
  return (key, documents)

# Task 3: Submit your first MapReduce

1 - Create Python list in *input_data: ['myCSV1.csv', ..., myCSV19.csv']*
2 - Submit MapReduce as follows:

  `mapreduce = MapReduceEngine()`<br>
  `status = mapreduce.execute(input_data, inverted_map, inverted_index, params={'column':1})`<br>
  `print(status)`<br>

3 - 'MapReduce Completed' should be printed and **mapreducefinal** folder should contain the result files.

4 - Delete all temporary data from **mapreducetemp** folder and delete SQLite datbase.

In [12]:
# Create Python list in input_data
input_data = glob.glob("/content/*.csv")
input_data

['/content/myCSV[11].csv',
 '/content/myCSV[3].csv',
 '/content/myCSV[17].csv',
 '/content/myCSV[9].csv',
 '/content/myCSV[14].csv',
 '/content/myCSV[16].csv',
 '/content/myCSV[1].csv',
 '/content/myCSV[8].csv',
 '/content/myCSV[12].csv',
 '/content/myCSV[15].csv',
 '/content/myCSV[20].csv',
 '/content/myCSV[6].csv',
 '/content/myCSV[10].csv',
 '/content/myCSV[19].csv',
 '/content/myCSV[13].csv',
 '/content/myCSV[4].csv',
 '/content/myCSV[18].csv',
 '/content/myCSV[7].csv',
 '/content/myCSV[2].csv',
 '/content/myCSV[5].csv']

In [13]:
mapreduce = MapReduceEngine()
status = mapreduce.execute(input_data, inverted_map, inverted_reduce, params={'column':1})
print(status)

       key                                              value
0   Albert  /content/myCSV[10].csv,/content/myCSV[10].csv,...
1     Dana  /content/myCSV[10].csv,/content/myCSV[1].csv,/...
2  Johanna  /content/myCSV[10].csv,/content/myCSV[12].csv,...
3     John  /content/myCSV[10].csv,/content/myCSV[12].csv,...
4     Marc  /content/myCSV[10].csv,/content/myCSV[10].csv,...
5  Michael  /content/myCSV[12].csv,/content/myCSV[12].csv,...
6    Scott  /content/myCSV[12].csv,/content/myCSV[12].csv,...
7   Steven  /content/myCSV[10].csv,/content/myCSV[17].csv,...
MapReduce Completed


In [14]:
# Delete all temporary data from mapreducetemp folder and delete SQLite datbase.
if os.path.isdir('/content/mapreducetemp'):
  path = '/content/mapreducetemp'
  for filename in os.listdir(path):
    os.remove(path+'/'+filename)
  os.rmdir('mapreducetemp')

if os.path.isfile('mydb.db'):
  os.remove('mydb.db')

# Task 4:

The phase where MapReduceEngine reads all temporary files generated by maps and sort them to provide each reducer a specific key is called the shuffle step. What would be the main problem of MapReduce when processing Big Data, if there is no shuffle step at all? (meaning reducers will directly read responses from the mappers).

### **Answer:**

The shuffle and sort step moves all files with the same key to a specific reducer. This ensures that each reducer will receive all records associated with a specific key so that it can apply the reduce function to them. When processing big data, the reducers rely on the fact that they receive a key and the complete set of values for that key, to return the (key, value) pair for that key.
This also allows reduce functions to be processed on different machines, and can thus be executed in parallel, making processing much faster.
If there was no shuffle step, we would not be able to guarantee that all files with the same key will arrive at the same reducer (they would be distributed randomly across reducers and/or machines). If this were the case when the reduce function runs, it would not give the correct result as it relies on all matching records to be in order and passed to the same reducer.
We would end up with incomplete (key, value) pairs that do not represent the true desired values.
As well, the parallelism of MapReduce would not be able to operate.