In [2]:
import os
import pandas as pd
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns

In [10]:
# Path to your dataset folder
dataset_folder = "./dataset/A4Benchmark"

# Functions for anomaly detection and evaluation
def sliding_window_std(df, column_name, window_size, threshold):
    results = []
    problem_vals = []  # List to store values that exceed the threshold

    for i in range(len(df)):
        # Calculate the window boundaries
        start_index = max(0, i - window_size // 2)
        end_index = min(len(df), i + window_size // 2 + 1)
        
        # Get the numbers within the window
        window = df[column_name].iloc[start_index:end_index]
        
        # Calculate the standard deviation of the window
        std_dev = window.std()
        
        # Append the result to the results list
        # result list conations 3 things (number, window,std of the window)
        results.append((i, df.iloc[i][column_name], window.tolist(), std_dev))
        # print(results[i],'\n')

        if len(results) <= 2:
            # print("Skipping iteration#", i)
            # print("##################################################")
            continue
            
        if len(results) >= 3:
            # print("val1:",results[-3][3])
            # print("val2:",results[-2][3])
            # print("current std:",std_dev)
            # print("current iteration:", i)
            # print("##################################################")

            avg_prev_results = (results[-2][3] + results[-3][3]) / 2
            threshold_value = avg_prev_results + threshold

            if std_dev > threshold_value:
                problem_vals.append((i - window_size // 2, window.iloc[-1]))

            # print('Problem list:', problem_vals)
    return results, problem_vals

def actual_anomaly(df):
    anomaly_rows = []

    # Iterate over each row in the DataFrame
    for i, row in df.iterrows():
        if row['anomaly'] == 1:
            # Append a tuple containing the row number and the value from the 'anomaly' column to the result list
            anomaly_rows.append((i, row['value']))
    # print(anomaly_rows)
    return anomaly_rows

def find_normal_values(original_lst, anomaly_lst):
    normal_values = []
    for num in original_lst:
        is_anomaly = False
        for win_num, value in anomaly_lst:
            if num == value:
                is_anomaly = True
                break
        if not is_anomaly:
            normal_values.append(num)
    return normal_values

def finding_row_number(original_lst, anomaly_lst):
    rows = []
    i = 1
    for num in original_lst:
        i +=1
        for win_num, value in anomaly_lst:
            if num == value:
                rows.append((i,num))
    return rows

def categorize_points(original_lst, anomaly_lst):
    normal = []
    seasonal = []
    anomalies = []

    for i, num in enumerate(original_lst):
        if num not in (value for _, value in anomaly_lst):
            normal.append((i, num))
        elif i % 12 == 0 or i % 168 ==0:
            seasonal.append((i, num))
        else:
            anomalies.append((i, num))

    return normal, seasonal, anomalies

def evaluate_anomaly_detection(true_labels, predicted_labels):
    # Generate confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)

    # Extract true positives (TP) and true negatives (TN)
    TP = cm[1, 1]  # Actual positive (1) and predicted positive (1)
    TN = cm[0, 0]  # Actual negative (0) and predicted negative (0)
    FP = cm[0, 1]  # Actual negative (0) but predicted positive (1)
    FN = cm[1, 0]  # Actual positive (1) but predicted negative (0)

    # # Output true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN)
    # print("True Positives (TP):", TP)
    # print("True Negatives (TN):", TN)
    # print("False Positives (FP):", FP)
    # print("False Negatives (FN):", FN)

    # # Generate classification report
    # report = classification_report(true_labels, predicted_labels)
    # print("\nClassification Report:\n", report)

    # Calculate Accuracy
    accuracy = (TP + TN) / (TP + TN + FP + FN)

    # Calculate Precision
    precision = TP / (TP + FP)

    # Calculate Recall
    recall = TP / (TP + FN)

    # Calculate F1-score
    f1_score = 2 * (precision * recall) / (precision + recall)

    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1-score:", f1_score)

    return accuracy

# Function to apply your model and calculate accuracy
def apply_model_and_calculate_accuracy(df, column_name, window_size, threshold):
    std_devs, anomalies = sliding_window_std(df, column_name, window_size, threshold)
    actual = actual_anomaly(df)
    normal_values = find_normal_values(df[column_name], anomalies)
    
    anomaly_row_list = finding_row_number(df[column_name], anomalies)
    normal_list, seasonal_anomalies, other_anomalies = categorize_points(df['value'], anomaly_row_list)
    
    # Initialize and mark predicted anomalies
    df['predicted_anomalies'] = 0
    for anomaly_index, _ in other_anomalies:
        df.at[anomaly_index, 'predicted_anomalies'] = 1
    
    # Evaluate anomaly detection and return accuracy
    accuracy = evaluate_anomaly_detection(df['anomaly'], df['predicted_anomalies'])
    return accuracy

# Dictionary to store accuracy results
accuracy_results = {}

# Specify the column name containing the data
column_name = 'value'

window_size = 16
threshold = 19  # Adjust threshold as needed

# Iterate through each file in the directory
for filename in sorted(os.listdir(dataset_folder)):
    if filename.endswith(".csv"):
        file_path = os.path.join(dataset_folder, filename)
        # Load CSV file
        df = pd.read_csv(file_path)
        print(f"File: {filename}")
        # Apply your model and calculate accuracy
        accuracy = apply_model_and_calculate_accuracy(df, column_name, window_size, threshold)
        print("#################################################")
        # Store accuracy result
        accuracy_results[filename] = accuracy

# # Print accuracy results
# for filename, accuracy in accuracy_results.items():
#     print(f"File: {filename}, Accuracy: {accuracy}")

File: A4Benchmark-TS1.csv
Accuracy: 0.9940476190476191
Precision: 0.8
Recall: 0.3076923076923077
F1-score: 0.4444444444444444
#################################################
File: A4Benchmark-TS10.csv
Accuracy: 0.8636904761904762
Precision: 0.012987012987012988
Recall: 0.75
F1-score: 0.02553191489361702
#################################################
File: A4Benchmark-TS100.csv
Accuracy: 0.9940476190476191
Precision: 0.2
Recall: 0.14285714285714285
F1-score: 0.16666666666666666
#################################################
File: A4Benchmark-TS11.csv
Accuracy: 0.9761904761904762
Precision: 0.09302325581395349
Recall: 0.8
F1-score: 0.16666666666666666
#################################################
File: A4Benchmark-TS12.csv
Accuracy: 0.9851190476190477
Precision: 0.08333333333333333
Recall: 0.4
F1-score: 0.13793103448275862
#################################################
File: A4Benchmark-TS13.csv
Accuracy: 0.8005952380952381
Precision: 0.03206997084548105
Recall: 0.78571428

  f1_score = 2 * (precision * recall) / (precision + recall)


Accuracy: 0.9160714285714285
Precision: 0.0
Recall: 0.0
F1-score: nan
#################################################
File: A4Benchmark-TS17.csv
Accuracy: 0.9363095238095238
Precision: 0.04672897196261682
Recall: 0.5
F1-score: 0.08547008547008547
#################################################
File: A4Benchmark-TS18.csv


  f1_score = 2 * (precision * recall) / (precision + recall)


Accuracy: 0.9803571428571428
Precision: 0.0
Recall: 0.0
F1-score: nan
#################################################
File: A4Benchmark-TS19.csv
Accuracy: 0.8232142857142857
Precision: 0.023411371237458192
Recall: 0.5833333333333334
F1-score: 0.045016077170418
#################################################
File: A4Benchmark-TS2.csv
Accuracy: 0.9982142857142857
Precision: 1.0
Recall: 0.4
F1-score: 0.5714285714285715
#################################################
File: A4Benchmark-TS20.csv
Accuracy: 0.7738095238095238
Precision: 0.025839793281653745
Recall: 0.7692307692307693
F1-score: 0.049999999999999996
#################################################
File: A4Benchmark-TS21.csv
Accuracy: 0.993452380952381
Precision: 1.0
Recall: 0.15384615384615385
F1-score: 0.2666666666666667
#################################################
File: A4Benchmark-TS22.csv
Accuracy: 0.9029761904761905
Precision: 0.012738853503184714
Recall: 0.2
F1-score: 0.023952095808383235
######################

  f1_score = 2 * (precision * recall) / (precision + recall)


Accuracy: 0.944047619047619
Precision: 0.0
Recall: 0.0
F1-score: nan
#################################################
File: A4Benchmark-TS6.csv
Accuracy: 0.9494047619047619
Precision: 0.03614457831325301
Recall: 0.375
F1-score: 0.06593406593406592
#################################################
File: A4Benchmark-TS60.csv
Accuracy: 0.9738095238095238
Precision: 0.023255813953488372
Recall: 0.3333333333333333
F1-score: 0.04347826086956522
#################################################
File: A4Benchmark-TS61.csv
Accuracy: 0.9345238095238095
Precision: 0.009174311926605505
Recall: 0.3333333333333333
F1-score: 0.01785714285714286
#################################################
File: A4Benchmark-TS62.csv
Accuracy: 0.9
Precision: 0.03048780487804878
Recall: 0.35714285714285715
F1-score: 0.05617977528089888
#################################################
File: A4Benchmark-TS63.csv
Accuracy: 0.9958333333333333
Precision: nan
Recall: 0.0
F1-score: nan
##################################

  precision = TP / (TP + FP)


Accuracy: 0.9976190476190476
Precision: 1.0
Recall: 0.2
F1-score: 0.33333333333333337
#################################################
File: A4Benchmark-TS65.csv
Accuracy: 0.8244047619047619
Precision: 0.030201342281879196
Recall: 0.6
F1-score: 0.05750798722044729
#################################################
File: A4Benchmark-TS66.csv
Accuracy: 0.9994047619047619
Precision: nan
Recall: 0.0
F1-score: nan
#################################################
File: A4Benchmark-TS67.csv


  precision = TP / (TP + FP)


Accuracy: 0.8666666666666667
Precision: 0.043478260869565216
Recall: 0.7142857142857143
F1-score: 0.08196721311475409
#################################################
File: A4Benchmark-TS68.csv
Accuracy: 0.906547619047619
Precision: 0.006329113924050633
Recall: 1.0
F1-score: 0.012578616352201257
#################################################
File: A4Benchmark-TS69.csv
Accuracy: 0.8422619047619048
Precision: 0.0037735849056603774
Recall: 0.5
F1-score: 0.00749063670411985
#################################################
File: A4Benchmark-TS7.csv
Accuracy: 0.9625
Precision: 0.13432835820895522
Recall: 0.6428571428571429
F1-score: 0.2222222222222222
#################################################
File: A4Benchmark-TS70.csv
Accuracy: 0.9369047619047619
Precision: 0.018867924528301886
Recall: 0.5
F1-score: 0.03636363636363636
#################################################
File: A4Benchmark-TS71.csv
Accuracy: 0.9029761904761905
Precision: 0.03067484662576687
Recall: 0.5
F1-score: 0.

In [3]:
# Path to your dataset folder
dataset_folder = "./A4Benchmark/"

# Functions for anomaly detection and evaluation
def sliding_window_std(df, column_name, window_size, threshold):
    results = []
    problem_vals = []  # List to store values that exceed the threshold

    for i in range(len(df)):
        # Calculate the window boundaries
        start_index = max(0, i - window_size // 2)
        end_index = min(len(df), i + window_size // 2 + 1)
        
        # Get the numbers within the window
        window = df[column_name].iloc[start_index:end_index]
        
        # Calculate the standard deviation of the window
        std_dev = window.std()
        
        # Append the result to the results list
        # result list conations 3 things (number, window,std of the window)
        results.append((i, df.iloc[i][column_name], window.tolist(), std_dev))
        # print(results[i],'\n')

        if len(results) <= 2:
            # print("Skipping iteration#", i)
            # print("##################################################")
            continue
            
        if len(results) >= 3:
            # print("val1:",results[-3][3])
            # print("val2:",results[-2][3])
            # print("current std:",std_dev)
            # print("current iteration:", i)
            # print("##################################################")

            avg_prev_results = (results[-2][3] + results[-3][3]) / 2
            threshold_value = avg_prev_results + threshold

            if std_dev > threshold_value:
                problem_vals.append((i - window_size // 2, window.iloc[-1]))

            # print('Problem list:', problem_vals)
    return results, problem_vals

def actual_anomaly(df):
    anomaly_rows = []

    # Iterate over each row in the DataFrame
    for i, row in df.iterrows():
        if row['anomaly'] == 1:
            # Append a tuple containing the row number and the value from the 'anomaly' column to the result list
            anomaly_rows.append((i, row['value']))
    # print(anomaly_rows)
    return anomaly_rows

def find_normal_values(original_lst, anomaly_lst):
    normal_values = []
    for num in original_lst:
        is_anomaly = False
        for win_num, value in anomaly_lst:
            if num == value:
                is_anomaly = True
                break
        if not is_anomaly:
            normal_values.append(num)
    return normal_values

def finding_row_number(original_lst, anomaly_lst):
    rows = []
    i = 1
    for num in original_lst:
        i +=1
        for win_num, value in anomaly_lst:
            if num == value:
                rows.append((i,num))
    return rows

def categorize_points(original_lst, anomaly_lst):
    normal = []
    seasonal = []
    anomalies = []

    for i, num in enumerate(original_lst):
        if num not in (value for _, value in anomaly_lst):
            normal.append((i, num))
        elif i % 12 == 0 or i % 168 ==0:
            seasonal.append((i, num))
        else:
            anomalies.append((i, num))

    return normal, seasonal, anomalies

def evaluate_anomaly_detection(true_labels, predicted_labels):
    # Generate confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)

    # Extract true positives (TP) and true negatives (TN)
    TP = cm[1, 1]  # Actual positive (1) and predicted positive (1)
    TN = cm[0, 0]  # Actual negative (0) and predicted negative (0)
    FP = cm[0, 1]  # Actual negative (0) but predicted positive (1)
    FN = cm[1, 0]  # Actual positive (1) but predicted negative (0)

    # # Output true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN)
    # print("True Positives (TP):", TP)
    # print("True Negatives (TN):", TN)
    # print("False Positives (FP):", FP)
    # print("False Negatives (FN):", FN)

    # # Generate classification report
    # report = classification_report(true_labels, predicted_labels)
    # print("\nClassification Report:\n", report)

    # Calculate Accuracy
    accuracy = (TP + TN) / (TP + TN + FP + FN)

    # Calculate Precision
    # precision = TP / (TP + FP)

    # # Calculate Recall
    # recall = TP / (TP + FN)

    # # Calculate F1-score
    # f1_score = 2 * (precision * recall) / (precision + recall)

    # print("Accuracy:", accuracy)
    # print("Precision:", precision)
    # print("Recall:", recall)
    # print("F1-score:", f1_score)

    return accuracy

# Function to apply your model and calculate accuracy
def apply_model_and_calculate_accuracy(df, column_name, window_size, threshold):
    std_devs, anomalies = sliding_window_std(df, column_name, window_size, threshold)
    actual = actual_anomaly(df)
    normal_values = find_normal_values(df[column_name], anomalies)
    
    anomaly_row_list = finding_row_number(df[column_name], anomalies)
    normal_list, seasonal_anomalies, other_anomalies = categorize_points(df['value'], anomaly_row_list)
    
    # Initialize and mark predicted anomalies
    df['predicted_anomalies'] = 0
    for anomaly_index, _ in other_anomalies:
        df.at[anomaly_index, 'predicted_anomalies'] = 1
    
    # Evaluate anomaly detection and return accuracy
    accuracy = evaluate_anomaly_detection(df['anomaly'], df['predicted_anomalies'])
    return accuracy

# Dictionary to store accuracy results
accuracy_results = {}

# Specify the column name containing the data
column_name = 'value'

def grid_search(df, column_name):
    best_accuracy = 0
    best_window_size = 0
    best_threshold = 0
    
    # Define the range of values to search over
    window_sizes = range(3, 21)  # Adjust range as needed
    thresholds = range(5, 21)  # Adjust range as needed
    
    # Iterate over all combinations of window sizes and thresholds
    for window_size in window_sizes:
        for threshold in thresholds:
            # Apply your model and calculate accuracy
            accuracy = apply_model_and_calculate_accuracy(df, column_name, window_size, threshold)
            
            # Update best parameters if accuracy is improved
            if accuracy > best_accuracy:
                best_accuracy = accuracy
                best_window_size = window_size
                best_threshold = threshold
    
    return best_window_size, best_threshold, best_accuracy

# Iterate through each file in the directory
for filename in sorted(os.listdir(dataset_folder)):
    if filename.endswith(".csv"):
        file_path = os.path.join(dataset_folder, filename)
        # Load CSV file
        df = pd.read_csv(file_path)
        
        # Perform grid search to find the best window size and threshold
        best_window_size, best_threshold, best_accuracy = grid_search(df, column_name)
        
        # Print the best parameters and accuracy
        print(f"File: {filename}, Best Window Size: {best_window_size}, Best Threshold: {best_threshold}, Accuracy: {best_accuracy}")

# Print accuracy results
for filename, accuracy in accuracy_results.items():
    print(f"File: {filename}, Accuracy: {accuracy}")

File: Test_TS1.csv, Best Window Size: 20, Best Threshold: 20, Accuracy: 0.9665970772442589
File: Test_TS12.csv, Best Window Size: 16, Best Threshold: 20, Accuracy: 0.8559498956158664
File: Test_TS2.csv, Best Window Size: 20, Best Threshold: 20, Accuracy: 0.9102296450939458
File: Test_TS7.csv, Best Window Size: 20, Best Threshold: 20, Accuracy: 0.9853862212943633
File: Test_TS8.csv, Best Window Size: 16, Best Threshold: 20, Accuracy: 0.8058455114822547
File: Train_TS1.csv, Best Window Size: 20, Best Threshold: 20, Accuracy: 0.9633638634471274
File: Train_TS12.csv, Best Window Size: 16, Best Threshold: 20, Accuracy: 0.9342214820982515
File: Train_TS2.csv, Best Window Size: 20, Best Threshold: 20, Accuracy: 0.9034138218151541
File: Train_TS7.csv, Best Window Size: 16, Best Threshold: 20, Accuracy: 0.8376353039134055
File: Train_TS8.csv, Best Window Size: 20, Best Threshold: 20, Accuracy: 0.8726061615320566


# 2 way

In [12]:
# Path to your dataset folder
dataset_folder = "./dataset/A4Benchmark"

# Functions for anomaly detection and evaluation
def sliding_window_std(df, column_name, window_size, threshold):
    results = []
    problem_vals = []  # List to store values that exceed the threshold

    for i in range(len(df)):
        # Calculate the window boundaries
        start_index = max(0, i - window_size // 2)
        end_index = min(len(df), i + window_size // 2 + 1)
        
        # Get the numbers within the window
        window = df[column_name].iloc[start_index:end_index]
        
        # Calculate the standard deviation of the window
        std_dev = window.std()
        
        # Append the result to the results list
        # result list conations 3 things (number, window,std of the window)
        results.append((i, df.iloc[i][column_name], window.tolist(), std_dev))
        # print(results[i],'\n')

        if len(results) <= 2:
            # print("Skipping iteration#", i)
            # print("##################################################")
            continue
            
        if len(results) >= 3:
            # print("val1:",results[-3][3])
            # print("val2:",results[-2][3])
            # print("current std:",std_dev)
            # print("current iteration:", i)
            # print("##################################################")

            avg_prev_results = (results[-2][3] + results[-3][3]) / 2
            threshold_value = avg_prev_results + threshold

            if std_dev > threshold_value:
                problem_vals.append((i - window_size // 2, window.iloc[-1]))

            # print('Problem list:', problem_vals)
    return results, problem_vals

def actual_anomaly(df):
    anomaly_rows = []

    # Iterate over each row in the DataFrame
    for i, row in df.iterrows():
        if row['anomaly'] == 1:
            # Append a tuple containing the row number and the value from the 'anomaly' column to the result list
            anomaly_rows.append((i, row['value']))
    # print(anomaly_rows)
    return anomaly_rows

def find_normal_values(original_lst, anomaly_lst):
    normal_values = []
    for num in original_lst:
        is_anomaly = False
        for win_num, value in anomaly_lst:
            if num == value:
                is_anomaly = True
                break
        if not is_anomaly:
            normal_values.append(num)
    return normal_values

def finding_row_number(original_lst, anomaly_lst):
    rows = []
    i = 1
    for num in original_lst:
        i +=1
        for win_num, value in anomaly_lst:
            if num == value:
                rows.append((i,num))
    return rows

def categorize_points(original_lst, anomaly_lst):
    normal = []
    seasonal = []
    anomalies = []

    for i, num in enumerate(original_lst):
        if num not in (value for _, value in anomaly_lst):
            normal.append((i, num))
        else:
            anomalies.append((i, num))

    return normal, anomalies

def evaluate_anomaly_detection(true_labels, predicted_labels):
    # Generate confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)

    # Extract true positives (TP) and true negatives (TN)
    TP = cm[1, 1]  # Actual positive (1) and predicted positive (1)
    TN = cm[0, 0]  # Actual negative (0) and predicted negative (0)
    FP = cm[0, 1]  # Actual negative (0) but predicted positive (1)
    FN = cm[1, 0]  # Actual positive (1) but predicted negative (0)

    # Output true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN)
    print("True Positives (TP):", TP)
    print("True Negatives (TN):", TN)
    print("False Positives (FP):", FP)
    print("False Negatives (FN):", FN)

    # # Generate classification report
    # report = classification_report(true_labels, predicted_labels)
    # print("\nClassification Report:\n", report)

    # Calculate Accuracy
    accuracy = (TP + TN) / (TP + TN + FP + FN)

    # Calculate Precision
    precision = TP / (TP + FP)

    # Calculate Recall
    recall = TP / (TP + FN)

    # Calculate F1-score
    f1_score = 2 * (precision * recall) / (precision + recall)

    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1-score:", f1_score)

    return accuracy

# Function to apply your model and calculate accuracy
def apply_model_and_calculate_accuracy(df, column_name, window_size, threshold):
    std_devs, anomalies = sliding_window_std(df, column_name, window_size, threshold)
    actual = actual_anomaly(df)
    normal_values = find_normal_values(df[column_name], anomalies)
    
    anomaly_row_list = finding_row_number(df[column_name], anomalies)
    normal_list, other_anomalies = categorize_points(df['value'], anomaly_row_list)
    
    # Initialize and mark predicted anomalies
    df['predicted_anomalies'] = 0
    for anomaly_index, _ in other_anomalies:
        df.at[anomaly_index, 'predicted_anomalies'] = 1
    
    # Evaluate anomaly detection and return accuracy
    accuracy = evaluate_anomaly_detection(df['anomaly'], df['predicted_anomalies'])
    return accuracy

# Dictionary to store accuracy results
accuracy_results = {}

# Specify the column name containing the data
column_name = 'value'

window_size = 5
threshold = 10  # Adjust threshold as needed

# Iterate through each file in the directory
for filename in sorted(os.listdir(dataset_folder)):
    if filename.endswith(".csv"):
        file_path = os.path.join(dataset_folder, filename)
        # Load CSV file
        df = pd.read_csv(file_path)
        print(f"File: {filename}")
        # Apply your model and calculate accuracy
        accuracy = apply_model_and_calculate_accuracy(df, column_name, window_size, threshold)
        print("#################################################")
        # Store accuracy result
        accuracy_results[filename] = accuracy

# # Print accuracy results
# for filename, accuracy in accuracy_results.items():
#     print(f"File: {filename}, Accuracy: {accuracy}")

File: A4Benchmark-TS1.csv
True Positives (TP): 11
True Negatives (TN): 1289
False Positives (FP): 378
False Negatives (FN): 2
Accuracy: 0.7738095238095238
Precision: 0.028277634961439587
Recall: 0.8461538461538461
F1-score: 0.05472636815920398
#################################################
File: A4Benchmark-TS10.csv
True Positives (TP): 4
True Negatives (TN): 990
False Positives (FP): 686
False Negatives (FN): 0
Accuracy: 0.5916666666666667
Precision: 0.005797101449275362
Recall: 1.0
F1-score: 0.011527377521613834
#################################################
File: A4Benchmark-TS100.csv
True Positives (TP): 6
True Negatives (TN): 1112
False Positives (FP): 561
False Negatives (FN): 1
Accuracy: 0.6654761904761904
Precision: 0.010582010582010581
Recall: 0.8571428571428571
F1-score: 0.020905923344947733
#################################################
File: A4Benchmark-TS11.csv
True Positives (TP): 5
True Negatives (TN): 1069
False Positives (FP): 606
False Negatives (FN): 0
Accur

# Another test with some modifications

In [9]:
# Path to your dataset folder
dataset_folder = "./A4Benchmark"

# Functions for anomaly detection and evaluation
def sliding_window_std(df, column_name, window_size, threshold):
    results = []
    problem_vals = []  # List to store values that exceed the threshold

    for i in range(len(df)):
        # Calculate the window boundaries
        start_index = max(0, i - window_size // 2)
        end_index = min(len(df), i + window_size // 2 + 1)
        
        # Get the numbers within the window
        window = df[column_name].iloc[start_index:end_index]
        
        # Calculate the standard deviation of the window
        std_dev = window.std()
        
        # Append the result to the results list
        # result list conations 3 things (number, window,std of the window)
        results.append((i, df.iloc[i][column_name], window.tolist(), std_dev))
        # print(results[i],'\n')

        if len(results) <= 2:
            # print("Skipping iteration#", i)
            # print("##################################################")
            continue
            
        if len(results) >= 3:
            # print("val1:",results[-3][3])
            # print("val2:",results[-2][3])
            # print("current std:",std_dev)
            # print("current iteration:", i)
            # print("##################################################")

            avg_prev_results = (results[-2][3] + results[-3][3]) / 2
            threshold_value = avg_prev_results + threshold

            if std_dev > threshold_value:
                problem_vals.append((i - window_size // 2, window.iloc[-1]))

            # print('Problem list:', problem_vals)
    return results, problem_vals

def actual_anomaly(df):
    anomaly_rows = []

    # Iterate over each row in the DataFrame
    for i, row in df.iterrows():
        if row['anomaly'] == 1:
            # Append a tuple containing the row number and the value from the 'anomaly' column to the result list
            anomaly_rows.append((i, row['value']))
    # print(anomaly_rows)
    return anomaly_rows

def find_normal_values(original_lst, anomaly_lst):
    normal_values = []
    for num in original_lst:
        is_anomaly = False
        for win_num, value in anomaly_lst:
            if num == value:
                is_anomaly = True
                break
        if not is_anomaly:
            normal_values.append(num)
    return normal_values

def finding_row_number(original_lst, anomaly_lst):
    rows = []
    i = 1
    for num in original_lst:
        i +=1
        for win_num, value in anomaly_lst:
            if num == value:
                rows.append((i,num))
    return rows

def categorize_points(original_lst, anomaly_lst):
    normal = []
    seasonal = []
    anomalies = []

    for i, num in enumerate(original_lst):
        if num not in (value for _, value in anomaly_lst):
            normal.append((i, num))
        elif i % 12 == 0 or i % 168 ==0:
            seasonal.append((i, num))
        else:
            anomalies.append((i, num))

    return normal, seasonal, anomalies

def evaluate_anomaly_detection(true_labels, predicted_labels):
    # Generate confusion matrix
    cm = confusion_matrix(true_labels, predicted_labels)

    # Extract true positives (TP) and true negatives (TN)
    TP = cm[1, 1]  # Actual positive (1) and predicted positive (1)
    TN = cm[0, 0]  # Actual negative (0) and predicted negative (0)
    FP = cm[0, 1]  # Actual negative (0) but predicted positive (1)
    FN = cm[1, 0]  # Actual positive (1) but predicted negative (0)

    # # Output true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN)
    # print("True Positives (TP):", TP)
    # print("True Negatives (TN):", TN)
    # print("False Positives (FP):", FP)
    # print("False Negatives (FN):", FN)

    # # Generate classification report
    # report = classification_report(true_labels, predicted_labels)
    # print("\nClassification Report:\n", report)

    # Calculate Accuracy
    accuracy = (TP + TN) / (TP + TN + FP + FN)

    # Calculate Precision
    precision = TP / (TP + FP)

    # Calculate Recall
    recall = TP / (TP + FN)

    # Calculate F1-score
    f1_score = 2 * (precision * recall) / (precision + recall)

    # print("Accuracy:", accuracy)
    # print("Precision:", precision)
    # print("Recall:", recall)
    # print("F1-score:", f1_score)

    return precision

# Function to apply your model and calculate accuracy
def apply_model_and_calculate_accuracy(df, column_name, window_size, threshold):
    std_devs, anomalies = sliding_window_std(df, column_name, window_size, threshold)
    actual = actual_anomaly(df)
    normal_values = find_normal_values(df[column_name], anomalies)
    
    anomaly_row_list = finding_row_number(df[column_name], anomalies)
    normal_list, seasonal_anomalies, other_anomalies = categorize_points(df['value'], anomaly_row_list)
    
    # Initialize and mark predicted anomalies
    df['predicted_anomalies'] = 0
    for anomaly_index, _ in other_anomalies:
        df.at[anomaly_index, 'predicted_anomalies'] = 1
    
    # Evaluate anomaly detection and return accuracy
    precision = evaluate_anomaly_detection(df['anomaly'], df['predicted_anomalies'])
    return precision

# Dictionary to store accuracy results
accuracy_results = {}

# Specify the column name containing the data
column_name = 'value'

def grid_search(df, column_name):
    best_precision = 0
    best_window_size = 0
    best_threshold = 0
    
    # Define the range of values to search over
    window_sizes = range(3, 21)  # Adjust range as needed
    thresholds = range(5, 21)  # Adjust range as needed
    
    # Iterate over all combinations of window sizes and thresholds
    for window_size in window_sizes:
        for threshold in thresholds:
            # Apply your model and calculate accuracy
            precision = apply_model_and_calculate_accuracy(df, column_name, window_size, threshold)
            
            # Update best parameters if accuracy is improved
            if precision > best_precision:
                best_precision = precision
                best_window_size = window_size
                best_threshold = threshold
    
    return best_window_size, best_threshold, best_precision

# Iterate through each file in the directory
for filename in sorted(os.listdir(dataset_folder)):
    if filename.endswith(".csv"):
        file_path = os.path.join(dataset_folder, filename)
        # Load CSV file
        df = pd.read_csv(file_path)
        
        # Perform grid search to find the best window size and threshold
        best_window_size, best_threshold, best_precision = grid_search(df, column_name)
        
        # Print the best parameters and accuracy
        print(f"File: {filename}, Best Window Size: {best_window_size}, Best Threshold: {best_threshold}, Precision: {best_precision}")

# Print accuracy results
for filename, accuracy in accuracy_results.items():
    print(f"File: {filename}, Precision: {precision}")

File: A4Benchmark-TS1.csv, Best Window Size: 16, Best Threshold: 19, Precision: 0.8
File: A4Benchmark-TS10.csv, Best Window Size: 20, Best Threshold: 20, Precision: 0.017857142857142856
File: A4Benchmark-TS2.csv, Best Window Size: 12, Best Threshold: 17, Precision: 1.0


KeyboardInterrupt: 