In [1]:
from cusum import *

from os.path import (
    abspath,
    dirname,
)
import numpy as np
import pandas as pd
import os
import pickle as pkl
from scipy import stats
from tensorflow import keras
import tensorflow as tf
import matplotlib.pyplot as plt
import networkx as nx
from pygraphviz import AGraph
import warnings
from sklearn.exceptions import DataConversionWarning
warnings.filterwarnings(action='ignore', category=DataConversionWarning)
warnings.filterwarnings(action='ignore', category=FutureWarning)

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
tf.get_logger().setLevel('ERROR')
tf.config.set_visible_devices([], 'GPU')
# sess = tf.compat.v1.Session(
#     config=tf.compat.v1.ConfigProto(log_device_placement=True))


In [26]:
# global variables
model_file = "/home/zjiae/Results/exp_data/mysql_plus"
graph_file = "/home/zjiae/Project/TiDB_exp/inference/mysql/manual.txt"
# graph_file = "/home/zjiae/Project/TiDB_exp/inference/mysql/blip_withoutCE.txt"
train_file = "/home/zjiae/Project/TiDB_exp/inference/mysql/train_1m_plus.csv"
# normal_data_file = "/home/zjiae/Project/TiDB_exp/inference/test_normal.csv"
# current_data_file = "/home/zjiae/Project/TiDB_exp/inference/test_anomaly.csv"
test_file="/home/zjiae/Project/TiDB_exp/inference/mysql/test1_1m.csv"

train_df = pd.read_csv(train_file)
test_df = pd.read_csv(test_file)

nodes_list = train_df.columns.to_list()
with open(graph_file, 'r') as fin:
    origin_edges = fin.read().splitlines()


dml_model_list = list(map(lambda x: x.split(
    '.')[0], os.listdir(os.path.join(model_file, "dml"))))
deepiv_model_list = os.listdir(os.path.join(model_file, "deepiv"))


In [27]:
def find_parents(node_name, edge_list):
    parents = []
    for edge in edge_list:
        treatment, outcome = edge.strip(';').split(' -> ')
        if node_name == outcome:
            parents.append(treatment)
    return parents


def deepiv_ate(deepiv_model, X=None, T0=0, T1=1):
    if np.ndim(T0) == 0:
        T0 = np.repeat(T0, 1 if X is None else np.shape(X)[0])
    if np.ndim(T1) == 0:
        T1 = np.repeat(T1, 1 if X is None else np.shape(X)[0])
    # if X is None:
    #     print('No data provided. Returning NaN.')
    #     return np.nan
    return (deepiv_model.predict([T1, X]) - deepiv_model.predict([T0, X])).reshape(-1, 1)


def for_estimation(node_name, value):
    if train_df[node_name].std() == 0:
        return value-value
    else:
        return (value - train_df[node_name].mean()) / train_df[node_name].std()
    # return (value - train_df[node_name].min()) / (train_df[node_name].max() - train_df[node_name].min())


def one_tailed_pValue(distribution, value):
    if distribution is None:
        return 0.5 if value == 0 else 0
    cdf = distribution.cdf(value)
    if cdf > 0.5:
        return 1 - cdf
    else:
        return cdf


def calc_ate(edge, t0, t1):
    if edge in dml_model_list:
        # read pickle file
        with open(os.path.join(model_file, "dml", edge+".pkl"), 'rb') as fin:
            dml_model_config = pkl.load(fin)

        X_now = [(current_log.get(c) - train_df.get(c).mean()) / train_df.get(c).std()
                 for c in dml_model_config['confounder']]
        X_now = np.array(X_now).reshape(1, -1)

        ate = dml_model_config['est'].ate(
            X=X_now, T0=t0, T1=t1).mean()

    elif edge in deepiv_model_list:
        deepiv_model = keras.models.load_model(
            os.path.join(model_file, "deepiv", edge))
        # with open(os.path.join(model_file, "deepiv", edge, "causal.pkl"), 'rb') as fin:
        #     deepiv_model_config = pkl.load(fin)

        ate = deepiv_ate(deepiv_model, X=np.zeros((1, 1)), T0=t0,
                         T1=t1).mean()
    return ate


# def final_effect(trace_list, treat_node, t0, t1):
#     alt_effects = []
#     for trace in trace_list:
#         if trace[0] == treat_node:
#             if trace[1] == start_node:
#                 return calc_ate(trace[0]+'-'+trace[1], t0, t1)
#             else:
#                 effect = calc_ate(trace[0]+'-'+trace[1], t0, t1)
#                 next_node = trace[1]
#                 if current_log.get(next_node) is None:
#                     alt_effects.append(0)
#                 else:
#                     t1_next = for_estimation(
#                         next_node, current_log.get(next_node))
#                     t0_next = t1_next-effect
#                     alt_effects.append(final_effect(
#                         trace_list, next_node, t0_next, t1_next))
#     # print(treat_node,alt_effects)
#     return sum(alt_effects)


def counter_factual(DG, sorted_nodes, treat_node_idx):
    treat_node = sorted_nodes[treat_node_idx]
    effect_dict = {}

    if treat_node in normal_df.columns:
        t0 = for_estimation(treat_node, normal_df[treat_node].mean())
    else:
        t0 = for_estimation(treat_node, 0)

    t1 = for_estimation(treat_node, current_log[treat_node])
    for s in DG.successors(treat_node):
        edge = treat_node+'-'+s
        effect_dict[edge] = calc_ate(edge, t0, t1)

    for i in range(treat_node_idx+1, len(sorted_nodes)-1):
        node = sorted_nodes[i]
        if current_log.get(node) is None:
            for s in DG.successors(node):
                edge = node+'-'+s
                effect_dict[edge] = 0
        else:
            temp_t1 = for_estimation(node, current_log[node])

            temp_effect = 0
            for p in DG.predecessors(node):
                edge = p+'-'+node
                if effect_dict.get(edge) is not None:
                    temp_effect += effect_dict[edge]
            temp_t0 = temp_t1-temp_effect

            for s in DG.successors(node):
                edge = node+'-'+s
                effect_dict[edge] = calc_ate(edge, temp_t0, temp_t1)
    ret_effect = 0
    for p in DG.predecessors(sorted_nodes[-1]):
        edge = p+'-'+sorted_nodes[-1]
        if effect_dict.get(edge) is not None:
            ret_effect += effect_dict[edge]
        else:
            ret_effect += 0
    return ret_effect


def dfs_cause(DG, node, visited):
    """
    This function reproduce CauseInfer
    """
    visited.add(node)
    isAnomaly = False
    if node in current_log:
        x = test_df[start_node][(test_df["timestamp"] >= observed_time-180) &
                                (test_df["timestamp"] <= observed_time+120)]
        x = (x-x.min())/(x.max()-x.min())
        std = x.std()
        for i in range(3, 6):
            res, _, _, _ = detect_cusum(x[0:60], i/10, 0, True, False)
            if res.shape[0] == 0:
                res_1, _, _, _ = detect_cusum(x[120:180], i/10, 0, True, False)
                if res_1.shape[0] != 0:
                    isAnomaly = True
                break

        # detect_cusum(x[s:s+60], 0.3, 0, True, True)
        # ref_data = train_df[node].to_numpy()
        # ref_data = for_estimation(node, ref_data)
        # if ref_data.std() == 0:
        #     temp_PDF = 0
        # else:
        #     temp_distribution = stats.gaussian_kde(ref_data)
        #     temp_value = for_estimation(node, current_log[node])
        #     temp_PDF = temp_distribution.pdf(temp_value)[0]
    # if temp_PDF < threshold:
    if isAnomaly:
        for predecessor in DG.predecessors(node):
            if predecessor not in visited:
                visited.union(dfs_cause(DG, predecessor, visited))
    return visited


In [28]:
start_node = "MySQL_Query_Duration__None"

observed_time=1653859785

# normal_start=1653679800
# normal_stop=1653681600
normal_start=1653858600
normal_stop=1653859500

# slice normal_df from normal_star to normal_stop
normal_df = test_df.loc[(test_df['timestamp']>=normal_start) & (test_df['timestamp']<normal_stop)]

current_log = test_df[test_df["timestamp"]
                         == observed_time].to_dict('list')
current_log = {k: v[0] for k, v in current_log.items()}

In [29]:
if start_node not in nodes_list:
    print("Node not found")

print(f"Average value of normal: {normal_df[start_node].mean()}")
print(f"Anomaly value: {current_log[start_node]}")
start_data = train_df[start_node].to_numpy()
# start_data = (start_data-start_data.min())/(start_data.max()-start_data.min())
start_data=for_estimation(start_node,start_data)
# params = stats.norm.fit(start_data)
# node_distribution = stats.norm(*params)
node_distribution = stats.gaussian_kde(start_data)
fact_start = for_estimation(start_node, current_log[start_node])
fact_PDF = node_distribution.pdf(fact_start)
print("fact_PDF:", fact_PDF)

Average value of normal: 0.6645449529196077
Anomaly value: 0.4402866924732841
fact_PDF: [1.26705563]


In [30]:
node_point = 0
visit_list = [start_node, ]
trace_list = []
causes = {}

while(True):
    if node_point >= len(visit_list):
        # print("No more node. Done!")
        break
    # print("Try to find the parent of {}".format(visit_list[node_point]))
    parents = find_parents(visit_list[node_point], origin_edges)
    if len(parents) > 0:
        for parent in parents:
            if parent not in visit_list:
                visit_list.append(parent)
            trace_list.append((parent, visit_list[node_point]))
    node_point += 1

DG=nx.DiGraph()
DG.add_edges_from(trace_list)
sorted_nodes=list(nx.topological_sort(DG))

In [31]:
cause_infer=dfs_cause(DG, start_node, set())
# cause_infer=list(cause_infer)
# print items in sorted_nodes with index
for i in range(len(sorted_nodes)):
    if sorted_nodes[i] in cause_infer:
        print(sorted_nodes[i])
    if i >=5:
        break

# for i in range(len(cause_infer)):
#     print(i,cause_infer[i])

In [32]:
for i in range(len(sorted_nodes)-1):
    effect = counter_factual(DG, sorted_nodes, i)
    counter_start = fact_start-effect
    counter_PDF = node_distribution.pdf(counter_start)
    causes[sorted_nodes[i]] = (counter_PDF-fact_PDF, effect)
    if i % 10 == 0:
        # break
        print(f"{i}/{len(sorted_nodes)} has been done")


0/18 has been done
10/18 has been done


In [33]:
sorted_causes=dict(sorted(causes.items(), key=lambda item: item[1][0], reverse=True))
for k,v in sorted_causes.items():
    print(v,k)

(array([0.08213282]), -0.006238382309675217) System_Memory_Distribution__Free__None
(array([0.]), 0.0) MySQL_Query_Cache_Memory__Free_Memory__None
(array([0.]), 0.0) System_Swap_Activity__Swap_In_Reads__None
(array([0.]), 0.0) MySQL_Connections__Max_Used_Connections__None
(array([-0.13422876]), 0.01066964864730835) MySQL_Network_Usage_Hourly__Sent__None
(array([-0.14258019]), 0.011356402188539505) System_IO_Activity__Page_In__None
(array([-0.21049012]), 0.017057016491889954) MySQL_Network_Usage_Hourly__Received__None
(array([-0.71579642]), 0.07052215933799744) MySQL_Handlers__write
(array([-1.26636662]), -1.687682867050171) System_IO_Activity__Page_Out__None
(array([-1.26705563]), -54.988468170166016) System_CPU_Usage_Load__user
(array([-1.26705563]), 3.1834304332733154) MySQL_Transaction_Handlers__commit
(array([-1.26705563]), 3.0300304889678955) MySQL_Handlers__external_lock
(array([-1.26705563]), 2.9995107650756836) Current_QPS__None
(array([-1.26705563]), 2.9692225456237793) MySQL_

In [None]:
nodes_pdf = dict()
sorted_anomaly_nodes = []
for node in nodes_list:
    if node == "timestamp":
        continue
    if node in current_log:
        ref_data = train_df[node].to_numpy()
        ref_data = for_estimation(node, ref_data)
        if ref_data.std() == 0:
            nodes_pdf[node] = 0
        else:
            temp_distribution = stats.gaussian_kde(ref_data)
            temp_value = for_estimation(node, current_log[node])
            temp_PDF = temp_distribution.pdf(temp_value)
            nodes_pdf[node] = temp_PDF[0]

sorted_anomaly_nodes = sorted(
    nodes_pdf.items(), key=lambda x: x[1])
for i in range(10):
    print(sorted_anomaly_nodes[i][0])

In [None]:
for trace in trace_list:
    print(f"{trace[0]} -> {trace[1]}")

In [None]:
ax=plt.hist(start_data, density=True, bins=50)

fig, ax = plt.subplots(1, 1)
npts_sample = int(1e4)
x = np.linspace(start_data.min(), start_data.max(), npts_sample)
kde_pdf = node_distribution.evaluate(x)
ax.plot(x, kde_pdf)

# x=node_distribution.rvs(size=10000)
# ax.hist(x, density=True, bins=50)
# plt.show()
