# Big Data Platform
## Small files and MapReduce

**By:**  

Eyal Michaeli, 207380528
Tzach Larboni, 302673355

<br><br>

#### Abstract: <br>
In the following notebook we will implement the Consolidated Batch Files solution to the small files problem of MapReduce. We chose to implement a slight modification of the word-counter implementation of MapReduce, in which we will count the number of appearances of each first name in the origin files. <br>
First, we will generate multiple small files and run MapReduce on them while timing the run. Later, we will implement a consolidation function and run the same MapReduce code on the consolidated files while timing it as well. <br>
Our implementation shows that the proposed solution indeed makes MapReduce more efficient and thus resolves the small files problem. For further discussion of the solution, please see the relevant section in the attached paper.

**Imports**

In [1]:
import os
import random
import warnings
import concurrent
import sqlite3
import traceback
import shutil
import time
from pathlib import Path
from typing import List

# ml
import numpy as np
import pandas as pd

**Hide Warnings**

In [2]:

warnings.filterwarnings('ignore')

**Disable Autoscrolling**

In [3]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

**Set Random Seed**

In [53]:
random.seed(123)

## Implementation

In [54]:
# insert your path here:
my_path = "/Users/eyalmichaeli/Desktop/School/Master's/IDC_masters/BigDataPlatforms/Final Project - MapReduce/output" 
# "/Users/mymac/IDC_masters/big_data_platforms_ex2"

path = Path(my_path)

In [55]:
firstname = ['John', 'Dana', 'Scott', 'Marc', 'Steven', 'Michael', 'Albert', 'Johanna']
city = ['NewYork', 'Haifa', 'Munchen', 'London', 'PaloAlto', 'TelAviv', 'Kiel', 'Hamburg']
secondname = ['Lennon', 'McCartney', 'Starr', 'Harrison', 'Ono', 'Sutcliffe', 'Epstein', 'Preston']

def create_csvs(number_of_csvs: int, rows_per_csv) -> None:
    """
    Creates <number_of_csvs> csv files, with the firstname, city, secondname defined above 
    """
    csvs_path = path / "csvs"
    csvs_path.mkdir(parents=True, exist_ok=True)
    csv_paths = list()
    for i in range(0, number_of_csvs):
        temp_df = pd.DataFrame({"firstname": np.random.choice(firstname, rows_per_csv),
                                "secondname": np.random.choice(secondname, rows_per_csv),
                                "city": np.random.choice(city, rows_per_csv),
                                })

        csv_path = str(csvs_path / f"myCSV{i+1}.csv")   
        csv_paths.append(csv_path)                  
        temp_df.to_csv(csv_path, index=False)

    print(f"Created {number_of_csvs} CSV files")

    return csv_paths

Use python to Create `mapreducetemp` and `mapreducefinal` folders

In [56]:
mapreducetemp_folder = path / "mapreducetemp"
mapreducetemp_folder.mkdir(parents=True, exist_ok=True)

mapreducefinal_folder = path / "mapreducefinal"
mapreducefinal_folder.mkdir(parents=True, exist_ok=True)

print("Created folders")

Created folders


<br><br><br>
## MapReduceEngine

The following Python code creates an SQLite database with the following table

`TableName: temp_results`   
`schema: (key:TEXT, value:TEXT)`

In [57]:
# Creates the database "temp_results.db", then closes it.
def create_db(db_path):
    conn = None
    cursor = None
    try:
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS temp_results (key, value);")

    except Exception:
        traceback.print_exc()

    finally:
        print(f"Created a Database, in the following path: {db_path}")
        cursor.close()
        if conn:
            conn.close()


create_db(str(path / "temp_results.db"))


Created a Data Base, in path: /Users/eyalmichaeli/Desktop/School/Master's/IDC_masters/BigDataPlatforms/Final Project - MapReduce/output/temp_results.db


1. **Explanation about the following Python Class and its method:** The class is `MapReduceEngine` and the method is `def execute(input_data, map_function, reduce_function)` such that:
    - `input_data`: is an array of elements
    - `map_function`: is a pointer to the Python function that returns a list where each entry of the form (key,value) 
    - `reduce_function`: is pointer to the Python function that returns a list where each entry of the form (key,value)

In more detail, the execute method implements:
<br>
1. For each key of `input_data`, start a new Python thread that executes map_function(key) <br>
2. Each thread will store results of the map_function into mapreducetemp/part-tmp-X.csv where X is a unique number per each thread. <br>
3. Keep the list of all threads and check whether they are completed.<br>
4. Once all threads completed, load the content of all CSV files into the temp_results table in SQLite.

    Remark: The easiest way is to loop over all CSV files and load them into Pandas DataFrame first and then load them into SQLite
    `data = pd.read_csv(path to csv)`
    `data.to_sql(‘temp_results’,sql_conn, if_exists=’append’,index=False)`
5. Write SQL statement that generates a sorted list by key of the form `(key, value)`, where `value` is a concatenation of ALL values in the value column that match the specific key.<br>
6. Start a new thread for each value from the generated list in the previous step and execute `reduce_function(key,value)`. <br>
7. Each thread stores the results of reduce_function into `mapreducefinal/part-X-final.csv` file. <br>
8. Keep list of all threads and check whether they are completed  <br>
9. Once all threads completed, print `MapReduce Completed`. Otherwise, print `MapReduce Failed`.

In [58]:

class MapReduceEngine:
    """
    a class that implements MapReduce. Gets an Sqlite connection in its __init__.
    calls the functions: inverted_map and inverted_reduce in its execute method,
    which constitutes the MapReduce engine.
    """

    def __init__(self, conn):
        self.conn = conn

    def execute(self, input_data: List[str], map_function, reduce_function, params: dict, print_count_of_occurances=False):
        thread_list_map, csvs_paths_map, thread_list_reduce, csvs_paths_reduce = list(), list(), list(), list()
        exec_map = concurrent.futures.ThreadPoolExecutor()
        for csv_key in input_data:
            t = exec_map.submit(map_function, csv_key, params['column_index'])
            threads_returns = t.result()
            csv_index = input_data.index(csv_key)  # an index of the relative csv in the input_array
            csv_path = f'{mapreducetemp_folder}/part-tmp-{csv_index}.csv'
            csvs_paths_map.append(csv_path)
            pd.DataFrame(threads_returns).to_csv(csv_path,
                                                 header=['key', 'value'],
                                                 index=False)
            thread_list_map.append(t)

        # wait until the threads are completed
        exec_map.shutdown(wait=True)

        # Once all threads completed, load content of all CSV files into the temp_results table in Sqlite
        for path_to_csv in csvs_paths_map:
            data = pd.read_csv(path_to_csv)
            data.to_sql(name='temp_results', con=self.conn, if_exists='append', index=False)


        results_df = pd.read_sql_query("SELECT key, GROUP_CONCAT(value) as value "
                                       "FROM temp_results "
                                       "GROUP BY key "
                                       "ORDER BY key",
                                       conn)

        exec_reduce = concurrent.futures.ThreadPoolExecutor()
        for res_i in range(len(results_df)):
            try:
                key = results_df["key"].iloc[res_i]
                value = results_df["value"].iloc[res_i]
                t = exec_reduce.submit(reduce_function, key, value, print_count_of_occurances)
                t_results = t.result() # t_results is one list, in which the 1st index is the key and the 2nd is a concat of all of the files it appears in.
                csv_path = f'{mapreducefinal_folder}/part-{res_i}-final.csv'
                csvs_paths_reduce.append(csv_path)
                pd.DataFrame({'key': t_results[0], 'value': t_results[1]}, index=[0]).to_csv(csv_path,
                                                     index=False)
                thread_list_reduce.append(t)

            except Exception:
                print(f"Mapreduce failed for result index: {res_i} with key: {key}, value: {value}")
                traceback.print_exc()
                # close connection to db
                if conn:
                    conn.close()

        # wait until the threads are completed
        exec_reduce.shutdown(wait=True)

        # close connection to db
        if conn:
            conn.close()

        return 'MapReduce Completed'


The following code implements the `inverted_map(document_name)` function, which reads the CSV document from the local disc and returns a list containing entries of the form (key_value, document name).

In [59]:
def inverted_map(document_name: str, column_index: int) -> List[tuple]:
    """
    reads the CSV document from the local disc and returns a list that contains entries of the form (key_value, document name) for the specific column_index provided.
    :param document_name: csv file name.
    :param column_index: column index in the csv file (Note: starting from 1)
    :return: List[tuple] where each tuple contains 2 strings
    """
    csv_path = str(path / 'csvs'/ document_name)
    df = pd.read_csv(csv_path)
    col_series = df[df.columns[column_index-1]]
    csv_path_list = [csv_path] * len(df)
    return list(zip(col_series.values, csv_path_list))

The following code implements the `inverted_reduce(value, documents)` function, where the field `documents` contains a list of all CSV documents per given a value. <br>
This list might have duplicates.<br>
The reduce function will return a list of [key, count_of_occurrences]

In [60]:
keys_count_dict = {}

def inverted_reduce(key: str, documents: str, print_number_of_occurances: bool) -> List[str]:
    """
    reduce function
    :param key: key value (for example: if the column is 'first_name' it could be 'Albert'.
    :param documents: a string (list) of all CSV documents per given key.
    :return: List: [key, count_of_occurances]
    """
    count_of_occurances = len(documents.split(","))
    global keys_count_dict
    keys_count_dict[key] = count_of_occurances

    if print_number_of_occurances:
        print(f"Key: {key}, count of occurances: {count_of_occurances}\n")
        
    return [key, count_of_occurances]


<br><br><br><br>

## Testing Our MapReduce


In order to compare the default implementation of MapReduce to the consolidated run of MapReduce, we will run two executes:
1. Regular MapReduce on all the CSVs, as is.
2. Regular MapReduce, after we have consolidated the CSVs into several bigger files (in terms of rows and size)

First, we will create 1000 small CSVs (with 5 rows) as our input data for the MapReduce job, (which is to count number of appearances of each value in the files):

In [61]:
NUMBER_OF_CSVS = 1000
ROWS_PER_CSV = 5
small_csvs_paths = create_csvs(number_of_csvs=NUMBER_OF_CSVS, rows_per_csv=ROWS_PER_CSV)

Created 1000 CSV files


**We will submit our MapReduce job on the data as-is and time the operation:**

In [62]:
# create an SQL DB connection
conn = sqlite3.connect(str(path / "temp_results.db"))

# create MapReduceEngine instance
mapreduce = MapReduceEngine(conn=conn)

start_time = time.time()

# execute MapReduce on input_data, on first_name (same as HW2)
status = mapreduce.execute(small_csvs_paths,
                           inverted_map,
                           inverted_reduce,
                           params={'column_index': 1},
                           print_count_of_occurances=True)  # assign true if you want the reduce function to print the number of total occurances for each key (also helps to debug)

end_time = time.time()


print(status)
print(f"\nit took {end_time-start_time:.3f} seconds to run on the data as-is")

Key: Albert, count of occurances: 661

Key: Dana, count of occurances: 623

Key: Johanna, count of occurances: 616

Key: John, count of occurances: 612

Key: Marc, count of occurances: 582

Key: Michael, count of occurances: 654

Key: Scott, count of occurances: 615

Key: Steven, count of occurances: 637

MapReduce Completed

it took 19.841 seconds to run on the data as-is


Next, we will submit our MapReduce job on the data after consolidating the small files and time the operation.
First, we will consolidate the small files:

In [63]:
def merge_small_files(csv_files_paths: List[str], min_file_size: int, merged_csvs_folder: str, print_each_file_size=False):
    """
    Reads csv files, ip a csv file is smaller than <min_file_size> (in bytes) than appends the next csv file, until reaches a size that's bigger than <min_file_size>.
    Then, write the new csv files to a new_folder: <merged_csvs_folder>.
    """
    merged_csv_folder = Path(merged_csvs_folder)
    list_of_merged_dfs = list()
    append = False

    # go over the csvs
    for index, csv_file in enumerate(csv_files_paths):
        file_size = Path(csv_file).stat().st_size
            
        temp_df = pd.read_csv(csv_file)

        if append:
            temp_df = last_temp_df.append(temp_df)  # take the temp_df from the last operation, and then append the current temp_df
            file_size += last_file_size
            if print_each_file_size:
                print(f"{file_size:.3f}")

        if file_size < min_file_size:
            append = True
            if index == len(csv_files_paths) - 1:  # if we are in the last file, add it to the CSVs, even if it might not be in size of 2000 or bigger
                list_of_merged_dfs.append(temp_df)
                break # we are in the end, and don't want to append the current temp_df again

        else:
            list_of_merged_dfs.append(temp_df)
            append = False

        # for the append of the dataframes, keep track of this iteration' temp_df & file_size for the next iteration
        last_file_size = file_size
        last_temp_df = temp_df
    
    merged_csvs_paths = list()
    # write the new merged csv files
    for i, merged_df in enumerate(start=1, iterable=list_of_merged_dfs):
        merged_csv_path = merged_csv_folder / f"merged_csv_{i}.csv"
        merged_csvs_paths.append(merged_csv_path)
        merged_df.to_csv(merged_csv_path, index=False)

    return merged_csvs_paths
    


In [64]:
merged_csvs_folder = path / "merged_csvs"
Path(merged_csvs_folder).mkdir(parents=True, exist_ok=True)

start_time = time.time()
merged_csvs_paths = merge_small_files(
                    csv_files_paths=small_csvs_paths, 
                    min_file_size=2000, 
                    merged_csvs_folder=merged_csvs_folder)

end_time = time.time()

print(f"it took {end_time-start_time:.3f} seconds to do the merging")

it took 5.451 seconds to do the merging


Next, we will run MapReduce on it:

In [65]:
print(f"We can see that from {NUMBER_OF_CSVS} CSV files, after merging some of them, we have: {len(merged_csvs_paths)} CSV files (which are bigger in size)")

We can see that from 1000 CSV files, after merging some of them, we have: 65 CSV files (which are bigger in size)


In [66]:
# delete the mapreduce folders created by the last operation
try:
    shutil.rmtree(str(mapreducetemp_folder))
    shutil.rmtree(str(mapreducefinal_folder))
    print("deleted the folders")

except Exception as e:
    print(f'Error: {e.strerror}')

# delete the SQLite database
try:
    os.remove(str(path / 'temp_results.db'))
    print("deleted the db")
except Exception as e:
    print(f'Error: {str(path / "temp_results.db")}, {e.strerror}')


deleted the folders
deleted the db


In [67]:
# create new empty folders
try: 
    mapreducetemp_folder.mkdir(parents=True, exist_ok=True)
    mapreducefinal_folder.mkdir(parents=True, exist_ok=True)
    print("Created empty folders")

except Exception as e:
    print(f'Error: {e.strerror}')
    
former_keys_count_dict = keys_count_dict.copy()  # copy the last dict, to compare later to the new dict
keys_count_dict = {}  # set it back to empty

# create the db again
create_db(str(path / "temp_results.db"))

Created empty folders
Created a Data Base, in path: /Users/eyalmichaeli/Desktop/School/Master's/IDC_masters/BigDataPlatforms/Final Project - MapReduce/output/temp_results.db


In [68]:
def test_dicts(keys_count_dict, new_keys_count_dict):
    """
    A function to make sure the 2 dicts created from the output of the reducers, are the same, between the 2 MapReduce jobs.
    """
    passed = True
    try:          
        assert len(new_keys_count_dict) == len(keys_count_dict)

    except Exception as e:
        passed = False
        print(len(new_keys_count_dict), len(keys_count_dict))
        print("the 2 dicts length are not equal (not the same number of keys)")

    for key in keys_count_dict:
        try:
            assert new_keys_count_dict[key] == keys_count_dict[key]

        except Exception as e:
            passed = False
            print(new_keys_count_dict[key], keys_count_dict[key])
            print(f"For key: {key}, the values are not equal!")
    
    if passed:
        print("All Tests Passed Successfully!")
    
        

In [69]:

# create an SQL DB connection
conn = sqlite3.connect(str(path / "temp_results.db"))

# create MapReduceEngine instance
mapreduce = MapReduceEngine(conn=conn)

start_time = time.time()

# execute MapReduce on input_data, on first_name (same as HW2)
status = mapreduce.execute(merged_csvs_paths,
                           inverted_map,
                           inverted_reduce,
                           params={'column_index': 1},
                           print_count_of_occurances=True)  # assign true if you want the reduce function to print the number of total occurances for each key (also helps to debug)
                           
end_time = time.time()

# unit test: make sure that the dicts are the same
test_dicts(former_keys_count_dict, keys_count_dict)

print(status)

print(f"\nIt took {end_time-start_time:.3f} seconds to run on the merged data")

Key: Albert, count of occurances: 661

Key: Dana, count of occurances: 623

Key: Johanna, count of occurances: 616

Key: John, count of occurances: 612

Key: Marc, count of occurances: 582

Key: Michael, count of occurances: 654

Key: Scott, count of occurances: 615

Key: Steven, count of occurances: 637

All Tests Passed Successfully!
MapReduce Completed

It took 1.839 seconds to run on the merged data


**As we can see, on the data as-is, it takes 15-20 seconds as opposed to on the consolidated data, which on it takes 1-2 seconds.** This difference is meaningful and it showcases that the solution's impact. <br>
We will also note that we had to preprocess the data before, which took about 6-10 seconds. However, it is still quicker than the 15-20 seconds of the origin case - consolidating all of the small files AND running MapReduce still took less than 12 seconds in total.<br>
We would like to emphasize that even if the entire process was not faster than running MapReduce on the origin files, we can still schedule the consolidation to run during off-hours. Hence, in our opinion, it would still a better solution.

Finally, we delete all temporary data from mapreducetemp folder and delete SQLite database:

In [70]:
# delete all temp data from mapreducetemp
try:
    shutil.rmtree(str(mapreducetemp_folder))

except Exception as e:
    print(f'Error: {str(mapreducetemp_folder)}, {e.strerror}')


# delete the SQLite database
try:
    os.remove(str(path / 'temp_results.db'))

except Exception as e:
    print(f'Error: {str(path / "temp_results.db")}, {e.strerror}')
