In [1]:
import pandas as pd
import numpy as np
import os
import pickle as pkl
import h5py
from tqdm import tqdm
import sys
from scipy.stats import ttest_1samp

  from ._conv import register_converters as _register_converters


In [2]:
data_dir = "./data/clusterdata-2011-2/"

task_events_header = ['timestamp','missing_info', 'job_id','task_index','machine_id','event_type',
                      'user_name','sched_class','priority','cpu_req','ram_req','space_req','diff_machine']

machine_events_header = ['timestamp','machine_id','event_type','platform_id','cpu_cap','mem_cap']

task_usage_header = ['start_time','end_time','job_id','task_index','machine_id', 'mean_cpu_usage',
                     'canon_memory_usage', 'assign_memory_usage', 'unmapped_cache', 'total_cache', 
                     'max_mem', 'mean_io', 'mean_space', 'max_cpu', 'max_io', 'cpi', 'mai', 'sample',
                     'agg_type', 'sample_cpu_usage']

# Machines that are removed from the cluster their load is 1.5 times their mean resource usage.

### Create list of machines that crashed

In [20]:
df = pd.read_csv(data_dir + "machine_events/part-00000-of-00001.csv.gz",header=None)
df.columns = machine_events_header
crash_machines = df[df['event_type'] == 1]['machine_id']


crash_machines = np.unique(crash_machines)
hf = h5py.File('./data/machines_that_crash.h5','w')
hf.create_dataset('ids', data= crash_machines.astype(np.int64))
hf.close()

In [3]:
hf = h5py.File('./data/machines_that_crash.h5','r')
crash_machines = np.array(hf['ids'])
hf.close()

# hf = h5py.File('./data/time_stamps.h5','r')
# ts = np.array(hf.get('time'))
# hf.close()

# start_time = ts[0]
# end_time = ts[1]

# cols = range(start_time,end_time)

import gc
gc.collect()

0

In [4]:
crash_machines

array([         5,         10,         13, ..., 6289704471, 6437385645,
       6453653899], dtype=int64)

## Calculate mean CPU usage for machines throughout the trace

In [5]:
cols = ['total_usage','measure_count']
mean_machine_usage = pd.DataFrame(index=crash_machines,columns=cols)
mean_machine_usage.fillna(0,inplace=True)

In [94]:
for i in tqdm(range(500)):
    df = pd.read_csv(data_dir + "task_usage/part-%05d-of-00500.csv.gz" % i,header=None)
    df.columns = task_usage_header
    df = df[['start_time','machine_id','mean_cpu_usage']]
    df = df[df['machine_id'].isin(crash_machines)].groupby(['machine_id', 'start_time']).agg('sum')
    df.reset_index(level='start_time',inplace=True)
    df = df.groupby('machine_id').agg(['sum','count'])['mean_cpu_usage']
    
    mean_machine_usage['total_usage'] += df['sum']
    mean_machine_usage['measure_count'] += df['count']

In [214]:
mean_machine_usage.dropna(axis=0,how='all',inplace=True,thresh=2)
mean_machine_usage['mean'] = mean_machine_usage['total_usage']/mean_machine_usage['measure_count']
mean_machine_usage.to_csv('./data/mean_machine_usage.csv')

## Calculate CPU usage of machines in time periods when they crashed

In [159]:
crash_usage = pd.read_csv(data_dir + "machine_events/part-00000-of-00001.csv.gz",header=None)
crash_usage.columns = machine_events_header
crash_usage['mean_cpu_usage'] = 0
crash_usage = crash_usage[crash_usage['event_type'] ==1]

In [160]:
f = 0
data = pd.read_csv(data_dir + "task_usage/part-%05d-of-00500.csv.gz" % f,header=None)
data.columns = task_usage_header
f+=1
for ind in tqdm(crash_usage.index):
    row = crash_usage.loc[ind]
    while f < 500 and data[(data['end_time'] >= row['timestamp']) & (data['start_time'] <= row['timestamp'])].empty:
        
        data = pd.read_csv(data_dir + "task_usage/part-%05d-of-00500.csv.gz" % f,header=None)
        data.columns = task_usage_header
        f+=1
    data
    temp = data[(data['end_time'] >= row['timestamp']) & (data['start_time'] <= row['timestamp'])]
    crash_usage.loc[ind,'mean_cpu_usage'] = temp[temp['machine_id'] == row['machine_id']].sum()['mean_cpu_usage']


100%|████████████████████████████████████████████████████████████████████████████| 8957/8957 [1:14:30<00:00,  2.00it/s]


In [189]:
crash_usage = crash_usage[crash_usage['mean_cpu_usage'] != 0]
crash_usage.reset_index(drop=True,inplace=True)
crash_usage = crash_usage[['timestamp','machine_id','mean_cpu_usage']]
crash_usage.to_csv('./data/crash_stats.csv')

## After Data generation and preprocessing

In [3]:
mean_machine_usage = pd.read_csv('./data/mean_machine_usage.csv',index_col=0)
mean_machine_usage.drop(['total_usage','measure_count'],axis=1,inplace=True)

crash_usage = pd.read_csv('./data/crash_stats.csv',index_col=0)
crash_usage.drop(['timestamp'],axis=1,inplace=True)

mean_machine_usage.rename(columns={'mean':'mean_normal_usage'},inplace=True)
crash_usage.rename(columns={'mean_cpu_usage':'usage_when_crashed'},inplace=True)

In [4]:
mean_machine_usage.head()

Unnamed: 0,machine_id,mean_normal_usage
0,5,0.078498
1,10,0.123686
2,13,0.063506
3,23,0.08479
5,28,0.119474


In [5]:
crash_usage.head()

Unnamed: 0,machine_id,usage_when_crashed
0,317808289,0.134664
1,1338945,0.037087
2,317486724,0.094489
3,4820183646,0.057566
4,155314177,0.038722


In [6]:
final = pd.merge(crash_usage,mean_machine_usage,on='machine_id')
final.head()

Unnamed: 0,machine_id,usage_when_crashed,mean_normal_usage
0,317808289,0.134664,0.091113
1,317486724,0.094489,0.110986
2,1273853,0.097454,0.070854
3,2596362793,0.000885,0.096335
4,1436297839,0.005465,0.123303


In [8]:
usage_mean = final['mean_normal_usage'].mean()

## T-test
**df** = 1000

**alpha** = 0.05

**t_crit** = 1.962

In [14]:
ttest_1samp(final.loc[:1000,'usage_when_crashed'],usage_mean*1.5)

Ttest_1sampResult(statistic=1.4535882237988955, pvalue=0.14637425012095806)

For 1000 samples we get T = 1.45 <br>
T <= 1.962 <br>
$\therefore$ We accept the hypothesis