# Big Data Platform
## Final Project: Small files and Map Reduce

**By:**  

Aviv Ples, 318357233  
Itay Shapira, 034250043

**Imports**

In [1]:
# general
import os
import time
import random
import warnings
import threading
import shutil

# ml
import numpy as np
import scipy as sp
import pandas as pd

# visual
import seaborn as sns
import matplotlib.pyplot as plt

# notebook
from IPython.display import display
from tqdm.notebook import tqdm

**Hide Warnings**

In [2]:
warnings.filterwarnings('ignore')

**Set Random Seed**

In [3]:
random.seed(123)

# Creation of Random Small Files

We work locally as this is a prototype.

We create 500 different CSV files in this format:  `myCSV[Number].csv`, where each file contains a random number of records, $10 - 10,000$ (due to limited computing resources)

The schema is `(‘firstname’,’secondname’,city’)`  

Values randomly chosen from the lists: 
- `firstname` : `[John, Dana, Scott, Marc, Steven, Michael, Albert, Johanna]`  
- `city` : `[New York, Haifa, München, London, Palo Alto,  Tel Aviv, Kiel, Hamburg]`  
- `secondname`: any value  

In [4]:
def generate_random_name():
    """
    Generate a random name: First character is uppercase and
    alphabetic and the rest is series of lower case characters of length
    1 - 12. Giving us an output string of a length 2 - 13

    Returns:
    str: Randomly generated name
    """
    return random.choice(string.ascii_uppercase) + \
           ''.join(random.choices(string.ascii_lowercase, k=random.choice(range(1,12))))

In [5]:
import string

firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto',  'TelAviv', 'Kiel', 'Hamburg']
# Use list comprehension in order to create a list of 8 randomly generated names
secondname = [generate_random_name() for _ in range(8)] # please use some version of random

print(secondname)

['Bckcxa', 'Riweiigalc', 'Tbilxcdu', 'Axogvt', 'Kufpnvikun', 'Vqrxmrjate', 'Creikbloi', 'Gcqajfnsxqz']


In [6]:
def generate_csvs(number_of_csvs):
    """
    Generate number_of_csvs csv files with a random number, 10-10,000
    of randomly generated rows for the defined columns by randomly picking
    a value from the lists of possible values for each column.

    Parameters:
    number_of_csvs (int): number of csv files to generate
    """
    for i in tqdm(range(1,number_of_csvs+1)):
        df = pd.DataFrame(columns=['firstname', 'secondname', 'city'])
        n = np.random.randint(10, 10001)
        df['firstname'] = np.random.choice(firstname, n)
        df['secondname'] = np.random.choice(secondname, n)
        df['city'] = np.random.choice(city, n)
        df.to_csv(f'myCSV{i}.csv', index=False)

In [7]:
generate_csvs(500)

  0%|          | 0/500 [00:00<?, ?it/s]

Use python to Create `mapreducetemp` and `mapreducefinal` folders

In [8]:
os.makedirs("mapreducetemp", exist_ok=True)
os.makedirs("mapreducefinal", exist_ok=True)

# Merge Small Files

Considering this is a prototype we do the merging as follows:

1. We segregate the large files from the small files
2. Sort the small files into the merging queue
3. Small files are put into the queue with the maximum merge limit
4. Files are placed into the mergin queue until the queue size becomes equal to the merge criteria
5. Finally we merge the files in each queue into one

<br>

This method only covers a prototype solution for CSV files

In [9]:
# Define the merge limit to 210 KB since the max file size is 230 KB in our case
merge_limit = 210 * 1024

In [10]:
# Create folder to save the large files and merged files

os.makedirs("large_files", exist_ok=True)

In [11]:
# Segregate large files from small files
large_files = [_ for _ in os.listdir('.') if _.endswith('.csv') \
               and os.path.getsize(_) >= merge_limit]

# Copy large files into the new folder
# In our case we copy so we can time MapReduce on the small files
# as well as the large and merged files
for large_file in large_files:
    shutil.copyfile(large_file, f"large_files/{large_file}")


small_files = [_ for _ in os.listdir('.') if _.endswith('.csv') \
               and os.path.getsize(_) < merge_limit]

In [12]:
# Lets use the Worst fit strategy to sort the small files
# Hence, the maximum number of queues needed is the number of small files

def worst_fit(files, merge_limit):
    file_sizes = [os.path.getsize(file) for file in files]
    queues_file = []
    queues_size = []

    # Find the best queue that can accommodate file_size
    for i in tqdm(range(len(file_sizes))):
        # Initialize maximum space left and index of worst queue
        mx, wi = -1, 0
        
        for j in range(len(queues_size)):
            if (queues_size[j] >= file_sizes[i]) and (queues_size[j] - file_sizes[i] > mx):
                wi = j
                mx = queues_size[j] - file_sizes[i]

        # If no queue could accomodate file_size create a new queue
        if mx == -1:
            queues_size.append(merge_limit - file_sizes[i])
            queues_file.append([files[i]])
        else:
            queues_size[wi] -= file_sizes[i]
            queues_file[wi].append(files[i])

    return queues_file, queues_size

In [13]:
merged_files, merged_files_size = worst_fit(small_files, merge_limit)

  0%|          | 0/458 [00:00<?, ?it/s]

In [14]:
# Merge the CSV files into one file
for i, to_merge in tqdm(enumerate(merged_files), total=len(merged_files)):
    df = pd.concat(map(pd.read_csv, to_merge), ignore_index=True)
    df.to_csv(f'large_files/myMergedCSV{i}.csv', index=False)

  0%|          | 0/277 [00:00<?, ?it/s]

# MapReduceEngine

Write Python code to create an SQLite database with the following table

`TableName: temp_results`   
`schema: (key:TEXT,value:TEXT)`

In [15]:
import sqlite3 as sql

In [16]:
def create_connection(db_file):
    """
    Create an sql connection to the db by the given db file

    Parameters:
    db_file (str): Path of the db file to use to make a connection to the db

    Returns:
    conn (sql.Connection): Connection to db
    """
    conn = sql.connect(db_file)
    return conn

def create_table(conn, create_table_sql):
    """
    Create a table in the db with the given sql execution code

    Parameters:
    conn (sql.Connection): connection to db
    create_table_sql (str): sql execution code for table creation
    """
    c = conn.cursor()
    c.execute(create_table_sql)
    conn.commit()
    c.close()

In [17]:
sql_create_table = """ CREATE TABLE IF NOT EXISTS temp_results (
                           key TEXT,
						   value TEXT
                           ); """

# create a database connection
sql_conn = create_connection("mydb.db")

# create table
create_table(sql_conn, sql_create_table)

**Python class** `MapReduceEngine` with method:

`def execute(input_data, map_function, reduce_function)`:

- `input_data`: is an array of elements
- `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value) 
- `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

<br>

`execute(...)` function:

<br>

1. For each key  from the  input_data, start a new Python thread that executes map_function(key) 
<br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread.
<br>
3. Keep the list of all threads and check whether they are completed.
<br>
4. Once all threads completed, load content of all CSV files into the temp_results table in SQLite.

    Remark: Easiest way to loop over all CSV files and load them into Pandas first, then load into SQLite  
    `data = pd.read_csv(path to csv)`  
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
<br><br>

5. **SQL statement** that generates a sorted list by key of the form `(key, value)` where value is concatenation of ALL values in the value column that match specific key. For example, if table has records
<table>
    <tbody>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV1.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">Dana</td>
                <td style="text-align:center">myCSV5.csv</td>
            </tr>
            <tr>
                <td style="text-align:center">John</td>
                <td style="text-align:center">myCSV7.csv</td>
            </tr>
    </tbody>
</table>

    Then SQL statement will return `(‘John’,’myCSV1.csv, myCSV7.csv’)`
<br>
6. **Start a new thread** for each value from the generated list in the previous step, to execute `reduce_function(key,value)` 
<br>    
7. Each thread will store results of reduce_function into `mapreducefinal/part-X-final.csv` file  
<br>
8. Keep list of all threads and check whether they are completed  
<br>
9. Once all threads completed, print on the screen `MapReduce Completed` otherwise print `MapReduce Failed`

In [18]:
from concurrent.futures import ProcessPoolExecutor, as_completed
from threading import current_thread
import logging

In [19]:
# Setup logger file
logging.basicConfig(filename='mapreduce.log', filemode='w+',
                    format='%(asctime)s | %(threadName)s | %(levelname)s | %(message)s',
                    level=logging.INFO)

In [20]:
def thread_wrapper(function, save_path, value):
    """
    A wrapper to run the threads with logging capability. If the thread fails
    raise the exception.

    Parameters:
    function: function to run by thread
    save_path (str): path to save the csv
    value (tuple): value to run the function on and thread_id
    """
    data, thread_id = value
    current_thread().name = f"{function.__name__} thread {thread_id}"
    logging.info('Started')
    try:
        # If tuple then it means it's the reduce function
        if type(data) is tuple:
            res = function(*data)
            df = pd.DataFrame(res)
            df.to_csv(f'{save_path}/part-{thread_id}-final.csv',
                        index=False)
        # The map function
        else:
            res = function(data)
            df = pd.DataFrame(res, columns=['key', 'value'])
            df.to_csv(os.path.join(save_path,
                                   f'part-tmp-{thread_id}.csv'),
                      index=False)
    except Exception as e:
        logging.error(e)
        raise e 

    logging.info('Ended')

def pool_executor_wrapper(function, data, pbar, num_of_threads, save_path):
    """
    A wrapper to execute the pool of threads

    Parameters:
    function: function to let the thread run
    data: data to run the function on
    pbar (tqdm): the progress bar we want to update as threads complete
    num_of_threads (int): the max number of threads we want to run
    save_path (str): path where we want to save our results

    Returns:
    str: if a thread fails returns a fail message otherwise returns nothing
    """
    # Define the pool executor and start the threads
    with ProcessPoolExecutor(max_workers=num_of_threads) as executor:
        futures = list(map(lambda x: executor.submit(thread_wrapper, function, save_path, x),
                           list(zip(data, range(1, num_of_threads + 1)))))
        
        # Go over the list of threads and check the completion status of each
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                return "MapReduce Failed"
            
            pbar.update(1)
        

In [21]:
class MapReduceEngine:
    def execute(self, input_data, map_function, reduce_function):
        """
        Execute the mapreduce steps: map, shuffle and sort, reduce
        Parallelize the map and reduce steps by creating threads to run the 
        map and reduce function in parallel on the partitioned data

        Parameters:
        input_data (dict): Dictionary to apply map_function on
        map_function: Map function to apply on input_data
        reduce_function: Reduce function on the shuffled and sorted output of
                         map_function
        
        Returns:
        str: Completion status message
        """

        # Start a thread for each key in input_data and run map_function on it
        num_of_threads = len(input_data)
        mapreducetemp_path = "mapreducetemp"

        # Define a progress bar to visualize progress of threads
        with tqdm(total=num_of_threads, desc = 'Map Progress Bar') as pbar:
            map_status = pool_executor_wrapper(map_function, input_data, pbar,
                                               num_of_threads, mapreducetemp_path)

        if map_status is not None:
            return map_status

        # Load content of all CSV files into temp_results table
        for file in os.listdir(mapreducetemp_path):
            if file.endswith(".csv"):
                data = pd.read_csv(os.path.join(mapreducetemp_path, file))
                data.to_sql('temp_results', sql_conn, if_exists='append', index=False)

        # SQL statement to generate a sorted list by key
        sql_generate_sorted_list = """
                                   SELECT key, GROUP_CONCAT(value)
                                   FROM temp_results
                                   GROUP BY key
                                   ORDER BY key;
                                   """
        sorted_list = sql_conn.execute(sql_generate_sorted_list).fetchall()

        # Start a thread for each value from the generated list
        num_of_threads = len(sorted_list)

        # Define a progress bar to visualize progress of threads
        with tqdm(total=num_of_threads, desc = 'Reduce Progress Bar') as pbar:
            reduce_status = pool_executor_wrapper(reduce_function, sorted_list, pbar,
                                                  num_of_threads, 'mapreducefinal',
                                                  is_map=False)
            
        if reduce_status is not None:
            return reduce_status

        return "MapReduce Completed"

# Implement the MapReduce Inverted index of the JSON documents

`inverted_map(document_name)`:

Reads the CSV document from the local disc and return a list that contains entries of the form (key_value, document name).

For example, if myCSV4.csv document has values like:  
`{‘firstname’:’John’,‘secondname’:’Rambo’,‘city’:’Palo Alto’}`

Then `inverted_map(‘myCSV4.csv’)` function will return a list:  
`[(‘firstname_John’,’ myCSV4.csv’),(‘secondname_Rambo’,’ myCSV4.csv’), (‘city_Palo Alto’,’ myCSV4.csv’)]`

In [22]:
def inverted_map(document_name):
    """
    Reads the csv document from the local disk and return a list that contains
    entries of the form (key_value, document name)

    Parameters:
    document_name (csv): csv document name to run the map function on
    column_index (int): index of column to create list for

    Returns:
    entries (lst): list of tuples of the defined form
    """
    entries = []
    df = pd.read_csv(document_name)
    for _, row in df.iterrows():
        for col in df.columns:
            entries.append(("_".join([col, row[col]]), document_name))
    return entries

Reduce function:

`inverted_reduce(value, documents)`, where the field “documents” contains a list of all CSV documents per given value.   
This list might have duplicates.   
Reduce function will return new list without duplicates.

For example,  
calling the function `inverted_reduce(‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’)`   
will return a list `[‘firstname_Albert’,’myCSV2.csv, myCSV5.csv,myCSV2.csv’]`

In [23]:
def inverted_reduce(value, documents):
    """
    Take the two parameters given and put them together in a list and get rid of
    the duplicates in the parameter documents

    Parameters:
    value (str): column name and value in the form of columnname_value
    documents(str): list of all csv documents per given value

    Returns:
    lst: concatenation of value and the list of csv documents with no duplicates
    """
    return [value] + list(set(documents.split(",")))

# Testing The MapReduce

In [24]:
from time import time

**Run MapReduce on the merged files and large files**

In [25]:
mapreduce = MapReduceEngine()

In [26]:
large_files_start = time()
large_files_data = [f"large_files/{_}" for _ in os.listdir('large_files') if _.endswith('.csv')]
large_files_status = mapreduce.execute(large_files_data, inverted_map, inverted_reduce)
large_files_elapsed_time = np.round(time() - large_files_start, 3)
print(large_files_status)
print(f"The MapReduce took {large_files_elapsed_time} seconds on the merged and large files")

Map Progress Bar:   0%|          | 0/319 [00:00<?, ?it/s]

Reduce Progress Bar:   0%|          | 0/24 [00:00<?, ?it/s]

MapReduce Completed
The MapReduce took 261.533 seconds on the merged and large files


**Run MapReduce on the original small files**

In [27]:
small_files_start = time()
small_files_data = [_ for _ in os.listdir('.') if _.endswith('.csv')]
small_files_status = mapreduce.execute(small_files_data, inverted_map, inverted_reduce)
small_files_elapsed_time = np.round(time() - small_files_start, 3)
print(small_files_status)
print(f"The MapReduce took {small_files_elapsed_time} seconds on the small files")

Map Progress Bar:   0%|          | 0/500 [00:00<?, ?it/s]

Reduce Progress Bar:   0%|          | 0/24 [00:00<?, ?it/s]

MapReduce Completed
The MapReduce took 301.918 seconds on the small files


We can clearly see that merging the small files **reduced** the runtime of the MapReduce

**Use python to delete all temporary data from mapreducetemp folder and delete SQLite database:**

In [28]:
mapreducetemp_path = "mapreducetemp"
for file in os.listdir(mapreducetemp_path):
    os.remove(os.path.join(mapreducetemp_path, file))
os.remove("mydb.db")