# Evaluation of Crispy

In [2]:
import collections
import warnings

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from sklearn.linear_model import LinearRegression as LR

warnings.filterwarnings('ignore')

In [3]:
df_cluster = pd.read_csv('arrow_cluster_jobs.csv')
df_local = pd.read_csv('crispy_local_jobs.csv')

jobs = list(sorted(set(df_cluster['job'])))

f"{len(jobs) = }"

'len(jobs) = 16'

In [4]:
df_cluster.head(5)

Unnamed: 0,duration,input_size,cost,cost_norm,scaleout,mtype,cores,total_cores,memory,total_memory,...,agg_disk.%util,agg_network.rxpck/s,agg_network.txpck/s,agg_network.rxkB/s,agg_network.txkB/s,agg_network.rxcmp/s,agg_network.txcmp/s,agg_network.rxmcst/s,agg_network.%ifutil,bread
0,1722.228,100000000000,0.746299,1.878088,12,r4.large,2,24,15.25,183.0,...,265556.91,5948741.33,3588376.43,35888301.26,35940356.62,0.0,0.0,0.0,0.0,31440860.0
1,7040.141403,401636367057,1.955595,5.194055,10,m4.large,2,20,8.0,80.0,...,1396.29,158224.97,161207.31,845914.27,576545.7,0.0,0.0,0.0,93.24,214324.3
2,367.16,93803644984,0.244773,2.146511,24,m4.large,2,48,8.0,192.0,...,7563.72,465252.08,430435.82,2296895.3,1447771.15,0.0,0.0,0.0,259.95,15045490.0
3,560.571,2993586581,0.498285,1.733598,16,c4.xlarge,4,64,7.5,120.0,...,5021.82,944174.28,948309.69,4887539.75,4872704.93,0.0,0.0,0.0,539.47,70589.23
4,1624.248,240005901008,1.443776,5.501142,8,m4.2xlarge,8,64,32.0,256.0,...,6134.12,413545.07,377574.86,2542939.24,1373500.34,0.0,0.0,0.0,275.63,25324990.0


In [5]:
def get_train_data(job):
    df_train = df_local[df_local['job'] == job]
    X_train = df_train['dataset_size'].to_numpy().reshape(-1,1)
    y_train = df_train['max_memory_used'].to_numpy()
    
    df_test = df_cluster[df_cluster['job'] == job]
    X_test = [[df_test.iloc[0]['input_size']]]
    
    return X_train, y_train, X_test

In [6]:
mem_overhead = {'hadoop': 2e9, 'spark': 2.5e9}  # Ubuntu + framework

def possible_configs(job, mem_req=0):
    filter1 = df_cluster['job'] == job 
    total_mem_req =   mem_req + df_cluster['scaleout'] \
                    * df_cluster['framework'].map(lambda x: mem_overhead[x])
    filter2 = df_cluster['total_memory']*1e9 > total_mem_req
    df_configs = df_cluster[filter1 & filter2][['mtype', 'scaleout']]
    return set(df_configs.itertuples(index=False, name=None))
    

def best_config_for_all_other_jobs(job, mem_req=0):
    algorithm, framework, dataset = job.split('_')
    config_candidates = possible_configs(job, mem_req)
    if not config_candidates: return None
    
    same_framework      = df_cluster['framework'] == framework
    different_algorithm = df_cluster['algorithm'] != algorithm
    all_other_jobs = df_cluster[ same_framework & different_algorithm ]
    
    configs = collections.defaultdict(list)
    for i, row in all_other_jobs.iterrows():
        config = (row['mtype'], row['scaleout'])
        if config in config_candidates: 
            configs[config] += [row['cost_norm']]
    
    configs = [(sum(v),)+k for k, v in configs.items()]  # (cum_cost, mtype, scaleout)
    return sorted(configs)[0][1:]

bfa = best_config_for_all_other_jobs


def crispy(job):
    X_train, y_train, X_test = get_train_data(job)
    model = LR()
    model.fit(X_train,y_train)
    mem_req = model.predict(X_test)[0] if model.score(X_train, y_train) > .99 else 0
    return bfa(job, mem_req) or bfa(job, 0)

In [7]:
print(f"{'Job':25s} Crispy Selected Configuration\n{'-'*55}")
for job in jobs:
    print(f"{job:28s} {crispy(job)}")

Job                       Crispy Selected Configuration
-------------------------------------------------------
bayes_spark_bigdata          ('c4.large', 4)
bayes_spark_huge             ('r4.2xlarge', 8)
join_spark_bigdata           ('c4.large', 4)
join_spark_huge              ('c4.large', 4)
kmeans_spark_bigdata         ('r4.2xlarge', 10)
kmeans_spark_huge            ('r4.xlarge', 10)
linear_spark_bigdata         ('m4.xlarge', 8)
linear_spark_huge            ('c4.large', 4)
lr_spark_bigdata             ('c4.large', 6)
lr_spark_huge                ('c4.large', 4)
pagerank_hadoop_bigdata      ('c4.large', 4)
pagerank_hadoop_huge         ('c4.large', 4)
pagerank_spark_bigdata       ('r4.xlarge', 8)
pagerank_spark_huge          ('r4.large', 4)
terasort_hadoop_bigdata      ('c4.large', 6)
terasort_hadoop_huge         ('c4.large', 4)


In [8]:
def single_eval(job, predictors):
    
    job_df =  df_cluster[df_cluster['job'] == job]
    
    results = {}
    
    results['Random'] = float(job_df['cost_norm'].mean())
    results['Medium'] = float(job_df[  (job_df['mtype']=='m4.xlarge') 
                                     & (job_df['scaleout']==12) ]
                              ['cost_norm'].mean())
    
    for predictor_name, predictor in predictors:
        mtype, scaleout = predictor(job)
        predicted = float(job_df[  (job_df['mtype']==mtype) 
                                 & (job_df['scaleout'] == scaleout)]['cost_norm'])
        results[predictor_name] = predicted
        
    return results

def full_eval():
    
    predictors = [best_config_for_all_other_jobs, crispy]
    predictor_names = ['BFA', 'Crispy']
    
    df = pd.DataFrame(columns=['job']+predictor_names)
    
    for job in jobs:
        eval_results = single_eval(job, zip(predictor_names, predictors))
        df = df.append({**{'job':job}, **eval_results}, ignore_index=True)
    return df

In [19]:
df_results = full_eval()[['job', 'Random', 'Medium', 'BFA', 'Crispy']]
print(f"{'Selector':8s} | Cost (norm)")
df_results.median()
df_results.mean()

Selector | Cost (norm)


Random    2.348822
Medium    2.009813
BFA       1.769217
Crispy    1.337519
dtype: float64

In [11]:
df_results  # In detail, for each job:

Unnamed: 0,job,Random,Medium,BFA,Crispy
0,bayes_spark_bigdata,1.283399,1.173069,1.095362,1.095362
1,bayes_spark_huge,1.408278,1.354763,1.203942,1.000503
2,join_spark_bigdata,1.848328,1.567345,1.050709,1.050709
3,join_spark_huge,2.548079,,1.0,1.0
4,kmeans_spark_bigdata,3.476274,2.787279,3.991071,1.156992
5,kmeans_spark_huge,3.339827,3.152283,4.777844,1.175872
6,linear_spark_bigdata,1.353124,1.210515,1.133372,1.133372
7,linear_spark_huge,3.196385,3.718082,3.121199,3.121199
8,lr_spark_bigdata,3.547521,2.502524,1.731832,1.731832
9,lr_spark_huge,5.210249,4.104707,2.487412,2.487412


In [12]:
profiling_times = []

print(f"{'Job':23s} {'Profiling time'}\n{'-'*38}")
for job in jobs:
    job_df = df_local[df_local['job'] == job]
    profiling_time = job_df['runtime'].sum()
    print(f"{job:25s} {profiling_time:4d} seconds")
    profiling_times.append(profiling_time)

Job                     Profiling time
--------------------------------------
bayes_spark_bigdata        373 seconds
bayes_spark_huge           369 seconds
join_spark_bigdata         136 seconds
join_spark_huge            110 seconds
kmeans_spark_bigdata       470 seconds
kmeans_spark_huge          470 seconds
linear_spark_bigdata       372 seconds
linear_spark_huge          198 seconds
lr_spark_bigdata           675 seconds
lr_spark_huge              562 seconds
pagerank_hadoop_bigdata    812 seconds
pagerank_hadoop_huge       812 seconds
pagerank_spark_bigdata    1292 seconds
pagerank_spark_huge       1292 seconds
terasort_hadoop_bigdata    547 seconds
terasort_hadoop_huge       547 seconds


In [15]:
profiling_times = np.array(profiling_times)
print(f"{profiling_times.min()  / 60  =  :.2f} minutes")
print(f"{profiling_times.max()  / 60  = :.2f} minutes")
print(f"{profiling_times.mean() / 60  =  :.2f} minutes")
print(f"{np.median(profiling_times) / 60  =  :.2f} minutes")

profiling_times.min()  / 60  =  1.83 minutes
profiling_times.max()  / 60  = 21.53 minutes
profiling_times.mean() / 60  =  9.41 minutes
np.median(profiling_times) / 60  =  8.47 minutes
