# Aggregation

## 场景一

### (1) 描述

场景一中，所有实验按固定的模式运行，包含干扰循环和负载循环
1. 初始化 client 和 server(vm)
2. 启动 background 干扰，当前支持的干扰有: cpu/cache/mem/io/net
3. 启动 workload， 当前支持的workload有: redis/nginx/mysql
4. workload 结束后，记录一份 `workload_info`, 包含时间戳, 负载、干扰强度等metadata
5. 回到步骤3，执行下一个workload，当所有workload都执行完毕后，记录一份 `info_per_epoch`, 包含所有已经执行的 `workload_info`, 以及此次循环的干扰强度信息
6. 回到步骤2，直到所有的强度的干扰都执行完毕
7. 结束实验，记录实验的启动、结束时间，总共消耗的时间，以及干扰循环的总次数

以上信息保存在 `exp.json` 或 `date.json` 中，另外通过 Prometheus client 请求得到整个实验周期中的metric数据，保存在 `merged.csv` 中


### (2) 数据处理


#### 1. Quick Start


通过 `read_from_dir(dir)` 读取 metadata 和csv 数据创建 `ExpData` 实例

In [None]:
from aggregation import *
pd.set_option('display.max_rows', 10) 

exp_root = "/home/ict/appProfile/data/redis_1/no_stress/redis_no_20231102130306"
exp_data = read_from_dir(exp_root)

In [None]:
# shift time
# 32 for opt, 10 for warmup, 1 for container start
shift = exp_data._time_shift(opt_interval=32, delay=11)
exp_data.shift_time(shift).exp["info_per_epoch"]

调用 `agg_epoch()` 方法之后，`exp_data` 会按每次干扰循环聚合数据, 这也是推荐的使用方法

In [None]:
df_epoch = exp_data.agg_epoch()
df_epoch

因为返回的仍然是一个 DataFrame, 因此后续可以根据数据处理的需要自行设置方法，如下展示一种从 `df_epoch` 数据中获取某个 workload 数据的流程

In [None]:
df_epoch_group = df_epoch.groupby(df_epoch.index)
keys = list(df_epoch_group.groups.keys())
keys

使用 `groupby` 函数处理 Dataframe, 并从 `groups.keys()` 选择一个获取此 workload 的数据

In [None]:
df_workload = df_epoch_group.get_group(keys[0])
df_workload

#### 2. Custom Process

`exp_data` 将读入的数据保存在 `exp` 与 `df` 两个字段中，可以通过 `exp_data.exp` 与 `exp_data.df` 来直接访问读入的数据， 如获取某个 workload info, 可以采取如下方式

In [None]:
workload_info = exp_data.exp["info_per_epoch"][0]["workloads"][keys[0]]
workload_info

`workloads_of` 可以通过 name 获取所有的 workloads

In [None]:
workload_infos = exp_data.workloads_of(keys[0])
workload_infos

随后，可以使用” `workload_df` 来获取此 workload 对应的 dataframe

In [None]:
exp_data.workload_df(workload_info)

默认情况下不会提取 "stress" 数据，可以指定参数 `with_stress` 来开启

In [None]:
exp_data.workload_df(workload_info, with_stress=True)

或者通过 `agg_one_workload` 获取此 workload 对应时间序列下的均值

In [None]:
exp_data.agg_one_workload(workload_info)

`exp_data` 也允许只获取某个 干扰循环 的数据，通过下标指示

In [None]:
exp_data.agg_epoch(0)

### 3. Advanced Usage

默认情况下 `exp_data` 对每个 workload 采用如下预处理手段，按顺序依次为:
1. `filter_column_startswith(col_prefix=("vm", "app"))`: 只选用 `vm`, `app` 前缀的指标
2. `filter_column_useless(std_min=1e-10)`: 过滤掉平均值为0, 或方差小于 `1e-10` 的指标
3. `filter_row_noise(col_prefix=("app"))`: 过滤行中 `app` 为前缀指标中的离群值

同时，对于每个 workload 采用如下聚合手段，按顺序依次为:
1. `lambda x : x.mean().to_frame().T`: 将一个workload时序数据按均值压缩为一行

`exp_data` 允许对上述处理进行自定义, 需要注意的是, 自定义的方法设置完毕之后，将会一直生效，包括在 `agg_epoch` 时
- `set_workload_preprocess_funcs(df_funcs):`: 自定义预处理手段
- `set_workload_agg_funcs(df_funcs):`: 自定义聚合手段

其中 `df_funcs` 为一组函数，每个函数都满足如下签名:
- `df_func(df: DataFrame) -> DataFrame`

In [None]:
# defualt_workload_preprocess_funcs = [
#     filter_column_startswith(col_prefix=("stress", "host", "vm", "app")),
#     filter_column_useless(excol_prefix=("stress")),
#     filter_row_noise(col_prefix=("app")),
# ]

# defualt_workload_agg_funcs = [
#     lambda x : x.mean().to_frame().T,
# ]

exp_data.set_workload_preprocess_funcs([
    filter_column_startswith(col_prefix=("stress", "host", "vm", "app")),
    filter_column_useless(excol_prefix=("stress")),
])

exp_data.workload_df(workload_info, with_stress=True)

In [None]:
# 获取最大值而不是平均值
exp_data.set_workload_agg_funcs([
    lambda x : x.max().to_frame().T,
])

exp_data.agg_one_workload(workload_info)

### exp 运算

`exp.json` 中以 workload 为粒度进行编排组织
- `info_per_epoch` 以 epcoh 的维度组织 workload, 每个 epoch 在全局设置，如干扰上存在不同
- `info_per_workload` 以 workload 的维度组织，每个 workload 中包含了同一个 workload 不同epoch下的数据

In [None]:
assert exp_data.exp["info_per_workload"] == ipe_to_ipw(exp_data.exp["info_per_epoch"])
assert exp_data.exp["info_per_epoch"] == ipw_to_ipe(exp_data.exp["info_per_workload"])

In [None]:
exp = exp_data.exp
keys = list(exp["info_per_workload"].keys())
sub_workload_1 = {k: exp["info_per_workload"][k] for k in keys[:len(keys) // 2]}
sub_workload_2 = {k: exp["info_per_workload"][k] for k in keys[len(keys) // 2:]}

In [None]:
def workload_to_exp(workload):
    info_per_workload = workload
    info_per_epoch = ipw_to_ipe(workload)
    
    workload_keys = list(info_per_workload.keys())
    start_time = info_per_epoch[0]["workloads"][workload_keys[0]]["start_time"]
    end_time = info_per_epoch[-1]["workloads"][workload_keys[-1]]["end_time"]
    total_time = end_time - start_time
    n_epoch = len(info_per_epoch)
    date_format = 'timestamp'
    
    
    return {
        "start_time": start_time,
        "end_time": end_time,
        "total_time": total_time,
        "n_epoch": n_epoch,
        "date_format": date_format,
        "info_per_workload": info_per_workload,
        "info_per_epoch": info_per_epoch,
    }
sub_exp_1 = workload_to_exp(sub_workload_1)
sub_exp_2 = workload_to_exp(sub_workload_2)

sub_data_1 = filter_row_timerange(
    format_to_13_timestamp(sub_exp_1["start_time"]),
    format_to_13_timestamp(sub_exp_1["end_time"]),
)(exp_data.df)

sub_data_2 = filter_row_timerange(
    format_to_13_timestamp(sub_exp_2["start_time"]),
    format_to_13_timestamp(sub_exp_2["end_time"]),
)(exp_data.df)

sub_exp_data_1 = ExpData(sub_data_1, sub_exp_1)
sub_exp_data_2 = ExpData(sub_data_2, sub_exp_2)

In [None]:
total_exp_data = concat([sub_exp_data_1, sub_exp_data_2])