In [None]:
import pandas as pd
import numpy as np
import os
import sys
from pathlib import Path
import pickle

import matplotlib.pyplot as plt
from cycler import cycler

# Parse results from logs. Retrieve power consumption, fragmentation, and task scheduling information.

In [None]:
# Dictionaries where the results will be stored.
df_pwr_dict = {}
df_frag_dict = {}
df_sched_pod_dict = {}

DATADIR = "./2024_0606"
data = Path(DATADIR)

fileDirs = sorted([x for x in data.iterdir() if x.is_dir()])
for fdir in fileDirs:
    df_pwr_dict[fdir.name] = {}
    df_frag_dict[fdir.name] = {}
    df_sched_pod_dict[fdir.name] = {}

    policyDirs = sorted([x for x in fdir.iterdir() if x.is_dir()])
    for pdir in policyDirs:
        print(f"Processing data for set of experiments: {fdir.name}.{pdir.name}")
        
        tuneDirs = sorted([x for x in pdir.iterdir() if x.is_dir()])
        for tdir in tuneDirs:
            seedDirs = sorted([x for x in tdir.iterdir() if x.is_dir()])
            for sdir in seedDirs:
                pwrfile = sdir / 'analysis_pwr.csv'
                grep = sdir / 'analysis_grep.out'
                alloc = sdir / 'analysis_allo.csv'
                schedfile = sdir / 'analysis_cdol.csv'
                fragfile = sdir / 'analysis_frag.csv'

                # Retrieve the total GPU cluster capacity (in millis).
                try:
                    with open(grep, 'r') as file:
                        content = file.read()  # Read the entire file into a string
                        # Find the position of 'allocation: ' in the content
                        start_idx = content.find('MilliGpu: ')
                        if start_idx != -1:
                            # Find the end of the line
                            end_idx = content.find('\n', start_idx)
                            # Extract the allocation part
                            selection = content[start_idx:end_idx]
                            # Extract the integer part by splitting the string
                            allocation_value = int(selection.split("/")[1].split(")")[0])
                        else:
                            raise Exception("MilliGpu cluster info not found. Error!\n")

                except Exception as e:
                    print("ERROR grep analysis: %s\n" % (e))

                # print(f"Total GPU cluster capacity: {allocation_value}\n")

                
                
                ### Collect telemetry about the GPU workload (in millis) that the cluster has received. ###
                try:
                    df_allo = pd.read_csv(alloc)
                    df_allo.rename(columns = lambda x: x.split('-')[-1], inplace=True)
                    cum_gpu_allo = df_allo.loc[:, 'arrived_gpu_milli'] / allocation_value

                except Exception as e:
                    print("ERROR alloc analysis: %s\n" % (e))

                # print(df_allo)
                # sys.exit() # DEBUG!
                

                
                # Set up the index for GPU requests received by the cluster.
                new_index = np.arange(0, 1.005, 0.005)

                ### Collect telemetry about power consumption. ###
                try:
                    df_pwr = pd.read_csv(pwrfile)
                    df_pwr.rename(columns = lambda x: x.split('-')[-1], inplace=True)
                    df_pwr["cumulative_workload"] = cum_gpu_allo
                    df_pwr.set_index("cumulative_workload", inplace = True)

                    # Remove rows with duplicated index entries, keeping only the first entry for each group of duplicates.
                    # Then, add the entries in new_index in df_pwr's existing index via a union.
                    # Then, reindex and interpolate the missing values.
                    # Finally, keep only the rows whose index entries are present in new_index (i.e., the regularly spaced ones).
                    df_pwr = df_pwr[~df_pwr.index.duplicated(keep='first')]
                    df_pwr = df_pwr.reindex(df_pwr.index.union(new_index)).interpolate(method='linear').ffill().bfill()
                    df_pwr = df_pwr.loc[new_index]
                    if df_pwr.isna().any().any(): 
                        # print(df_pwr[df_pwr.isna().any(axis=1)])
                        raise Exception("dataframe contains NaNs!\n")
                        
                    df_pwr_dict[fdir.name].setdefault(pdir.name, list()).append(df_pwr)

                except Exception as e:
                    print("ERROR power analysis: %s\n" % (e))

                # print(df_pwr)
                # sys.exit() # DEBUG!



                ### Collect telemetry about power consumption. ###
                try:
                    df_frag = pd.read_csv(fragfile)
                    df_frag.rename(columns = lambda x: x.split('-')[-1], inplace=True)
                    df_frag = df_frag[["origin_ratio"]]
                    df_frag["cumulative_workload"] = cum_gpu_allo
                    df_frag.set_index("cumulative_workload", inplace = True)

                    # Remove rows with duplicated index entries, keeping only the first entry for each group of duplicates.
                    # Then, add the entries in new_index in df_pwr's existing index via a union.
                    # Then, reindex and interpolate the missing values.
                    # Finally, keep only the rows whose index entries are present in new_index (i.e., the regularly spaced ones).
                    df_frag = df_frag[~df_frag.index.duplicated(keep='first')]
                    df_frag = df_frag.reindex(df_frag.index.union(new_index)).interpolate(method='linear').ffill().bfill()
                    df_frag = df_frag.loc[new_index]
                    if df_frag.isna().any().any(): 
                        # print(df_frag[df_frag.isna().any(axis=1)])
                        raise Exception("dataframe contains NaNs!\n")
                        
                    df_frag_dict[fdir.name].setdefault(pdir.name, list()).append(df_frag)

                except Exception as e:
                    print("ERROR fragmentation analysis: %s\n" % (e))

                #print(df_frag)
                #sys.exit() # DEBUG!
                

                
                ### Collect telemetry about pods that the cluster failed to schedule. ###
                try:
                    df_sched_pod = pd.read_csv(schedfile)
                    df_sched_pod.rename(columns = lambda x: x.split('-')[-1], inplace=True)
                    df_sched_pod = df_sched_pod[['event']]
                    df_sched_pod["issued_pods"] = df_sched_pod.index + 1
                    df_sched_pod["cumulative_workload"] = cum_gpu_allo
                    df_sched_pod['event'] = 1 * (df_sched_pod['event'] == 'failed')
                    df_sched_pod['event'] = df_sched_pod['event'].cumsum()
                    df_sched_pod.rename(columns = {'event' : 'failed_pods_cumsum'}, inplace = True)
                    df_sched_pod['arrived_gpu_milli'] = df_allo['arrived_gpu_milli']
                    df_sched_pod['used_gpu_milli'] = df_allo['used_gpu_milli']
                    df_sched_pod['successful_pods'] = df_sched_pod["issued_pods"] - df_sched_pod["failed_pods_cumsum"]
                    df_sched_pod.set_index("cumulative_workload", inplace = True)

                    df_sched_pod = df_sched_pod[~df_sched_pod.index.duplicated(keep='first')]
                    df_sched_pod = df_sched_pod.reindex(df_sched_pod.index.union(new_index)).interpolate(method='linear').ffill().bfill()
                    df_sched_pod = df_sched_pod.loc[new_index]
                    if df_sched_pod.isna().any().any(): 
                        raise Exception("dataframe contains NaNs!\n")
                    
                    df_sched_pod_dict[fdir.name].setdefault(pdir.name, list()).append(df_sched_pod)
                
                except Exception as e:
                    print("ERROR scheduling analysis: %s\n" % (e))

                # print(df_sched_pod)
                # sys.exit() # DEBUG!


# display(df_pwr_dict.keys())
# display(df_pwr_dict)
# display(df_sched_pod_dict.keys())
# display(df_sched_pod_dict)

### Compute the average power consumption, fragmentation, and number of failed plugins within each score plugin's set of runs.

In [None]:
# Set of competitors to exclude to avoid cluttering the plots too much.
set_remove_competitors = {'01-Random',
                          #'02-DotProd',
                          #'03-GpuClustering',
                          #'04-GpuPacking',
                          #'05-BestFit',
                          '08-PWR_500_FGD_500', 
                          '13-PWR_25_FGD_975', 
                          '09-PWR_300_FGD_700', 
                          '10-PWR_200_FGD_800',}


# Compute the average power consumption for each score plugin.
dict_pwr_final_res = {}
for k in df_pwr_dict.keys() :
    dict_pwr_final_res[k] = {}
    for k2 in df_pwr_dict[k].keys() :
        if k2 in set_remove_competitors : continue
        print(f"Computing cluster power consumption mean for level ({k},{k2}) ({len(df_pwr_dict[k][k2])} repetitions found)")
        dict_pwr_final_res[k][k2] = sum(df_pwr_dict[k][k2]) / len(df_pwr_dict[k][k2])


# Compute the average fragmentation for each score plugin.
dict_frag_final_res = {}
for k in df_frag_dict.keys() :
    dict_frag_final_res[k] = {}
    for k2 in df_frag_dict[k].keys() :
        if k2 in set_remove_competitors : continue
        print(f"Computing fragmentation mean for level ({k},{k2}) ({len(df_frag_dict[k][k2])} repetitions found)")
        dict_frag_final_res[k][k2] = sum(df_frag_dict[k][k2]) / len(df_frag_dict[k][k2])


# Compute the average number of failed pod for each score plugin.
dict_sched_final_res = {}
for k in df_sched_pod_dict.keys() :
    dict_sched_final_res[k] = {}
    for k2 in df_sched_pod_dict[k].keys() :
        if k2 in set_remove_competitors : continue
        print(f"Computing mean # of pods that have failed to schedule for level ({k},{k2}) ({len(df_sched_pod_dict[k][k2])} repetitions found)")
        dict_sched_final_res[k][k2] = sum(df_sched_pod_dict[k][k2]) / len(df_sched_pod_dict[k][k2])

In [None]:
# display(dict_pwr_final_res.keys())
# display(dict_pwr_final_res['openb_pod_list_default']['01-Random'])
# display(dict_sched_final_res.keys())
# display(dict_sched_final_res['openb_pod_list_default']['01-Random'])

In [None]:
# Beautify the names of the policies for the plots.
for k_level in dict_pwr_final_res.keys() :
    dict_pwr_final_res[k_level] = {key.split('-')[1]: value for key, value in dict_pwr_final_res[k_level].items()}
    dict_frag_final_res[k_level] = {key.split('-')[1]: value for key, value in dict_frag_final_res[k_level].items()}
    dict_sched_final_res[k_level] = {key.split('-')[1]: value for key, value in dict_sched_final_res[k_level].items()}

# Measuring GPU efficiency

In [None]:
dict_efficiency = {}
for level in dict_pwr_final_res.keys() :
    
    dict_efficiency[level] = {}
    max_energy_efficiency = 0
    
    # Compute energy and usage efficiency.
    for policy in dict_pwr_final_res[level].keys() :
        # print(f'{level} - {policy}')

        # display(dict_pwr_final_res[level][policy])
        # display(dict_sched_final_res[level][policy])

        dict_efficiency[level][policy] = pd.DataFrame(index = dict_pwr_final_res[level][policy].index)

        # Usage efficiency is computed as the ratio between successfully allocated GPU resources and requested GPU resources.
        # This comes already normalized in [0,1]
        dict_efficiency[level][policy]['usage_efficiency'] = dict_sched_final_res[level][policy]['used_gpu_milli']/ dict_sched_final_res[level][policy]['arrived_gpu_milli']

# Save all the results

In [None]:
with open('dict_pwr_final_res.pkl', 'wb') as handle:
    pickle.dump(dict_pwr_final_res, handle)

with open('dict_frag_final_res.pkl', 'wb') as handle:
    pickle.dump(dict_frag_final_res, handle)

with open('dict_sched_final_res.pkl', 'wb') as handle:
    pickle.dump(dict_sched_final_res, handle)

with open('dict_efficiency.pkl', 'wb') as handle:
    pickle.dump(dict_efficiency, handle)