# Emulated Distributed File System

## Objectives:
1. To create an emulated distributed file system
2. To use the EDFS to store and retrieve files
3. To study and analyze the relation between the health expenditure of various countries vs. the life expectancy of the people in those countries.

## Code to Emulate EDFS

### Importing Libraries

In [None]:
from IPython.display import clear_output
import ipywidgets as widgets
import json
import matplotlib.pyplot as plt
import mysql.connector
import numpy as np
import os
import pandas as pd
import shlex
import warnings

In [None]:
warnings.filterwarnings('ignore')

### Initializing Helper Functions and Variables

In [None]:
#Global variables
db = mysql.connector.connect(host='localhost', user='root', passwd='dsci551', database='project')   #Connecting to database

In [None]:
#Function to check if the ID belongs to a directory
def isDirectory(id):

    cursor = db.cursor()
    cursor.execute(f'SELECT File FROM metadata WHERE id = {id}')
    result = cursor.fetchall()[0][0]
    cursor.close()
    return result == 0

In [None]:
# TODO: Add leading slash?
#Function to check if a path exists
def exists(path, get_id = False):

    cursor = db.cursor()
    path = path.split('/')
    parent = None
    id = None

    for i in path:

        if i == '':
            continue

        if parent is None:
            cursor.execute(f'SELECT id FROM metadata WHERE Name = "{i}" AND Parent IS NULL')
        else:
            cursor.execute(f'SELECT id FROM metadata WHERE Name = "{i}" AND Parent = {parent}')

        result = cursor.fetchall()
        if len(result) == 0:
            cursor.close()
            return False if not get_id else (False, None)
        parent = result[0][0]
        id = result[0][0]

    cursor.close()
    return True if not get_id else (True, id)

### Functions to Perform EDFS Operations
1. create_directory (mkdir): To create a directory
2. list_files (ls): To list the contents of a directory
3. getPartitionLocations: To get the locations of partitions
4. readPartition: To read the contents of a partition
5. read_file (cat): To output the contents of a file
6. delete_file (rm): TO remove a file
7. upload_file (put): To upload a file to the EDFS

In [None]:
#Function to create a directory (mkdir)
def create_directory(dir_name):
    
    #Check if directory already exists
    if exists(dir_name):
        return f'{dir_name} already exists'

    #Get start of new IDs
    cursor = db.cursor()
    cursor.execute('SELECT MAX(id) FROM metadata')
    cur_id = cursor.fetchall()
    if len(cur_id) != 0 and len(cur_id[0]) != 0 and cur_id[0][0] is not None:
        new_id = cur_id[0][0] + 1
    else:
        new_id = 1

    #Create directory (or directory path)
    path = dir_name.split('/')
    create = False
    parent = None
    currPath = '/'

    for i in path:

        if i=='':
            continue

        if not create:
            currPath += i + '/'
            if not exists(currPath):
                create = True

        if not create:
            if parent is None:
                cursor.execute(f'SELECT id FROM metadata WHERE Name = "{i}" AND Parent IS NULL')
            else:
                cursor.execute(f'SELECT id FROM metadata WHERE Name = "{i}" AND Parent = {parent}')
            parent = cursor.fetchall()[0][0]

        else:
            cursor.execute(f'INSERT INTO metadata (ID, Name, Parent, File) VALUES ({new_id}, "{i}", {parent if parent is not None else "null"}, 0)')
            parent = new_id
            new_id += 1

    db.commit()
    cursor.close()
    return f'{dir_name} created'

In [None]:
#Function to list files (ls)
def list_files(dir_name):

    #Check if directory exists
    flag, id = exists(dir_name, True)
    if not flag:
        return f'{dir_name} does not exist'

    #Check if path is a file
    if id is not None and not isDirectory(id):
        return f'{dir_name} is a file'

    #List files in directory
    cursor = db.cursor()
    cursor.execute(f'SELECT Name FROM metadata WHERE Parent {f"= {id}" if id is not None else "IS NULL"}')
    result = cursor.fetchall()
    cursor.close()
    output = '\n'.join([i[0] for i in result])
    return output

In [None]:
#Function to get partition locations
def getPartitionLocations(id):

    cursor = db.cursor()
    cursor.execute(f'SELECT Partitions FROM metadata WHERE id = {id}')
    partitions = cursor.fetchall()[0][0]
    cursor.close()
    return json.loads(partitions)

In [None]:
#Function to read a file from a partition
def readPartition(id, partition):
    
    cursor = db.cursor()
    cursor.execute(f'SELECT Content FROM {partition} WHERE id = {id}')
    content = cursor.fetchall()[0][0]
    cursor.close()

    content = content.strip('][').split(', ')
    content = list(map(lambda x: x.strip("'"), content))
    
    header = content[0]
    content = content[1:]
    content = list(map(lambda x: x.split(','), content))
    content = pd.DataFrame(content, columns = header.split(','))

    return content

In [None]:
#Function to read a file (cat)
def read_file(file_name, df=False):
    
    #Check if file exists
    flag, id = exists(file_name, True)
    if not flag:
        return f'{file_name} does not exist'

    #Check if path is a directory
    if id is None or isDirectory(id):
        return f'{file_name} is a directory'

    #Read file from different partitions
    partitions = getPartitionLocations(id)
    content = None

    for i in partitions:
        partition = readPartition(id, i)
        if content is None:
            content = partition
        else:
            content = content.append(partition, ignore_index = True)

    if df:
        return content
    return content.to_string(index = False)

In [None]:
#Function to delete a file (rm)
def delete_file(file_name):

    #Check if file exists
    flag, id = exists(file_name, True)
    if not flag:
        return f'{file_name} does not exist'

    #Check if path is a directory
    if id is None or isDirectory(id):
        return f'{file_name} is a directory'

    #Find partitions
    partitions = getPartitionLocations(id)

    #Delete file from partitions
    cursor = db.cursor()

    for partition in partitions:
        cursor.execute(f'DELETE FROM {partition} WHERE id = {id}')
        db.commit()

    #Delete file's metadata
    cursor.execute(f'DELETE FROM metadata WHERE id = {id}')
    db.commit()
    cursor.close()
    return f'{file_name} deleted'

In [None]:
#Function to create a file (put)
def upload_file(file_name, dir_name, partition_col=None):

    #Check if file already exists in HDFS
    path = dir_name.strip('/').split('/')
    path.append(file_name)
    path = '/'.join(path)
    if exists(path):
        return f'{path} already exists'

    #Check if file exists in local file system
    if not os.path.exists(file_name):
        return f'{file_name} does not exist'

    # TODO: Get rid of file path of file_name?

    #Read file
    data = pd.read_csv(file_name)
    headers = data.columns.tolist()

    partition_tables = []
    new_partitions = {}

    #Partition file randomly
    if partition_col is None:

        for i in range(3):
            partition_tables.append(f'partition_{i+1}')

        partitions = np.array_split(data.reindex(np.random.permutation(data.index)), 3)

        for i in range(len(partitions)):
            plist = partitions[i].values.tolist()
            plist = [headers] + plist
            new_partition = [','.join(map(str, j)) for j in plist]
            new_partitions[f'partition_{i+1}'] = new_partition

    #Partition file based on partition_col
    else:
        
        unique_vals = data[partition_col].unique()

        for i in unique_vals:
            partition_tables.append(f'partition_{i.lower().replace(" ", "_")}')
            plist = data[data[partition_col] == i]
            plist = plist.values.tolist()
            plist = [headers] + plist
            new_partition = [','.join(map(str, j)) for j in plist]
            new_partitions[f'partition_{i.lower().replace(" ", "_")}'] = new_partition

    #Add file to metadata table
    cursor = db.cursor()
    cursor.execute('SELECT MAX(id) FROM metadata')
    cur_id = cursor.fetchall()
    if len(cur_id) != 0 and len(cur_id[0]) != 0 and cur_id[0][0] is not None:
        new_id = cur_id[0][0] + 1
    else:
        new_id = 1

    _, parent = exists(dir_name, True)
    ptables = json.dumps(partition_tables).replace('"', '\\"')

    cursor.execute(f'INSERT INTO metadata (ID, Name, Parent, File, Partitions) VALUES ({new_id}, "{file_name}", {parent if parent is not None else "null"}, 1, "{ptables}")')

    #Create partition tables if they don't exist
    for table in partition_tables:
        cursor.execute(f'CREATE TABLE IF NOT EXISTS {table} (`ID` int NOT NULL, `Content` text(30000) NOT NULL, PRIMARY KEY (`ID`), CONSTRAINT `{table}_ibfk_1` FOREIGN KEY (`ID`) REFERENCES `metadata` (`ID`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;')

    #Add partitions to respective tables
    for i in partition_tables:
        cursor.execute(f'INSERT INTO {i} (ID, Content) VALUES ({new_id}, "{new_partitions[i]}")')

    db.commit()
    cursor.close()
    return f'{path} uploaded'

## An interactive UI to perform EDFS operations

In [None]:
class GUI:

    def __init__(self):
        #Text box for the command
        self.command = widgets.Text(
            value='',
            placeholder='Enter Command',
            description='Command:',
            disabled=False
        )

        #Button to execute the command
        self.button = widgets.Button(
            description='Execute',
            disabled=False,
            button_style='',
            tooltip='Execute Command',
            icon='check'
        )

        #Text box to display the output
        self.output = widgets.Textarea(
            value='',
            placeholder='Output',
            description='Output:',
            disabled=False,
            layout=widgets.Layout(width='100%')
        )



    def setButtonClick(self, func):
        self.button.on_click(func)

    def display(self):
        display(self.command, self.button, self.output)

    def getOutput(self):
        return self.output

    def getCommand(self):
        return self.command

In [None]:
EDFS_GUI = GUI()

In [None]:
#Function to execute the command
def on_button_clicked_edfs(b):

    output = EDFS_GUI.getOutput()
    command = EDFS_GUI.getCommand()

    output.value = ''
    cmd = command.value
    args = shlex.split(cmd)

    try:
        if args[0] == 'ls':
            output.value = list_files(args[1])
        elif args[0] == 'mkdir':
            output.value = create_directory(args[1])
        elif args[0] == 'cat':
            output.value = read_file(args[1])
        elif args[0] == 'rm':
            output.value = delete_file(args[1])
        elif args[0] == 'put':
            if len(args) == 3:
                output.value = upload_file(args[1], args[2])
            else:
                if len(args) == 5 and args[3] == '-p':
                    output.value = upload_file(args[1], args[2], args[4])
                else:
                    output.value = 'Usage: put <file_name> <dir_name> [-p <partition_col>]'
        else:
            output.value = 'Invalid Command'
    except Exception as e:
        output.value = str(e)

In [None]:
EDFS_GUI.setButtonClick(on_button_clicked_edfs)
EDFS_GUI.display()

## Code to Analyze the Relation between Health Expenditure and Life Expectancy

### Setting up Query Related Functions

In [None]:
def calc(df, value_header, continent, task):
        
    df[value_header] = df[value_header].astype(float)
    df = df[df['Continent'] == continent]
    if(len(df) == 0):
        return None
    avg_ex = df.groupby(by=['Continent'],as_index=False).agg({value_header: task})[value_header]
    if task == 'sum':
        return (avg_ex[0], len(df))
    else:
        return avg_ex[0]

In [None]:
def Map(path, query, continent, task):

    #Check if file exists
    flag, cur_id = exists(path, True)
    if not flag:
        return f'{path} does not exist'

    #Check if path is a directory
    if id is None or isDirectory(cur_id):
        return f'{path} is a directory'

    partition_locations = getPartitionLocations(cur_id)
    final_mapped_databases = []
    for i in partition_locations:
        final_mapped_databases.append(readPartition(cur_id, i))

    value_header = final_mapped_databases[0].columns[4]
    list_calc = []

    print('-----------')
    print('Explanation')
    print('-----------')
    print()

    for partition in range(len(partition_locations)):
        value = calc(final_mapped_databases[partition], value_header, continent, task)
        print(f'Output from {partition_locations[partition]}: {value}')

        list_calc.append(value)

    print(f'Output from mapper function: {list_calc}')
    print()
    return list_calc

In [None]:
def Reduce(query, list_calc):

    if query == 'Average':
        totalSum = 0
        totalCount = 0
        for i in list_calc:
            if i is not None:
                totalSum += i[0]
                totalCount += i[1]
        return str(totalSum/totalCount)
    elif query == 'Max':
        return str(max([x for x in list_calc if x is not None]))
    elif query == 'Min':
        return str(min([x for x in list_calc if x is not None]))

In [None]:
def mapPartition(path, query, continent, GUI):

    clear_output()
    GUI.display()

    task = {'Average': 'sum', 'Max': 'max', 'Min': 'min'}[query]
    
    list_calc = Map(path, query, continent, task)
    if type(list_calc) == str:
        return list_calc
    return Reduce(query, list_calc)

In [None]:
def visualize(path1, path2, input_country):

    flag1, cur_id_1 = exists(path1, True)
    flag2, cur_id_2 = exists(path2, True)

    if not flag1 or not flag2:
        return f'One of the paths does not exist'

    final_df1 = read_file(path1, True)
    final_df2 = read_file(path2, True)

    final_df = pd.merge(final_df1,final_df2,how='inner', on=['Country','Year'])
    
    df_visual = final_df[['Country','Year','Health Expenditure','Life Expectancy']]
    
    #Sorting the dataset to get consistent data
    df_visual.sort_values(by=['Year','Country'], ascending=[True, True], inplace=True)
    
    data_plot = df_visual[df_visual['Country'] == input_country]
    if (len(data_plot) == 0):
        return ('Input Country is Wrong or Not in the Database')
    
    data_plot['Health Expenditure'] = data_plot['Health Expenditure'].astype(float)
    data_plot['Life Expectancy'] = data_plot['Life Expectancy'].astype(float)
    
    #Plotting the Graphs
    fig, ax1 = plt.subplots(figsize=(10, 4))

    ax1.plot(data_plot['Year'], data_plot['Life Expectancy'], color='red')
    ax1.set_ylabel('Life Expectancy', fontweight='bold')
    ax1.legend(['Life Expectancy'], loc="center left")

    #Set up the 2nd axis
    ax2 = ax1.twinx()
    ax2.bar(data_plot['Year'], data_plot['Health Expenditure'], width=0.5, alpha=0.5, color='orange')
    for index, dd in enumerate(data_plot['Health Expenditure']):
        plt.text(x=index , y=dd+1 , s=f"{round(dd, 1)}", fontdict=dict(fontsize=8))
    ax2.set_ylabel('Health Expenditure in US$', fontweight='bold')
    ax2.legend(['Health Expenditure'], loc="center right")

    ax1.title.set_text('Health Expenditure and Life Expectancy rate for {} from Year 2000-2019'.format(input_country))
    ax1.set_xlabel('Year', fontweight='bold')
    plt.show()

    return('Visualization is Created Please Check Below:')

# An interactive UI to perform Search and Analytics operations

In [None]:
search_GUI = GUI()

In [None]:
#Function to execute the command
def on_button_clicked_search(b):

    output = search_GUI.getOutput()
    command = search_GUI.getCommand()

    output.value = ''
    cmd = command.value
    try:
        args = shlex.split(cmd)
    except Exception as e:
        output.value = 'Invalid Command'
        return

    try:
        if args[0] == '1':
            output.value = mapPartition(args[1], args[2], args[3], search_GUI)
        elif args[0] == '2':
            clear_output()
            search_GUI.display()
            output.value = visualize(args[1], args[2], args[3])
        else:
            output.value = 'Invalid Command'
    except Exception as e:

        output.value = str(e)

## Instructions

This GUI is used to perform the following operations:
1. Find aggregate statistics of a dataset based on a continent  
Usage: 1 (path) (Average|Max|Min) (continent)

2. Visualize the relation between health expenditure and life expectancy of a country  
Usage: 2 (path1) (path2) (country)

In [None]:
search_GUI.setButtonClick(on_button_clicked_search)
search_GUI.display()