# MySQL

In [62]:
import mysql.connector as ccnx
import csv
import re
import sys
import pandas as pd


#python connector setup
mydb = ccnx.connect(
    host="localhost",
    user="root",
    password="root",
    database="edfs"
)
mycursor = mydb.cursor(buffered=True)

####################
# Helper Functions #
####################

def Sort_Tuple(tup: list, idx: int) -> list:
    '''
    Return the sorted list of tuples by the element of the tuple
    Args:
        tup (list): unsorted list of tuples
        idx (int): the element of the tuple to sort by
    Returns:
        (list) Sorted list of tuples
    '''
    return(sorted(tup, key = lambda x: x[idx]))

def key_idx(str_list):
    '''
    Return the index of 'Country Name' if it exists in the dataset, and 0 if
    the column name is not present
    Args:
        str_list (list): the list of column names
    Returns:
        (int) the index returned
    '''
    try:
        return str_list.index('Country Name')
    except:
        return 0

def key_cleaning(row):
    #key cleaning
    clean_key =  re.sub(r'[^A-Za-z0-9 ]+', '', row[0]).replace(" ", "_")
    key = clean_key if clean_key != "" else "invalid_key"
    if key.isnumeric() and key.length() > 0:
        key = f"t{key}"
    if len(key) >= 64:
        key = key[0:40]
    return key

####################
# API Functions    #
####################

def read_dataset(path):
    # (partition_name, csv_index, comma-separated-string)
    list_of_tuples = getPartitionData(path)
    list_of_lists = [tuple[2].split(",") for tuple in list_of_tuples]
    df = pd.DataFrame(list_of_lists)

    new_header = df.iloc[0] #grab the first row for the header
    df = df[1:] #take the data less the header row
    df.columns = new_header #set the header row as the df header

    df = df.drop(["Country Code", "Series Code"], "columns")

    df_melted = df.melt(id_vars=["Country Name", "Series Name"],
        var_name="Year",
        value_name="Value")

    df_melted["Year"] = df_melted["Year"].str[0:4]

    df_melted = df_melted.loc[df_melted.Value.str.isnumeric()].copy()

    # change columns names
    new_columns = list()
    columns = df_melted.columns
    for c in columns:
            new_columns.append(c.replace(" ","_"))

    # change column names in dataframe
    df_melted.columns = new_columns

    return df_melted.astype({'Year':'int', 'Value': 'float'})

def seek(path):
    '''
    Returns the filestructure that matches the specified path
    Args:
        str_list (list): the list of column names
    Returns:
        (int) the index returned
    '''
    seek_statement = "SELECT * FROM df WHERE path = %s"
    mycursor.execute(seek_statement, (path,))
    myresult = mycursor.fetchall()
    return myresult

def mkdir(path, name):
    '''
    Create the directory at the specfied path in the filesystem
    Args:
        path (str): the path to the directory's home
        name (str): the name of the directory
    Returns:
        output (str): success or failure of the operation
    '''
    result = seek(path)
    if result:
        if result[0][1] == "DIRECTORY":
            pathname = f"{path}/{name}"
            dup_result = seek(pathname)
            if not dup_result:
                insert_statement = "INSERT INTO df VALUES (%s, 'DIRECTORY')"
                mycursor.execute(insert_statement, (pathname,))
                mydb.commit()
                output = f"directory {name} created"
            else:
                output = "directory already exists"
    else:
        output = f"Invalid path: {path}"
    return output

def rm(path, name):
    '''
    Removes the directory at the specfied path in the filesystem
    Args:
        path (str): the path to the directory's home
        name (str): the name of the directory
    Returns:
        output (str): success or failure of the operation
    '''
    result = seek(path)
    filepath =  f"{path}/{name}"
    if result:
        select_statement = "SELECT * FROM df WHERE path LIKE %s"
        mycursor.execute(select_statement, (filepath + "%",))
        result = mycursor.fetchall()
        if len(result) != 1:
            output = "invalid deletion"
        else:
            delete_statement = "DELETE FROM df WHERE path LIKE %s"
            mycursor.execute(delete_statement, (filepath,)) #TODO: adding % here will add -r functionality
            mydb.commit()
            output = f"{filepath} deleted"
    else:
        output = f"Invalid path: {filepath}"
    return output

def ls(path):
    '''
    Returns the contents of the directory at the specfied path in the filesystem
    Args:
        path (str): the path to the directory's home
        name (str): the name of the directory
    Returns:
        output (str): success or failure of the operation
    '''
    result = seek(path)
    if result:
        if result[0][1] == "FILE":
            output = "Cannot run 'ls' on files"
        elif result[0][1] == "DIRECTORY":
            ls_statement = "SELECT * FROM df WHERE path REGEXP %s"
            mycursor.execute(ls_statement, (f"^{path}\/[^\/]+$",))
            output = mycursor.fetchall()
    else:
        output = f"Invalid path: {path}"
    return output

def getPartitionLocations(path):
    '''
    Returns the blockLocations that match the file at the specified
    Args:
        path (str): the path to the file
    Returns:
        output (obj): the blockLocations in a list of tuples
    '''
    cat_statement = "SELECT * FROM blockLocations WHERE path = %s"
    mycursor.execute(cat_statement, (path,))
    result = mycursor.fetchall()
    return result

def readPartition(path, partition_name):
    '''
    Returns the contents of a specified partition_name
    Args:
        path (str): the path to the file
        partition_name: the name of the partition
    Returns:
        [list of (tuple)]: the data contents of the partition
    '''
    mycursor.execute(f"SELECT * FROM {partition_name} WHERE path = %s", (path,))
    result = mycursor.fetchall()
    partition_data = []
    for line in result:
        partition_data.append((partition_name, line[1], line[2]))
    return partition_data

def cat(path):
    '''
    Returns the contents the file at the specified path
    Args:
        path (str): the path to the file
    Returns:
        output (obj): the text of the file
    '''
    output = ""
    sorted_data_list = getPartitionData(path)
    for s in sorted_data_list:
        output += s[2] +"\n"
    return output

def getPartitionData(path):
    '''
    Returns the contents the file at the specified path
    Args:
        path (str): the path to the file
    Returns:
        output (obj): the text of the file
    '''
    result = seek(path)
    output = ""
    if result:
        if result[0][1] == "DIRECTORY":
            output = "Cannot run 'cat' on directories"
        elif result[0][1] == "FILE":
            myresult = getPartitionLocations(path)
            data_list = []
            for partition in myresult:
                data_list = data_list + readPartition(path, partition[1])
            sorted_data_list = Sort_Tuple(data_list, 1)
            return sorted_data_list
    else:
        output = f"Invalid path: {path}"
    return output

def put(path, name, csv):
    '''
    places the file from a local directory into the EDFS
    Args:
        path (str): the path to the file
        name (str): the name of the file to be created
        csv (str): the path to the csv file to be placed
    Returns:
        output (str): the success or failure of the operation
    '''
    result = seek(path)
    if result:
        if result[0][1] == "DIRECTORY":
            dup_result = seek(f"{path}/{name}")
            if not dup_result:
                hash_lists = hash(path, name, csv)
                output = f"file {name} created"
            else:
                output = "file already exists"
        else:
            output = "cannot place a file in a file"
    else:
        output = f"Invalid path: {path}"
    return output



def hash(path, name, csv_file):
    '''
    Alters the metadata to allocate new datanotes if needed and places the file
    data into the nodes
    Args:
        path (str): the path to the file
        name (str): the name of the file to be created
        csv (str): the path to the csv file to be placed
    Returns:
        key_list (list): the list of keys to datanodes that have been
        allocated to
    '''

    #execute metadata alter
    meta_statement = "INSERT INTO df VALUES (%s, 'FILE');"
    mycursor.execute(meta_statement, (f"{path}/{name}",))
    mydb.commit()
    with open(csv_file) as f:

        key_list, csv_counter = {}, 0
        reader = csv.reader(f, delimiter=',')
        header = next(reader)
        key_index = key_idx(header)
        key = key_cleaning(header)

        #TODO: modularize this create code JFC
        try:
            create_statement = f"""
                CREATE TABLE IF NOT EXISTS {key}(
                    path varchar(255),
                    data_index int,
                    data text,
                    FOREIGN KEY(path) REFERENCES df(path) ON DELETE CASCADE
                )"""
            insert_hash_statement = f"INSERT INTO {key} VALUES (%s, %s, %s);"
            mycursor.execute(create_statement)
            mydb.commit()
            mycursor.execute(insert_hash_statement, (path + "/" + name, csv_counter, ','.join(header)))
            mydb.commit()
            key_list[key] = None
            csv_counter += 1
        except:
            output = f"ERROR: {mycursor.statement}"

        for row in reader:
            key = key_cleaning(row)
            #try insert data into datanode
            try:
                create_statement = f"""
                    CREATE TABLE IF NOT EXISTS {key}(
                        path varchar(255),
                        data_index int,
                        data text,
                        FOREIGN KEY(path) REFERENCES df(path) ON DELETE CASCADE
                    )"""
                insert_hash_statement = f"INSERT INTO {key} VALUES (%s, %s, %s);"
                mycursor.execute(create_statement)
                mydb.commit()
                mycursor.execute(insert_hash_statement, (path + "/" + name, csv_counter, ','.join(row)))
                mydb.commit()
                key_list[key] = None
                csv_counter += 1
            except:
                output = f"ERROR: {mycursor.statement}"
                rm(path, name)
                # return output

        #write data into datanodes
        for key in key_list.keys():
             block_statement = "INSERT INTO blockLocations VALUES(%s, %s);"
             mycursor.execute(block_statement, (f"{path}/{name}", key))
        mydb.commit()
        return key_list

######################
# Database Functions #
######################

def delete(list):
    '''
    Drops all tables in the list from the edfs
    Args:
        list (list): the list of table names to drop
    Returns:
        (str): the success or failure of the operation
    '''
    try:
        for item in list:
            drop_table = f"DROP TABLE {key}"
            mycursor.execute(drop_table)
            mydb.commit()
        return "Dropped tables"
    except:
        return "Database drop error"

def new_env(edfs):
    '''
    Executes EDFS SQL setup queries
    Args:
        edfs (str): the name of the EDFS filesystel to setup
    Returns:
        (str): the success or failure of the operation
    '''
    env_statements = [
            f"CREATE DATABASE {edfs}",
            f"USE {edfs}",
            """
                CREATE TABLE df (
                    path varchar(255),
                    type varchar(255),
                    PRIMARY KEY(path)
                )""",
            "INSERT INTO df VALUES ('/root', 'DIRECTORY')",
            """
                CREATE TABLE blockLocations (
                    path varchar(255),
                    partition_name varchar(255),
                    CONSTRAINT FOREIGN KEY (path) REFERENCES df(path) ON DELETE CASCADE
                )"""
    ]
    try:
        for s in env_statements:
            mycursor.execute(s)
        mydb.commit()
        return f"{edfs} created"
    except:
        return "Database error"

def delete_env(edfs):
    '''
    Drops the EDFS database entirely
    Args:
        edfs (str): the name of the EDFS filesystel to drop
    Returns:
        (str): the success or failure of the operation
    '''
    try:
        drop_database = f"DROP DATABASE {edfs};"
        mycursor.execute(drop_database)
        mydb.commit()
    except:
        return "Database error"
    return  f"{edfs} deleted"

def start_env(edfs):
    '''
    Uses EDFS database
    Args:
        edfs (str): the name of the EDFS filesystem to run
    Returns:
        (str): the success or failure of the operation
    '''
    try:
        use_database = f"USE {edfs}"
        mycursor.execute(use_database)
    except:
        return "Database error"
    return  f"{edfs} started"

# def test_edfs(argv):

#     edfs = "edfs"

#     #Testing
#     if "--delete" in argv:
#         print(delete_env(edfs))
#     elif "--new" in argv:
#         print(new_env(edfs))
#     elif "--restart" in argv:
#         print(delete_env(edfs))
#         print(new_env(edfs))
#     else:
#         print(start_env(edfs))
#         print(mkdir("/root", "foo"))
#         print(mkdir("/root/foo", "bar"))
#         #todo: put check to make sure that the file source exists
#         print(put("/root/foo", "data", "../datasets/sql-edfs/data.csv"))
#         print(cat("/root/foo/data"))
#         print(ls("/root/foo"))
#         print(rm("/root", "data"))
#         print(ls("/tree"))


#     # test_edfs(sys.argv)


## Restart

In [3]:
edfs="edfs"
delete_env(edfs)
# new_env(edfs)

'edfs deleted'

In [4]:
new_env(edfs)

'edfs created'

In [5]:
start_env(edfs)

'edfs started'

# Init commands

In [28]:
ls("/root")

[('/root/user', 'DIRECTORY')]

In [None]:
# put(mycursor, "/root/foo", "data", "datasets/sql-edfs/data.csv")

# MongoDB

In [95]:
import csv
import re
import sys
import pandas as pd
from pymongo import MongoClient
# import simplejson as sp
import json
import os

#mongodb client setup
client = MongoClient("mongodb://root:root@localhost:27017/")
db = client['edfs']
#collection = db['name_of_collection']

#python connector setup

####################
# API Functions    #
####################

def read_dataset(path):
    df=pd.read_csv(path)
    df=df.reset_index(drop=True)
    #df_melted = df.melt(df.set_index('Country Name'))
    df.head()
    df = df.drop(["Country Code"], axis=1)
    new_columns = list()
    columns = df.columns.str.strip()
    for c in columns:
            new_columns.append(c.replace(" ","_"))

    # change column names in dataframe
    df.columns = new_columns
    df.columns = df.columns.astype(str)
    df_melted = df.melt(id_vars=["Country_Name", "Series_Name"],
        var_name="Year",
        value_name="Value")
    #print("readDataset", df_melted.astype({'Year':'int', 'Value': 'float'}))
    return  df_melted.astype({'Year':'int', 'Value': 'float'})

def seek(path):
    #print("seek", db.blockLocations.find_one({"path": path}))
    return db.blockLocations.find_one({"path": path})

def mkdir(path, name):
    result = seek(path)
    output = ""
    #print("mkdir", result)
    if result:
        pathn = path + "/" + name
        db.blockLocations.insert_one({"path": pathn, "type" : 'DIRECTORY'})
        output = f"Directory created"
    else:
        output = f"Invalid path: {path}"
    #print("mkdir",output)
    return output

def rm(path, name):
    result = seek(path)
    path =  f"{path}/{name}"
    output = ""
    if result:
        db.blockLocations.insert_one({"Path": path, "type" : 'DIRECTORY'})
        output = f"{path} deleted"
    else:
        output = "invalid deletion"
    #print(output)
    return output


def ls(path):
    result = seek(path)
    print("ls",result)
    if result:
        if result["type"] == "FILE":
            output = "Cannot run 'ls' on files"
        elif result["type"] == "DIRECTORY":
            #output1=db.df.find()
            query = {"path": {"$regex": f"{path}","$options" :'i'}}
            output1=db.blockLocations.find(query)
            final=[]
            for o in output1:
                final.append(o)
    else:
        final = f"Invalid path: {path}"
    
    if(len(final)==0):
        return final
    else:
        return final

def readPartition(inp,file,path):
    path= '/'+path+'/'+file+'/'+inp
    X = db.df.find_one({"location":path}, {'_id':False})
    if(X):
        return X
    else:
        return ("FILE DOES NOT EXIST")

def cat(filelist):
    x=db.df.find()
    list=[]
    for i in x:
        list.append(i)
   
    return (list)

#db.df.find({"$and":[{"location":{'$regex':"Benin"}},{"location":{'$regex':"Belgium"}}]},{'_id':0})

def put(path, csvf):
    header = [ "\ufeffCountry Name",	"Country Code",	"Series Name",
    	"2003",	"2004",	"2005",	"2006",	"2007",	"2008",	"2009",	
        "2010",	"2011",	"2012",	"2013",	"2014",	"2015",	"2016",
        "2017",	"2018",	"2019",	"2020",	"2021"]
    new_headers_list=[ "Country Name",	"Country Code",	"Series Name",
    	"2003",	"2004",	"2005",	"2006",	"2007",	"2008",	"2009",	
        "2010",	"2011",	"2012",	"2013",	"2014",	"2015",	"2016",
        "2017",	"2018",	"2019",	"2020",	"2021"]
    csvfile = open( csvf, 'r+')
    reader = csv.DictReader( csvfile )
    reader.fieldnames = new_headers_list
    for each in reader:
        #print(each)
        row={}
        blockLoc = {}
        for field in new_headers_list:
            row[field]=each[field]
            blockLoc["path"] = path+ "/"+ each["Country Name"]
            row["location"] = path+ "/"+ each["Country Name"]
            blockLoc["type"] = "FILE"
        db.df.insert_one(row)
        db.blockLocations.insert_one(blockLoc)
    return("Inserted Data")

#session = db.getMongo().startSession( { readPreference: { mode: "primary" } } )
def getPartitionLocations(path):
    #path1= path+'/'+filepartition
    X = db.blockLocations.find({"path":{"$regex":path}},{"path":True,"_id":False})
    loc=[]
    if(X):
        for i in X:
            loc.append(i)
        return loc[1:]
    else:
        return ("FILE DOES NOT EXIST")
######################
# Database Functions #
######################

def delete(list):
    try:
        for item in list:
            col=db[f"item"]
            col.drop()
        return "Dropped tables"
    except:
        return "Database drop error"

def new_env(edfs):
    try:
        db = client['edfs']
        db.blockLocations.insert_one({"path":'/root',"type":'DIRECTORY'})
        return f"{edfs} created"
    except:
        return "Database error"

def delete_env(edfs):
    try:
        client.drop_database('edfs')
    except:
        return "Database error"
    return  f"{edfs} deleted"

def start_env(edfs):
    try:
        client = MongoClient('localhost', 27017)
        db = client['edfs']
    except:
        return "Database error"
    return  f"{edfs} started"

def test_edfs(argv):

    edfs = "edfs"

    #Testing
    if "--delete" in argv:
        print(delete_env(edfs))
    elif "--new" in argv:
        print(new_env(edfs))
    elif "--restart" in argv:
        print(delete_env(edfs))
        print(new_env(edfs))
    else:
        print(start_env(edfs))
        print(mkdir("/root", "foo"))
        print(mkdir("/root/foo", "bar"))

# test_edfs(sys.argv[1])
#test_edfs("--new")
#print(mkdir('/root', "user"))
#mkdir("/root/foo", "bar")
#rm("/root/foo", "bar")
#put("/root/foo", "data", "/Users/digvijaydesai/Downloads/ashita_code/Data.csv")
#seek("/root/foo/data")
#seek("/root/foo/bar")
#rm("/root", "bar")
#print(ls('/root'))
#print(cat("/Users/digvijaydesai/Downloads/ashita_code/Data.csv"))
#print(cat("/Users/digvijaydesai/Downloads/DSCI 552 ML/hw0/Salaries.csv"))
#print(read_dataset("/Users/digvijaydesai/Downloads/ashita_code/Data.csv"))
#print(readPartition("XY","foo","root"))
#print(getPartitionLocations("/root/foo"))

In [123]:
edfs = "edfs"
delete_env(edfs)

'edfs deleted'

In [124]:
new_env(edfs)

'edfs created'

In [125]:
mkdir("/root", "GDP_Growth")

'Directory created'

In [126]:
ls("/root")

ls {'_id': ObjectId('638436bcc61a654c2f9c05d2'), 'path': '/root', 'type': 'DIRECTORY'}


[{'_id': ObjectId('638436bcc61a654c2f9c05d2'),
  'path': '/root',
  'type': 'DIRECTORY'},
 {'_id': ObjectId('638436bdc61a654c2f9c05d3'),
  'path': '/root/GDP_Growth',
  'type': 'DIRECTORY'}]

In [127]:
put("/root/GDP_Growth", "datasets/GDP_Growth.csv")

'Inserted Data'

In [92]:
def read_dataset(path):
    df=pd.read_csv(path)
    df=df.reset_index(drop=True)
    #df_melted = df.melt(df.set_index('Country Name'))
    df.head()
    df = df.drop(["Country Code"], axis=1)
    new_columns = list()
    columns = df.columns.str.strip()
    for c in columns:
            new_columns.append(c.replace(" ","_"))

    # change column names in dataframe
    df.columns = new_columns
    df.columns = df.columns.astype(str)
    df_melted = df.melt(id_vars=["Country_Name", "Series_Name"],
        var_name="Year",
        value_name="Value")
    #print("readDataset", df_melted.astype({'Year':'int', 'Value': 'float'}))
    return  df_melted.astype({'Year':'int', 'Value': 'float'})

In [93]:
df = read_dataset("datasets/GDP_Growth.csv")

In [129]:
cat("/root/GDP_Growth")

[{'_id': ObjectId('638436c0c61a654c2f9c05d4'),
  'Country Name': '\ufeffCountry Name',
  'location': '/root/GDP_Growth/\ufeffCountry Name',
  'Country Code': 'Country Code',
  'Series Name': 'Series Name',
  '2003': '2003',
  '2004': '2004',
  '2005': '2005',
  '2006': '2006',
  '2007': '2007',
  '2008': '2008',
  '2009': '2009',
  '2010': '2010',
  '2011': '2011',
  '2012': '2012',
  '2013': '2013',
  '2014': '2014',
  '2015': '2015',
  '2016': '2016',
  '2017': '2017',
  '2018': '2018',
  '2019': '2019',
  '2020': '2020',
  '2021': '2021'},
 {'_id': ObjectId('638436c1c61a654c2f9c05d6'),
  'Country Name': 'Aruba',
  'location': '/root/GDP_Growth/Aruba',
  'Country Code': 'ABW',
  'Series Name': 'GDP growth (annual %)',
  '2003': '1.120879121',
  '2004': '7.281025864',
  '2005': '-0.384927066',
  '2006': '1.138905837',
  '2007': '3.0967223',
  '2008': '1.833430856',
  '2009': '-11.68358552',
  '2010': '-2.732595966',
  '2011': '3.366778149',
  '2012': '-1.035375324',
  '2013': '6.42981

In [43]:
# getPartitionLocations("/root/GDP_Growth")

In [45]:
# readPartition("Algeria", "GDP_Growth", "root")

In [23]:
readPartition("Algeria", "foo", "root")

'FILE EXISTS'

In [47]:
rm("/root", "GDP_Growth")

'/root/GDP_Growth deleted'

In [59]:
# ls("/root")

In [20]:
cat("/root/foo")

In [29]:
# getPartitionLocations("/root/foo")

In [17]:
ls("/root/user")

ls {'_id': ObjectId('6383d95bc61a654c2f9bfbad'), 'path': '/root/user', 'type': 'DIRECTORY'}


[{'_id': ObjectId('6383d95bc61a654c2f9bfbad'),
  'path': '/root/user',
  'type': 'DIRECTORY'}]

In [8]:
ls("/root")

ls {'_id': ObjectId('6383d95ac61a654c2f9bfbac'), 'path': '/root', 'type': 'DIRECTORY'}


[{'_id': ObjectId('6383d95ac61a654c2f9bfbac'),
  'path': '/root',
  'type': 'DIRECTORY'},
 {'_id': ObjectId('6383d95bc61a654c2f9bfbad'),
  'path': '/root/user',
  'type': 'DIRECTORY'}]

In [7]:
ls("/root/user")

ls {'_id': ObjectId('6383d95bc61a654c2f9bfbad'), 'path': '/root/user', 'type': 'DIRECTORY'}


[{'_id': ObjectId('6383d95bc61a654c2f9bfbad'),
  'path': '/root/user',
  'type': 'DIRECTORY'}]

In [9]:
# put(mycursor, "/root/foo", "data", "datasets/sql-edfs/data.csv")
put("/root/foo", None, "datasets/sql-edfs/data.csv")

'Inserted Data'

In [78]:
# getPartitionLocations("/root/user")

In [79]:
# df = read_dataset("datasets/Data.csv")

In [10]:
getBlockLocation("/root/user", "Aruba")

NameError: name 'getBlockLocation' is not defined

In [None]:
cat

In [72]:
ls("/root/user")

In [73]:
put("/root/user", None, "datasets/Data.csv")

'Inserted Data'

In [100]:
# cat("datasets/Data.csv")

In [97]:
# ls('/root/user')

In [118]:
mango_tree = mongo.cat("/root/GDP_Growth")

In [122]:
# with open('mango_tree.json', 'w') as fp:
#     json.dump(mango_tree, fp)

# PMR

In [136]:
import edfs.mysql as sql
import edfs.firebase as firebase
import edfs.mongodb as mongo
import sys
import mysql.connector as ccnx
from enum import Enum
import pandas as pd



#python connector setup
mydb = ccnx.connect(
    host="localhost",
    user="root",
    password="root",
    database="edfs"
)
mycursor = mydb.cursor(buffered=True)

# # class syntax
# class EDFS(Enum):
#     MYSQL = 1
#     FIREBASE = 2
#     MONGODB = 3

# class FUNC(Enum):
#     SUM = 1
#     MAX = 2
#     MIN = 3
#     AVG = 4


#####################
#  Helper Function  #
#####################

def convert_str(item:str):
    try:
        return float(item)
    except:
        return None

def intersection(lst1, lst2):
    return list(set(lst1) & set(lst2))

#####################
#  Mapper Function  #
#####################
def mongodb_map(targets, file):
    col_blacklist = ["location", "_id"]
    mango_tree = mongo.cat(file)
    # print(mango_tree)
    # with open('mango_tree.json', 'w') as fp:
    #     json.dump(mango_tree, fp)
    mangos = []
    for mango_fruit in mango_tree:
        # print(mango_fruit)
        mango_pudding = ','.join([value for key, value in mango_fruit.items() if key not in col_blacklist])
        if "location" in mango_fruit:
            mangos.append((mango_fruit["location"], None, mango_pudding))
        header_list = list(mango_fruit.keys())

    # targets = intersection(targets, header_list)
    # print(targets)
    col_dict = {header_list.index(i):i for i in targets}
    return mapPartition(mangos, col_dict)

def firebase_map(targets, file):
    data = firebase.cat(file)
    data = [row.split(",") for row in data] #list of list
    header_list = data[0]
    data = [(row[0], None, ','.join(row)) for row in data]
    targets = intersection(targets, header_list)
    col_dict = {header_list.index(i):i for i in targets}
    return mapPartition(data, col_dict)

def mapPartition(data, col_dict):
    """
        data: The data from the partitions
        col_dict: the names of the columns
    """
    data_mapped = {}
    for d in data:
        partition_key, data_block = d[0], d[2].split(",")
        data_block = [(col_dict[x], data_block[x]) for x in col_dict.keys()]
        if partition_key not in data_mapped:
            data_mapped[partition_key] = []
        data_mapped[partition_key] += data_block
    return data_mapped

def sql_map(targets: [], file:str):
    sql.start_env("edfs")
    data = sql.getPartitionData(file)
    # then I get all the partition locations and the indices and it goes zoooom

    header_list = (data[0][2].split(","))
    targets = intersection(targets, header_list)
    col_dict = {header_list.index(i):i for i in targets}
    return mapPartition(data, col_dict)

def edfs_shuffle(data_mapped:dict):
    data_shuffled = {}
    for key in data_mapped.keys():
        for item in data_mapped[key]:
            col_key, value = item[0], item[1]
            if col_key not in data_shuffled:
                data_shuffled[col_key] = []
            data_shuffled[col_key].append(convert_str(value))
    return data_shuffled

def edfs_reduce(data_shuffled:dict, function:int):
    data_reduced = {}
    if function == "SUM":
        for col in data_shuffled.keys():
            data_reduced[col] = sum(list(filter(lambda item: item is not None, data_shuffled[col])))
    elif function == "MAX":
        for col in data_shuffled.keys():
            data_reduced[col] = max(list(filter(lambda item: item is not None, data_shuffled[col])))
    elif function == "MIN":
        for col in data_shuffled.keys():
            data_reduced[col] = min(list(filter(lambda item: item is not None, data_shuffled[col])))
    elif function == "AVG":
        for col in data_shuffled.keys():
            shuffled_data = list(filter(lambda item: item is not None, data_shuffled[col]))
            data_reduced[col] = sum(shuffled_data)/len(shuffled_data)
    return data_reduced

def execute(implementation:int, function:int, targets:[]=None, file:str=None, DEBUG=False):
    if implementation == "MYSQL":
        data_mapped = sql_map(targets, file)
    if implementation == "FIREBASE":
        data_mapped = firebase_map(targets, file)
    if implementation == "MONGODB":
        data_mapped = mongodb_map(targets, file)
    data_shuffled = edfs_shuffle(data_mapped)
    data_reduced = edfs_reduce(data_shuffled, function)

    for item in targets:
        if item not in data_reduced:
            data_reduced[item] = None

    if DEBUG:
        print(f"Data Mapped:\n {data_mapped}\n")
        print(f"Data Shuffled:\n {data_shuffled}\n")
        print(f"Data Reduced:\n {data_reduced}\n")

    data_reduced = dict((key[:4], value) for (key, value) in data_reduced.items())
    df = pd.DataFrame({'Year': list(data_reduced.keys()), 'Value': list(data_reduced.values())})

    return df.sort_values(by='Year')

In [131]:
data_agg

Unnamed: 0,Year,Value
0,2018,2020.0


In [135]:
data_agg = execute("MONGODB", "AVG", targets=["2018", "2019"], file=f"datasets/Data.csv", DEBUG=True)

Data Mapped:
 {'/root/GDP_Growth/\ufeffCountry Name': [('2018', '2020'), ('2019', '2021')], '/root/GDP_Growth/Aruba': [('2018', '-22.31894795'), ('2019', '1.238927')], '/root/GDP_Growth/Africa Eastern and Southern': [('2018', '-2.886855648'), ('2019', '4.303667021')], '/root/GDP_Growth/Afghanistan': [('2018', '-2.351100673'), ('2019', '4.237913')], '/root/GDP_Growth/Africa Western and Central': [('2018', '-0.897014041'), ('2019', '3.914798957')], '/root/GDP_Growth/Angola': [('2018', '-5.500000003'), ('2019', '0.700000005')], '/root/GDP_Growth/Albania': [('2018', '-3.481630373'), ('2019', '8.544083251')], '/root/GDP_Growth/Andorra': [('2018', '-11.18393994'), ('2019', '8.949417981')], '/root/GDP_Growth/Arab World': [('2018', '-5.051382598'), ('2019', '3.530294057')], '/root/GDP_Growth/United Arab Emirates': [('2018', '-6.134500803'), ('2019', '5.38917498')], '/root/GDP_Growth/Argentina': [('2018', '-9.895268964'), ('2019', '10.26376093')], '/root/GDP_Growth/Armenia': [('2018', '-7.4'), 

# Spark Implementation

In [7]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import pandas as pd

In [3]:
import os
os.environ["JAVA_HOME"] = "/Library/Internet\ Plug-Ins/JavaAppletPlugin.plugin/Contents/Home"

In [5]:
# spark= SparkSession \
#     .builder \
#     .getOrCreate()

In [52]:
df = pd.read_csv("datasets/Human_Capital_Index.csv")

In [53]:
df.head()

Unnamed: 0,Country Name,Country Code,Series Name,Series Code,2010 [YR2010],2011 [YR2011],2012 [YR2012],2013 [YR2013],2014 [YR2014],2015 [YR2015],2016 [YR2016],2017 [YR2017],2018 [YR2018],2019 [YR2019],2020 [YR2020]
0,Afghanistan,AFG,Expected Years of School,HD.HCI.EYRS,..,..,..,..,..,..,..,8.58,8.720339775,..,8.901890755
1,Albania,ALB,Expected Years of School,HD.HCI.EYRS,11.61565018,..,..,..,..,..,..,12.99,12.88550758,..,12.88938141
2,Algeria,DZA,Expected Years of School,HD.HCI.EYRS,11.32690144,..,..,..,..,..,..,11.42,11.77531815,..,11.84803486
3,American Samoa,ASM,Expected Years of School,HD.HCI.EYRS,..,..,..,..,..,..,..,..,..,..,..
4,Andorra,AND,Expected Years of School,HD.HCI.EYRS,..,..,..,..,..,..,..,..,..,..,..


In [54]:
def is_year (c):
    return any(char.isdigit() for char in c)  

In [58]:
df.dropna(inplace=True)

In [61]:
# new_columns

In [59]:
# change columns names
new_columns = list()
columns = df.columns
for c in columns:
    if is_year(c):
        new_columns.append(c[:4])
    else:
        new_columns.append(c.replace(" ","_"))

# change column names in dataframe
df.columns = new_columns
df['Year'] = df['Year'].astype(int)
df = df.loc[df['Value'] != '..'].copy()
df['Value'] = df['Value'].astype(float)

KeyError: 'Year'

In [40]:
df_melted_2.dropna(inplace=True)

In [51]:
df_melted.isnull().sum()

Country Name      0
Indicator Name    0
Year              0
Value             0
dtype: int64

In [47]:
df_melted.loc[df_melted.Value.str.isnumeric()]

ValueError: Cannot mask with non-boolean array containing NA / NaN values

In [25]:
df_melted = df_melted.loc[df_melted.Value.str.isnumeric()].copy()

# change columns names
new_columns = list()
columns = df_melted.columns
for c in columns:
        new_columns.append(c.replace(" ","_"))

# change column names in dataframe
df_melted.columns = new_columns
df_melted.astype({'Year':'int', 'Value': 'float'})

ValueError: Cannot mask with non-boolean array containing NA / NaN values

# Datasets

In [7]:
df = pd.read_csv("datasets/sql-edfs/data.csv")

In [11]:
df2 = df.sample(50, random_state=42).copy()

In [12]:
df2.head()

Unnamed: 0,Country Name,Country Code,Series Name,Series Code,1960 [YR1960],1961 [YR1961],1962 [YR1962],1963 [YR1963],1964 [YR1964],1965 [YR1965],...,2012 [YR2012],2013 [YR2013],2014 [YR2014],2015 [YR2015],2016 [YR2016],2017 [YR2017],2018 [YR2018],2019 [YR2019],2020 [YR2020],2021 [YR2021]
208,Central Europe and the Baltics,CEB,Access to electricity (% of population),EG.ELC.ACCS.ZS,..,..,..,..,..,..,...,100.0,100.0,100.0,100.0,99.8278344126722,100.0,100.0,100.0,99.9796400333302,..
6,Antigua and Barbuda,ATG,Access to electricity (% of population),EG.ELC.ACCS.ZS,..,..,..,..,..,..,...,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,..
79,Haiti,HTI,Access to electricity (% of population),EG.ELC.ACCS.ZS,..,..,..,..,..,..,...,37.9000015258789,38.6978759765625,39.496410369873,40.8362846374512,40.4000015258789,43.7492713928223,44.962345123291,45.990234375,46.9255332946777,..
204,Africa Eastern and Southern,AFE,Access to electricity (% of population),EG.ELC.ACCS.ZS,..,..,..,..,..,..,...,31.8443844038242,31.794159992766,32.001026781227,33.8719104646129,38.8801732244203,40.261357595893,43.061876950924,44.2708604789338,45.8034852293869,..
117,Mexico,MEX,Access to electricity (% of population),EG.ELC.ACCS.ZS,..,..,..,..,..,..,...,99.1116333007813,99.1476974487305,99.1729278564453,99.0,99.5,100.0,99.5,99.5999984741211,99.4000015258789,..


In [13]:
df2.to_csv('datasets/Access_Electricity.csv', index=False)

In [15]:
df = pd.read_csv("datasets/sql-edfs/CookingData.csv")

In [17]:
df3 = df.sample(50, random_state=42).copy()

In [19]:
df3.to_csv('datasets/Access_Fuels.csv', index=False)

In [20]:
df = pd.read_csv("datasets/Human_Capital_Index.csv")

In [24]:
df4 = df.sample(50, random_state=42)

In [26]:
df4.to_csv("datasets/Human_Capital_Index_Sample.csv", index=False)