# Import libs

In [None]:
import numpy as np
import time
import matplotlib.pyplot as plt
import pandas as pd
import glob
from dateutil import parser
import datetime
import os

# Define functions

In [None]:
last_date = ''
last_time = ''

def readLogFileWithGlob(globString, date, logTimeStampStr):
    logfile = pd.read_csv(glob.glob(globString)[0], header=None)
    logfile[0] = logfile[0].apply(lambda x: int(getTimeStamp(logTimeStampStr, x.strip().split(' ')[0]).timestamp()))
    logfile[2] = logfile[2].apply(lambda x: x.strip().split(' ')[2].strip())
    logfile[2] = logfile[2].apply(lambda x: float(x))
    logfile[3] = logfile[3].apply(lambda x: float(x))
    logfile[4] = logfile[4].apply(lambda x: float(x))
    logfile.columns = ['time', 'users', 'load1','load5', 'load15']
    logfile = logfile.drop(columns=['users']);
    return logfile

def cleanSparkgenLine(line):
    splitted = line.split(' ')
    global last_date
    global last_time
    if(len(splitted) < 2):
        return [last_date, last_time, line]
    last_date = splitted[0].strip()
    last_time = splitted[1].strip()
    return [last_date, last_time, ' '.join(splitted[2:len(splitted)])]

def groupLinesByThree(lines):
    startTime = parser.parse(lines[0][1])
    endTime = parser.parse(lines[2][1])
    duration = (endTime - startTime).total_seconds()

    result = lines[2][2]
    if "lenet5" in lines[0][2]:
        application = "lenet5"
    else:
        application = "bi-rnn"
    return [startTime, endTime, duration, result, application]

def readSparkGen(logTimeStampStr, prefix = '.'):
    filename = prefix + '/' + 'sparkgen'
    sparkgenDate = ''
    first = True
    with open(filename) as f:
        content = f.readlines()
    # you may also want to remove whitespace characters like `\n` at the end of each line
    content = [cleanSparkgenLine(x) for x in content]
    
    df_sparkgen = pd.DataFrame(content)
    df_sparkgen.columns = ['date', 'time', 'content']
    correctLine = df_sparkgen["content"].str.contains(('true,'))
    valid = df_sparkgen["content"].str.startswith(('[LOG')) | df_sparkgen["content"].str.contains(('true,'))
    correctData = df_sparkgen[correctLine]
    
    total = 0
    experiments = pd.DataFrame(columns=['start', 'end', 'duration', 'result', 'application'])
    for index, row in correctData.iterrows():
        splitted = row['content'].split(',')
        if first:
            first = False
            sparkgenDate = row['date']
        total += 1
        duration = PFTS_Old(splitted[5]) - PFTS_Old(splitted[4])
        if(duration < 0):
            duration = PFTS_alt(splitted[5]) - PFTS_alt(splitted[4])
        new_start_time = (parser.parse(row['date'] + " " + row['time'] ) - datetime.timedelta(seconds=duration)).strftime("%H:%M:%S")
        new_end_time = row['time']
        experiments.loc[total] =[ getTimeStamp(logTimeStampStr, new_start_time).timestamp(), getTimeStamp(logTimeStampStr, new_end_time).timestamp(), duration, row['content'], 'lenet5']
    return [experiments, sparkgenDate]

def filter_log_row(row, exp):
    res = (exp['start'] <= row) & (exp['end'] >= row)
    numValid = (res).sum()
    if numValid > 0:
        return True
    else:
        return False

def PFTS_logs(str):
    epochTime = str
    dt = datetime.datetime.fromtimestamp(epochTime, datetime.timezone.utc)
    str_time = dt.strftime("%H:%M:%S")
    newEpoch = int(time.mktime(parser.parse(str_time).timetuple()))
    return newEpoch                         

def PFTS_Old(str):
    return int(round(float(str)))

def PFTS_alt(str):
    epochTime = PFTS_Old(str) + 7200
    dt = datetime.datetime.fromtimestamp(epochTime, datetime.timezone.utc)
    str_time = dt.strftime("%H:%M:%S")
    newEpoch = int(time.mktime(parser.parse(str_time).timetuple()))
    return newEpoch 

def PFTS(str):
    epochTime = PFTS_Old(str)
    dt = datetime.datetime.fromtimestamp(epochTime, datetime.timezone.utc)
    str_time = dt.strftime("%H:%M:%S")
    newEpoch = int(time.mktime(parser.parse(str_time).timetuple()))
    return newEpoch         
    
def filterSlaveDataFrame(df, exp):
    filtered_rows = df['time'].apply(lambda x: filter_log_row(x, exp))
    return df[filtered_rows]

def readAllSlaveLogs(date, logTimeStampStr, prefix = '.'):
    df1 = readLogFileWithGlob(prefix + '/' +'*_1.log', date, logTimeStampStr)
    df2 = readLogFileWithGlob(prefix + '/' +'*_2.log', date, logTimeStampStr)
    df3 = readLogFileWithGlob(prefix + '/' +'*_3.log', date, logTimeStampStr)
    df4 = readLogFileWithGlob(prefix + '/' +'*_4.log', date, logTimeStampStr)
    return {'log1': df1, 'log2': df2, 'log3': df3, 'log4': df4}

def filterLogs(logs, exp):
    filtered_df1 = filterSlaveDataFrame(logs['log1'], exp)
    filtered_df2 = filterSlaveDataFrame(logs['log2'], exp)
    filtered_df3 = filterSlaveDataFrame(logs['log3'], exp)
    filtered_df4 = filterSlaveDataFrame(logs['log4'], exp)
    return {'log1': filtered_df1, 'log2': filtered_df2, 'log3': filtered_df3, 'log4': filtered_df4}

def bestDate(l):
    if not pd.isnull(l[0]):
        return l[0]
    if not pd.isnull(l[1]):
        return l[1]
    if not pd.isnull(l[2]):
        return l[2]
    if not pd.isnull(l[3]):
        return l[3]
    
def averageWithNaN(values):
    return np.nanmean(values)

def averageLogSet(logs):
    tmp1 = logs['log1']
    tmp1.columns = ['time-1','load1-1', 'load5-1', 'load15-1']
    tmp2 = logs['log2']
    tmp2.columns = ['time-2','load1-2', 'load5-2', 'load15-2']
    tmp3 = logs['log3']
    tmp3.columns = ['time-3','load1-3', 'load5-3', 'load15-3']
    tmp4 = logs['log4']
    tmp4.columns = ['time-4','load1-4', 'load5-4', 'load15-4']

    combined = pd.concat([tmp1, tmp2, tmp3, tmp4], axis=1)

    averaged = pd.DataFrame(columns=['time', 'avg_load1', 'avg_load5', 'avg_load15'])
    for index, row in combined.iterrows():
        time = bestDate([row['time-1'],row['time-2'], row['time-3'], row['time-4']])
        avg_load1 = averageWithNaN([row['load1-1'], row['load1-2'], row['load1-3'], row['load1-4']])
        avg_load5 = averageWithNaN([row['load5-1'], row['load5-2'], row['load5-3'], row['load5-4']])
        avg_load15 = averageWithNaN([row['load15-1'], row['load15-2'], row['load15-3'], row['load15-4']])
        averaged.loc[index] = [time, avg_load1, avg_load5, avg_load15]
    return averaged

def correctForDD(name, logs):
    # Parse name
    splitted = name.split('_')
    fast_slaves = int(splitted[3])
    slow_slaves = 4 - fast_slaves
    logList = ['log1','log2','log3','log4']
    fast_logs = logList[0:fast_slaves]
    slow_logs = logList[fast_slaves:4]
    newLogs = {}
    for logName in slow_logs:
        newLogs[logName] = correctDFForDD(logs[logName], 0.75)
    for logName in fast_logs:
        newLogs[logName] = correctDFForDD(logs[logName], 0.5)
    return newLogs

def correctDFForDD(df, factor):
    result = df.copy()
    result['load1'] = result['load1'].apply(lambda x :x)
    result['load5'] = result['load5'].apply(lambda x : x)
    result['load15'] = result['load15'].apply(lambda x : x)
    return result

def extractLoadForTimeRange(start, end, loadData, resolution='avg_load1'):
    mask = (loadData['time'] >= start) & (loadData['time'] <= end)
    meanDataForExp = loadData[mask].iloc[1:].mean(axis = 0) 
    return meanDataForExp
    
def getLogDataPerRow(row, data):
    
    startTime = row['start']
    endTime = row['end']
    return extractLoadForTimeRange(startTime, endTime, data)
    
def getIndividualExperiments(sparkgen, data):
    combinedSparkgen = pd.DataFrame(columns=['start','end','duration','result','application','avg_load1', 'avg_load5', 'avg_load15'])
    for index, row in sparkgen.iterrows():
        meanValues = getLogDataPerRow(row, data)
        newRow = np.append(row.values, meanValues)
        combinedSparkgen.loc[index] = newRow
    return combinedSparkgen

def extractLoadForTimeRange(start, end, loadData, resolution='avg_load1'):
    mask = (loadData['time'] >= start) & (loadData['time'] <= end)
    # Drop the first row
    meanDataForExp = loadData[mask].iloc[1:].mean(axis = 0) 
    return meanDataForExp
    
def getLogDataPerRow(row, data):
    
    startTime = row['start']
    endTime = row['end']
    return extractLoadForTimeRange(startTime, endTime, data)
    
def getIndividualExperiments(expStruct, dset='filtered_data'):
    data = expStruct[dset]['averaged'].copy()
    sparkgen = expStruct['sparkgen']
    tl = expStruct['typesList']
    combinedSparkgen = pd.DataFrame(columns=['cpu_ratio_of_fast', 'ram_of_all_nodes', 'number_of_fast', 'start','end','duration','result','application','avg_load1', 'avg_load5', 'avg_load15'])
    for index, row in sparkgen.iterrows():
        meanValues = getLogDataPerRow(row, data)    
        combinedSparkgen = combinedSparkgen.append({
            'cpu_ratio_of_fast' : tl[0], 
            'ram_of_all_nodes'  : tl[1], 
            'number_of_fast' : tl[2], 
            'start' : row['start'],
            'end': row['end'],
            'duration': row['duration'],
            'result': row['result'],
            'application': row['application'],
            'avg_load1' : meanValues['avg_load1'], 
            'avg_load5': meanValues['avg_load5'], 
            'avg_load15': meanValues['avg_load15']
        }, ignore_index=True)
    return combinedSparkgen

def extractLoadForTimeRange(start, end, loadData, resolution='avg_load1'):
    mask = (loadData['time'] >= start) & (loadData['time'] <= end)
    # Drop the first row
    meanDataForExp = loadData[mask].iloc[1:].mean(axis = 0) 
    return meanDataForExp
    
def getLogDataPerRow(row, data):
    startTime = row['start']
    endTime = row['end']
    return extractLoadForTimeRange(startTime, endTime, data)
    
def getIndividualExperiments(sparkgen, data, tl):
    combinedSparkgen = pd.DataFrame(columns=['cpu_ratio_of_fast', 'ram_of_all_nodes', 'number_of_fast', 'start','end','duration','result','application','avg_load1', 'avg_load5', 'avg_load15','norml1', 'norml5', 'norml15'])
    for index, row in sparkgen.iterrows():
        meanValues = getLogDataPerRow(row, data.copy())    
        combinedSparkgen = combinedSparkgen.append({
            'cpu_ratio_of_fast' : tl[0], 
            'ram_of_all_nodes'  : tl[1], 
            'number_of_fast' : tl[2], 
            'start' : row['start'],
            'end': row['end'],
            'duration': row['duration'],
            'result': row['result'],
            'application': row['application'],
            'avg_load1' : meanValues['avg_load1'], 
            'avg_load5': meanValues['avg_load5'], 
            'avg_load15': meanValues['avg_load15'],
            'norml1': meanValues['avg_load1'] / row['duration'],
            'norml5': meanValues['avg_load5'] / row['duration'],
            'norml15': meanValues['avg_load15'] / row['duration'],
        }, ignore_index=True)
    return combinedSparkgen

def getTimeStamp(after, string):
    day = datetime.datetime.utcfromtimestamp(after)
    timestring = datetime.datetime.strptime(str(day.date())+"T"+string, "%Y-%m-%dT%H:%M:%S")
    if timestring < day:
        timestring += datetime.timedelta(days=1)

    return timestring

def ClusterDataSet(name, path):
    logtimestamp = float(path.split('_')[0])
    
    sparkgenData = readSparkGen(logtimestamp, path)
    sparkgen = sparkgenData[0]
    logs_raw = readAllSlaveLogs(sparkgenData[1], logtimestamp, path)
    logs_corrected = readAllSlaveLogs(sparkgenData[1], logtimestamp, path)
    logs_corrected = correctForDD(name ,logs_corrected)
    logs_filtered = readAllSlaveLogs(sparkgenData[1], logtimestamp, path)
    logs_filtered = correctForDD(name ,logs_filtered)
    lf = filterLogs(logs_filtered, sparkgen)
    logs_raw_averaged = averageLogSet(logs_raw)
    logs_corrected_averaged = averageLogSet(logs_corrected)
    lf_averaged = averageLogSet(lf)
    
    cpu_ratio_of_fast = int(name.split('_')[0])
    ram_of_all_nodes = int(name.split('_')[1])
    number_of_fast = int(name.split('_')[2])
    tl = [cpu_ratio_of_fast, ram_of_all_nodes, number_of_fast]

    dataset = {
        "name": name,
        "typesList": tl,
        "raw_data": {"logs": logs_raw, "averaged": logs_raw_averaged},
        "logs_corrected": {"logs": logs_corrected, "averaged": logs_corrected_averaged},
        "filtered_data": {"logs": lf, "averaged": lf_averaged},
        "sparkgen": sparkgen,
        "sepExps": getIndividualExperiments(sparkgen, lf_averaged, tl)
              }
    return dataset

# Load all experiments
* Parse
* Clean
* Correlated loadvalues to experiments

In [None]:
experiment_data = {}
for filename in glob.iglob('*', recursive=True):
    if not os.path.isfile(filename): # filter dirs
        name = ''
        if len(filename.split('.')) < 3:
            name = '_'.join(filename.split('_')[1:5])
        else:
            name = filename.split('.')[2]
        print(">> Loading " + name)
        dataset = ClusterDataSet(name, filename)
        experiment_data[name] = dataset
print(">> Finished loading all experiments")

## Merge all single experiments

In [None]:
allExps = []
for key, exp in experiment_data.items():
    allExps.append(exp['sepExps'])
allExperiments =  pd.concat(allExps, ignore_index=True)
allExperiments.to_csv('output_lenet5.csv', sep='\t')

## Create experiment summary

In [None]:
summary = pd.DataFrame(columns=['CPU', 'RAM', 'Machine Ratio','Duration', 'Load'])
idx = 0
for key, exp in experiment_data.items():
    avg_duration = exp['sparkgen']['duration'].mean()
    loadavg = exp['filtered_data']['averaged'].mean()['avg_load1']
    row = np.append(exp['typesList'], avg_duration)
    row = np.append(row, exp['filtered_data']['averaged'].mean()['avg_load1'])
    summary.loc[idx] = row
    idx += 1
summary['CPU'] = summary['CPU'].apply(int)
summary['RAM'] = summary['RAM'].apply(int)
summary['Machine Ratio'] = summary['Machine Ratio'].apply(int)
summary.to_csv('summary.csv', sep='\t')
summary

## Analysis

### Import libs

In [None]:
import pandas
import researchpy as rp
import seaborn as sns
import numpy as np

import statsmodels.api as sm
from statsmodels.formula.api import ols
import statsmodels.stats.multicomp

### Load data

In [None]:
df = pd.read_csv('output_lenet5.csv', delimiter='\t')
df.columns = ['index','CPU', 'RAM', 'Machine_Ratio', 'start', 'end', 'duration', 'result', 'application', 'load', 'avg_load5', 'avg_load15','norml1','norml5','norml15']

### Create models

In [None]:
model_load = ols('load ~ C(CPU)*C(RAM)*C(Machine_Ratio)', df).fit()
model_duration = ols('duration ~ C(CPU)*C(RAM)*C(Machine_Ratio)', df).fit()

# Seeing if the overall model is significant
print(f"Load Model: Overall model F({model_load.df_model: .0f},{model_load.df_resid: .0f}) = {model_load.fvalue: .3f}, p = {model_load.f_pvalue: .4f}")
print(f"Duration Model: Overall model F({model_duration.df_model: .0f},{model_duration.df_resid: .0f}) = {model_duration.fvalue: .3f}, p = {model_duration.f_pvalue: .4f}")

### Load Model Summary

In [None]:
model_load.summary()

### Duration Model Summary

In [None]:
model_duration.summary()

### Create Anova stats

In [None]:
res_load = sm.stats.anova_lm(model_load, typ= 2)
res_load

In [None]:
res_duration = sm.stats.anova_lm(model_duration, typ= 2)
res_duration

### Write output of models to latex

In [None]:
with open('anova_load.tex','w') as tf:
    tf.write(res_load.to_latex(index=True))

with open('model_load.tex','w') as tf:
    tf.write(model_load.summary().as_latex())

with open('anova_duration.tex','w') as tf:
    tf.write(res_duration.to_latex(index=True))
    
with open('model_duration.tex','w') as tf:
    tf.write(model_duration.summary().as_latex())