In [3]:
from tqdm.notebook import tqdm
import multiprocessing
from typing import *
import pandas as pd
import os
import json
from pathlib import Path
from collections import defaultdict
import pickle
from concurrent.futures import ProcessPoolExecutor

DATASET = 'D'
TEST_RATE = .4
SAVE_ROOT = f'../data/{DATASET}'
os.makedirs(SAVE_ROOT, exist_ok=True)

### Run Table

In [5]:
faults = pd.read_csv(f"/SSF/data/{DATASET}/faults.csv")

faults['instance'] = faults['root_cause_node'].apply(lambda x: x.split()[0])
def node_type_map(rc):
    component = 'Node' if 'node' in rc else 'Pod'
    typ = ''
    if 'CPU' in rc:
        typ = f'{component} CPU'
    elif 'Memory' in rc:
        typ = f'{component} Memory'
    else:
        typ = component
    return typ
faults['anomaly_type'] = faults['root_cause_node'].apply(node_type_map)
faults['st_time'] = faults['timestamp']
faults

Unnamed: 0,timestamp,duration,root_cause_node,experiment_type,datetime,drift_type,instance,anomaly_type,st_time
0,1709139600,5m,node4 CPU,node-cpu-stress-xll-k8s-5,2024-02-29 01:00:00+08:00,raw,node4,Node CPU,1709139600
1,1709143200,5m,node3 CPU,node-cpu-stress-xll-k8s-3,2024-02-29 02:00:00+08:00,raw,node3,Node CPU,1709143200
2,1709146800,5m,node3 CPU,node-cpu-stress-xll-k8s-3,2024-02-29 03:00:00+08:00,raw,node3,Node CPU,1709146800
3,1709161200,5m,ts-verification-code-service-0 CPU;ts-ticketin...,pod-cpu-stress,2024-02-29 07:00:00+08:00,raw,ts-verification-code-service-0,Pod CPU,1709161200
4,1709164800,5m,ts-assurance-service-1 CPU,pod-cpu-stress,2024-02-29 08:00:00+08:00,raw,ts-assurance-service-1,Pod CPU,1709164800
...,...,...,...,...,...,...,...,...,...
155,1712698200,5m,ts-seat-service-0,pod-network-corrupt,2024-04-10 05:30:00+08:00,travel svc cpu 0410,ts-seat-service-0,Pod,1712698200
156,1712703600,5m,ts-inside-payment-service-1,pod-network-delay,2024-04-10 07:00:00+08:00,travel svc cpu 0410,ts-inside-payment-service-1,Pod,1712703600
157,1712714400,5m,ts-execute-service-0,pod-failure,2024-04-10 10:00:00+08:00,travel svc cpu 0410,ts-execute-service-0,Pod,1712714400
158,1712856600,5m,ts-station-service-1 Memory,pod-memory-stress,2024-04-12 01:30:00+08:00,node2 node memory 0412,ts-station-service-1,Pod Memory,1712856600


In [11]:
run_table = faults[faults['drift_type']=='raw']
run_table = run_table[['instance', 'st_time', 'anomaly_type']]
run_table["anomaly_type"] = run_table["anomaly_type"].apply(lambda x: "[" + x + "]")
run_table['service'] = run_table['instance']
run_table["duration"] = [300] * len(run_table)
run_table["ed_time"] = run_table["st_time"] + run_table["duration"]
run_table["data_type"] = ["train"] * len(run_table)
run_table = run_table[['st_time','service','instance','anomaly_type','duration','ed_time','data_type']]
run_table.reset_index(drop=True, inplace=True)

anomaly_cnt = run_table.groupby(by="anomaly_type")["instance"].count()
anomaly_cnt_dict = anomaly_cnt.to_dict()
for anomaly, group in run_table.groupby(by="anomaly_type"):
    sample_cnt = int(anomaly_cnt_dict[anomaly] * TEST_RATE)
    group_index = group.index.to_list()
    test_choices = group_index[-sample_cnt:]
    for choice in test_choices:
        run_table.loc[choice, "data_type"] = "test"

anomaly_types = [item for item in run_table.anomaly_type.unique()]
anomaly_types.sort()
print(' '.join(anomaly_types))

print('train: ', len(run_table[run_table['data_type']=='train']), 'test: ', len(run_table[run_table['data_type']=='test']))
run_table.to_csv(os.path.join(SAVE_ROOT, 'run_table.csv'))
run_table

[Node CPU] [Node Memory] [Pod CPU] [Pod Memory] [Pod]
train:  69 test:  41


Unnamed: 0,st_time,service,instance,anomaly_type,duration,ed_time,data_type
0,1709139600,node4,node4,[Node CPU],300,1709139900,train
1,1709143200,node3,node3,[Node CPU],300,1709143500,train
2,1709146800,node3,node3,[Node CPU],300,1709147100,train
3,1709161200,ts-verification-code-service-0,ts-verification-code-service-0,[Pod CPU],300,1709161500,train
4,1709164800,ts-assurance-service-1,ts-assurance-service-1,[Pod CPU],300,1709165100,train
...,...,...,...,...,...,...,...
105,1709433000,ts-security-service-0,ts-security-service-0,[Pod],300,1709433300,test
106,1709434800,ts-order-service-0;ts-auth-service-0,ts-order-service-0;ts-auth-service-0,[Pod],300,1709435100,test
107,1709436600,ts-admin-order-service-1,ts-admin-order-service-1,[Pod],300,1709436900,test
108,1709438400,ts-order-service-1,ts-order-service-1,[Pod],300,1709438700,test


In [16]:
run_table_train = run_table[run_table['data_type']=='train']
run_table_test = faults[faults['drift_type']!='raw']
run_table_test = run_table_test[['instance', 'st_time', 'anomaly_type']]
run_table_test["anomaly_type"] = run_table_test["anomaly_type"].apply(lambda x: "[" + x + "]")
run_table_test['service'] = run_table_test['instance']
run_table_test["duration"] = [300] * len(run_table_test)
run_table_test["ed_time"] = run_table_test["st_time"] + run_table_test["duration"]
run_table_test["data_type"] = ["test"] * len(run_table_test)
run_table_test = run_table_test[['st_time','service','instance','anomaly_type','duration','ed_time','data_type']]
run_table_test.reset_index(drop=True, inplace=True)
run_table_new = pd.concat([run_table_train, run_table_test])
run_table_new.reset_index(drop=True, inplace=True)
print('train: ', len(run_table_new[run_table_new['data_type']=='train']), 'test: ', len(run_table_new[run_table_new['data_type']=='test']))
run_table_new.to_csv(os.path.join(SAVE_ROOT, 'run_table_drift.csv'))
run_table_new

train:  69 test:  50


Unnamed: 0,st_time,service,instance,anomaly_type,duration,ed_time,data_type
0,1709139600,node4,node4,[Node CPU],300,1709139900,train
1,1709143200,node3,node3,[Node CPU],300,1709143500,train
2,1709146800,node3,node3,[Node CPU],300,1709147100,train
3,1709161200,ts-verification-code-service-0,ts-verification-code-service-0,[Pod CPU],300,1709161500,train
4,1709164800,ts-assurance-service-1,ts-assurance-service-1,[Pod CPU],300,1709165100,train
...,...,...,...,...,...,...,...
114,1712698200,ts-seat-service-0,ts-seat-service-0,[Pod],300,1712698500,test
115,1712703600,ts-inside-payment-service-1,ts-inside-payment-service-1,[Pod],300,1712703900,test
116,1712714400,ts-execute-service-0,ts-execute-service-0,[Pod],300,1712714700,test
117,1712856600,ts-station-service-1,ts-station-service-1,[Pod Memory],300,1712856900,test


### Anomalies

In [17]:
def load_pkl(filepath):
    with open(filepath, 'rb') as f:
        data = pickle.load(f)
    return data

metrics_df = load_pkl(f'/SSF/data/{DATASET}/metrics.pkl')
metrics_df

Unnamed: 0,timestamp,value,name,pod,metric_kind,clientServiceName,serviceName,operationName,kind,method,url,count,cost,proc,succ,client_cost,url_prefix,metric_type
0,1709136000,24969216.0,ts-admin-basic-info-service-0##jvm_code_cache_...,,,,,,,,,,,,,,,jvm_code_cache_memory_bytes_committed
1,1709136120,9240576.0,ts-admin-basic-info-service-0##jvm_code_cache_...,,,,,,,,,,,,,,,jvm_code_cache_memory_bytes_committed
2,1709136180,23003136.0,ts-admin-basic-info-service-0##jvm_code_cache_...,,,,,,,,,,,,,,,jvm_code_cache_memory_bytes_committed
3,1709136240,23003136.0,ts-admin-basic-info-service-0##jvm_code_cache_...,,,,,,,,,,,,,,,jvm_code_cache_memory_bytes_committed
4,1709136300,23003136.0,ts-admin-basic-info-service-0##jvm_code_cache_...,,,,,,,,,,,,,,,jvm_code_cache_memory_bytes_committed
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
51074089,1712860560,0.0,ts-voucher-service-1##fs_write_bytes,,,,,,,,,,,,,,,fs_write_bytes
51074090,1712860620,0.0,ts-voucher-service-1##fs_write_bytes,,,,,,,,,,,,,,,fs_write_bytes
51074091,1712860680,0.0,ts-voucher-service-1##fs_write_bytes,,,,,,,,,,,,,,,fs_write_bytes
51074092,1712860740,0.0,ts-voucher-service-1##fs_write_bytes,,,,,,,,,,,,,,,fs_write_bytes


In [18]:
metrics_df['cmdb'] = metrics_df['name'].map(lambda _: _.split('##')[0])
metrics_df = metrics_df[['timestamp', 'name', 'cmdb', 'metric_type', 'value']]
nodes_excluded = ['jaeger-query','loadgenerator', 'loadgenerator-5bcbc46d56-25r6l','loadgenerator-5bcbc46d56-48kv8','loadgenerator-5bcbc46d56-8xvxw','loadgenerator-5bcbc46d56-brf2g','loadgenerator-5bcbc46d56-hbmfv','loadgenerator-5bcbc46d56-lw2xk','loadgenerator-5ddf956d5b-sn2bk','loadgenerator-5ddf956d5b-wzd2x','loadgenerator-5bcbc46d56-8ng58', 'loadgenerator-5bcbc46d56-9glvx','loadgenerator-5bcbc46d56-f4b5b', 'loadgenerator-5bcbc46d56-khhb4','loadgenerator-5bcbc46d56-m72xc', 'loadgenerator-5bcbc46d56-sgnlw','loadgenerator-5bcbc46d56-tqbjj', 'loadgenerator-5ddf956d5b-9dfcq','loadgenerator-8549ff4cd6-ls7qp', 'loadgenerator-f8bd9f56b-xtx24', 'loadgenerator-f8bd9f56b-6tgz4','unknown']
metrics_df =  metrics_df[~metrics_df['cmdb'].isin(nodes_excluded)]
metrics_df = metrics_df.drop_duplicates()
metrics_df = metrics_df.rename(columns={"metric_type": "kpi_name", "cmdb": "cmdb_id"})
metrics_df

Unnamed: 0,timestamp,name,cmdb_id,kpi_name,value
0,1709136000,ts-admin-basic-info-service-0##jvm_code_cache_...,ts-admin-basic-info-service-0,jvm_code_cache_memory_bytes_committed,24969216.0
1,1709136120,ts-admin-basic-info-service-0##jvm_code_cache_...,ts-admin-basic-info-service-0,jvm_code_cache_memory_bytes_committed,9240576.0
2,1709136180,ts-admin-basic-info-service-0##jvm_code_cache_...,ts-admin-basic-info-service-0,jvm_code_cache_memory_bytes_committed,23003136.0
3,1709136240,ts-admin-basic-info-service-0##jvm_code_cache_...,ts-admin-basic-info-service-0,jvm_code_cache_memory_bytes_committed,23003136.0
4,1709136300,ts-admin-basic-info-service-0##jvm_code_cache_...,ts-admin-basic-info-service-0,jvm_code_cache_memory_bytes_committed,23003136.0
...,...,...,...,...,...
51074089,1712860560,ts-voucher-service-1##fs_write_bytes,ts-voucher-service-1,fs_write_bytes,0.0
51074090,1712860620,ts-voucher-service-1##fs_write_bytes,ts-voucher-service-1,fs_write_bytes,0.0
51074091,1712860680,ts-voucher-service-1##fs_write_bytes,ts-voucher-service-1,fs_write_bytes,0.0
51074092,1712860740,ts-voucher-service-1##fs_write_bytes,ts-voucher-service-1,fs_write_bytes,0.0


In [20]:
nodes = list(metrics_df.cmdb_id.unique())
nodes.sort()
print(f"{len(nodes)} nodes: \n{' '.join(nodes)}")
node_hash = {node: index for index, node in enumerate(nodes)}

import sys
sys.path.insert(0, '/SSF/DejaVu-Omni')
from failure_dependency_graph.parse_yaml_graph_config import parse_yaml_graph_config

graph = parse_yaml_graph_config('/SSF/data/D/graphs/graph_1711657800.yml', Path('tmp'))
src, tgt = [], []
for i, j in graph.edges:
    if i in nodes and j in nodes:
        src.append(j)
        tgt.append(i)
src = [node_hash[item] for item in src]
tgt = [node_hash[item] for item in tgt]
print('src: ', src)
print('tgt: ', tgt)

src:  [66, 69, 66, 69, 93, 107, 111, 90, 69, 42, 17, 117, 93, 107, 126, 53, 97, 101, 69, 101, 117, 101, 49, 66, 69, 117, 32, 73, 66, 69, 101, 111, 90, 82, 26, 101, 45, 49, 126, 90, 97, 117, 78, 69, 82, 17, 53, 21, 93, 57, 26, 42, 13, 101, 32, 73, 107, 111, 66, 124, 3, 4, 2, 1, 3, 1, 2, 3, 2, 1, 2, 1, 4, 3, 4, 1, 4, 1, 3, 4, 2, 3, 2, 1, 4, 2, 4, 1, 3, 2, 1, 4, 1, 2, 4, 1, 4, 3, 4, 1, 1, 3, 3, 2, 1, 3, 1, 2, 3, 1, 1, 4, 1, 2, 1, 3, 4, 3, 2, 3, 1, 4, 2, 1, 2, 1, 4, 4, 3, 1, 2, 4, 1, 3, 4, 1, 2, 1, 1, 2, 3, 1, 3, 1, 4, 3, 1, 2, 4, 1, 2, 1, 4, 3, 3, 4, 2, 3, 3, 4, 3, 1, 4, 1, 3]
tgt:  [45, 45, 97, 97, 117, 117, 117, 117, 117, 78, 78, 78, 78, 78, 78, 78, 78, 78, 78, 69, 53, 53, 53, 93, 93, 93, 93, 57, 57, 57, 26, 26, 26, 26, 107, 66, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 123, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 22, 23, 24, 25, 27, 28, 29, 30, 31, 33, 34, 35, 36, 37, 38, 39, 40, 41, 43, 44, 46, 47, 48,

In [21]:
import sys
sys.path.insert(0, '/SSF/DejaVu-Omni/DiagFusion/')
from detector.k_sigma import Ksigma

def process_task(df, case_id, st_time, ed_time):
    detector = Ksigma()
    rt = []
    scheduler = tqdm(total=len(df), desc=f"case:{case_id}, detecting")
    for instance, ins_group in df.groupby(by="cmdb_id"):
        for kpi, kpi_group in ins_group.groupby(by="kpi_name"):
            res = detector.detection(kpi_group, "value", st_time, ed_time)
            if res[0] is True:
                rt.append([int(res[1]), instance, kpi, res[2]])
        scheduler.update(len(ins_group))
    return rt

In [22]:
metric_dict = {}
tasks = []
pool = multiprocessing.Pool(processes=16)
for case_id, case in run_table.iterrows():
    # 故障前60个点，故障后0个点
    sample_interval = 60
    st_time = case["st_time"] - (sample_interval * 60)
    ed_time = case["ed_time"] + (sample_interval * 0)
    task = pool.apply_async(
        process_task,
        (
            metrics_df.query(f"timestamp >= {st_time} & timestamp < {ed_time}"),
            case_id,
            st_time,
            ed_time,
        ),
    )
    tasks.append((case_id, task))
pool.close()
pool.join()
for case_id, task in tasks:
    metric_dict[case_id] = task.get()

In [23]:
anomaly_path = os.path.join(SAVE_ROOT, 'anomalies')
os.makedirs(anomaly_path, exist_ok=True)
with open(os.path.join(anomaly_path, 'demo_metric.json'), "w") as w:
    json.dump(metric_dict, w)

In [24]:
metric_dict = {}
tasks = []
pool = multiprocessing.Pool(processes=16)
for case_id, case in run_table_new.iterrows():
    # 故障前60个点，故障后0个点
    sample_interval = 60
    st_time = case["st_time"] - (sample_interval * 60)
    ed_time = case["ed_time"] + (sample_interval * 0)
    task = pool.apply_async(
        process_task,
        (
            metrics_df.query(f"timestamp >= {st_time} & timestamp < {ed_time}"),
            case_id,
            st_time,
            ed_time,
        ),
    )
    tasks.append((case_id, task))
pool.close()
pool.join()
for case_id, task in tasks:
    metric_dict[case_id] = task.get()

anomaly_path = os.path.join(SAVE_ROOT, 'anomalies')
os.makedirs(anomaly_path, exist_ok=True)
with open(os.path.join(anomaly_path, 'demo_metric_drift.json'), "w") as w:
    json.dump(metric_dict, w)