In [2]:
%%html
<style>
div.container#notebook-container {width: 1300px}
</style>

# Import stuff

In [1230]:
%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import matplotlib

matplotlib.use("Cairo")  # alternatives: nbAgg
%matplotlib inline
# %matplotlib notebook
from matplotlib import rcParams

rcParams["font.family"] = "sans-serif"
rcParams['font.sans-serif'] = ['DejaVu Sans Mono']
rcParams["pdf.fonttype"] = 42
rcParams["ps.fonttype"] = 42
# plt.style.use('default')

import seaborn
from tqdm.auto import tqdm
from multiprocessing import Pool, RLock, freeze_support

import time
import datetime
import pandas as pd
from pandas.plotting import register_matplotlib_converters

register_matplotlib_converters()
import os
import re
import numpy as np
import random
import networkx as nx
import pickle

freeze_support()  # for Windows support
tqdm.set_lock(RLock())  # for managing output contention

# Import data load and process functions
from main_dycause_mp_new import dycause_causal_discover
import utility_funcs.data_load_funcs as data_funcs

# Import data and graph plot functions
import utility_funcs.graph_draw as gd

# Import modified interval process functions
import dycause_lib.method_improves as meth_imp

# Import root cause analysis functions
from dycause_lib.rca import analyze_root, normalize_by_column, search_path, bfs, case_rca_backtrace
from utility_funcs.evaluation_function import pr_stat, print_prk_acc, my_acc
import utility_funcs.exp_result_analyze as exp_ana

# For PCMCI functions
import tigramite.data_processing as pp

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Simple Helper Funcs

In [892]:
from tabulate import tabulate


def cross_validation(all_case_result, train_keys=['case1', 'case2', 'case3', 'case5', 'case6_host204', 'case6_host28'], verbose=False):
    training_dict = {}
    testing_dict = {}
    for k in all_case_result:
        if k in train_keys:
            training_dict[k] = all_case_result[k]
        else:
            testing_dict[k] = all_case_result[k]
    idx = exp_ana.print_case_avg_max_perf(training_dict, verbose=verbose)
    data = []
    for k in testing_dict:
        try:
            data.append([k, *testing_dict[k][idx]['prks'], np.mean(testing_dict[k][idx]['prks']), 
                         testing_dict[k][idx]['acc'], sum(testing_dict[k][idx]['time_info'].values())])
        except Exception:
            from IPython.core.debugger import set_trace
            set_trace() # 断点位置
    data.append(['case avg', *[np.mean([data[i][j+1] for i in range(len(data))]) for j in range(13)]])
    if verbose:
        print(tabulate(data, headers=['Case']+['PR@{}'.format(i+1) for i in range(10)]+['PR@Avg', 'Acc', 'Time'], floatfmt="#06.4f"))
    return idx, data


def enum_fold_cross_val(ours_all_case_result, train_size=5):
    each_fold_data = []
    keys_list = sorted(list(ours_all_case_result.keys()))  # sorted in order to keep the same with different dict
    rnd = len(ours_all_case_result) // train_size
    for i in range(rnd):
        _, data = cross_validation(ours_all_case_result, train_keys=keys_list[i*train_size:(i+1)*train_size], verbose=False)
        each_fold_data.append(data)
    final_data = []
    for i in range(4):
        final_data.append(each_fold_data[i][-1][1:])
    final_avg = ['Avg'] + np.mean(final_data, axis=0).tolist()
    final_std = ['Std'] + np.std(final_data, axis=0).tolist()
    print(tabulate([final_avg, final_std], headers=['Metric'] + ['PR@{}'.format(i+1) for i in range(10)]+['PR@Avg', 'Acc', 'Time'], floatfmt="#06.4f"))
    for i in range(1, len(final_avg)):
        print('{:.3g}±{:.3g}'.format(final_avg[i], final_std[i]), end=' ')
    return final_avg, final_std

## Domain Knowledge

In [4]:
#------------- Domain Knowledge --------------------------------------
only_source_cols = ['inBytes', 'inMulticast', 'inPackets',
                    'tcpPkgInsegs', 'udpInDatagrams']
only_target_cols = ['outBytes', 'outPackets',
                    'tcpPkgOutsegs', 'tcpPkgRetranssegs',
                    'udpOutDatagrams']

def get_only_source_target_idx(only_source_cols, only_target_cols, col_list):
    def find_match_ind(simple_column):
        for i, c in enumerate(col_list):
            if gd.remove_parenthesis(c) == simple_column:
                return i
#         assert False, f'Column {simple_column} not found in col_list!'
        return -1
    source_ids = [find_match_ind(i) for i in only_source_cols]
    source_ids = [i for i in source_ids if i!=-1]
    target_ids = [find_match_ind(i) for i in only_target_cols]
    target_ids = [i for i in target_ids if i != -1]
    return source_ids, target_ids

## Graph Drawing Column Sets

In [1]:
#--------- Case 1,2,5,6 Net & Runtime Metrics ----------
all_columns_d_1 = {
    'eth0': ['(iface=eth0)inPackets', '(iface=eth0)inPercent', '(iface=eth0)inMulticast', 
             '(iface=eth0)outPercent', '(iface=eth0)outPackets',
             '(iface=eth0)totalBytes', '(iface=eth0)totalPackets'],
    'cpu': ['busy', 'iowait', 'softirq', 'nice', 'system', 'user', 'switches'], 
    'kernel': ['kernelFilesAllocated'], 
    'load': ['load1', 'load5'],
    'memory': ['memBuffers', 'memCached', 'memShmem', 'memUsedPercent', 'memAvailablePercent'], 
    'tcp': ['retrans', 'tcpAbortOnTimeout', 'tcpDelayedACKLocked', 'tcpLostRetransmit', 'tcpPkgInsegs', 'tcpPkgOutsegs',
    'tcpPkgRetranssegs', 'tcpTW'],
    'socket': ['ssClosed', 'ssEstab', 'ssOrphaned', 'ssTimeWait'],
    'udp': ['udpInDatagrams', 'udpNoPorts', 'udpOutDatagrams', 'udpIgnoreMulti_Diff']
}
# --------- Case 7 all_columns
all_columns_d_3 = {
    "eth0": [
        "(iface=eth0)inPercent",
        "(iface=eth0)inPackets",
        "(iface=eth0)outPercent",
        "(iface=eth0)outPackets",
        "(iface=eth0)totalBytes",
        "(iface=eth0)totalPackets",
    ],
    "cpu": ["busy", "iowait", "softirq", "system", "user", "switches"],
    "kernel": ["kernelFilesAllocated"],
    "load": ["load1", "load5"],
    "memory": ["memBuffers", "memCached", "memShmem", "memUsedPercent", "memAvailablePercent"],
    "tcp": [
        "retrans",
        "tcpAbortOnTimeout",
        "tcpDelayedACKLocked",
        "tcpPkgInsegs",
        "tcpPkgOutsegs",
        "tcpTW",
    ],
    "socket": ["ssClosed", "ssEstab", "ssOrphaned", "ssTimeWait"],
    "udp": ["udpInDatagrams", "udpNoPorts", "udpOutDatagrams", "udpIgnoreMulti_Diff"],
    "disk": [
        "(mount=/data00)df.statistics.used.percent",
        "(mount=/data00)df.statistics.used",
        "(mount=/data00)df.statistics.total",
        "(mount=/data00)df.inodes.free.percent",
        "(mount=/data00)df.bytes.free.percent",
        "(mount=/)df.statistics.used.percent",
        "(mount=/)df.statistics.used",
        "(mount=/)df.statistics.total",
        "(mount=/)df.bytes.free.percent",
    ],
    "diskio": [
        "(device=sda)disk.io.write",
        "(device=sda)disk.io.w_wait",
        "(device=sda)disk.io.util",
        "(device=sda)disk.io.read",
        "(device=sda)disk.io.await",
        "(device=sda)disk.io.read_bytes_Diff",
        "(device=sda)disk.io.write_bytes_Diff",
        "(device=sdb)disk.io.write",
        "(device=sdb)disk.io.w_wait",
        "(device=sdb)disk.io.util",
        "(device=sdb)disk.io.read",
        "(device=sdb)disk.io.await",
        "(device=sdb)disk.io.read_bytes_Diff",
        "(device=sdb)disk.io.write_bytes_Diff",
    ],
}


columns_by_case = {}
color_info_by_case = {}

def extract_cols(d):
    all_columns = []
    for k, v in d.items():
        all_columns += sorted(v)
    return all_columns
columns_by_case['case1'] = extract_cols(all_columns_d_1)
color_info_by_case['case1'] = gd.get_node_colors(all_columns_d_1, columns_by_case['case1'])

columns_by_case['case7'] = extract_cols(all_columns_d_3)
color_info_by_case['case7'] = gd.get_node_colors(all_columns_d_3, columns_by_case['case7'])

NameError: name 'gd' is not defined

## Our Method Parameter Search
Example on one case.

In [1188]:
plot_out_dir = "plot_out_dir"

In [None]:
save_names = []
dycause_input = pd.read_csv(os.path.join('sample_data', 'anomaly_host_metrics.csv'), index_col=0)
dycause_input.index = pd.to_datetime(dycause_input.index)

print("Data shape:", dycause_input.shape)

df = data_funcs.normalize_df(dycause_input)

max_segment_len = None
for sign in [0.01, 0.05, 0.1]:
    for step in [30]:
        for lag in [6, 7, 8]:
            local_results_dy, dcc_dy, mat_dy, time_stat_dict_dy = dycause_causal_discover(
                # Data params
                dycause_input.to_numpy()[:, :],
                # Granger interval based graph construction params
                step=step,
                significant_thres=sign,
                lag=lag,  # must satisfy: step > 3 * lag + 1
                adaptive_threshold=0.7,
                use_multiprocess=True,
                max_workers=10,
                max_segment_len=max_segment_len,
                # Debug_params
                verbose=2,
                runtime_debug=True,
            )

            print(time_stat_dict_dy['Construct-Impact-Graph-Phase'])

            # Use the timezone in my location.
            local_tz = datetime.timezone(datetime.timedelta(hours=8))
            time_str = datetime.datetime.now(local_tz).strftime("%Y%m%d_%H%M%S")
            fname = os.path.join("temp_results", casename, f"exp_rets_{time_str}.pkl")
            save_names.append(fname)
            # print("Saving results to:", fname)
            os.makedirs(os.path.dirname(fname), exist_ok=True)
            with open(fname, "wb") as f:
                pickle.dump({
                    "local_results": local_results_dy,
                    "dcc": dcc_dy,
                    "mat": mat_dy,
                    "time_stat_dict": time_stat_dict_dy,
                    "step": step,
                    "lag": lag,
                    "sign": sign
                }, f)
print(save_names)

### Notes
* `all_case_input` is the dict for the host metrics data, diagnosis entry and the root causes.
* `save_names_1_7_nine` is the exp_rets results generated using previous cell.

In [None]:
all_case_rca_exp_res = {}
for case_num in tqdm(['case3']):
    df_path = df_path_dict[case_num]
    dycause_input = pd.read_csv(df_path, index_col=0)
    dycause_input.index = pd.to_datetime(dycause_input.index)
    hint_thres = 30 # for case 1-6: 250, for case 7-25: 30
    ntop=80
    all_case_rca_exp_res[case_num] = []
    # ---- Fill the following dict with save exp_rets in dycause_lib/ours_exp_logs.py
    for fname in save_names_1_7_nine[case_num][-1:]:
        with open(os.path.join("temp_results", exp_dir_dict[case_num], fname), "rb") as f:
            exp_ret = pickle.load(f)
        time_info = {'DyCause': exp_ret["time_stat_dict"]["Construct-Impact-Graph-Phase"]}
        exp_ret['time_info'] = time_info
        meth_imp.build_intervals_special(exp_ret['local_results'], exp_ret['sign'], dycause_input.shape[0], 
                                         exp_ret['step'], 
                                         dycause_input.shape[1])
        exp_ret['dcc_special'] = meth_imp.generate_DCC(exp_ret['local_results'], dycause_input.shape[0], 
                                                       dycause_input.shape[1], interval_key="intervals_special")
    

        columns = columns_by_case[case_num]
        N = len(columns)
        tic = time.time()
        data_idx_to_graph_idx, graph_idx_to_data_idx = gd.construct_d2g_map_dict(
            list(dycause_input.columns), columns, print_info=False
        )
        exp_ret["filtered_dcc"] = gd.filter_dcc(
            exp_ret["dcc"], N, graph_idx_to_data_idx
        )
        exp_ret["filtered_dcc_special"] = gd.filter_dcc(
            exp_ret["dcc_special"], N, graph_idx_to_data_idx
        )
        filtered_df = gd.filter_df(dycause_input, graph_idx_to_data_idx, N)
        # hints from anomaly dcc
        hints = meth_imp.get_dir_hints_from_dcc(exp_ret["filtered_dcc"], N, hint_thres)
#         hints = None
#         exp_ret['hint_thres'] = hint_thres
        # merge hints from normal profile
        # hints = hints + normal_profile_hints_filtered
        # hints = meth_imp.remove_double_direction_hints(hints)

        o_s_ids, o_t_ids = get_only_source_target_idx(
            only_source_cols, only_target_cols, columns
        )
#         o_s_ids, o_t_ids = None, None

        exp_ret['ntop'] = ntop
#     # for ntop in [80]:
        mat, candidates = meth_imp.global_thresholding(
            exp_ret["filtered_dcc_special"],
            N,
            normal_axis="none",
            ntop=ntop,
            only_source_ids=o_s_ids,
            only_target_ids=o_t_ids,
            hints=hints,
            return_candidates=True
        )
        toc = time.time() - tic
        exp_ret['mat'] = mat
        exp_ret['candidates'] = candidates
        exp_ret['time_info']['Graph'] = toc

        tic = time.time()
        # corr_res_cond_shuffled, corr_res_cond = calculate_CMIknn_shuffled(mat, filtered_df)
        corr_res_cond_shuffled, corr_res_cond = meth_imp.calculate_CMIknn_shuffled_mp(exp_ret['mat'], 
                                                                                      filtered_df.iloc[-120:, :], 
                                                                                      max_workers=10, 
                                                                                      verbose=True)
        # corr_res_cond_shuffled = meth_imp.calculate_parcorr(mat, filtered_df)
        toc = time.time() - tic
        exp_ret['corr_res_cond_shuffled'] = corr_res_cond_shuffled
#         exp_ret['corr_res_cond_shuffled'] = None
        exp_ret['time_info']['Cond'] = toc
        
        k = case_num
        entry = all_case_input[k]['entry']
        root_causes = all_case_input[k]['root_causes']

        exps = meth_imp.search_rca_backtrace_params(exp_ret['mat'], 
                                                    exp_ret['candidates'], 
                                                    exp_ret['corr_res_cond_shuffled'], 
                                                    entry, root_causes, filtered_df, 
                                                    exp_ret['time_info'],
                                                    verbose=False, comp_func=lambda x,p:x>p)
        for d in exps:
            d['step'] = exp_ret['step']
            d['lag'] = exp_ret['lag']
            d['sign'] = exp_ret['sign']
            d['hint_thres'] = hint_thres
            d['ntop'] = ntop
        all_case_rca_exp_res[case_num].extend(exps)

In [None]:
# Save parameter search results
with open(f'temp_results/all_case_rca_exp_results.pkl', 'wb') as f:
    pickle.dump(all_case_rca_exp_res, f)

## Perf Evaluation

In [None]:
# Load the parameter search results for each case
with open(f'temp_results/all_case_rca_exp_results.pkl', 'rb') as f:
    all_case_rca_exp_res = pickle.load(f)

In [896]:
cross_val_avg_std = {}
cross_val_avg_std['ours'] = enum_fold_cross_val(all_case_rca_exp_res, train_size=1)

Metric      PR@1    PR@2    PR@3    PR@4    PR@5    PR@6    PR@7    PR@8    PR@9    PR@10    PR@Avg     Acc     Time
--------  ------  ------  ------  ------  ------  ------  ------  ------  ------  -------  --------  ------  -------
Avg       0.0658  0.0921  0.1272  0.1645  0.2342  0.2829  0.3276  0.3855  0.4263   0.4789    0.2585  0.7702  74.9261
Std       0.0436  0.0228  0.0375  0.0218  0.0142  0.0151  0.0269  0.0180  0.0337   0.0411    0.0037  0.0117   5.8263
0.0658±0.0436 0.0921±0.0228 0.127±0.0375 0.164±0.0218 0.234±0.0142 0.283±0.0151 0.328±0.0269 0.386±0.018 0.426±0.0337 0.479±0.0411 0.259±0.00373 0.77±0.0117 74.9±5.83 

## DyCause
Notes:
* `save_names` is the exp_rets list as before.

In [16]:
dycause_all_case_result = {}

In [None]:
def dycause_rca_search(exp_ret, columns, entry, root_causes, filtered_df):
    
    dycause_results = []
    for thres in tqdm(np.arange(0.1, 1.0, 0.1)):
        tic = time.time()
        mat = adaptive_thresholding(
            exp_ret["filtered_dcc"], thres, len(columns)
        )
        toc = time.time() - tic
        dcc_sum = {}
        for k in exp_ret["filtered_dcc"]:
            dcc_sum[k] = float(np.sum(exp_ret["filtered_dcc"][k]))
        
        exp_rets = backtrace_param_search(mat, entry, root_causes, filtered_df)
        
        for d in exp_rets:
            d['thres'] = thres
            d['mat'] = mat
            d['dcc_sum'] = dcc_sum
            d['time_info']['Adaptive'] = toc
            d['time_info']['DyCause'] = time_info['DyCause']
        dycause_results.extend(exp_rets)
    return dycause_results

for case_num in tqdm([1]):
    case_str = f'case{case_num}'
    print('{:-^80}'.format(f' {case_str} '))
    dycause_input = pd.read_csv(os.path.join(data_out_dir, f'dycause_input_{case_num}.csv'),
                            index_col=0)
    dycause_input.index = pd.to_datetime(dycause_input.index)
    fname = "temp_results/" + save_names[case_num]
    with open(fname, "rb") as f:
        exp_ret = pickle.load(f)
    time_info = {'DyCause': exp_ret["time_stat_dict"]["Construct-Impact-Graph-Phase"]}
#     print('DyCause Time:', f'{exp_ret["time_stat_dict"]["Construct-Impact-Graph-Phase"]:.4f}')

    meth_imp.build_intervals_special(exp_ret['local_results'], exp_ret['sign'], dycause_input.shape[0], 
                                     exp_ret['step'], dycause_input.shape[1])
    exp_ret['dcc_special'] = meth_imp.generate_DCC(exp_ret['local_results'], dycause_input.shape[0], 
                                                   dycause_input.shape[1], interval_key="intervals_special")
    data_idx_to_graph_idx, graph_idx_to_data_idx  = gd.construct_d2g_map_dict(list(dycause_input.columns), 
                                                                      columns_by_case[case_str])
    exp_ret["filtered_dcc"] = gd.filter_dcc(exp_ret["dcc"], len(columns_by_case[case_str]), graph_idx_to_data_idx)
    filtered_df = gd.filter_df(dycause_input, graph_idx_to_data_idx, len(columns_by_case[case_str]))
    entry = get_case_entry(case_num)
    root_causes = get_case_root_causes(case_num)
    dycause_results = dycause_rca_search(exp_ret, columns_by_case[case_str], entry, root_causes, filtered_df)
#     dycause_all_case_result[case_num] = dycause_results

In [899]:
cross_val_avg_std['dycause'] = enum_fold_cross_val(dycause_all_case_result, train_size=1)

Metric      PR@1    PR@2    PR@3    PR@4    PR@5    PR@6    PR@7    PR@8    PR@9    PR@10    PR@Avg     Acc     Time
--------  ------  ------  ------  ------  ------  ------  ------  ------  ------  -------  --------  ------  -------
Avg       0.1579  0.1447  0.1557  0.1711  0.1868  0.2171  0.2618  0.2789  0.2895   0.3197    0.2183  0.4166  36.2762
Std       0.0744  0.0543  0.0293  0.0360  0.0362  0.0317  0.0549  0.0426  0.0448   0.0606    0.0418  0.1352   1.0182
0.158±0.0744 0.145±0.0543 0.156±0.0293 0.171±0.036 0.187±0.0362 0.217±0.0317 0.262±0.0549 0.279±0.0426 0.289±0.0448 0.32±0.0606 0.218±0.0418 0.417±0.135 36.3±1.02 

## CloudRanger

In [9]:
from cloud_ranger_lib.get_link_matrix import build_graph_pc
import cloud_ranger_lib.cloud_ranger as cloud_ranger
from utility_funcs.pearson import calc_pearson


def main_cloudranger(df, frontend, true_root_cause, pc_aggregate=5,
    pc_alpha=0.1, beta=0.3, 
    rho=0.2, testrun_round=5, verbose=False):
    '''
    Params:
        df: input dataframe, shape [T, N]
    '''
    tic = time.time()
    data = df.to_numpy()
    T, N = data.shape
    data = np.array([cloud_ranger.aggregate(row, pc_aggregate) for row in data.T]) # shape: [N, T]
    rela = calc_pearson(data, method="numpy", zero_diag=False)
    access = build_graph_pc(data, alpha=pc_alpha)
    pc_toc = time.time() - tic
    
    prks = []
    acc = []
    tic = time.time()
    for i in range(testrun_round):
        rank, P, M = cloud_ranger.relaToRank(
            rela, access, 10, frontend, beta=beta, rho=rho, print_trace=False
        )
        if verbose:
            for j in range(10):
                print(rank[j], end=", ")
            print("")
        prks.append(pr_stat(rank, true_root_cause, 10))
        acc.append(my_acc(rank, true_root_cause, n=N))
    toc = time.time() - tic
    toc = toc / testrun_round
    prks = np.mean(prks, axis=0)
    acc = np.mean(acc)
    if verbose:
        print_prk_acc(prks, acc)
    return prks, acc, {'random_walk_time': toc, 'pc_time': pc_toc}, {'access': access, 'rank': rank}

In [129]:
cr_all_case_result = {}

### CloudRanger on One Case

In [None]:
import multiprocessing as mp
pool = mp.Pool(10)
results = []
for pc_agg in [5, 10, 20]:
    for pc_a in [0.05, 0.1, 0.15]:
        for beta in [0.2, 0.4, 0.6]:
            for rho in [0.2, 0.4, 0.6]:
                results.append(pool.apply_async(main_cloudranger, (filtered_df, entry+1, [i+1 for i in root_causes],
                                                                   pc_agg, pc_a, beta, rho)))
pbar = tqdm(total=len(results))
for res in results:
    res.wait() # wait until it completes
    pbar.update(1)
pbar.close()
pool.close()
pool.terminate()

In [111]:
cr_exp_rets = [{'prks': r.get()[0], 'acc': r.get()[1], 'time_info': r.get()[2], 'others': r.get()[3]} for r in results]
idx = 0
for pc_agg in [5, 10, 20]:
    for pc_a in [0.05, 0.1, 0.15]:
        for beta in [0.2, 0.4, 0.6]:
            for rho in [0.2, 0.4, 0.6]:
                cr_exp_rets[idx].update({
                    'pc_agg': pc_agg,
                    'pc_a': pc_a,
                    'beta': beta,
                    'rho': rho
                })
                idx += 1

### Notes
* `cr_all_case_result` is the dict storing the CloudRanger search results of all cases.

In [901]:
cross_val_avg_std['cloudranger'] = enum_fold_cross_val(cr_all_case_result, train_size=1)

Metric      PR@1    PR@2    PR@3    PR@4    PR@5    PR@6    PR@7    PR@8    PR@9    PR@10    PR@Avg     Acc     Time
--------  ------  ------  ------  ------  ------  ------  ------  ------  ------  -------  --------  ------  -------
Avg       0.0105  0.0211  0.0307  0.0770  0.0776  0.1239  0.1287  0.1392  0.1492   0.1558    0.0914  0.6536  60.9526
Std       0.0182  0.0207  0.0228  0.0387  0.0340  0.0179  0.0191  0.0249  0.0266   0.0195    0.0147  0.0126  11.1873
0.0105±0.0182 0.0211±0.0207 0.0307±0.0228 0.077±0.0387 0.0776±0.034 0.124±0.0179 0.129±0.0191 0.139±0.0249 0.149±0.0266 0.156±0.0195 0.0914±0.0147 0.654±0.0126 61±11.2 

In [14]:
with open('temp_results/cloudranger/case1_cloudranger_exp_results.pkl', 'rb') as f:
    cr_case1_result = pickle.load(f)

## PCMCI RCA
* PCMCI with Backtrace RCA
* PCMCI with CloudRanger second-order random walk

In [5]:
from baselines.main_pcmci import run_pcmci, pcmci_build_g

In [6]:
pcmci_all_case_result = {}

In [None]:
for case in ['case1']:
    entry = all_case_input[case]['entry']
    root_causes = all_case_input[case]['root_causes']
    filtered_df = all_case_input[case]['filtered_df']

    # ---- Multiprocess run PCMCI Step 1 Conduct Tests ----
    import multiprocessing as mp
    pool = mp.Pool(10)
    pcmci_exp_rets = []
    results = []
    for pc_alpha in [0.01, 0.05, 0.1]:
        for lag in [6, 7, 8]:
            pcmci_exp_rets.append({
                'pc_alpha': pc_alpha,
                'lag': lag
            })
            results.append(pool.apply_async(run_pcmci, (filtered_df.to_numpy()[:, :], lag, pc_alpha, 'cmiknn')))
    pbar = tqdm(total=len(pcmci_exp_rets))
    for res in results:
        res.wait() # wait until it completes
        pbar.update(1)
    pbar.close()
    pool.close()
    pool.terminate()

    for i, d in enumerate(pcmci_exp_rets):
        try:
            pcmci, pcmci_res, toc = results[i].get()
            d['pcmci'] = pcmci
            d['pcmci_res'] = pcmci_res
            d['pcmci_time'] = toc
        except ValueError as e:
            d['pcmci'] = None
            d['pcmci_res'] = None
            d['pcmci_time'] = None


    # ---- single process PCMCI ----
    # pcmci_exp_rets = []
    # idx = 0
    # pbar = tqdm(total=9)
    # for pc_alpha in [0.01, 0.05, 0.1]:
    #     for lag in [5, 8, 10]:
    #         tic = time.time()
    #         try:
    #             pcmci_res = run_pcmci(filtered_df.to_numpy()[:, :], pc_alpha=pc_alpha, tau_max=lag, verbosity=0)
    #         except ValueError as e:
    #             pcmci_res = None
    #         toc = time.time()
    #         pcmci_exp_rets.append({
    #             "pc_alpha": pc_alpha, 
    #             'tau_max': lag,
    #             "pcmci_res": pcmci_res,
    #             "pcmci_time": toc-tic
    #         })
    #         pbar.update(1)
    # pbar.close()

    print('If valid results:', [d['pcmci_res'] is not None for d in pcmci_exp_rets])

    # ---- PCMCI Step 2 Build Graph ----
    pcmci_adj_res = []
    for i in range(len(pcmci_exp_rets)):
        tic = time.time()
        if pcmci_exp_rets[i]['pcmci'] != None:
            g = pcmci_build_g(pcmci_exp_rets[i]['pcmci'], pcmci_exp_rets[i]['pcmci_res'], pcmci_exp_rets[i]['pc_alpha'])
            adj = nx.to_numpy_matrix(g)
        else:
            adj = None
    #         print(adj)
        toc = time.time()
        pcmci_adj_res.append({
            'adj': adj,
            'pcmci_time_2': toc-tic
        })
        pcmci_adj_res[-1].update(pcmci_exp_rets[i])


    # ---- single process version ----
    # pcmci_rca_exp_results = []
    # for d in tqdm(pcmci_adj_res):
    #     if d['adj'] is not None:
    #         exp_rets = backtrace_param_search(d['adj'], entry, root_causes, filtered_df)

    #         for _d in exp_rets:
    #             _d['time_info']['pcmci_time'] = d['pcmci_time']
    #             _d['time_info']['pcmci_time_2'] = d['pcmci_time_2']
    #             _d['adj'] = d['adj']
    #             _d['pc_alpha'] = d['pc_alpha']
    #             _d['lag'] = d['lag']
    #         pcmci_rca_exp_results.extend(exp_rets)
    #     else:
    #         pcmci_rca_exp_results.extend([{'prks': None, 'acc': None} for _ in range(135)])

    # ---- multiple processes version ----
    pcmci_rca_exp_results = []
    import multiprocessing as mp
    pool = mp.Pool(10)
    pool_results = []
    for d in pcmci_adj_res:
        if d['adj'] is not None:
            # ---- backtrace RCA ----
            pool_results.append(pool.apply_async(
                meth_imp.backtrace_param_search, (d['adj'], entry, root_causes, filtered_df)))
            # ---- Random Walk RCA ----
#             pool_results.append(pool.apply_async(
#                 pcmci_randwalk_params_search, (filtered_df.to_numpy().T, d['adj'].tolist(), entry+1, [i+1 for i in root_causes])))
        else:
            pool_results.append(None)
    pool_futures = [i for i in pool_results if i is not None]
    pbar = tqdm(total=len(pool_futures))
    for fut in pool_futures:
        fut.wait() # wait until it completes
        pbar.update(1)
    pbar.close()
    pool.close()
    pool.terminate()
    for i, d in enumerate(pcmci_adj_res):
        if d['adj'] is not None:
            try:
                exp_rets = pool_results[i].get()
                for _d in exp_rets:
                    _d['time_info']['pcmci_time'] = d['pcmci_time']
                    _d['time_info']['pcmci_time_2'] = d['pcmci_time_2']
                    _d['adj'] = d['adj']
                    _d['pc_alpha'] = d['pc_alpha']
                    _d['lag'] = d['lag']
                pcmci_rca_exp_results.extend(exp_rets)
            except ValueError as e:
                pcmci_rca_exp_results.extend([{'prks': None, 'acc': None} for _ in range(45)])
        else:
            pcmci_rca_exp_results.extend([{'prks': None, 'acc': None} for _ in range(45)])

    pcmci_all_case_result[case] = pcmci_rca_exp_results
#     with open(f'temp_results/pcmci/{case}_pcmci2_exp_results_new.pkl', 'wb') as f:
#         pickle.dump(pcmci_rca_exp_results, f)

In [903]:
cross_val_avg_std['pcmci bt'] = enum_fold_cross_val(pcmci_all_case_result, train_size=1)

Metric      PR@1    PR@2    PR@3    PR@4    PR@5    PR@6    PR@7    PR@8    PR@9    PR@10    PR@Avg     Acc     Time
--------  ------  ------  ------  ------  ------  ------  ------  ------  ------  -------  --------  ------  -------
Avg       0.0625  0.1016  0.1094  0.1797  0.1875  0.2250  0.2687  0.3031  0.3281   0.3625    0.2128  0.7015  29.9527
Std       0.0000  0.0341  0.0308  0.0524  0.0364  0.0293  0.0187  0.0223  0.0185   0.0198    0.0240  0.0444   4.8446
0.0625±0 0.102±0.0341 0.109±0.0308 0.18±0.0524 0.188±0.0364 0.225±0.0293 0.269±0.0187 0.303±0.0223 0.328±0.0185 0.363±0.0198 0.213±0.024 0.701±0.0444 30±4.84 

In [905]:
cross_val_avg_std['pcmci rw'] = enum_fold_cross_val(pcmci_all_case_result, train_size=1)

Metric      PR@1    PR@2    PR@3    PR@4    PR@5    PR@6    PR@7    PR@8    PR@9    PR@10    PR@Avg     Acc     Time
--------  ------  ------  ------  ------  ------  ------  ------  ------  ------  -------  --------  ------  -------
Avg       0.0156  0.0781  0.0859  0.1289  0.1328  0.1797  0.2406  0.2594  0.3250   0.3656    0.1812  0.6677  32.2557
Std       0.0271  0.0271  0.0348  0.0231  0.0173  0.0317  0.0185  0.0285  0.0515   0.0526    0.0117  0.0335   3.6727
0.0156±0.0271 0.0781±0.0271 0.0859±0.0348 0.129±0.0231 0.133±0.0173 0.18±0.0317 0.241±0.0185 0.259±0.0285 0.325±0.0515 0.366±0.0526 0.181±0.0117 0.668±0.0335 32.3±3.67 