In [None]:
import scipy.io
import numpy as np

# Load the train.mat file
mat_file_path = "train.mat"
data = scipy.io.loadmat(mat_file_path)

# Display the keys in the loaded data
data.keys()


In [None]:
# Extract the Train_data object
train_data = data['Train_data']

# Check the type and shape
type(train_data), train_data.shape


In [None]:
train_data[0].size

In [None]:
train_data.shape[1]

In [None]:
# Extract the first system (system 0)
system_0 = train_data[0, 0]

# Check what fields or attributes are available in this system's structure
system_0.dtype.names


In [None]:
system_0

In [None]:
# Extract trajectory and label for system 0
trajectory_0 = system_0['trajectory']
label_0 = system_0['Label']

# Check shapes of both arrays
trajectory_0.shape, label_0.shape


In [None]:
trajectory_0[0,0].shape

In [None]:
label_0[0].shape


In [None]:
# Extract trajectory of component 0
component_0_traj = trajectory_0[0, 0]  # 1st row, 1st component

# Check shape and type
type(component_0_traj), component_0_traj.shape


In [None]:
all_normal_data = []

for i in range(train_data.shape[1]):
    system = train_data[0, i]
    trajectories = system['trajectory']  # shape (1, 4)
    labels = system['Label']            # shape (4, 1000)
    
    for j in range(4):  # For each of the 4 trajectories per system
        traj = trajectories[0, j]       # shape (10, 1000)
        label = labels[j]               # shape (1000,)
        
        # Select only columns (time steps) where label is 0
        normal_indices = label == 0
        normal_data = traj[:, normal_indices]  # shape (10, N_normal)
        
        # Transpose to (N_normal, 10)
        normal_data = normal_data.T
        all_normal_data.append(normal_data)

# Stack all normal data
X_train = np.vstack(all_normal_data) 
print("Shape of normal trajectory: " , X_train.shape)

In [None]:
# Collect all anomalous samples
all_anomaly_data = []

for i in range(train_data.shape[1]):
    system = train_data[0, i]
    trajectories = system['trajectory']  # shape (1, 4)
    labels = system['Label']            # shape (4, 1000)
    
    for j in range(4):  # For each of the 4 trajectories per system
        traj = trajectories[0, j]       # shape (10, 1000)
        label = labels[j]               # shape (1000,)
        
        # Select only columns (time steps) where label is 1
        anomaly_indices = label == 1
        anomaly_data = traj[:, anomaly_indices]  # shape (10, N_anomaly)
        
        # Transpose to (N_anomaly, 10)
        anomaly_data = anomaly_data.T
        all_anomaly_data.append(anomaly_data)

# Stack all anomaly data
X_anomaly = np.vstack(all_anomaly_data)  # Final shape: (N_total_anomaly, 10)
print("Anomaly data shape:", X_anomaly.shape)

In [None]:
import pandas as pd
import torch
from torch import nn, optim
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from tqdm import tqdm

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device ", device)

In [None]:
# Train/test split
train_df, test_df = train_test_split(
    X_train,  # Ensure we only use the specified features
    test_size=0.2,
    random_state=42
)

In [None]:
# Scale data with standard scaler
#scaler = StandardScaler()
#train_df_scaled = scaler.fit_transform(train_df)
#test_df_scaled = scaler.transform(test_df)
#anomaly_scaled = scaler.transform(X_anomaly)  

# Scale data with min-max scaler
#scaler = MinMaxScaler()
scaler = MinMaxScaler(feature_range=(-1, 1))
train_df_scaled = scaler.fit_transform(train_df)
test_df_scaled = scaler.transform(test_df)
anomaly_scaled = scaler.transform(X_anomaly)  

In [None]:
print("shape of train data ", train_df_scaled.shape)
print("shape of test data ", test_df_scaled.shape)
print("shape of anomaly data ", anomaly_scaled.shape)

In [None]:
# Convert to PyTorch tensors
train_data_tensor = torch.FloatTensor(train_df_scaled).to(device)
test_data_tensor = torch.FloatTensor(test_df_scaled).to(device)
anomaly_data_tensor = torch.FloatTensor(anomaly_scaled).to(device)


# Step 3: GAN Architecture


data_dim = train_data_tensor.shape[1]
data_dim

In [None]:
import torch
latent_dim = 8

class VAE_Encoder(nn.Module):
    def __init__(self, data_dim, latent_dim):
        super().__init__()
        self.shared = nn.Sequential(
            nn.Linear(data_dim, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.BatchNorm1d(32),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.BatchNorm1d(16),
        )
        self.mu_layer = nn.Linear(16, latent_dim)
        self.logvar_layer = nn.Linear(16, latent_dim)

    def forward(self, x):
        x = self.shared(x)
        mu = self.mu_layer(x)
        logvar = self.logvar_layer(x)
        return mu, logvar

class VAE_Decoder(nn.Module):
    def __init__(self, data_dim, latent_dim):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(latent_dim, 16),
            nn.ReLU(),
            nn.BatchNorm1d(16),
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.BatchNorm1d(32),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.BatchNorm1d(64),
            nn.Linear(64, data_dim),
            nn.Tanh()  # because the data is scaled between -1 and 1
        )

    def forward(self, z):
        return self.model(z)

def reparameterize(mu, logvar):
    std = torch.exp(0.5 * logvar)
    eps = torch.randn_like(std)
    return mu + eps * std


In [None]:
# Step 4: Instantiate models
encoder = VAE_Encoder(data_dim, latent_dim).to(device)
decoder = VAE_Decoder(data_dim, latent_dim).to(device)


loss_function = nn.MSELoss()
beta = 100
def vae_loss(recon_x, x, mu, logvar):
    recon_loss = loss_function(recon_x, x)  # or L1
    # KL Divergence between the posterior and the standard normal prior
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) / x.size(0) 
    return recon_loss + kl_div * beta, recon_loss, kl_div



#lr = 0.0001

In [None]:
#opt_g = optim.Adam(generator.parameters(), lr=lr)
#opt_d = optim.Adam(discriminator.parameters(), lr=lr)

opt_enc = torch.optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=1e-3)



# Step 5: Training loop
epochs = 5
batch_size = 512


In [None]:
from torch.utils.data import DataLoader, TensorDataset

dataset = TensorDataset(train_data_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
import torch
import os
from tqdm import tqdm

# === Setup for checkpointing ===
checkpoint_path = 'checkpoints/'
checkpoint_file = os.path.join(checkpoint_path, 'Pytorch_Variational_Autoencoder.pth')
os.makedirs(checkpoint_path, exist_ok=True)

start_epoch = 0
best_loss_AE = float('inf')

# === Try to load checkpoint if it exists ===
if os.path.exists(checkpoint_file):
    checkpoint = torch.load(checkpoint_file)
    encoder.load_state_dict(checkpoint['encoder_state_dict'])
    decoder.load_state_dict(checkpoint['decoder_state_dict'])
    opt_enc.load_state_dict(checkpoint['opt_enc_state_dict'])


    start_epoch = checkpoint['epoch'] + 1
    best_loss_AE = checkpoint['best_loss_AE']

    print(f"✅ Loaded checkpoint from epoch {start_epoch - 1}")

# === Lists to store losses ===

losses_rec = []
losses_kl = []
losses_total = []

for epoch in tqdm(range(start_epoch, epochs)):

    epoch_total_loss = 0.0
    epoch_recon_loss = 0.0
    epoch_kl_loss = 0.0
    

    num_batches = 0

    for batch_idx, (real_data,) in enumerate(dataloader):
        num_batches += 1
        real_data = real_data.to(device)

        # === Train Autoencoder ===
        mu, logvar = encoder(real_data)
        z = reparameterize(mu, logvar)
        recon_data = decoder(z)
        loss, recon_loss, kl_div = vae_loss(recon_data, real_data, mu, logvar)
        
        opt_enc.zero_grad()
        loss.backward()
        opt_enc.step()

        # === Accumulate losses ===
        epoch_total_loss += loss.item()
        epoch_recon_loss += recon_loss.item()
        epoch_kl_loss += kl_div.item()
        


    # === Average Epoch Losses ===
    avg_total_loss = epoch_total_loss / num_batches
    avg_recon_loss = epoch_recon_loss / num_batches
    avg_kl_loss = epoch_kl_loss / num_batches

    losses_rec.append(avg_total_loss)
    losses_kl.append(avg_recon_loss)
    losses_total.append(avg_total_loss)
    


    # === Save checkpoint every N epochs or on best loss improvement ===
    if epoch % 1 == 0:
        torch.save({
            'epoch': epoch,
            'encoder_state_dict': encoder.state_dict(),
            'decoder_state_dict': decoder.state_dict(),

            'opt_enc_state_dict': opt_enc.state_dict(),

            'best_loss_AE': best_loss_AE,
        }, checkpoint_file)
        print(f"💾 Checkpoint saved at epoch {epoch} with total loss : {avg_total_loss:.4f}")

    # === Early Stopping ===
    if avg_total_loss < 0.0005:
        print(f"⛔ Early stopping at epoch {epoch} as VAE total loss reached {avg_total_loss:.4f}")
        break

    # === Logging ===
    if epoch % 10 == 0:
        print(f"📘 Epoch {epoch}: Total = {avg_total_loss:.4f} | Recon = {avg_recon_loss:.4f} | KL = {avg_kl_loss:.4f}")
        


In [None]:
import plotly.graph_objects as go

fig = go.Figure()

fig.add_trace(go.Scatter(y=losses_total, mode='lines', name='Total loss'))
fig.add_trace(go.Scatter(y=losses_rec, mode='lines', name='Reconstruction Loss'))
fig.add_trace(go.Scatter(y=losses_kl, mode='lines', name='KL Divergence'))


fig.update_layout(
    title='Training Losses Over Epochs',
    xaxis_title='Epoch',
    yaxis_title='Loss',
    legend_title='Loss Type',
    template='plotly_white',

)

fig.show()


In [None]:
# Discriminator Loss
fig_d = go.Figure()
fig_d.add_trace(go.Scatter(y=losses_total, mode='lines', name='Total Loss'))
fig_d.update_layout(title='Total Loss', xaxis_title='Epoch', yaxis_title='Loss', template='plotly_dark')
fig_d.show()

# Generator Loss
fig_g = go.Figure()
fig_g.add_trace(go.Scatter(y=losses_rec, mode='lines', name='Reconstruction Loss'))
fig_g.update_layout(title='Reconstruction Loss', xaxis_title='Epoch', yaxis_title='Loss', template='plotly_dark')
fig_g.show()

# Reconstruction Loss
fig_rec = go.Figure()
fig_rec.add_trace(go.Scatter(y=losses_kl, mode='lines', name='KL Divergence'))
fig_rec.update_layout(title='KL Divergence', xaxis_title='Epoch', yaxis_title='Loss', template='plotly_dark')
fig_rec.show()




In [None]:
checkpoint = torch.load('checkpoints/Pytorch_Variational_Autoencoder.pth')

encoder.load_state_dict(checkpoint['encoder_state_dict'])
decoder.load_state_dict(checkpoint['decoder_state_dict'])


encoder.eval()
decoder.eval()



In [None]:
def compute_vae_outputs(data_tensor):
    with torch.no_grad():
        mu, logvar = encoder(data_tensor)
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        z = mu + eps * std
        
        recon_x = decoder(z)
        
        recon_loss =  torch.mean((data_tensor - recon_x) ** 2, dim=1)  # or L1
        kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=1)
        total_loss = recon_loss + kl_loss

        return total_loss.cpu().numpy(), recon_loss.cpu().numpy(), kl_loss.cpu().numpy(), mu.cpu().numpy(), logvar.cpu().numpy(), z.cpu().numpy()

In [None]:
total_loss_train, recon_loss_train, kl_loss_train, mu_train, logvar_train, z_train= compute_vae_outputs(train_data_tensor)
total_loss_test, recon_loss_test, kl_loss_test, mu_test, logvar_test, z_test= compute_vae_outputs(test_data_tensor)
total_loss_anomaly, recon_loss_anomaly, kl_loss_anomaly, mu_anomaly, logvar_anomaly, z_anomaly= compute_vae_outputs(anomaly_data_tensor)





In [None]:
print(total_loss_train)

In [None]:
print(kl_loss_train)

In [None]:
print(recon_loss_train)

In [None]:
mu_train[0:5]

In [None]:
import plotly.express as px
import pandas as pd

df = pd.DataFrame({
    'Score': list(total_loss_train) + list(total_loss_test) + list(total_loss_anomaly),
    'Set': (['Train'] * len(total_loss_train)) +
           (['Test'] * len(total_loss_test)) +
           (['Anomaly'] * len(total_loss_anomaly))
})

fig = px.box(df, x='Set', y='Score', title='Total loss', template='plotly_dark')
fig.show()


In [None]:
import plotly.express as px
import pandas as pd

df = pd.DataFrame({
    'Score': list(recon_loss_train) + list(recon_loss_test) + list(recon_loss_anomaly),
    'Set': (['Train'] * len(total_loss_train)) +
           (['Test'] * len(total_loss_test)) +
           (['Anomaly'] * len(total_loss_anomaly))
})

fig = px.box(df, x='Set', y='Score', title='Reconstruction loss', template='plotly_dark')
fig.show()


In [None]:
import plotly.express as px
import pandas as pd

df = pd.DataFrame({
    'Score': list(kl_loss_train) + list(kl_loss_test) + list(kl_loss_anomaly),
    'Set': (['Train'] * len(total_loss_train)) +
           (['Test'] * len(total_loss_test)) +
           (['Anomaly'] * len(total_loss_anomaly))
})

fig = px.box(df, x='Set', y='Score', title='KL Divergence', template='plotly_dark')
fig.show()


In [None]:
def downsample(arr, n=5000):
    idx = np.random.choice(len(arr), size=min(n, len(arr)), replace=False)
    return arr[idx]

# downsampling the data
latent_train = downsample(mu_train)
latent_test = downsample(mu_test)
latent_anomaly = downsample(mu_anomaly)

X = np.vstack([latent_train, latent_test, latent_anomaly])
labels = (['train'] * len(latent_train) +
          ['test'] * len(latent_test) +
          ['anomaly'] * len(latent_anomaly))


In [None]:
from sklearn.manifold import TSNE

tsne = TSNE(n_components=2, perplexity=30, random_state=5, n_jobs=-1)
X_2d = tsne.fit_transform(X)
import plotly.express as px
import pandas as pd

df = pd.DataFrame(X_2d, columns=['x', 'y'])
df['label'] = labels

fig = px.scatter(df, x='x', y='y', color='label',
                 title='t-SNE Visualization of Encoder Latent Space Z - mean')
fig.show()

tsne = TSNE(n_components=3, perplexity=30, random_state=5, n_jobs=-1)
X_3d = tsne.fit_transform(X)

df_3d = pd.DataFrame(X_3d, columns=['x', 'y', 'z'])
df_3d['label'] = labels

fig_3d = px.scatter_3d(df_3d, x='x', y='y', z='z', color='label',
                       title='3D t-SNE Visualization of Encoder Latent Space Z - mean')
fig_3d.update_traces(marker=dict(size=3))
fig_3d.show()


In [None]:
# downsampling the data
latent_train = downsample(logvar_train)
latent_test = downsample(logvar_test)
latent_anomaly = downsample(logvar_anomaly)

X = np.vstack([latent_train, latent_test, latent_anomaly])
labels = (['train'] * len(latent_train) +
          ['test'] * len(latent_test) +
          ['anomaly'] * len(latent_anomaly))

tsne = TSNE(n_components=2, perplexity=30, random_state=5,n_jobs=-1)
X_2d = tsne.fit_transform(X)

df = pd.DataFrame(X_2d, columns=['x', 'y'])
df['label'] = labels

fig = px.scatter(df, x='x', y='y', color='label',
                 title='t-SNE Visualization of Encoder Latent Space Z-log-var')
fig.show()

tsne = TSNE(n_components=3, perplexity=30, random_state=5, n_jobs=-1)
X_3d = tsne.fit_transform(X)

df_3d = pd.DataFrame(X_3d, columns=['x', 'y', 'z'])
df_3d['label'] = labels

fig_3d = px.scatter_3d(df_3d, x='x', y='y', z='z', color='label',
                       title='3D t-SNE Visualization of Encoder Latent Space Z-log-var')
fig_3d.update_traces(marker=dict(size=3))
fig_3d.show()





In [None]:
# downsampling the data
latent_train = downsample(z_train)
latent_test = downsample(z_test)
latent_anomaly = downsample(z_anomaly)

X = np.vstack([latent_train, latent_test, latent_anomaly])
labels = (['train'] * len(latent_train) +
          ['test'] * len(latent_test) +
          ['anomaly'] * len(latent_anomaly))

tsne = TSNE(n_components=2, perplexity=30, random_state=5, n_jobs=-1)
X_2d = tsne.fit_transform(X)

df = pd.DataFrame(X_2d, columns=['x', 'y'])
df['label'] = labels

fig = px.scatter(df, x='x', y='y', color='label',
                 title='t-SNE Visualization of Encoder Latent Space Z')
fig.show()

tsne = TSNE(n_components=3, perplexity=30, random_state=5, n_jobs=-1)
X_3d = tsne.fit_transform(X)

df_3d = pd.DataFrame(X_3d, columns=['x', 'y', 'z'])
df_3d['label'] = labels

fig_3d = px.scatter_3d(df_3d, x='x', y='y', z='z', color='label',
                       title='3D t-SNE Visualization of Encoder Latent Space Z')
fig_3d.update_traces(marker=dict(size=3))
fig_3d.show()





In [None]:
import umap
latent_train = downsample(mu_train)
latent_test = downsample(mu_test)
latent_anomaly = downsample(mu_anomaly)

X = np.vstack([latent_train, latent_test, latent_anomaly])
labels = (['train'] * len(latent_train) +
          ['test'] * len(latent_test) +
          ['anomaly'] * len(latent_anomaly))

umap_2d = umap.UMAP(n_components=2, n_jobs= -1)
X_2d = umap_2d.fit_transform(X)

df_2d = pd.DataFrame(X_2d, columns=['x', 'y'])
df_2d['label'] = labels

# 2D Plot
fig_2d = px.scatter(df_2d, x='x', y='y', color='label',
                    title="UMAP 2D Latent Visualization of Encoder Latent Space Z mean",
                    opacity=0.7)
fig_2d.show()

# Step 3: UMAP to 3D
umap_3d = umap.UMAP(n_components=3,n_jobs= -1)
X_3d = umap_3d.fit_transform(X)

df_3d = pd.DataFrame(X_3d, columns=['x', 'y', 'z'])
df_3d['label'] = labels

# 3D Plot
fig_3d = px.scatter_3d(df_3d, x='x', y='y', z='z', color='label',
                       title="UMAP 3D Latent Visualization of Encoder Latent Space Z mean",
                       opacity=0.7)
fig_3d.update_traces(marker=dict(size=3))
fig_3d.show()

In [None]:
import umap
latent_train = mu_train
latent_test = mu_test
latent_anomaly = mu_anomaly

X = np.vstack([latent_train, latent_test, latent_anomaly])
labels = (['train'] * len(latent_train) +
          ['test'] * len(latent_test) +
          ['anomaly'] * len(latent_anomaly))

umap_2d = umap.UMAP(n_components=2, n_jobs= -1)
X_2d = umap_2d.fit_transform(X)

df_2d = pd.DataFrame(X_2d, columns=['x', 'y'])
df_2d['label'] = labels

# 2D Plot
fig_2d = px.scatter(df_2d, x='x', y='y', color='label',
                    title="UMAP 2D Latent Visualization of Encoder Latent Space Z mean",
                    opacity=0.7)
fig_2d.show()

# Step 3: UMAP to 3D
umap_3d = umap.UMAP(n_components=3,n_jobs= -1)
X_3d = umap_3d.fit_transform(X)

df_3d = pd.DataFrame(X_3d, columns=['x', 'y', 'z'])
df_3d['label'] = labels

# 3D Plot
fig_3d = px.scatter_3d(df_3d, x='x', y='y', z='z', color='label',
                       title="UMAP 3D Latent Visualization of Encoder Latent Space Z mean",
                       opacity=0.7)
fig_3d.update_traces(marker=dict(size=3))
fig_3d.show()

In [None]:
latent_train = mu_train
latent_test = mu_test
latent_anomaly = mu_anomaly

X = np.vstack([latent_train, latent_test, latent_anomaly])
labels = (['train'] * len(latent_train) +
          ['test'] * len(latent_test) +
          ['anomaly'] * len(latent_anomaly))

from sklearn.manifold import TSNE

tsne = TSNE(n_components=2, perplexity=30, random_state=5, n_jobs=-1)
X_2d = tsne.fit_transform(X)
import plotly.express as px
import pandas as pd

df = pd.DataFrame(X_2d, columns=['x', 'y'])
df['label'] = labels

fig = px.scatter(df, x='x', y='y', color='label',
                 title='t-SNE Visualization of Encoder Latent Space Z - mean')
fig.show()

tsne = TSNE(n_components=3, perplexity=30, random_state=5, n_jobs=-1)
X_3d = tsne.fit_transform(X)

df_3d = pd.DataFrame(X_3d, columns=['x', 'y', 'z'])
df_3d['label'] = labels

fig_3d = px.scatter_3d(df_3d, x='x', y='y', z='z', color='label',
                       title='3D t-SNE Visualization of Encoder Latent Space Z - mean')
fig_3d.update_traces(marker=dict(size=3))
fig_3d.show()

In [None]:
import numpy as np
from sklearn.metrics import (
    roc_curve, precision_recall_curve, auc,
    accuracy_score, f1_score, precision_score, recall_score
)
import plotly.graph_objects as go

# Anomaly scores
scores_train_2 = np.array(kl_loss_train)
scores_test_2 = np.array(kl_loss_test)
scores_anomaly_2 = np.array(kl_loss_anomaly)

# Test data is normal (label 0), anomaly is (label 1)
y_true = np.concatenate([np.zeros_like(scores_test_2), np.ones_like(scores_anomaly_2)])
y_scores = np.concatenate([scores_test_2, scores_anomaly_2])

# === ROC Curve ===
fpr, tpr, _ = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

roc_fig = go.Figure()
roc_fig.add_trace(go.Scatter(x=fpr, y=tpr, mode='lines', name='ROC Curve'))
roc_fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], mode='lines', line=dict(dash='dash')))
roc_fig.update_layout(title=f'ROC Curve (AUC = {roc_auc:.4f})',
                      xaxis_title='False Positive Rate',
                      yaxis_title='True Positive Rate')

# === PR Curve ===
precision, recall, _ = precision_recall_curve(y_true, y_scores)
pr_auc = auc(recall, precision)

pr_fig = go.Figure()
pr_fig.add_trace(go.Scatter(x=recall, y=precision, mode='lines', name='PR Curve'))
pr_fig.update_layout(title=f'Precision-Recall Curve (AUC = {pr_auc:.4f})',
                     xaxis_title='Recall',
                     yaxis_title='Precision')

# === Best threshold based on F1 Score ===
thresholds = np.linspace(0, 1, 200)
best_f1 = 0
best_threshold = 0

for t in thresholds:
    y_pred = (y_scores >= t).astype(int)
    f1 = f1_score(y_true, y_pred)
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = t

# Final metrics
final_preds = (y_scores >= best_threshold).astype(int)
final_acc = accuracy_score(y_true, final_preds)
final_prec = precision_score(y_true, final_preds)
final_rec = recall_score(y_true, final_preds)

# Show plots
roc_fig.show()
pr_fig.show()

# Print best metrics
print(f"Best Threshold: {best_threshold:.4f}")
print(f"Accuracy: {final_acc:.4f}")
print(f"Precision: {final_prec:.4f}")
print(f"Recall: {final_rec:.4f}")
print(f"F1 Score: {best_f1:.4f}")


In [None]:
from sklearn.mixture import GaussianMixture
import seaborn as sns
gmm = GaussianMixture(n_components=5, covariance_type='full').fit(mu_train)
train_gmm_scores = gmm.score_samples(mu_train)
test_gmm_scores = gmm.score_samples(mu_test)
ano_gmm_scores = gmm.score_samples(mu_anomaly)

sns.boxplot(data=[train_gmm_scores, test_gmm_scores, ano_gmm_scores])

#Setting labels
plt.xlabel('Class')
plt.title('Gaussian Mixture scores')
plt.xticks([0, 1, 2], ['Train', 'Test', 'Anomaly'])  # Label x-ticks with class names names

# Show the plot
plt.show()

In [None]:
import numpy as np

# Compute the mean vector
mean_vector = np.mean(mu_train, axis=0)
print(mean_vector.shape)

# Compute the covariance matrix
covariance_matrix = np.cov(mu_train, rowvar=False)

epsilon = 1e-10
regularized_cov_matrix = covariance_matrix + epsilon * np.eye(covariance_matrix.shape[0])

from scipy.spatial.distance import mahalanobis
from scipy.linalg import inv

# Inverse of the covariance matrix
inv_cov_matrix = inv(regularized_cov_matrix)

# Function to calculate Mahalanobis distance
def compute_mahalanobis_distance(data_point, mean_vector, inv_cov_matrix):
    diff = data_point - mean_vector
    distance = np.sqrt(diff.T @ inv_cov_matrix @ diff)
    return distance

# Compute Mahalanobis distances for all data points

train_mahalanobis_distances = np.array([compute_mahalanobis_distance(point, mean_vector, inv_cov_matrix) for point in mu_train])
test_mahalanobis_distances = np.array([compute_mahalanobis_distance(point, mean_vector, inv_cov_matrix) for point in mu_test])
ano_mahalanobis_distances = np.array([compute_mahalanobis_distance(point, mean_vector, inv_cov_matrix) for point in mu_anomaly])



import seaborn as sns
plt.figure(figsize=(8, 6))

# Create the box plot
sns.boxplot(data=[train_mahalanobis_distances, test_mahalanobis_distances, ano_mahalanobis_distances])

# Setting labels
#plt.xlabel('Class')
#plt.ylabel('cosine_similarity Scores')
plt.title('Mahalanobis distance to mean latent vector of normal data')
plt.xticks([0, 1, 2], ['Train', 'Test', 'Anomaly'])

# Show the plot
plt.show()







In [None]:
import numpy as np
from sklearn.svm import OneClassSVM

from sklearn.neighbors import LocalOutlierFactor



# One-Class SVM
ocsvm = OneClassSVM()
ocsvm.fit(mu_train)

train_svm_scores = ocsvm.score_samples(mu_train)
test_svm_scores = ocsvm.score_samples(mu_test)
ano_svm_scores = ocsvm.score_samples(mu_anomaly)

plt.figure(figsize=(8, 6))

# Create the box plot
sns.boxplot(data=[train_svm_scores, test_svm_scores, ano_svm_scores])

# Setting labels
#plt.xlabel('Class')
#plt.ylabel('cosine_similarity Scores')
plt.title('One-Class SVM scores')
plt.xticks([0, 1, 2], ['Train', 'Test', 'Anomaly'])




In [None]:
all_mahalanobis = np.concatenate([test_mahalanobis_distances, ano_mahalanobis_distances])
all_gmm = np.concatenate([test_gmm_scores, ano_gmm_scores])
labels = np.concatenate([np.zeros_like(test_mahalanobis_distances), np.ones_like(ano_mahalanobis_distances)])

In [None]:
from sklearn.metrics import f1_score, precision_recall_curve, roc_curve, auc

def best_f1_threshold(scores, labels, greater_is_anomaly=True):
    if not greater_is_anomaly:
        scores = -scores  # 
    precisions, recalls, thresholds = precision_recall_curve(labels, scores)
    f1s = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
    best_idx = np.argmax(f1s)
    return thresholds[best_idx], f1s[best_idx]

# For Mahalanobis (higher distance = more anomalous)
mahal_thresh, mahal_f1 = best_f1_threshold(all_mahalanobis, labels, greater_is_anomaly=True)

# For GMM (typically, lower score = more anomalous)
gmm_thresh, gmm_f1 = best_f1_threshold(all_gmm, labels, greater_is_anomaly=False)


In [None]:
import matplotlib.pyplot as plt

# ROC Curve
fpr_mahal, tpr_mahal, _ = roc_curve(labels, all_mahalanobis)
roc_auc_mahal = auc(fpr_mahal, tpr_mahal)

fpr_gmm, tpr_gmm, _ = roc_curve(labels, -all_gmm)  # 
roc_auc_gmm = auc(fpr_gmm, tpr_gmm)

plt.figure(figsize=(8,6))
plt.plot(fpr_mahal, tpr_mahal, label=f"Mahalanobis (AUC = {roc_auc_mahal:.4f})")
plt.plot(fpr_gmm, tpr_gmm, label=f"GMM (AUC = {roc_auc_gmm:.4f})")
plt.plot([0,1], [0,1], 'k--')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()

# Precision-Recall Curve
prec_mahal, rec_mahal, _ = precision_recall_curve(labels, all_mahalanobis)
prc_auc_mahal = auc(rec_mahal, prec_mahal)

prec_gmm, rec_gmm, _ = precision_recall_curve(labels, -all_gmm)
prc_auc_gmm = auc(rec_gmm, prec_gmm)

plt.figure(figsize=(8,6))
plt.plot(rec_mahal, prec_mahal, label=f"Mahalanobis (AUC = {prc_auc_mahal:.4f})")
plt.plot(rec_gmm, prec_gmm, label=f"GMM (AUC = {prc_auc_gmm:.4f})")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.legend()
plt.show()


In this code beat is applied on the KL divergence part of the ELBO loss

GMM and mahanolobis distance is trained on the mean latent but the log-variance latent space is also smooth and sperates the class