In [55]:
%matplotlib inline
import seaborn as sns
import matplotlib.pyplot as plt
from pprint import pprint
from itertools import product
from difflib import SequenceMatcher
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler, MinMaxScaler


import re
import json
from pathlib import Path

import pandas as pd
import numpy as np

In [56]:
geodata_dir = Path("../data/geodata-stage/")
apj_dir = Path("../data/apj-rest-ctlg/")
tpch_dir = Path("../data/tpch/")
tpcc_dir = Path("../data/tpcc/")

### インスタンスが独立に動作していると考え，インスタンスごと(=hostごと)にDataFrameを作成
- apjのdfには"read", geodataのdfには"write"ラベルを貼る
- tpchは後で付け足す感じで．(readのラベルを貼る)

In [57]:
## tpch用
# drop_pseudo_cols = [
#     'cpu-usage_guest_nice',
#     'cpu-usage_irq',
#     'cpu-usage_guest',
#     'handler-mrr_init',
#     'handler-discover',
#     'handler-savepoint',
#     'handler-savepoint_rollback',
#     'innodb_buffer_pool',
#     'innodb_io_capacity',
#     'query-commit',
#     'query-drop_index',
#     'query-alter_table',
#     'query-drop_table',
#     'query-create_table',
#     'tmp_tables',
#     'avg_lock_wait_time',
#     'avg_sort-merge_passes',
#     'buffer_hit_ratio%',
#     'disk_used_(gb)-used',
#     'disk_used_(gb)-total',
#     'innodb_io-mysql_global_status_innodb_data_read',
#     'innodb_io-mysql_global_status_innodb_data_written'
# ]

## tpcc用
drop_pseudo_cols = ['cpu-usage_irq',
    'cpu-usage_guest_nice',
    'cpu-usage_guest',
    'handler-discover',
    'handler-savepoint_rollback',
    'handler-mrr_init',
    'handler-savepoint',
    'query-alter_table',
    'query-create_table',
    'query-drop_index',
    'query-drop_table',
    'query-commit',
    'tmp_tables',
    'avg_lock_wait_time',
    'avg_sort-merge_passes',
    'buffer_hit_ratio%',
    'innodb_io-mysql_global_status_innodb_data_read',
    'innodb_io-mysql_global_status_innodb_data_written',
    'query_response_time'
]

instance_num = 3

df_lst = []
for data_dir in [apj_dir, geodata_dir]:
    instance_dict = {j: {} for j in range(1, instance_num + 1)}
    json_paths = data_dir.glob("*.json")
    for path in json_paths:
        with open(path, "r") as f:
            metric = path.name[:-24]
            results = json.load(f)["results"]
            metric_set = set()
            for src_data in results:
                for data in src_data["data"]:
                    metric_set.add(data["metric"])
            for src_data in results:
                if src_data["source"][:10] != "summarizer":
                    for data in src_data["data"]:
                        instance_id = int(re.search('\d', data["tags"]["host"]).group())
                        # jsonのキーは順番保証されていないので念の為ソート
                        tags = [data["tags"][key] for key in sorted(data["tags"].keys()) if key != "host"]
                        key_name = metric
                        # metricフィールドが全て同じならキー名に含めない
                        if len(metric_set) > 1:
                            # metricフィールドの冗長な名前を簡潔化
                            metric_attr = re.findall('^.*\.(.*)$', data['metric'])[0]
                            key_name += f"-{metric_attr}"
                        if len(tags) > 0:
                            key_name += f"-{'-'.join(tags)}"
                        instance_dict[instance_id][key_name] = data["NumericType"]
    for id, instance in instance_dict.items():
        df = pd.DataFrame(instance, dtype="float32")
        df.dropna(inplace=True, axis=0)
        label = "read" if data_dir == apj_dir else "write"
        df["label"] = [label] * len(df)
        df_lst.append(df)

# tpch & tpcc (pseudo data)
# tpc-hはquery/sが非常に小さいので，tpccだけでやってみる
for data_dir in [tpcc_dir]:
    data_dict = {}
    json_paths = data_dir.glob("*.json")
    for path in json_paths:
        with open(path, "r") as f:
            metric = path.name[:-24]
            results = json.load(f)["results"]
            metric_set = set()
            for src_data in results:
                for data in src_data["data"]:
                    metric_set.add(data["metric"])
            for src_data in results:
                if src_data["source"][:10] != "summarizer":
                    for data in src_data["data"]:
                        # jsonのキーは順番保証されていないので念の為ソート
                        tags = [data["tags"][key] for key in sorted(data["tags"].keys()) if key != "origin"]
                        key_name = metric
                        # metricフィールドが全て同じならキー名に含めない
                        if len(metric_set) > 1:
                            # metricフィールドの冗長な名前を簡潔化
                            metric_attr = re.findall('^.*\.(.*)$', data['metric'])[0]
                            key_name += f"-{metric_attr}"
                        if len(tags) > 0:
                            key_name += f"-{'-'.join(tags)}"
                        data_dict[key_name] = data["NumericType"]
    df = pd.DataFrame(data_dict, dtype="float32")
    df.dropna(inplace=True)
    # query数が0近くから急激に上昇した直後 or 急激に0近くまで下降した直前のデータを取り除く
    query_nums = df.filter(like="query-").sum(axis=1)
    rm_indices = set(query_nums[query_nums < 500].index.to_list())
    rm_indices = rm_indices | {idx + 1 for idx in rm_indices if idx < len(df)} | {idx - 1 for idx in rm_indices if idx > 1} | {1, len(df)}
    df.drop(rm_indices, axis=0, inplace=True)

    # tpch(/tpcc)に存在し，実ワークロードデータには存在しないメトリックを削除
    df.drop(drop_pseudo_cols, axis=1, inplace=True)

    # tpch(/tpcc)のカラム名をrename
    base_metric_cols = set(df_lst[0].columns)
    th = 0.4
    map2base = {key: None for key in df.keys()}
    similarity_dict = {key: 0 for key in df.keys()}
    for pair in product(df.keys(), base_metric_cols):
        m1 = SequenceMatcher(None, pair[0], pair[1]).ratio()
        m2 = SequenceMatcher(None, pair[1], pair[0]).ratio()
        similarity = max(m1, m2)
        if similarity > th:
            if similarity > similarity_dict[pair[0]]:
                map2base[pair[0]] = pair[1]
                similarity_dict[pair[0]] = similarity
    df.rename(columns=map2base, inplace=True)
    label = "read" if data_dir == tpch_dir else "write"
    df["label"] = [label] * len(df) # ラベルを追加
    df_lst.append(df)

In [58]:
for tmp_df in df_lst:
    print(tmp_df.shape)

(94, 57)
(94, 57)
(94, 57)
(94, 57)
(94, 57)
(94, 57)
(24, 40)


- TODO: query_numsを消しすぎていないか確認する
- TODO: ワークロードごとに正規化する？
    - TPC-Hでは１つ当たりクエリが重く，query数/sが非常に小さいので実データと比較すると何もクエリを処理していないような状態に見えてしまう

In [59]:
# ワークロード間で重複していないメトリックを探索
duplicated_cols = set(df_lst[0].columns)
for tmp_df in df_lst:
    duplicated_cols &= set(tmp_df.columns)

non_duplicated_cols = set()
for tmp_df in df_lst:
    non_duplicated_cols |= set(tmp_df.columns) ^ duplicated_cols

# for col in non_duplicated_cols:
#     print(col)
#     for i, tmp_df in enumerate(df_lst):
#         if col in tmp_df.columns.to_list():
#             print('exist', i)
#         else:
#             print('not exist', i)
#     print()

print("non duplicated cols:")
print(non_duplicated_cols)

# 重複したメトリックを削除
if len(non_duplicated_cols) > 0:
    for tmp_df in df_lst:
        tmp_df.drop(non_duplicated_cols, axis=1, errors='ignore', inplace=True)

non duplicated cols:
{'avg_slow_query', 'disk_usage_(percent)-vda2', 'avg_handler-savepoint', 'disk_usage_(data_volume)___binlog_file_total_size-used-vdb', 'disk_usage_(percent)-vdb', 'avg_disk_iops-writes-vda2', 'avg_handler-discover', 'avg_handler-savepoint_rollback', 'avg_threads_connected-mysql_global_status_max_used_connections', 'avg_handler-mrr_init', 'avg_disk_busy-vda2', 'avg_disk_iops-reads-vda2', 'avg_cpu-usage_idle', 'avg_cpu-usage_guest', 'avg_cpu-usage_guest_nice', 'avg_cpu-usage_irq', 'disk_usage_(data_volume)___binlog_file_total_size-total-vdb'}


### 閾値を徐々に下げて削除していく
これにより相関が非常に高いものを優先的に削除できる

- 特徴量を選択する際は全てのwrite, read intensiveなデータを一緒くたにして相関をみる

In [60]:
from pprint import pprint

sns.set(font_scale=2)
# df = pd.concat(df_lst[:-1])
df = pd.concat(df_lst)
df = df.loc[:, df.nunique() != 1] # 値が一定のメトリックを除く
df_selected = df.drop(["label"], axis=1, inplace=False)

labels = df["label"]

del_lim = 5
del_num = 0
ths = [0.95, 0.9, 0.85, 0.8, 0.75, 0.7]
del_metrics_all = []

for th in ths:
    df_corr = df_selected.corr()
    corr_mat = df_corr.to_numpy()
    cols = df_corr.columns

    # 相関が th 以上 or -th 以下のメトリックを取り出す
    high_corrs_dict = {k: set() for k in cols}
    for i, j in zip(*np.where((corr_mat >= th) | (corr_mat <= -th))):
        if i < j:
            # queryはworkloadを最もよく表しているので，消さないようにする
            if cols[i][:9] != "avg_query":
                high_corrs_dict[cols[i]].add(cols[j])
            if cols[j][:9] != "avg_query":
                high_corrs_dict[cols[j]].add(cols[i])
    del_metrics = []
    while del_num < del_lim:
        # 相関が高いメトリック間の関係数をメトリック別に列挙
        # （メトリックごとの関係数を相関係数の和で代用してもいい）
        del_metric = max(high_corrs_dict.items(), key=lambda item: len(item[1]))[0]
        if len(high_corrs_dict[del_metric]) == 0:
            break
        # keyを削除
        high_corrs_dict.pop(del_metric, None)
        # value(=set)の要素を削除
        for k, v_set in high_corrs_dict.items():
            if del_metric in v_set:
                v_set.discard(del_metric)
        del_metrics.append(del_metric)
        del_num += 1
    del_metrics_all += del_metrics
    df_selected.drop(del_metrics, axis=1, inplace=True)
pprint(del_metrics_all)
print(f"the number of deleted metrics: {del_num}")

# plt.figure(figsize=(25, 25))
# sns.heatmap(df_selected.corr(), vmax=1, vmin=-1, center=0, annot=True, square=True, cmap="bwr", annot_kws={"size": 16}, fmt=".2f", cbar_kws={"shrink": 0.85})
# plt.show()

['avg_network_send_received-bytes_sent-eth0',
 'avg_handler-read_first',
 'avg_handler-rollback',
 'avg_handler-read_rnd_next',
 'avg_handler-external_lock']
the number of deleted metrics: 5


In [65]:
model = KMeans(n_clusters=2, random_state=100)

clusters = model.fit(df_selected)

# std_scaler = StandardScaler()
# df_std = std_scaler.fit_transform(df_selected)
# clusters = model.fit(df_std)

# mm_scaler = MinMaxScaler()
# df_normalized = mm_scaler.fit_transform(df_selected)
# clusters = model.fit(df_normalized)


In [66]:
for label_pred in clusters.labels_:
    print(label_pred, end=", ")
print('\n')
labels_int = labels.map({"read": 1, "write": 0}).to_numpy()
for label in labels_int:
    print(label, end=", ")

1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0