# Data preparation notebook

In [1]:
import pandas as pd
import glob
import os
import shutil
import re
from datetime import datetime
entry_directory = "Raw"
prepared_directory = "Prepared"
organised_directory = "Organised"

## Sum CPU and RAM usage for specific timestamps

In [None]:
def calculate_sums(file_path):

    node_data_df = pd.read_csv(file_path, dtype = {'timestamp' : 'string', 'PID' : 'int', 'CPU': 'float64', 'RAM': 'float64'})
    node_data_df['timestamp'] = node_data_df['timestamp'].apply(lambda x: x if len(x.split(".")) > 1 else x + ".000000" )
    node_data_df['epoch'] = node_data_df['timestamp'].apply(lambda x: (datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f") - datetime(1970, 1, 1)).total_seconds())
    min_timestamp = node_data_df['epoch'].min()
    node_data_df['epoch'] = node_data_df['epoch'].apply(lambda x: x - min_timestamp)
    node_data_df.drop('PID', axis='columns', inplace=True)
    node_data_df.drop('timestamp', axis='columns', inplace=True)
    node_data_df = node_data_df.groupby("epoch").sum()

    return node_data_df


for root, _, files in os.walk(f".\{entry_directory}"):
        for file in files:
            full_path = os.path.join(root, file)
            if re.search("\d\d_\d\d_\d\d\d\d_\d\d_\d\d_\d\d.csv$", full_path):
                if os.path.exists(full_path.replace(entry_directory, prepared_directory)):
                    continue
                calculated_dataframe_to_save = calculate_sums(full_path)
                if not os.path.exists(root.replace(entry_directory, prepared_directory)):
                    os.makedirs(root.replace(entry_directory, prepared_directory))
                calculated_dataframe_to_save.to_csv(full_path.replace(entry_directory, prepared_directory))

## Find worker nodes and move them to Organised directory

In [None]:
root = f".\{prepared_directory}"
for functionTypeDirectory in os.listdir(root):
    if os.path.isdir(os.path.join(root, functionTypeDirectory)):
        for functionDirectory in os.listdir(os.path.join(root, functionTypeDirectory)):
            for numberOfNodesDirectory in os.listdir(os.path.join(root, functionTypeDirectory, functionDirectory)):
                destinationPath =  os.path.join(root, functionTypeDirectory, functionDirectory, numberOfNodesDirectory, 'source-data')
                for nodeDirectory in os.listdir(destinationPath):
                    path =  os.path.join(destinationPath, nodeDirectory)
                    all_files = glob.glob(path + "/*.csv")
                    dfs = []
                    i = 1
                    for filename in all_files:
                        df = pd.read_csv(filename)
                        dfs.append(df)
                        if df['RAM'].mean() >= 2 and df['CPU'].head(15).mean() > 3:
                            if not os.path.exists(os.path.join(destinationPath.replace(prepared_directory, organised_directory), str(i))):
                                os.makedirs(os.path.join(destinationPath.replace(prepared_directory, organised_directory), str(i)))
                            shutil.copyfile(filename, os.path.join(destinationPath.replace(prepared_directory, organised_directory), str(i), os.path.basename(filename)))
                        i += 1

## Join files into one

In [13]:
def join_files(entry_dir, category):
    directory = f"./{entry_dir}/{category}"
    udf_dirs = os.listdir(directory)
    label = category
    result_df = pd.DataFrame()
    snapshot = 0
    for udf in udf_dirs:
        for root, _, files in os.walk(f"./{directory}/{udf}"):
            for file in files:
                full_path = os.path.join(root, file)

                node_data_df = pd.read_csv(full_path)
                node_data_df['snapshot'] = snapshot
                snapshot += 1
                node_data_df["label"] = label
                node_data_df["udf"] = udf
                node_data_df["size"] = full_path.split("\\")[-4].split("-")[3][0]
                result_df = pd.concat([result_df,node_data_df])
    result_df[["snapshot", "label", "udf", "epoch", "CPU", "RAM", "size"]].to_csv(f"{directory}/joined_{category}.csv", index = False)

labels = ['aggregation', 'filtration', 'filtration-aggregation', 'filtration-aggregation-join', 'filtration-join']

for label in labels:
    join_files(organised_directory, label)

## Normalize joined files for each udf type

In [None]:
udf_types = ['aggregation', 'filtration', 'filtration-aggregation', 'filtration-aggregation-join', 'filtration-join']
full_df = pd.DataFrame()
for udf_type in udf_types:
    full_df = pd.concat([full_df, pd.read_csv(f"{organised_directory}/{udf_type}/joined_{udf_type}.csv")])

full_df.CPU = (full_df.CPU-full_df.CPU.min())/(full_df.CPU.max() - full_df.CPU.min())
full_df.RAM = (full_df.RAM-full_df.RAM.min())/(full_df.RAM.max() - full_df.RAM.min())
for udf_type in udf_types:
    if not os.path.exists(f"{organised_directory}/{udf_type}"):
        os.makedirs(f"{organised_directory}/{udf_type}")
    full_df[full_df.label == udf_type].to_csv(f"./{organised_directory}/{udf_type}/normalized_{udf_type}.csv", index = False)

## Smooth out values in joined and normalised files

In [4]:
def smooth_values(entry_dir, category, window_value, file_type):
    # reading file
    path_to_df = f"./{entry_dir}/{category}/{file_type}_{category}.csv"
    df = pd.read_csv(path_to_df)
    cpu_df = df.groupby('snapshot')['CPU'].rolling(window=window_value, min_periods = 1).mean().to_frame()
    ram_df = df.groupby('snapshot')['RAM'].rolling(window=window_value, min_periods = 1).mean().to_frame()
    df['CPU'] = cpu_df.reset_index()['CPU'] 
    df['CPU'] = df['CPU'].round(2)
    df['RAM'] = ram_df.reset_index()['RAM']
    df['RAM'] = df['RAM'].round(2)
    df['epoch'] = df['epoch'].round(3)
    df.fillna(0).to_csv(f"./{entry_dir}/{category}/{window_value}_{file_type}_smooth_{category}.csv", index = False)

labels = ['aggregation', 'filtration', 'filtration-aggregation', 'filtration-aggregation-join', 'filtration-join']
window_sizes = [6, 12, 18]
for window_size in window_sizes:
    for label in labels:
        smooth_values(organised_directory, label, window_size, "joined")
        smooth_values(organised_directory, label, window_size, "normalized")