In [None]:
import warnings
import sys
import os
if not sys.warnoptions:
    warnings.simplefilter("ignore")
    os.environ["PYTHONWARNINGS"] = "ignore" # Also affect subprocesses

import pandas as pd
import numpy as np
import time
from sklearn.linear_model import ElasticNetCV, LassoCV, LinearRegression
from collections import OrderedDict
from trainer import bayesFS, my_lasso_train, my_enet_train, my_ard_train, fsMTS_train
import matplotlib.pyplot as plt
%matplotlib inline
import pickle

## Get ground truth first

In [None]:
path_to_data = "/Users/chency/PythonProjects/exathlon/"
truth = pd.read_csv(path_to_data + "data/raw/ground_truth.csv", dtype={"root_cause_start": int, "root_cause_end": int},na_filter=False)
truth

## load cases
each case contains a ground truth label and their multi time series from corresponding trace

In [None]:
if os.path.exists('cases.pickle'):
    "load existing cases data"
    with open('cases.pickle', 'rb') as f:
        cases = pickle.load(f)
else:
    "no cases saved yet, clean them from exathlon raw data"
    cases = []
    for i, case in truth.iterrows():
        start = case['root_cause_start']
        end = case['root_cause_end']
        start_min = pd.to_datetime(start, unit='s').floor('min')
        end_min = pd.to_datetime(end, unit='s').floor('min')
        trace_name = case['trace_name']
        application = trace_name.split('_')[0]
        tmp = case.to_dict()
        tmp['start_min'] = start_min
        tmp['end_min'] = end_min
        metrics = pd.read_csv(path_to_data + "data/raw/app{}/{}.csv".format(application, trace_name))
        metrics = metrics[(metrics.t <= end) & (metrics.t > end - 10800)]
        metrics['time'] = pd.to_datetime(metrics['t'], unit='s')
        metrics = metrics.groupby(pd.Grouper(key='time', freq='1min')).mean()
        tmp['metrics'] = metrics
        cases.append(tmp)
    with open('cases.pickle', 'wb') as f:
        pickle.dump(cases, f)


In [None]:
def preprocessing(case: dict, diff_columns, pre='30min', after='1min'):
    """
    clip all metrics , Normalization
    """

    start_min = case['start_min'].floor('min')
    end_min = case['end_min'].floor('min')
    print("start minute: {}, end minute: {}".format(start_min, end_min))
    metrics = case['metrics']


    # clip [start_min - pre, start_min + after] time series
    metrics = metrics.drop('t', axis=1)[start_min - pd.Timedelta(pre): start_min + pd.Timedelta(after)]
    metrics = metrics.loc[:, metrics.isna().sum() != len(metrics)]

    for col in metrics.columns:
        for diff_col in diff_columns:
            if col.endswith(diff_col):
                metrics[col] = metrics[col].diff().bfill()
                break

    # standardrize, consider one-value array
    m = metrics.mean(axis=0)
    std = metrics.std()
    std[std == 0] = 1
    norm_metrics = (metrics - m) / std
    """
    # m = test_m[:start_min - pd.Timedelta('1min')].mean(axis=0)
    # std = test_m[:start_min - pd.Timedelta('1min')].std()
    """

    """
    # m = test_m - test_m[:start_min - pd.Timedelta('1min')].min(axis=0)
    # minmax = test_m.max(axis=0) - test_m.min(axis=0)
    # minmax[minmax == 0] = 1
    # m = m / minmax
    """

    return norm_metrics, start_min, end_min



In [None]:
def detrending(metrics: pd.DataFrame, start_min):
    """detrend normalized metrics"""
    Y = metrics[:start_min - pd.Timedelta('1min')]
    X = np.arange(len(Y)).reshape(-1, 1)
    model = LinearRegression(n_jobs=4).fit(X, Y)
    trend = model.predict(np.arange(len(metrics)).reshape(-1, 1))
    detrend_metrics = metrics - trend
    # detrend_metrics = metrics
    return detrend_metrics



In [None]:
def z_score_anomlay_detection(detrend_metrics: pd.DataFrame, start_min, end_min, y_pool, ignored_x, thd=3, percent=0.5):
    def endswith_in(col, columns):
        for c in columns:
            if c.endswith(col):
                return c
        return False

    detrend_m_zscore = detrend_metrics.loc[:, (detrend_metrics[start_min: end_min].abs() >= thd).sum(axis=0) / len(detrend_metrics[start_min: end_min]) >= percent]
    flag = -1
    y_col = endswith_in(y_pool[0], detrend_metrics.columns)
    for i, col in enumerate(y_pool):
        c = endswith_in(col, detrend_m_zscore.columns)
        if c:
            flag = i
            y_col = c
            print("{} is selected as y".format(y_col))
            break
    for col in y_pool[flag + 1:]:
        c = endswith_in(col, detrend_m_zscore.columns)
        if c:
            # print("drop {}".format(c))
            detrend_m_zscore.drop(c, axis=1, inplace=True)
    for suffix in ignored_x:
        for col in detrend_m_zscore.columns:
            # if col.endswith(suffix):
            if suffix in col:
                # print("drop {}".format(col))
                detrend_m_zscore.drop(col, axis=1, inplace=True)

    detrend_m_zscore.loc[:, y_col] = detrend_metrics[y_col]

    return detrend_m_zscore, y_col

In [None]:
y_column_pool = ['StreamingMetrics_streaming_lastCompletedBatch_processingDelay_value',
                 'StreamingMetrics_streaming_lastCompletedBatch_schedulingDelay_value',
                 'StreamingMetrics_streaming_lastCompletedBatch_totalDelay_value',

                 ]


diff_columns_contains = ["StreamingMetrics_streaming_totalProcessedRecords_value",
                         "StreamingMetrics_streaming_totalReceivedRecords_value",
                         'driver_BlockManager_memory_memUsed_MB_value'
                         ]

ignored_contains = ['Idle%', 'Sys%', 'Wait%', 'driver_LiveListenerBus', 'driver_DAGScheduler_messageProcessingTime', "CodeGenerator", "driver_DAGScheduler",
                    'StreamingMetrics_streaming_runningBatches_value',
                 'StreamingMetrics_streaming_unprocessedBatches_value',
                 'StreamingMetrics_streaming_waitingBatches_value',
                 'StreamingMetrics_streaming_retainedCompletedBatches_value',
                 'StreamingMetrics_streaming_totalCompletedBatches_value',
                    'StreamingMetrics_streaming_lastReceivedBatch_processingEndTime_value',
                     'StreamingMetrics_streaming_lastReceivedBatch_processingStartTime_value',
                    #  'driver_DAGScheduler_stage_waitingStages_value',
                    # 'driver_DAGScheduler_stage_runningStages_value',
                    'driver_BlockManager_memory_remainingMem_MB_value',
                    'driver_BlockManager_memory_remainingOnHeapMem_MB_value',
                    # 'driver_BlockManager_memory_memUsed_MB_value',
                    'driver_BlockManager_memory_onHeapMemUsed_MB_value',]

In [None]:
def evaluate_cases(cases, trace_type='all', method='BMFS', topK=5):
    save_xy = False
    results = []
    available_num = 0
    hits = 0
    rec_num = 0
    t0 = time.time()

    for i, case in enumerate(cases):

        if case['trace_type'] == 'process_failure' or case['anomaly_type'] == 'unknown' or case['anomaly_details'] == 'no_application_impact':
            continue
        if trace_type != 'all' and trace_type not in case['trace_type']:
            continue
        print("case index", i)
        print("trace_name: {}, anomaly_type: {}, anomaly_details: {}".format(case['trace_name'], case['anomaly_type'], case['anomaly_details']))
        available_num += 1

        # preprocessing
        norm_m, start_min, end_min = preprocessing(case, diff_columns_contains, pre='60min', after='5min')
        detrend_m = detrending(norm_m, start_min=start_min)
        detrend_m_zscore, y_col = z_score_anomlay_detection(detrend_m, start_min, end_min, y_column_pool, ignored_contains, thd=3, percent=0.5)

        # X, y
        print(y_col)
        X: pd.DataFrame = detrend_m_zscore.drop(y_col, axis=1)
        Y: pd.DataFrame = detrend_m_zscore[[y_col]]
        y: pd.Series = detrend_m_zscore[y_col]
        print("X shape:", X.shape)
        print("y shape:", y.shape)
        # X = X + 0.1 * np.random.randn(*X.shape)

        # save X,y data
        if save_xy:
            X.to_csv("detrend_normalized_anomaly_data/case{}_X.csv".format(i), index=True)
            Y.to_csv("detrend_normalized_anomaly_data/case{}_Y.csv".format(i), index=True)

        # if X.shape[1] < 5:
        #     beta_est = np.ones(X.shape[1])
        if method == 'BMFS':
            beta_est = bayesFS(X, y, positive=False, tol=1e-2, tol_ll=1e-3).coef_
        elif method == 'ARD':
            beta_est = my_ard_train(X, y, positive=False).coef_
        elif method == 'E-Net':
            beta_est = my_enet_train(X, y, pos=False).coef_
        elif method == 'Lasso':
            beta_est = my_lasso_train(X, y, pos=False).coef_
        elif method == 'fsMTS':
            beta_est = fsMTS_train(X, y).coef_
        else:
            raise ValueError("method should be BMFS or ard or enet or lasso or fsMTS")

        # compute attribution for columns with non-zero coefs
        mask = np.abs(beta_est) > 0
        result = sorted([(rc, coef, attribution) for rc, coef, attribution in zip(X.columns[mask], beta_est[mask],
                                                                     1 / (y.values[-1] - np.mean(y.values)) * beta_est[mask] * (X.loc[:,mask].iloc[-1] - X.loc[:start_min, mask].mean()).values) if abs(attribution) > 0], key=lambda x:abs(x[2]), reverse=True)

        # result = sorted([(rc, coef, attribution) for rc, coef, attribution in zip(X.columns[mask], beta_est[mask], beta_est[mask]) if abs(attribution) > 0], key=lambda x:abs(x[2]), reverse=True)

        results.append(result)
        result_cols = [rc for rc,_, _ in result[:topK]]
        print([(rc, att) for rc,_, att in result[:topK]])
        rec_num += len(result_cols)

        # root cause matching
        cpu_user_num = {}
        task_num = 0
        flow_num = 0
        flow_direction = 1

        for rc in result_cols:
            if 'CPU' in rc:
                node = rc.split('_')[0]
                cpu_user_num[node] = cpu_user_num.get(node, 0) + 1
            elif 'StreamingMetrics_streaming_lastReceivedBatch_records_value' in rc or 'StreamingMetrics_streaming_totalProcessedRecords_value' in rc or\
                    "StreamingMetrics_streaming_totalReceivedRecords_value" in rc or 'driver_BlockManager_memory_memUsed_MB_value' in rc:
                flow_num += 1
                if X.iloc[-1][rc] < 0:
                    flow_direction = -1
                else:
                    flow_direction = 1
            elif "executor_threadpool_activeTasks_value" in rc:
                task_num += 1

        if flow_num > 0:
            if flow_direction == 1:
                conclusion = 'bursty_input'
                print("Conclusion: bursty_input from input records")
            else:
                conclusion = 'stalled_input'
                print("Conclusion: stalled_input from input records")
        elif len(cpu_user_num)==1:
            node, count = [(key, count) for key, count in cpu_user_num.items()][0]
            conclusion = 'cpu_contention'
            print("Conclusion: cpu contention from {}".format(node))
            # if count > 1 or task_num==0:
            #     conclusion = 'cpu_contention'
            #     print("Conclusion: cpu contention from {}".format(node))
            # else:
            #     conclusion = 'bursty_input'
            #     print("Conclusion: bursty_input from active tasks and no cpu concentration")

        elif task_num > 0:
            conclusion = 'bursty_input'
            print("Conclusion: bursty_input from active tasks and no cpu concentration")

        else:
            conclusion = 'unknown'
            print("Conclusion: unknown")

        if conclusion in case['anomaly_type']:
            hits += 1
            print("hits: {}".format(conclusion))


        print()



    return hits, available_num - hits, round(hits / available_num, 4), round((time.time() - t0) / available_num, 2), round(rec_num / available_num,1)

In [None]:
def results2table(results):
    """print result in a latex table style"""
    for trace_type, result in results.items():
        print(trace_type)
        for method, perfs in result.items():
            print(f"{method}\t &{perfs[0]}\t &{perfs[1]}\t &{perfs[2]}\t &{perfs[3]}\t &{perfs[4]}")
        print()

In [None]:
results = {}
# for trace_type in ['bursty_input', 'stalled_input', 'cpu_contention']:
for trace_type in ['all']:
    result = {}
    for method in ['BMFS']:
        res = evaluate_cases(cases, trace_type=trace_type, method=method)
        result[method] = res
    results[trace_type] = result

results2table(results)

In [None]:
results2table(results)

In [None]:
results = {}
for trace_type in ['bursty_input', 'stalled_input', 'cpu_contention', 'all']:
# for trace_type in ['bursty_input']:
    result = {}
    for method in ['fsMTS']:
        res = evaluate_cases(cases, trace_type=trace_type, method=method)
        result[method] = res
    results[trace_type] = result

results2table(results)

In [None]:
results = {}
for trace_type in ['bursty_input', 'stalled_input', 'cpu_contention', 'all']:

    result = {}
    for method in ['E-Net']:
        res = evaluate_cases(cases, trace_type=trace_type, method=method)
        result[method] = res
    results[trace_type] = result
results2table(results)

In [None]:
results2table(results)

In [None]:
import pprint
pprint.pprint(results)

In [None]:
pd.DataFrame(results)

In [None]:
import json
with open('res_top5_bmfs_ranking_fsmts', 'r') as f:
    df = pd.DataFrame(json.load(f))
df