In [None]:
from pathlib import Path
from functools import reduce
import os
from matplotlib import pyplot as plt
import pandas as pd
from typing import Union, Tuple, List, Dict
import csv
import numpy as np

In [None]:
SMALL_SIZE = 14
MEDIUM_SIZE = 16
BIGGER_SIZE = 18

plt.rc('font', size=MEDIUM_SIZE)          # controls default text sizes
plt.rc('axes', titlesize=MEDIUM_SIZE)     # fontsize of the axes title
plt.rc('axes', labelsize=MEDIUM_SIZE)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=BIGGER_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize=BIGGER_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize=MEDIUM_SIZE)    # legend fontsize
plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title

In [None]:
SYSTEM = 'sockshop'
DATA_FOLDER = Path(f"../AD/anomaly_detection_models")
RESULTS_FOLDER = "statistics.csv"

AD_MODELS = ["birch", "iforest", "knn", "svm"]
AD_METRICS = ['Precision', 'Recall', 'F-Score']
RCA_METRICS = ['level_1', 'level_2', 'level_3']

SOCKSHOP_SERVICES = ['front-end', 'orders', 'carts', 'shipping', 'catalogue', 'payment', 'user']
SYSTEM_SERVICES = {'sockshop' : SOCKSHOP_SERVICES}
DESCRIPTIVE_STATISTICS_FOLDER = Path(f"Descriptive statistics")

In [None]:
# retrieve the all folder names within specified directory
def get_folder_names(directory):
  folder_names = [folder for folder in os.listdir(directory) if os.path.isdir(os.path.join(directory, folder))]
  return folder_names

In [None]:
# This only retrieves the ground truth ranges for Energy Anomalies
def get_energy_anomaly_gt_ranges(gt_file):
  gt_df = pd.read_csv(gt_file)
  gt_energy_metrics = {}
  gt_energy_metrics["normal_segments"] = {}
  gt_energy_metrics["anomalous_segments"] = {}

  for col in gt_df.columns:
    if not col.endswith("_energy_Anomaly"):
      continue
    gt_energy_metrics["normal_segments"][col] = []
    gt_energy_metrics["anomalous_segments"][col] = []

    normal_start = None
    anomalous_start = None

    for index, value in gt_df[col].items():
      if value == 0:  # value of 0 indicates normal data point
        if normal_start is None:
          normal_start = index
        else:
          continue

        if anomalous_start != None:
          gt_energy_metrics["anomalous_segments"][col].append((anomalous_start, index - 1))
          anomalous_start = None

      if value == 1:  # value of 1 indicates anomalous data point
        if anomalous_start is None:
          anomalous_start = index
        else:
          continue

        if normal_start != None:
          gt_energy_metrics["normal_segments"][col].append((normal_start, index - 1))
          normal_start = None

    if normal_start != None:
      gt_energy_metrics["normal_segments"][col].append((normal_start, len(gt_df[col])))
    elif anomalous_start != None:
      gt_energy_metrics["anomalous_segments"][col].append((anomalous_start, len(gt_df[col])))

  return gt_energy_metrics

In [None]:
def generate_model_stats(results_path, gt_ranges, ad_results):
  # Create new directory if they do not exist
  directory = os.path.dirname(results_path)
  if not os.path.exists(directory):
      os.makedirs(directory)
  print(results_path)

  # Save data to a csv file
  with open(results_path, mode='w', newline='') as csv_file:
    writer = csv.writer(csv_file)
    header = ["service_name", "true_positives", "false_positives", "false_negatives"]
    writer.writerow(header)

    for col in ad_results.columns:
      if not col.endswith("_Anomaly"):
        continue

      true_positives = 0
      false_positives = 0
      false_negatives = 0
      
      col_service = col.split("_")[0]
      gt_col = col_service + "_energy_Anomaly"
      
      
      ad_metric_col = ad_results[col]
      
      for normal_range in gt_ranges["normal_segments"][gt_col]:
        range_start = normal_range[0]
        range_end = normal_range[1] + 1
        ad_range_values = ad_metric_col[range_start:range_end]

        # if one value within the range is marked as an anomaly then we consider it as a false positive
        if not ad_range_values.all() == 0:
          false_positives += 1

      for anomalous_range in gt_ranges["anomalous_segments"][gt_col]:
        range_start = anomalous_range[0]
        range_end = anomalous_range [1] + 1
        ad_range_values = ad_metric_col[range_start:range_end]

        if ad_range_values.any() == 1:
          true_positives += 1
        else:
          false_negatives += 1

      data_row = [col, true_positives, false_positives, false_negatives]
      writer.writerow(data_row)

In [None]:
def calculate_precision(tp, fp):
  if (tp + fp) == 0:
    return 0
  return round(tp / (tp + fp),3)


def calculate_recall(tp, fn):
  if (tp + fn) == 0:
    return 0
  return round(tp / (tp + fn),3)


def calculate_fscore(precision, recall):
  if precision + recall == 0:
    return 0
  fscore = 2 * ((precision * recall) / (precision + recall))

  return round(fscore,3)

In [None]:
def generate_performance_metrics(results_folder):
  # Create new directory if they do not exist
  if not os.path.exists(results_folder):
      os.makedirs(results_folder)

  # Get a list of CSV file paths
  csv_files = [os.path.join(results_folder, file) for file in os.listdir(results_folder) if file.endswith('.csv')]
 
  for file in csv_files:
    if not 'statistics' in file:
      continue
    model = file.split('\\')[-1].split('_')[0] 
    if not model in AD_MODELS:
      continue
    df = pd.read_csv(file)
    for i in range(0,len(df)):
      df.loc[i,'precision'] = calculate_precision(df.loc[i,'true_positives'], df.loc[i,'false_positives'])
      df.loc[i,'recall'] = calculate_recall(df.loc[i,'true_positives'], df.loc[i,'false_negatives'])
      df.loc[i,'f1score'] = calculate_fscore(df.loc[i,'precision'], df.loc[i,'recall'])

    df.to_csv(file, index=False)

In [None]:
def get_stats_ad(trial_folder):
  
  for ad_model in AD_MODELS:
      ad_file = os.path.join(trial_folder, f'{ad_model}_results.csv')
      ad_results = pd.read_csv(ad_file)
      gt_ranges = get_energy_anomaly_gt_ranges(os.path.join(trial_folder, 'ground_truth.csv'))
      results_file = os.path.join(trial_folder, f'{ad_model}_{RESULTS_FOLDER}')
      generate_model_stats(results_file, gt_ranges, ad_results)
  generate_performance_metrics(trial_folder)

In [None]:
def get_stats():
  # Anomaly detection
  scenarios = get_folder_names(DATA_FOLDER)

  for scenario in scenarios:
    service_folder = os.path.join(DATA_FOLDER, scenario)
    stressors = get_folder_names(service_folder)

    for stressor in stressors:
      stressor_folder = os.path.join(service_folder, stressor)
      users = get_folder_names(stressor_folder)

      for user in users:
        user_folder = os.path.join(stressor_folder, user)
        scenarios = get_folder_names(user_folder)
        
        for scenario in scenarios:          
            scenario_folder = os.path.join(user_folder, scenario)
            time_windows = get_folder_names(scenario_folder)
            
            for time in time_windows:
              time_folder = os.path.join(scenario_folder, time)
              trials = get_folder_names(time_folder)
              
              for trial in trials:
                  trial_folder = os.path.join(time_folder, trial)
                  
                  get_stats_ad(trial_folder)
                  
                  

In [None]:
def get_overall_AD_and_RCA_data(services, ad_stats, rca_stats):
  
    for service in services:
        service_folder = os.path.join(DATA_FOLDER, service)
        stressors = get_folder_names(service_folder)
        
        for stressor in stressors:
            stressor_folder = os.path.join(service_folder, stressor)
            users = get_folder_names(stressor_folder)
            
            for user in users:
              user_folder = os.path.join(stressor_folder, user)
              scenarios = get_folder_names(user_folder)
            
              for scenario in scenarios:                  
                scenario_folder = os.path.join(user_folder, scenario)
                time_windows = get_folder_names(scenario_folder)
                
                for time in time_windows:
                  
                  if not time in rca_stats:
                    rca_stats[time] = {}
                                        
                  if not time in ad_stats:
                    ad_stats[time] = {}
                    
                  time_folder = os.path.join(scenario_folder, time)
                  trials = get_folder_names(time_folder)

                  for trial in trials:           
                    trial_folder = os.path.join(time_folder, trial)
                    
                    for file in os.listdir(trial_folder):
                      if 'statistics' not in file:
                        continue
                      
                      model_df = pd.read_csv(os.path.join(trial_folder,file))
                      model = file.split('_')[0]
                      
                      # Anomaly detection
                      if model in AD_MODELS:  
                        if not stressor in ad_stats[time]:
                          ad_stats[time][model] = {}
                            
                        precision_series =  model_df['precision'].values
                        recall_series =  model_df['recall'].values
                        f1score_series =  model_df['f1score'].values                    
                            
                        if not "Precision" in ad_stats[time][model] :
                            ad_stats[time][model] ["Precision"] = precision_series
                        else:
                            existing_precision = ad_stats[time][model] ["Precision"]
                            ad_stats[time][model] ["Precision"] = np.concatenate((existing_precision, precision_series), axis = None)
                                                  
                        if not "Recall" in ad_stats[time][model] :                    
                            ad_stats[time][model] ["Recall"] = recall_series
                        else:
                            existing_recall = ad_stats[time][model] ["Recall"]
                            ad_stats[time][model] ["Recall"] = np.concatenate((existing_recall, recall_series), axis = None)
                            
                        if not "F-Score" in ad_stats[time][model] :
                            ad_stats[time][model] ["F-Score"] = f1score_series
                        else:
                            existing_f1score = ad_stats[time][model] ["F-Score"]
                            ad_stats[time][model] ["F-Score"] = np.concatenate((existing_f1score, f1score_series), axis = None)
                    
    return ad_stats