#Load data

In [1]:
import os
import pandas as pd
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
def load_from_gdrive(data_dir):
  # Initialize an empty list to store the DataFrames
  data_frames = []

  # Loop through each .csv file in the directory
  for dir in data_dir:
    for file_name in os.listdir(dir):
        if file_name.endswith('.csv'):
            file_path = os.path.join(dir, file_name)

            # Load the .csv file into a DataFrame
            df = pd.read_csv(file_path)

            # Remove the first column
            df = df.iloc[:, 4:]

            df  = df.div(360)

            df  = df.clip(upper =1,lower = -1)


            # Append the DataFrame to the list
            data_frames.append(df)

  # Concatenate the DataFrames into a single DataFrame

  return data_frames

In [5]:

# Define the directory path where the .csv files are located in Google Drive

data_dir = ['/content/drive/MyDrive/28 08 mod/Normal']

combined_df =  load_from_gdrive(data_dir)

from sklearn.model_selection import train_test_split
# Split the data into training and testing sets
train_data, test_data = train_test_split(combined_df, test_size=0.4)# 60 40


In [None]:
data_dir = ['/content/drive/MyDrive/28 08 mod/Situations and scenarios/easy']

SaS_easy =  load_from_gdrive(data_dir)

In [None]:
data_dir = ['/content/drive/MyDrive/28 08 mod/Physical Activites/easy']

PH_AC_easy =  load_from_gdrive(data_dir)

In [None]:
data_dir = ['/content/drive/MyDrive/28 08 mod/Interaction with enviroment/easy']

IwE_easy =  load_from_gdrive(data_dir)

data_dir = ['/content/drive/MyDrive/28 08 mod/Interaction with enviroment/hard']

IwE_hard =  load_from_gdrive(data_dir)

In [None]:
data_dir = ['/content/drive/MyDrive/28 08 mod/Locomotion/easy']

Loc_easy =  load_from_gdrive(data_dir)

data_dir = ['/content/drive/MyDrive/28 08 mod/Locomotion/hard']

Loc_hard =  load_from_gdrive(data_dir)

data_dir = ['/content/drive/MyDrive/28 08 mod/Locomotion/v hard']

Loc_Vhard =  load_from_gdrive(data_dir)

In [None]:
data_dir = ['/content/drive/MyDrive/28 08 mod/Human Interaction/easy']

HI_easy =  load_from_gdrive(data_dir)

data_dir = ['/content/drive/MyDrive/28 08 mod/Human Interaction/hard']

HI_hard =  load_from_gdrive(data_dir)

#VAE

In [6]:
train_data_comb = pd.concat(train_data, axis=0, ignore_index=True)

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from torch.optim.lr_scheduler import StepLR
from sklearn.model_selection import train_test_split

class VAEAnomalyTabular(nn.Module):
    def __init__(self, input_size, latent_size, dropout_rate=0.4, l1_weight=0.000):
        super(VAEAnomalyTabular, self).__init__()
        self.encoder = self.make_encoder(input_size, latent_size, dropout_rate)
        self.decoder = self.make_decoder(latent_size, input_size, dropout_rate)
        self.l1_weight = l1_weight

    def make_encoder(self, input_size, latent_size, dropout_rate):
        return nn.Sequential(
            nn.Linear(input_size, 512),
            nn.LeakyReLU(),  # LeakyReLU activation
            nn.Dropout(dropout_rate),  # Dropout layer
            nn.Linear(512, 256),
            nn.LeakyReLU(),  # LeakyReLU activation
            nn.Dropout(dropout_rate),  # Dropout layer
            nn.Linear(256, latent_size * 2)
        )

    def make_decoder(self, latent_size, output_size, dropout_rate):
        return nn.Sequential(
            nn.Linear(latent_size, 256),
            nn.LeakyReLU(),  # LeakyReLU activation
            nn.Dropout(dropout_rate),  # Dropout layer
            nn.Linear(256, 512),
            nn.LeakyReLU(),  # LeakyReLU activation
            nn.Dropout(dropout_rate),  # Dropout layer
            nn.Linear(512, output_size)
        )
    def reparameterize(self, mu, logvar):
          std = torch.exp(0.5 * logvar)
          eps = torch.randn_like(std)
          return mu + eps * std

    def forward(self, x):
        latent_params = self.encoder(x)
        latent_params = latent_params.view(-1, latent_size * 2)
        mu = latent_params[:, :latent_size]
        logvar = latent_params[:, latent_size:]
        z = self.reparameterize(mu, logvar)
        reconstruction = self.decoder(z)
        return reconstruction, mu, logvar

    def train_vae(model, train_loader, optimizer, criterion, device):
      model.train()
      train_loss = 0
      for batch in train_loader:
          batch = batch.to(device)
          optimizer.zero_grad()
          reconstruction, mu, logvar = model(batch)
          loss = criterion(reconstruction, batch, mu, logvar)
          loss.backward()
          optimizer.step()
          train_loss += loss.item()
      return train_loss / len(train_loader)
    def vae_loss(reconstruction, x, mu, logvar):
      recon_loss = nn.functional.mse_loss(reconstruction, x, reduction='sum')
      kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
      return recon_loss + kl_divergence

batch_size = 32
input_size= 59
latent_size= 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

vae = VAEAnomalyTabular(input_size, latent_size)

In [8]:
# Load pre-trained weights
checkpoint = torch.load('/content/drive/MyDrive/Saved models/vae_model_best1.pth')
vae.load_state_dict(checkpoint, strict=False)


# Set the model to evaluation mode
vae.eval().to(device)

VAEAnomalyTabular(
  (encoder): Sequential(
    (0): Linear(in_features=59, out_features=512, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.4, inplace=False)
    (3): Linear(in_features=512, out_features=256, bias=True)
    (4): LeakyReLU(negative_slope=0.01)
    (5): Dropout(p=0.4, inplace=False)
    (6): Linear(in_features=256, out_features=128, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=64, out_features=256, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.4, inplace=False)
    (3): Linear(in_features=256, out_features=512, bias=True)
    (4): LeakyReLU(negative_slope=0.01)
    (5): Dropout(p=0.4, inplace=False)
    (6): Linear(in_features=512, out_features=59, bias=True)
  )
)

In [9]:
import numpy as np
batch_size = 1# Since we want to look get the error of 1 sample and not from a batch
def calculate_errors(model, data_loader, device):
    model.eval()
    errors = []
    with torch.no_grad():
        for batch in data_loader:
            batch = batch.to(device)
            reconstruction, mu, logvar = model(batch)
            recon_error = nn.functional.mse_loss(reconstruction, batch, reduction='sum')
            errors.append(recon_error.item())
    return errors


test_cat = pd.concat(test_data, axis=0, ignore_index=True)
tensor_data = torch.tensor(train_data_comb.values, dtype=torch.float32)
test_loader = DataLoader(tensor_data, batch_size=batch_size, shuffle=True)

# Calculate errors on the test dataset
test_errors = calculate_errors(vae, test_loader, device)

test_errors = np.array(test_errors)


quantile_threshold_99 = np.percentile(test_errors, 99)
quantile_threshold_95 = np.percentile(test_errors, 95)
quantile_threshold_90 = np.percentile(test_errors, 90)
quantile_threshold_85 = np.percentile(test_errors, 85)

quantile_threshold_50 = np.percentile(test_errors, 50)

print(f"Quantile Threshold: {quantile_threshold_99:.4f} Quantile Threshold: {quantile_threshold_95:.4f} \
Quantile Threshold: {quantile_threshold_90:.4f} Quantile Threshold: {quantile_threshold_85:.4f}")

Quantile Threshold: 2.3933 Quantile Threshold: 1.8938 Quantile Threshold: 0.9103 Quantile Threshold: 0.8233


In [10]:
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
flattened_errors = test_errors
# Sample reconstruction errors
plt.figure(figsize=(12, 8))
sns.histplot(flattened_errors, bins=30, kde=False)


percentiles = np.percentile(flattened_errors, [85, 90, 95, 99])

plt.axvline(np.percentile(flattened_errors,99), color='green', linestyle='--', label=f'{int(99)}th Percentile')
plt.axvline(np.percentile(flattened_errors,95), color='orange', linestyle='--', label=f'{int(95)}th Percentile')
plt.axvline(np.percentile(flattened_errors,90), color='violet', linestyle='--', label=f'{int(90)}th Percentile')
plt.axvline(np.percentile(flattened_errors,85), color='teal', linestyle='--', label=f'{int(85)}th Percentile')


# Add a vertical line for the median
median = np.median(flattened_errors)
plt.axvline(median, color='red', linestyle='dashdot', label=f'Median')


# Add labels and legend
plt.xlabel('Reconstruction Errors', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.legend(fontsize=12)
plt.savefig(f'/content/drive/MyDrive/VAE_HEATMAPS/thresholds.png')
# Show the plot
plt.show()

NameError: ignored

<Figure size 1200x800 with 0 Axes>

In [None]:
def create_err_list_from_dataset__vae(dataset):
  error_list=[]
  for record in dataset:

    tensor_data = torch.tensor(record.values, dtype=torch.float32)#zmieniać
    test_loader = DataLoader(tensor_data, batch_size=batch_size, shuffle=False)
    all_errors = calculate_errors(vae, test_loader, device)
    error_list.append(all_errors)

  return error_list


In [None]:
def check_against_thresh_pred(error_list,threshold,precent=0.05):#
  predictions=[]
  for item in error_list:
    anomaly = 0
    normal=0
    for sample in item:
        #print(sample)
        if sample > threshold:
            anomaly = anomaly+1
        else:
            normal = normal+1
    if anomaly > len(error_list)*precent:
      predictions.append(1)
    else:
      predictions.append(0)
  return predictions

In [None]:
import pickle

err_SaS_easy =create_err_list_from_dataset__vae(SaS_easy)#~12 min
err_PH_AC_easy =create_err_list_from_dataset__vae(PH_AC_easy)#
err_IwE_easy =create_err_list_from_dataset__vae(IwE_easy)
err_IwE_hard =create_err_list_from_dataset__vae(IwE_hard)# 15 min
err_Loc_easy =create_err_list_from_dataset__vae(Loc_easy)
err_Loc_hard =create_err_list_from_dataset__vae(Loc_hard)#16 min
err_Loc_Vhard =create_err_list_from_dataset__vae(Loc_Vhard)#16 30
err_HI_easy =create_err_list_from_dataset__vae(HI_easy)
err_HI_hard =create_err_list_from_dataset__vae(HI_hard)#20 min

error_dict = {
    'err_SaS_easy': err_SaS_easy,
    'err_PH_AC_easy': err_PH_AC_easy,
    'err_IwE_easy': err_IwE_easy,
    'err_IwE_hard': err_IwE_hard,
    'err_Loc_easy': err_Loc_easy,
    'err_Loc_hard': err_Loc_hard,
    'err_Loc_Vhard': err_Loc_Vhard,
    'err_HI_easy': err_HI_easy,
    'err_HI_hard': err_HI_hard
}
# Save the error_dict object to a file
with open('/content/drive/MyDrive/VAE_HEATMAPS/error_dict.pkl', 'wb') as file:
    pickle.dump(error_dict, file)


In [None]:
err_NORMAL= create_err_list_from_dataset__vae(test_data)

# Save the error_dict object to a file
with open('/content/drive/MyDrive/VAE_HEATMAPS/norm_dict.pkl', 'wb') as file:
    pickle.dump(err_NORMAL, file)

In [None]:
import pickle

# Load the error_dict object from the file
with open('/content/drive/MyDrive/VAE_HEATMAPS/error_dict.pkl', 'rb') as file:
    loaded_error_dict = pickle.load(file)

# Access the loaded objects
err_SaS_easy = loaded_error_dict['err_SaS_easy']
err_PH_AC_easy = loaded_error_dict['err_PH_AC_easy']
err_IwE_easy = loaded_error_dict['err_IwE_easy']
err_IwE_hard = loaded_error_dict['err_IwE_hard']
err_Loc_easy = loaded_error_dict['err_Loc_easy']
err_Loc_hard = loaded_error_dict['err_Loc_hard']
err_Loc_Vhard = loaded_error_dict['err_Loc_Vhard']
err_HI_easy = loaded_error_dict['err_HI_easy']
err_HI_hard = loaded_error_dict['err_HI_hard']



with open('/content/drive/MyDrive/VAE_HEATMAPS/norm_dict.pkl', 'rb') as file:
    normal = pickle.load(file)

err_NORMAL = normal

#VAE CNN

In [11]:
train_data_comb = pd.concat(train_data, axis=0, ignore_index=True)

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from torch.optim.lr_scheduler import StepLR
from sklearn.model_selection import train_test_split

class VAEAnomalyConv(nn.Module):
    def __init__(self, input_channels, latent_size, sequence_length=59,  dropout_rate=0.0, l1_weight=0.000):
        super(VAEAnomalyConv, self).__init__()
        self.latent_size = latent_size
        self.l1_weight = l1_weight

        # Encoder layers
        self.encoder = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(negative_slope=0.01),
            nn.Dropout(p=dropout_rate),
            nn.Flatten(start_dim=1, end_dim=-1),
            nn.Linear(in_features=1888, out_features=latent_size)
        )

        # Decoder layers
        self.decoder = nn.Sequential(
            nn.Linear(in_features=latent_size, out_features=32*sequence_length),
            nn.LeakyReLU(negative_slope=0.01),
            nn.Dropout(p=dropout_rate),
            nn.Unflatten(1, (32, sequence_length)),
            nn.ConvTranspose1d(in_channels=32, out_channels=1, kernel_size=3, stride=1, padding=1),
        )




    def forward(self, x):
      mu, logvar = self.encode(x)
      #print("Shape after Flatten layer in encoder:", mu.shape)
      z = self.reparameterize(mu, logvar)
      reconstruction = self.decode(z)
      return reconstruction, mu, logvar


    def encode(self, x):
      for i, layer in enumerate(self.encoder):
          x = layer(x)
          #print(f"Shape after layer {i} in encoder:", x.shape)
      mu = x
      logvar = x
      return mu, logvar



    def decode(self, z):
        decoded = self.decoder(z)
        return decoded

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + eps * std
        return z

    def l1_regularization(self):
        l1_reg = torch.tensor(0.0)
        for param in self.parameters():
            l1_reg += torch.norm(param, 1)
        return self.l1_weight * l1_reg

# Define the loss function
def vae_loss(reconstruction, x, mu, logvar):
    recon_loss = nn.functional.mse_loss(reconstruction, x, reduction='sum')
    kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + kl_divergence

batch_size = 32
input_size= 59
latent_size= 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

vae = VAEAnomalyConv(input_size, latent_size)

In [13]:
# Load pre-trained weights
checkpoint = torch.load('/content/drive/MyDrive/Saved models/cnn_vae_model_best1.pth')
vae.load_state_dict(checkpoint, strict=False)


# Set the model to evaluation mode
vae.eval().to(device)

VAEAnomalyConv(
  (encoder): Sequential(
    (0): Conv1d(1, 32, kernel_size=(3,), stride=(1,), padding=(1,))
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.0, inplace=False)
    (3): Flatten(start_dim=1, end_dim=-1)
    (4): Linear(in_features=1888, out_features=64, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=64, out_features=1888, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.0, inplace=False)
    (3): Unflatten(dim=1, unflattened_size=(32, 59))
    (4): ConvTranspose1d(32, 1, kernel_size=(3,), stride=(1,), padding=(1,))
  )
)

In [None]:
batch_size = 1

def test_vae(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0
    reconstruction_errors = []
    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)
            batch = batch.view(batch.shape[0], 1, batch.shape[1])
            reconstruction, mu, logvar = model(batch)
            loss = criterion(reconstruction, batch, mu, logvar)
            test_loss += loss.item()
            reconstruction_errors.extend(loss.item() for _ in range(batch.size(0)))
    return test_loss / len(test_loader), reconstruction_errors

test_data_comb = pd.concat(test_data, axis=0, ignore_index=True)
tens_test_data = torch.tensor(test_data_comb.values, dtype=torch.float32)
test_loader = DataLoader(tens_test_data.unsqueeze(2), batch_size=batch_size, shuffle=False)

mean_test_loss, reconstruction_errors = test_vae(vae, test_loader, vae_loss, device)

# Convert errors to a numpy array
test_errors = np.array(reconstruction_errors)

# Define the quantile threshold
quantile_threshold_99 = np.percentile(test_errors, 99)
quantile_threshold_95 = np.percentile(test_errors, 95)
quantile_threshold_90 = np.percentile(test_errors, 90)
quantile_threshold_85 = np.percentile(test_errors, 85)

quantile_threshold_50 = np.percentile(test_errors, 50)

print(f"Quantile Threshold: {quantile_threshold_99:.4f} Quantile Threshold: {quantile_threshold_95:.4f} \
Quantile Threshold: {quantile_threshold_90:.4f} Quantile Threshold: {quantile_threshold_85:.4f}")



In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
flattened_errors = test_errors
# Sample reconstruction errors
plt.figure(figsize=(12, 8))
sns.histplot(flattened_errors, bins=30, kde=False)

# Calculate the percentiles
percentiles = np.percentile(flattened_errors, [85, 90, 95, 99])

plt.axvline(np.percentile(flattened_errors,99), color='green', linestyle='--', label=f'{int(99)}th Percentile')
plt.axvline(np.percentile(flattened_errors,95), color='orange', linestyle='--', label=f'{int(95)}th Percentile')
plt.axvline(np.percentile(flattened_errors,90), color='violet', linestyle='--', label=f'{int(90)}th Percentile')
plt.axvline(np.percentile(flattened_errors,85), color='teal', linestyle='--', label=f'{int(85)}th Percentile')


# Calculate and add a vertical line for the median
median = np.median(flattened_errors)
plt.axvline(median, color='red', linestyle='dashdot', label=f'Median')


# Add labels and legend
plt.xlabel('Reconstruction Errors', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.legend(fontsize=12)
plt.savefig(f'/content/drive/MyDrive/CNN_VAE_HEATMAPS/thresholds.png')
# Show the plot
plt.show()

In [None]:
def create_err_list_from_dataset__cnn_vae(dataset):
  error_list=[]
  for record in dataset:

    tensor_data = torch.tensor(record.values, dtype=torch.float32)
    test_loader = DataLoader(tensor_data, batch_size=batch_size, shuffle=False)
    _, all_errors = test_vae(vae, test_loader, vae_loss, device)
    error_list.append(all_errors)

  return error_list

In [None]:
import pickle

err_SaS_easy =create_err_list_from_dataset__cnn_vae(SaS_easy)
err_PH_AC_easy =create_err_list_from_dataset__cnn_vae(PH_AC_easy)#14 min
err_IwE_easy =create_err_list_from_dataset__cnn_vae(IwE_easy)#16 min
err_IwE_hard =create_err_list_from_dataset__cnn_vae(IwE_hard)
err_Loc_easy =create_err_list_from_dataset__cnn_vae(Loc_easy)
err_Loc_hard =create_err_list_from_dataset__cnn_vae(Loc_hard)
err_Loc_Vhard =create_err_list_from_dataset__cnn_vae(Loc_Vhard)#21 min
err_HI_easy =create_err_list_from_dataset__cnn_vae(HI_easy)
err_HI_hard =create_err_list_from_dataset__cnn_vae(HI_hard)# 25 min

error_dict = {
    'err_SaS_easy': err_SaS_easy,
    'err_PH_AC_easy': err_PH_AC_easy,
    'err_IwE_easy': err_IwE_easy,
    'err_IwE_hard': err_IwE_hard,
    'err_Loc_easy': err_Loc_easy,
    'err_Loc_hard': err_Loc_hard,
    'err_Loc_Vhard': err_Loc_Vhard,
    'err_HI_easy': err_HI_easy,
    'err_HI_hard': err_HI_hard
}

# Save the error_dict object to a file
with open('/content/drive/MyDrive/CNN_VAE_HEATMAPS/error_dict.pkl', 'wb') as file:
    pickle.dump(error_dict, file)

In [None]:
err_NORMAL= create_err_list_from_dataset__cnn_vae(test_data)

# Save the error_dict object to a file
with open('/content/drive/MyDrive/CNN_VAE_HEATMAPS/norm_dict.pkl', 'wb') as file:
    pickle.dump(err_NORMAL, file)

In [None]:
import pickle

# Load the error_dict object from the file
with open('/content/drive/MyDrive/CNN_VAE_HEATMAPS/error_dict.pkl', 'rb') as file:
    loaded_error_dict = pickle.load(file)

# Access the loaded objects
err_SaS_easy = loaded_error_dict['err_SaS_easy']
err_PH_AC_easy = loaded_error_dict['err_PH_AC_easy']
err_IwE_easy = loaded_error_dict['err_IwE_easy']
err_IwE_hard = loaded_error_dict['err_IwE_hard']
err_Loc_easy = loaded_error_dict['err_Loc_easy']
err_Loc_hard = loaded_error_dict['err_Loc_hard']
err_Loc_Vhard = loaded_error_dict['err_Loc_Vhard']
err_HI_easy = loaded_error_dict['err_HI_easy']
err_HI_hard = loaded_error_dict['err_HI_hard']


In [None]:

with open('/content/drive/MyDrive/CNN_VAE_HEATMAPS/norm_dict.pkl', 'rb') as file:
    normal = pickle.load(file)

err_NORMAL = normal

#FFT VAE

In [15]:
train_data_comb = pd.concat(train_data, axis=0, ignore_index=True)

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler
from torch.optim.lr_scheduler import StepLR
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.nn.functional as F

class ComplexVAEFFT(nn.Module):
    def __init__(self, input_size, latent_size, neurons, dropout_rate=0.5, weight_decay=0.01):
        super(ComplexVAEFFT, self).__init__()
        self.latent_size = latent_size



        self.encoder = nn.Sequential(
              nn.Linear(input_size * 2, neurons),
              nn.ReLU(),
              nn.Dropout(dropout_rate),
              nn.Linear(neurons, neurons // 2),
              nn.ReLU(),
              nn.Dropout(dropout_rate),
              nn.Linear(neurons // 2, neurons // 4),
              nn.ReLU(),
              nn.Linear(neurons // 4, self.latent_size * 2)
          )

        self.decoder = nn.Sequential(
              nn.Linear(self.latent_size, neurons // 4),
              nn.ReLU(),
              nn.Dropout(dropout_rate),
              nn.Linear(neurons // 4, neurons // 2),
              nn.ReLU(),
              nn.Dropout(dropout_rate),
              nn.Linear(neurons // 2, neurons),
              nn.ReLU(),
              nn.Linear(neurons, input_size * 2)
          )

        self.weight_decay = weight_decay

    def forward(self, x):
        # Apply FFT to input
        x_fft = torch.fft.fft(x)

        # Separate real and imaginary parts
        x_fft_real = x_fft.real
        x_fft_imag = x_fft.imag
        x_fft_separated = torch.cat((x_fft_real, x_fft_imag), dim=-1)

        latent_params = self.encoder(x_fft_separated)
        mu = latent_params[:, :self.latent_size]  # Use self.latent_size
        logvar = latent_params[:, self.latent_size:]  # Use self.latent_size
        z = self.reparameterize(mu, logvar)
        reconstruction_separated = self.decoder(z)

        # Combine real and imaginary parts
        reconstruction_real = reconstruction_separated[:, :input_size]
        reconstruction_imag = reconstruction_separated[:, input_size:]
        reconstruction = torch.complex(reconstruction_real, reconstruction_imag)

        # Apply inverse FFT to output
        reconstruction_ifft = torch.fft.ifft(reconstruction)

        return reconstruction_ifft, mu, logvar


    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std


def complex_mse_loss(reconstruction, x):
    # Compute MSE for real part
    reconstruction_real = reconstruction.real
    real_loss = nn.functional.mse_loss(reconstruction_real, x, reduction='sum')

    # Compute MSE for imaginary part
    if torch.is_complex(x):
        x_imag = x.imag
        reconstruction_imag = reconstruction.imag
        imag_loss = nn.functional.mse_loss(reconstruction_imag, x_imag, reduction='sum')
    else:
        imag_loss = 0

    return real_loss + imag_loss




def vae_loss(reconstruction, x, mu, logvar):
    recon_loss = complex_mse_loss(reconstruction, x)
    kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    # Add weight decay loss
    weight_decay_loss = 0
    for param in vae.parameters():
        weight_decay_loss += torch.sum(torch.square(param))

    return recon_loss + kl_divergence + (weight_decay_loss * vae.weight_decay)



def train_vae(model, train_loader, optimizer, criterion, device):
    model.train()
    train_loss = 0
    for batch in train_loader:
        batch = batch.to(device)
        optimizer.zero_grad()
        reconstruction, mu, logvar = model(batch)
        loss = criterion(reconstruction, batch, mu, logvar)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    return train_loss / len(train_loader)

def test_vae(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for batch in test_loader:
            batch = batch.to(device)
            reconstruction, mu, logvar = model(batch)
            loss = criterion(reconstruction, batch, mu, logvar)
            test_loss += loss.item()
    return test_loss / len(test_loader)


# Early stopping parameters
input_size = 59
epochs = 10

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


#(self, input_size, latent_size, neurons, dropout_rate=0.5, weight_decay=0.01)
vae = ComplexVAEFFT(input_size, 64,1024,0,0).to(device)


In [None]:
# Load pre-trained weights
checkpoint = torch.load('/content/drive/MyDrive/Saved models/fft_vae_model_best1.pt', map_location=torch.device('cpu'))
vae.load_state_dict(checkpoint, strict=False)


# Set the model to evaluation mode
vae.eval().to(device)

In [None]:
def calculate_reconstruction_error_fft_vae(model, data_loader):
    model.eval()
    reconstruction_errors = []
    for batch in data_loader:
        x = batch.to(device)
        reconstruction, mu, logvar = model(x)
        recon_error = vae_loss(reconstruction, x, mu, logvar).item()  # Use vae_loss function
        reconstruction_errors.append(recon_error)
    return reconstruction_errors

In [None]:
def create_err_list_from_dataset__fft_vae(dataset):
    error_list = []
    for record in dataset:
        # Convert the record DataFrame into a tensor

        #train_data_comb = pd.concat(train_data, axis=0, ignore_index=True)
        tensor_train_data = torch.tensor(record.values, dtype=torch.float32)
        train_loader = DataLoader(tensor_train_data, batch_size=1, shuffle=False)
        #record_tensor = torch.Tensor(record.values)

        # Calculate the reconstruction error for the record
        reconstruction_error = calculate_reconstruction_error_fft_vae(vae, train_loader)

        error_list.append(reconstruction_error)

    return error_list

In [None]:
test_errors = create_err_list_from_dataset__fft_vae(test_data)

import numpy as np
flattened_errors = [error for sublist in test_errors for error in sublist]

quantile_threshold_99 = np.percentile(flattened_errors, 99)
quantile_threshold_95 = np.percentile(flattened_errors, 95)
quantile_threshold_90 = np.percentile(flattened_errors, 90)
quantile_threshold_85 = np.percentile(flattened_errors, 85)

quantile_threshold_50 = np.percentile(flattened_errors, 50)

print(f"Quantile Threshold: {quantile_threshold_99:.4f} Quantile Threshold: {quantile_threshold_95:.4f} \
Quantile Threshold: {quantile_threshold_90:.4f} Quantile Threshold: {quantile_threshold_85:.4f}")

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
flattened_errors = test_errors
# Sample reconstruction errors
plt.figure(figsize=(12, 8))
sns.histplot(flattened_errors, bins=30, kde=False)

percentiles = np.percentile(flattened_errors, [85, 90, 95, 99])

plt.axvline(np.percentile(flattened_errors,99), color='green', linestyle='--', label=f'{int(99)}th Percentile')
plt.axvline(np.percentile(flattened_errors,95), color='orange', linestyle='--', label=f'{int(95)}th Percentile')
plt.axvline(np.percentile(flattened_errors,90), color='violet', linestyle='--', label=f'{int(90)}th Percentile')
plt.axvline(np.percentile(flattened_errors,85), color='teal', linestyle='--', label=f'{int(85)}th Percentile')


# Add a vertical line for the median
median = np.median(flattened_errors)
plt.axvline(median, color='red', linestyle='dashdot', label=f'Median')


# Add labels and legend
plt.xlabel('Reconstruction Errors', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.legend(fontsize=12)
plt.savefig(f'/content/drive/MyDrive/CNN_VAE_HEATMAPS/thresholds.png')
# Show the plot
plt.show()

In [None]:
import pickle

err_SaS_easy =create_err_list_from_dataset__fft_vae(SaS_easy)
err_PH_AC_easy =create_err_list_from_dataset__fft_vae(PH_AC_easy)
err_IwE_easy =create_err_list_from_dataset__fft_vae(IwE_easy)
err_IwE_hard =create_err_list_from_dataset__fft_vae(IwE_hard)
err_Loc_easy =create_err_list_from_dataset__fft_vae(Loc_easy)
err_Loc_hard =create_err_list_from_dataset__fft_vae(Loc_hard)
err_Loc_Vhard =create_err_list_from_dataset__fft_vae(Loc_Vhard)
err_HI_easy =create_err_list_from_dataset__fft_vae(HI_easy)
err_HI_hard =create_err_list_from_dataset__fft_vae(HI_hard)#29 min

error_dict = {
    'err_SaS_easy': err_SaS_easy,
    'err_PH_AC_easy': err_PH_AC_easy,
    'err_IwE_easy': err_IwE_easy,
    'err_IwE_hard': err_IwE_hard,
    'err_Loc_easy': err_Loc_easy,
    'err_Loc_hard': err_Loc_hard,
    'err_Loc_Vhard': err_Loc_Vhard,
    'err_HI_easy': err_HI_easy,
    'err_HI_hard': err_HI_hard
}
# Save the error_dict object to a file
with open('/content/drive/MyDrive/FFT_C_VAE_HEATMAPS/error_dict.pkl', 'wb') as file:
    pickle.dump(error_dict, file)



In [None]:
err_NORMAL= create_err_list_from_dataset__fft_vae(test_data)

# Save the error_dict object to a file
with open('/content/drive/MyDrive/FFT_C_VAE_HEATMAPS/norm_dict.pkl', 'wb') as file:
    pickle.dump(err_NORMAL, file)

In [None]:
import pickle

# Load the error_dict object from the file
with open('/content/drive/MyDrive/FFT_C_VAE_HEATMAPS/error_dict.pkl', 'rb') as file:
    loaded_error_dict = pickle.load(file)

# Access the loaded objects
err_SaS_easy = loaded_error_dict['err_SaS_easy']
err_PH_AC_easy = loaded_error_dict['err_PH_AC_easy']
err_IwE_easy = loaded_error_dict['err_IwE_easy']
err_IwE_hard = loaded_error_dict['err_IwE_hard']
err_Loc_easy = loaded_error_dict['err_Loc_easy']
err_Loc_hard = loaded_error_dict['err_Loc_hard']
err_Loc_Vhard = loaded_error_dict['err_Loc_Vhard']
err_HI_easy = loaded_error_dict['err_HI_easy']
err_HI_hard = loaded_error_dict['err_HI_hard']

In [None]:
with open('/content/drive/MyDrive/FFT_C_VAE_HEATMAPS/norm_dict.pkl', 'rb') as file:
    normal = pickle.load(file)

err_NORMAL = normal

#LSTM VAE

In [None]:
from sklearn.model_selection import train_test_split
# Split the data into training and testing sets
train_data, test_data = train_test_split(combined_df, test_size=0.4)# 60 40


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
import pandas as pd



class VAE_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, latent_size,dropout_rate):
        super(VAE_LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.latent_size = latent_size
        self.dropout_rate = dropout_rate

        self.encoder = nn.LSTM(input_size, hidden_size, batch_first=True,dropout = dropout_rate)
        self.fc_mu = nn.Linear(hidden_size, latent_size)
        self.fc_log_var = nn.Linear(hidden_size, latent_size)
        self.decoder = nn.LSTM(latent_size, hidden_size, batch_first=True,dropout = dropout_rate)
        self.fc = nn.Linear(hidden_size, input_size)

    def encode(self, x):
        _, (h_n, _) = self.encoder(x)
        h_n = h_n.squeeze()
        mu = self.fc_mu(h_n)
        log_var = self.fc_log_var(h_n)
        return mu, log_var

    def reparameterize(self, mu, log_var):
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        z = mu + eps * std
        return z

    def decode(self, z):
        z = z.unsqueeze(0)
        h, _ = self.decoder(z)
        x_recon = self.fc(h.squeeze(0))
        return x_recon

    def forward(self, x):
        mu, log_var = self.encode(x)
        z = self.reparameterize(mu, log_var)
        x_recon = self.decode(z)
        return x_recon, mu, log_var


input_size = 59  # Number of features in a data frame
hidden_size = 256   # Set hidden size
latent_size = 64  # Set latent size
timestep = 50  # Set timestep size
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = VAE_LSTM(input_size, hidden_size, latent_size,dropout_rate = 0.2)

In [None]:
# Load pre-trained weights
checkpoint = torch.load('/content/drive/MyDrive/Saved models/lstm50_vae_model_best1.pth', map_location=torch.device('cpu'))
model.load_state_dict(checkpoint, strict=False)


# Set the model to evaluation mode
model.eval().to(device)

In [None]:
def calculate_reconstruction_error(model, data_loader):
    model.eval()
    reconstruction_errors = []
    for batch in data_loader:
        x = batch.to(device)
        x_recon, _, _ = model(x)
        recon_error = nn.MSELoss(reduction='none')(x_recon, x).sum(dim=1).mean().item()
        reconstruction_errors.append(recon_error)
    return reconstruction_errors


In [None]:
def create_err_list_from_dataset__lstm_vae(dataset, window_size, window_slide):
    error_list = []
    for record in dataset:
        record_error_list = []
        # Iterate over the record with a sliding window
        for i in range(0, len(record), window_slide):
            end = i + window_size

            # If the end of the window exceeds the length of the record
            if end > len(record):
                # Adjust the end to be the end of the record
                end = len(record)

            # Get the window from the record
            window = record[i:end]

            # Convert the window DataFrame into a tensor
            sequence_tensor = torch.Tensor(window.values)

            # Create a DataLoader for the window
            data_loader = DataLoader([sequence_tensor], batch_size=1)

            # Calculate the reconstruction error for the window
            reconstruction_error = calculate_reconstruction_error(model, data_loader)

            # Extract the number
            if isinstance(reconstruction_error, (list, torch.Tensor)):
                reconstruction_error = reconstruction_error[0]

            record_error_list.append(reconstruction_error)

        error_list.append(record_error_list)

    return error_list


In [None]:
errors = create_err_list_from_dataset__lstm_vae(test_data, window_size=10, window_slide=5)


import numpy as np
flattened_errors = [error for sublist in errors for error in sublist]

quantile_threshold_99 = np.percentile(flattened_errors, 99)
quantile_threshold_95 = np.percentile(flattened_errors, 95)
quantile_threshold_90 = np.percentile(flattened_errors, 90)
quantile_threshold_85 = np.percentile(flattened_errors, 85)

quantile_threshold_50 = np.percentile(flattened_errors, 50)

print(f"Quantile Threshold: {quantile_threshold_99:.4f} Quantile Threshold: {quantile_threshold_95:.4f} \
Quantile Threshold: {quantile_threshold_90:.4f} Quantile Threshold: {quantile_threshold_85:.4f}")

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
# Sample reconstruction errors
plt.figure(figsize=(12, 8))
sns.histplot(flattened_errors, bins=30, kde=False)

# Calculate the percentiles
percentiles = np.percentile(flattened_errors, [85, 90, 95, 99])

plt.axvline(np.percentile(flattened_errors,99), color='green', linestyle='--', label=f'{int(99)}th Percentile')
plt.axvline(np.percentile(flattened_errors,95), color='orange', linestyle='--', label=f'{int(95)}th Percentile')
plt.axvline(np.percentile(flattened_errors,90), color='violet', linestyle='--', label=f'{int(90)}th Percentile')
plt.axvline(np.percentile(flattened_errors,85), color='teal', linestyle='--', label=f'{int(85)}th Percentile')


# Add a vertical line for the median
median = np.median(flattened_errors)
plt.axvline(median, color='red', linestyle='dashdot', label=f'Median')


# Add labels and legend
plt.xlabel('Reconstruction Errors', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.legend(fontsize=12)
plt.savefig(f'/content/drive/MyDrive/LSTM_VAE_HEATMAPS/thresholds.png')
# Show the plot
plt.show()

In [None]:
import pickle
win_s = 50

err_SaS_easy =create_err_list_from_dataset__lstm_vae(SaS_easy, window_size=win_s, window_slide=5)#~12 min
err_PH_AC_easy =create_err_list_from_dataset__lstm_vae(PH_AC_easy, window_size=win_s, window_slide=5)#
err_IwE_easy =create_err_list_from_dataset__lstm_vae(IwE_easy, window_size=win_s, window_slide=5)
err_IwE_hard =create_err_list_from_dataset__lstm_vae(IwE_hard, window_size=win_s, window_slide=5)# 15 min tutaj
err_Loc_easy =create_err_list_from_dataset__lstm_vae(Loc_easy, window_size=win_s, window_slide=5)
err_Loc_hard =create_err_list_from_dataset__lstm_vae(Loc_hard, window_size=win_s, window_slide=5)#16 min
err_Loc_Vhard =create_err_list_from_dataset__lstm_vae(Loc_Vhard, window_size=win_s, window_slide=5)#16 30
err_HI_easy =create_err_list_from_dataset__lstm_vae(HI_easy, window_size=win_s, window_slide=5)
err_HI_hard =create_err_list_from_dataset__lstm_vae(HI_hard, window_size=win_s, window_slide=5)#20 min

error_dict = {
    'err_SaS_easy': err_SaS_easy,
    'err_PH_AC_easy': err_PH_AC_easy,
    'err_IwE_easy': err_IwE_easy,
    'err_IwE_hard': err_IwE_hard,
    'err_Loc_easy': err_Loc_easy,
    'err_Loc_hard': err_Loc_hard,
    'err_Loc_Vhard': err_Loc_Vhard,
    'err_HI_easy': err_HI_easy,
    'err_HI_hard': err_HI_hard
}
# Save the error_dict object to a file
with open('/content/drive/MyDrive/LSTM_50_VAE_HEATMAPS/error_dict.pkl', 'wb') as file:
    pickle.dump(error_dict, file)

err_NORMAL= create_err_list_from_dataset__lstm_vae(test_data, window_size=win_s, window_slide=5)

# Save the error_dict object to a file
with open('/content/drive/MyDrive/LSTM_50_VAE_HEATMAPS/norm_dict.pkl', 'wb') as file:
    pickle.dump(err_NORMAL, file)

In [None]:
import pickle

# Load the error_dict object from the file
with open('/content/drive/MyDrive/LSTM_50_VAE_HEATMAPS/error_dict.pkl', 'rb') as file:
    loaded_error_dict = pickle.load(file)

# Access the loaded objects
err_SaS_easy = loaded_error_dict['err_SaS_easy']
err_PH_AC_easy = loaded_error_dict['err_PH_AC_easy']
err_IwE_easy = loaded_error_dict['err_IwE_easy']
err_IwE_hard = loaded_error_dict['err_IwE_hard']
err_Loc_easy = loaded_error_dict['err_Loc_easy']
err_Loc_hard = loaded_error_dict['err_Loc_hard']
err_Loc_Vhard = loaded_error_dict['err_Loc_Vhard']
err_HI_easy = loaded_error_dict['err_HI_easy']
err_HI_hard = loaded_error_dict['err_HI_hard']



with open('/content/drive/MyDrive/LSTM_50_VAE_HEATMAPS/norm_dict.pkl', 'rb') as file:
    normal = pickle.load(file)

# Final results (Shared)

In [None]:
import numpy as np
import pandas as pd
from imblearn.under_sampling import RandomUnderSampler
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score, average_precision_score

def multiple_random_sampling(err_NORMAL,err_LIST,threshold,limit = 100,precent=0.05):

  avg_auc =[]
  avg_f1 = []
  avg_prec=[]
  avg_rec=[]
  avg_AP=[]

  for N in range(limit):
    normal_errors = err_NORMAL
    anomaly_errors =  err_LIST#err_LIST#err_IwE_easy#err_SaS_easy#err_PH_AC_easy#err_IwE_easy

    normal_errors = np.array(normal_errors,dtype=object)
    anomaly_errors = np.array(anomaly_errors,dtype=object)

    # Create labels
    normal_labels = np.zeros(len(normal_errors)) # normal is '0'
    anomaly_labels = np.ones(len(anomaly_errors)) # anomaly is '1'

    # Combine errors and labels
    errors = np.concatenate((normal_errors, anomaly_errors))
    labels = np.concatenate((normal_labels, anomaly_labels))

    # Create a DataFrame
    df = pd.DataFrame({'errors': errors, 'labels': labels})

    # Apply random undersampling
    undersample = RandomUnderSampler(sampling_strategy='majority') # 'majority' will undersample the majority class
    X_res, y_res = undersample.fit_resample(df[['errors']], df['labels'])

    X_res_list = X_res.values.tolist()
    X_res_flat = [item for sublist in X_res_list for item in sublist]

    predictions = check_against_thresh_pred(X_res_flat, threshold,precent)

    auc_score = roc_auc_score(y_res, predictions)
    f1 = f1_score(y_res, predictions)
    precision = precision_score(y_res, predictions)
    recall = recall_score(y_res, predictions)
    average_precision = average_precision_score(y_res, predictions)

    avg_auc.append(auc_score)
    avg_f1.append(f1)
    avg_prec.append(precision)
    avg_rec.append(recall)
    avg_AP.append(average_precision)


  print(f'AVG AUC {np.mean(avg_auc):.3f}')
  print(f'AVG F1 {np.mean(avg_f1):.3f}')
  print(f'AVG precision {np.mean(avg_prec):.3f}')
  print(f'AVG recall {np.mean(avg_rec):.3f}')



In [None]:
error_dict = {
    'Situations and scenarios - easy': err_SaS_easy,
    'Physical Activities - easy': err_PH_AC_easy,
    'Interaction with environment - easy': err_IwE_easy,
    'Interaction with environment - hard': err_IwE_hard,
    'Locomotion - easy': err_Loc_easy,
    'Locomotion - hard': err_Loc_hard,
    'Locomotion - very hard': err_Loc_Vhard,
    'Human Interaction - easy': err_HI_easy,
    'Human Interaction - hard': err_HI_hard
}


In [None]:


# Dictionary of quantile thresholds
thresholds = {
    'quantile_threshold_99': quantile_threshold_99,
    'quantile_threshold_95': quantile_threshold_95,
    'quantile_threshold_90': quantile_threshold_90,
    'quantile_threshold_85': quantile_threshold_85
}


for error_list in error_dict:
  for threshold in thresholds:
      print(f"List name: {error_list}")
      print(f"Threshold name: {threshold}")
      multiple_random_sampling(err_NORMAL,error_dict[error_list],thresholds[threshold],limit = 100,precent=0.05)
      print()




#Heatmaps

In [None]:
import numpy as np
import pandas as pd
from imblearn.under_sampling import RandomUnderSampler
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score, average_precision_score, matthews_corrcoef

def multiple_random_sampling_heatmap(err_NORMAL, err_LIST, threshold, limit=100, percent=0.05):

    avg_auc = []
    avg_f1 = []
    avg_prec = []
    avg_rec = []
    avg_AP = []
    avg_mcc = []

    for N in range(limit):
        normal_errors = err_NORMAL
        anomaly_errors = err_LIST

        normal_errors = np.array(normal_errors, dtype=object)
        anomaly_errors = np.array(anomaly_errors, dtype=object)

        normal_labels = np.zeros(len(normal_errors))
        anomaly_labels = np.ones(len(anomaly_errors))

        errors = np.concatenate((normal_errors, anomaly_errors))
        labels = np.concatenate((normal_labels, anomaly_labels))

        df = pd.DataFrame({'errors': errors, 'labels': labels})

        undersample = RandomUnderSampler(sampling_strategy='majority')
        X_res, y_res = undersample.fit_resample(df[['errors']], df['labels'])

        X_res_list = X_res.values.tolist()
        X_res_flat = [item for sublist in X_res_list for item in sublist]

        predictions = check_against_thresh_pred(X_res_flat, threshold, percent)

        auc_score = roc_auc_score(y_res, predictions)
        f1 = f1_score(y_res, predictions)
        precision = precision_score(y_res, predictions, zero_division=0)
        recall = recall_score(y_res, predictions)
        average_precision = average_precision_score(y_res, predictions)
        mcc = matthews_corrcoef(y_res, predictions)

        avg_auc.append(auc_score)
        avg_f1.append(f1)
        avg_prec.append(precision)
        avg_rec.append(recall)
        avg_AP.append(average_precision)
        avg_mcc.append(mcc)

    return avg_auc, avg_f1, avg_prec, avg_rec, avg_AP, avg_mcc


In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score, average_precision_score
from imblearn.under_sampling import RandomUnderSampler


percentages = [0.01]
percentages_range= np.arange(0.05,0.50,0.05)
percentages.extend(percentages_range)
xticklabels = ['{:.2f}'.format(val) for val in percentages]
yticklabels = list(np.arange(50, 100, 5))+[99]
yticklabels = ['{}th'.format(val) for val in yticklabels]

def calc_results(err_NORMAL,err_LIST):
  X_res_flat = [item for sublist in err_NORMAL for item in sublist]

  percentile_range = np.arange(50, 100, 5)
  percentiles = np.append(percentile_range, 99)
  thresholds = np.percentile(X_res_flat, percentiles)

  percentages = [0.01]
  percentages_range= np.arange(0.05,0.50,0.05)
  percentages.extend(percentages_range)

  # Create empty arrays to collect the data
  results_auc = np.zeros((len(thresholds), len(percentages)))
  results_f1 = np.zeros((len(thresholds), len(percentages)))
  results_prec = np.zeros((len(thresholds), len(percentages)))
  results_rec = np.zeros((len(thresholds), len(percentages)))
  results_AP = np.zeros((len(thresholds), len(percentages)))
  results_MCC = np.zeros((len(thresholds), len(percentages)))

  # Loop through thresholds and percentages
  for i, threshold in enumerate(thresholds):
      for j, percent in enumerate(percentages):
          avg_auc, avg_f1, avg_prec, avg_rec, avg_AP,avg_mcc = multiple_random_sampling_heatmap(err_NORMAL, err_LIST, threshold, percent=percent,limit=50)
          results_auc[i, j] = np.mean(avg_auc)
          results_f1[i, j] = np.mean(avg_f1)
          results_prec[i, j] = np.mean(avg_prec)
          results_rec[i, j] = np.mean(avg_rec)
          results_AP[i, j] = np.mean(avg_AP)
          results_MCC[i,j] = np.mean(avg_mcc)
  return results_auc,results_f1,results_prec,results_rec,results_AP,results_MCC


In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

def draw_heatmaps(results_auc,results_f1,results_prec,results_rec,results_AP,results_MCC,dataset_name):
  metrics = ['AUC', 'F1', 'Precision', 'Recall', 'Average Precision','Matthews Correlation Coefficient']
  results = {
      'AUC': results_auc,
      'F1': results_f1,
      'Precision': results_prec,
      'Recall': results_rec,
      'Average Precision': results_AP ,
      'Matthews Correlation Coefficient': results_MCC
  }

  metric_pairs = [('AUC', 'F1'), ('Precision', 'Recall'),('Average Precision','Matthews Correlation Coefficient')]


  for metric1, metric2 in metric_pairs:
      fig, ax = plt.subplots(1, 2, figsize=(12, 6))

      # Plot the first heatmap (metric1)
      sns.heatmap(results[metric1], annot=True, fmt='.3f', xticklabels=xticklabels, yticklabels=yticklabels, cbar=False, ax=ax[0], cmap="viridis", linewidths=0.5, linecolor="gray")
      ax[0].set_title(f'{metric1} Heatmap')
      ax[0].set_xlabel('Percentage')
      ax[0].set_ylabel('Threshold')
      ax[0].tick_params(axis='y', rotation=0)


      # Plot the second heatmap (metric2)
      sns.heatmap(results[metric2], annot=True, fmt='.3f', xticklabels=xticklabels, yticklabels=yticklabels, cbar=False, ax=ax[1], cmap="viridis", linewidths=0.5, linecolor="gray")
      ax[1].set_title(f'{metric2} Heatmap')
      ax[1].set_xlabel('Percentage')
      ax[1].set_ylabel('Threshold')
      ax[1].tick_params(axis='y', rotation=0)

      # Add title
      fig.suptitle(f'{dataset_name}', fontsize=16)


      # Adjust spacing between subplots
      plt.tight_layout()

      plt.savefig(f'/content/drive/MyDrive/FFT_C_VAE_HEATMAPS/{metric1}_{metric2}_{dataset_name}_heatmaps.png')
      #plt.show()
      plt.close()

In [None]:

for error_list in error_dict:
  results_auc,results_f1,results_prec,results_rec,results_AP,results_MCC = calc_results(err_NORMAL,error_dict[error_list])
  draw_heatmaps(results_auc,results_f1,results_prec,results_rec,results_AP,results_MCC,error_list)

In [None]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt


metrics = ['AUC', 'F1', 'Precision', 'Recall', 'Average Precision']
results = {
    'AUC': results_auc,
    'F1': results_f1,
    'Precision': results_prec,
    'Recall': results_rec,
    'Average Precision': results_AP
}
percentages = [0.01] + list(np.arange(0.05, 0.45, 0.05))

# Define pairs of metrics
metric_pairs = [('AUC', 'F1'), ('Precision', 'Recall')]

# Create and save separate pictures for each pair of heatmaps
for metric1, metric2 in metric_pairs:
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))  # Side-by-side

    # Plot the first heatmap
    sns.heatmap(results[metric1], annot=True, fmt='.3f', xticklabels=xticklabels, yticklabels=yticklabels, cbar=False, ax=ax[0], cmap="viridis", linewidths=0.5, linecolor="gray")
    ax[0].set_title(f'{metric1} Heatmap')
    ax[0].set_xlabel('Percentage')
    ax[0].set_ylabel('Threshold')
    ax[0].tick_params(axis='y', rotation=0)



    # Plot the second heatmap
    sns.heatmap(results[metric2], annot=True, fmt='.3f', xticklabels=xticklabels, yticklabels=yticklabels, cbar=False, ax=ax[1], cmap="viridis", linewidths=0.5, linecolor="gray")
    ax[1].set_title(f'{metric2} Heatmap')
    ax[1].set_xlabel('Percentage')
    ax[1].set_ylabel('Threshold')
    ax[1].tick_params(axis='y', rotation=0)

    # Add title
    fig.suptitle('Comparison of Metric Pairs', fontsize=16)


    # Adjust spacing
    plt.tight_layout()

    # Save the figure
    plt.savefig(f'{metric1}_{metric2}_heatmaps.png')
    plt.show()