In [None]:
from matplotlib import pyplot as plt
import pandas as pd
import numpy as np
import seaborn as  sns
import math
import os

In [None]:
exp_path = '../exp'
exp_id = 'exp-1'

if not os.path.exists(exp_id):
    os.mkdir(exp_id)


In [None]:
# Process log file to obtain external state data
import os

perf = pd.DataFrame([], columns=['instance', 'latency'])

log_path = '%s/%s/logs' % (exp_path, exp_id)
pub_logs = [x for x in os.listdir(log_path) if 'pub' in x]
for p in pub_logs:
    instance = p.split('.')[0]
    with open('%s/%s' % (log_path, p)) as f:
        lines = f.readlines()
        latency = []
        throughput = []
        for l in lines:
            if 'avg latency' in l:
                try:
                    lat = float(l.split(',')[2].replace(' ms avg latency', ''))
                    thr = float(l.split(',')[1].split(' ')[1])
                    # thr = float(l.split(',')[1].split('(')[1].split(' MB/sec')[0])
                    latency.append(lat)
                    throughput.append(thr)
                except:
                    print(l)
                    continue
        try:
            df = {'instance': instance, 'latency': np.percentile(latency, 90), 'throughput': np.percentile(throughput, 90)}
        except:
            continue
        perf = perf.append(df, ignore_index=True)

perf.to_csv('tmp.csv')
df = pd.read_csv('tmp.csv')
ex_state = pd.DataFrame([], columns=['session', 'latency', 'throughput'])
store = {}
for index, row in df.iterrows():
    sess = row['instance'].split('-')[2]
    if sess not in store:
        store.update({sess: {'latency': [row['latency']], 'throughput': [row['throughput']]}})
    else:
        store[sess]['latency'].append(row['latency'])
        store[sess]['throughput'].append(row['throughput'])

for sess in store:
    ex_state = ex_state.append({
        'session': sess, 
        'latency': np.mean(store[sess]['latency']), 
        'throughput': np.sum(store[sess]['throughput'])}, ignore_index=True)

ex_state.to_csv('%s/%s/external-state.csv' % (exp_path, exp_id), index=None)

In [None]:
# Load Dataset

X = pd.read_csv('%s/%s/schedule.csv' % (exp_path, exp_id))
all_X = X.copy()

Y = pd.read_csv('%s/%s/internal-state.csv' % (exp_path, exp_id))
T = pd.read_csv('%s/%s/external-state.csv' % (exp_path, exp_id))

features = pd.read_csv('../meta/state_meta.csv')['name'].to_list()
meta_info = pd.read_csv('../meta/config_meta.csv')
dt = meta_info['data_type']
categorical_features = meta_info.loc[(meta_info['type'] == 'categorical')]
for i, cf in categorical_features.iterrows():
    options = cf['options'].split('/')
    X[cf['name']] = X[cf['name']].apply(lambda x: options.index(x))

# remove outliers from internal states
lower = np.percentile(Y['dms_perf.server_broker_topics_AllTopicsBytesIn'], 1)
upper = np.percentile(Y['dms_perf.server_broker_topics_AllTopicsBytesIn'], 99)

# Select data between
Y = Y[(Y['dms_perf.server_broker_topics_AllTopicsBytesIn'] > lower) & (Y['dms_perf.server_broker_topics_AllTopicsBytesIn'] < upper)]

# remove outliers from internal states
lower = np.percentile(T['latency'], 1)
upper = np.percentile(T['latency'], 99)
# Select data between
T = T[(T['latency'] > lower) & (T['latency'] < upper)]

Y['instance'] = Y['instance'].apply(lambda x: int(x.split('-')[2].split('_')[0]))

instances = list(set(X['id']).intersection(T['session']).intersection(Y['instance']))

X = X[X['id'].isin(instances)]
Y = Y[Y['instance'].isin(instances)]
T = T[T['session'].isin(instances)]

missed_X = []
for index, row in all_X.iterrows():
    if row['id'] not in X['id']:
        missed_X.append(row.to_dict())
missed_X = pd.DataFrame(missed_X)
missed_X.to_csv('%s/%s/missed.csv' % (exp_path, exp_id), index=None)

X = X.sort_values(by=['id'])
Y = Y.sort_values(by=['instance'])
T = T.sort_values(by=['session'])

X.set_index("id", inplace=True)
Y.set_index("instance", inplace=True)
T.set_index("session", inplace=True)

Y = Y[features]

X_cols = np.array(X.columns)
Y_cols = np.array(Y.columns)
T_cols = np.array(T.columns)

print(X.shape, Y.shape, T.shape)

In [None]:
# Data normalization

from sklearn.preprocessing import StandardScaler, MinMaxScaler, Normalizer

mms_X = MinMaxScaler().fit_transform(X)
mms_Y = MinMaxScaler().fit_transform(Y)
mms_T = MinMaxScaler().fit_transform(T)

In [None]:
# Feature selection for throughput using Permutation
import math
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance

plt.rcParams.update({'figure.figsize': (10.0, 8.0)})
plt.rcParams.update({'font.size': 14})

fig = plt.figure(figsize=(8, 16))
# fig.tight_layout(pad=5)
ax1 = fig.add_subplot(211)
ax2 = fig.add_subplot(212)
ax = [ax1, ax2]

for i, col in enumerate(T_cols):
    rf = RandomForestRegressor(n_estimators=100)
    rf.fit(mms_X, mms_T[:, i])

    result = permutation_importance(rf, mms_X, mms_T[:, i], n_repeats=10, random_state=42, n_jobs=2)
    importance = result.importances_mean
    sorted_idx = importance.argsort()
    ax[i].boxplot(result.importances[sorted_idx].T, vert=False, labels=X_cols[sorted_idx])
    ax[i].set_xlabel("Random Forest Feature Importance")
    ax[i].set_title(col)
plt.show()

In [None]:
# # Diemnsion reduction using PCA
# from sklearn.decomposition import PCA 
# import seaborn as sns
# from factor_analyzer.factor_analyzer import calculate_kmo, FactorAnalyzer

# n_components = 3

# # do Adequacy Test
# kmo_all, kmo_model=calculate_kmo(mms_Y)
# print('KMO:', kmo_model)
# fa = FactorAnalyzer(n_components, rotation=None)


In [None]:
# dr_data = fa.fit_transform(mms_Y)
# ev1, v1 = fa.get_eigenvalues()
# plt.title('Scree Plot')
# plt.scatter(range(1, mms_Y.shape[1]+1), ev1)
# plt.plot(range(1, mms_Y.shape[1]+1), ev1)
# plt.plot(range(1, mms_Y.shape[1]+1), [1]*mms_Y.shape[1])
# plt.xlabel('Factors')
# plt.ylabel('Eigenvalue')
# plt.show()

In [None]:
# var = fa.get_factor_variance() # 给出贡献率
# print("\nExplained Variance:\n", var)

In [None]:
# df_cm = pd.DataFrame(np.abs(fa.loadings_), index=Y_cols)
# ax = sns.heatmap(df_cm, annot=True, cmap="BuPu")
# ax.yaxis.set_tick_params(labelsize=15)
# plt.title('Factor Analysis')
# plt.show()

In [None]:
# # Use Latent factors as output of prediction model
# latent_Y_cols = np.array(['factor-%d' % i for i in range(n_components)])
# latent_Y = MinMaxScaler().fit_transform(dr_data)

In [None]:
# Construct env prediction model using RF

from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedKFold
from sklearn.metrics import mean_squared_error, median_absolute_error, mean_squared_log_error, mean_absolute_error, explained_variance_score, r2_score
import numpy as np
from sklearn.externals import joblib

def main(myY, myX, cols):
    for i, metric in enumerate(cols):
        print('--------- %s ---------' % metric)
        y = myY[:, i].tolist()

        # evaluate the model and collect the scores
        best_model = None
        best_score = -1e6
        rf = RandomForestRegressor(random_state=1000)
        gb = GradientBoostingRegressor(random_state=1000)
        models = [rf, gb]
        for model in models:
            # define the evaluation procedure
            cv = RepeatedKFold(n_splits=5, n_repeats=10, random_state=43)
            n_scores = cross_val_score(model, myX, y, scoring='r2', cv=cv, n_jobs=-1)
            if np.mean(n_scores) > best_score:
                best_model = model
                best_score = np.mean(n_scores)
                model.fit(X=myX, y=y)
                joblib.dump(best_model, '%s/%s.joblib' % (exp_id, metric))
            print('r2: %.3f (%.3f)' % (np.mean(n_scores), np.std(n_scores)))

# main(latent_Y, mms_X, latent_Y_cols)
main(mms_Y, mms_X, Y_cols)
main(mms_T, mms_X, T_cols)

In [None]:
# Validate prediction accurancy
from sklearn.externals import joblib

cols = np.concatenate((Y_cols, T_cols), axis=0)
myY = np.concatenate((mms_Y, mms_T), axis=1)

w = 10
h = 6

fig, ax = plt.subplots(nrows=math.ceil(len(cols)/2), ncols=2, figsize=(w * 2, h * math.ceil(len(cols)/2)))
fig.tight_layout(pad=6)

titles = {
    'dms_perf.blkio_io_service_bytes_recursive_total': 'container.blkio.io_service_bytes',
    'dms_perf.cpu.usage_usermode': 'container.cpu.usage_usermode',
    'dms_perf.cpu.usage_kernelmode': 'container.cpu.usage_kernelmode',
    'dms_perf.memory.usage_total': 'container.memory.usage_total',
    'dms_perf.os_process_cpu_time': 'kafka.os.process.cpu.time',
    'dms_perf.server_broker_topics_TotalProduceRequestsPerSec': 'kafka.produce.request.per_sec',
    'dms_perf.network_request_Produce_TotalTimeMs': 'kafka.produce.request.total_time',
    'dms_perf.network_request_Produce_TemporaryMemoryBytes': 'kafka.produce.request.temporary_bytes',
    'latency': 'latency',
    'throughput': 'throughput'
}

for i, row in enumerate(ax):
    for j, col in enumerate(row):
        if i*len(row)+j >= len(cols):
            continue
        metric = cols[i*len(row)+j]
        model = joblib.load('%s/%s.joblib' % (exp_id, metric))
        predict_mmy = model.predict(mms_X)
        k = list(cols).index(metric)
        col.set_xlim(0, 1)
        col.set_ylim(0, 1)
        col.set_title(titles[metric], fontsize=20)
        col.set_xlabel('Observed Value', fontsize=15)
        col.set_ylabel('Predicted Value', fontsize=15)
        col.grid()
        col.scatter(myY[:, k], predict_mmy)
plt.show()

In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns

my_y_col = np.concatenate((Y_cols, T_cols), axis=0)
my_y = np.concatenate((mms_Y, mms_T), axis=1)

cols = np.concatenate((my_y_col, X_cols), axis=0)
myY = np.concatenate((my_y, mms_X), axis=1)
data = pd.DataFrame(myY, columns=cols)
corr = data.corr('spearman')[X_cols].loc[my_y_col].abs()
f, ax= plt.subplots(figsize = (14, 10))
sns.heatmap(corr,cmap='YlGn', linewidths = 0.05, ax = ax, annot=True)
ax.set_title('Correlation between features')
f.savefig('corr.png', dpi=100, bbox_inches='tight')

In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns

data = pd.DataFrame(myY, columns=cols)
corr = data.corr()
# f, ax= plt.subplots(figsize = (14, 10))
# sns.heatmap(corr,cmap='RdBu', linewidths = 0.05, ax = ax)
# ax.set_title('Correlation between features')
# f.savefig('corr.png', dpi=100, bbox_inches='tight')

latency_corr = corr['latency']
latency_corr = latency_corr[Y_cols]

fig, ax = plt.subplots()
b = ax.barh(range(len(Y_cols)), latency_corr)

# my_cols = [
#     'container.blkio.io.service.bytes',
#     'container.cpu.usage.total', 
#     'container.memory.usage.total',
#     'container.cpu.usage.usermode', 
#     'container.cpu.usage.kernelmode',
#     'jmx.network.request.produce.TemporaryMemoryBytes',
#     'jmx.network.request.produce.TotalTimeMs',
#     'jmx.server.broker.topics.TotalProduceRequestsPerSec',
#     'jmx.gc.collection.time',
#     'jmx.threading.thread.count',
#     'jmx.threading.daemon.thread.count', 
#     'jmx.os.open.fd.count',
#     'jmx.memory.non.heap.usage.used', 
#     'jmx.memory.heap.usage.used']
my_cols = Y_cols
ax.set_yticks(range(len(my_cols)))
ax.set_yticklabels(my_cols)

plt.title('latency correlation')
plt.show()

In [None]:
import pandas as pd
import numpy as np
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns

data = pd.DataFrame(myY, columns=cols)
corr = data.corr()

thr_corr = corr['throughput']
thr_corr = thr_corr[Y_cols]

fig, ax = plt.subplots()
b = ax.barh(range(len(Y_cols)), thr_corr)

ax.set_yticks(range(len(my_cols)))
ax.set_yticklabels(my_cols)

plt.title('throughput correlation')
plt.show()