In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, TensorDataset, Subset

import math
import numpy as np
import pandas as pd
from scipy.linalg import eigh
import matplotlib.pyplot as plt
from scipy.spatial import distance

  Referenced from: <CC4BC91F-8B6A-3F9A-B9EB-A2B9D578E202> /Users/DELL/opt/anaconda3/envs/data-depth/lib/python3.9/site-packages/torchvision/image.so
  warn(


In [2]:
from depth.multivariate import *
from numpy.random import RandomState
from mpl_toolkits.mplot3d import Axes3D
from scipy.spatial import ConvexHull, Delaunay
from mpl_toolkits.mplot3d.art3d import Poly3DCollection

In [None]:
# VAE Model Definition
class VAE(nn.Module):
    def __init__(self, x_dim, hidden_dim, z_dim=10):
        super(VAE, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(x_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, z_dim*2)
        )
        self.decoder = nn.Sequential(
            nn.Linear(z_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, x_dim),
            nn.Sigmoid()
        )
        

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        h = self.encoder(x)
        mu, logvar = torch.chunk(h, 2, dim=1)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z), z, mu, logvar

In [None]:
# Loss function
def loss_function(recon_x, x, mu, logvar):
    BCE = nn.functional.binary_cross_entropy(recon_x, x, reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

# Train VAE
def train_model(model, dataloader, learning_rate, num_epochs, max_batches=None):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    model.train()
    for epoch in range(num_epochs):
        train_loss = 0
        for bno, batch in enumerate(dataloader):
            if max_batches != None and bno > max_batches :
                break
            x, _ = batch
            x = x.view(x.size(0), -1)
            optimizer.zero_grad()
            recon_x, z, mu, logvar = model(x)
            loss = loss_function(recon_x, x, mu, logvar)
            loss.backward()
            train_loss += loss.item()
            optimizer.step()
        print(f'Epoch {epoch + 1}, Loss: {train_loss / len(dataloader.dataset)}')
    return model

In [None]:
# Load Fashion-MNIST dataset
def load_data(normal_class=0):
    transform = transforms.Compose([transforms.ToTensor(), transforms.Lambda(lambda x: x.view(-1))])
    fashion_mnist_train = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=False)
    fashion_mnist_test = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=False)
    
    # Filter out only the normal class
    train_indices = [i for i, (_, label) in enumerate(fashion_mnist_train) if label == normal_class]
    normal_train = Subset(fashion_mnist_train, train_indices)
    
    return normal_train, fashion_mnist_test

In [None]:
# Compute Mahalanobis depth
def compute_mahalanobis_depth(latent_space):
    mean = np.mean(latent_space, axis=0)
    cov = np.cov(latent_space, rowvar=False)
    inv_cov = np.linalg.inv(cov)
    depth = np.array([distance.mahalanobis(x, mean, inv_cov) for x in latent_space])
    return depth

In [None]:
# # Main function
# def main():
normal_train, fashion_mnist_test = load_data(normal_class=0)  # Class 0: T-shirt/top

dataloader = DataLoader(normal_train, batch_size=128, shuffle=False)

# Define VAE model
x_dim = 28 * 28
hidden_dim = 256
latent_dim = 50
model = VAE(x_dim=x_dim, hidden_dim=hidden_dim, z_dim=latent_dim)

# Train VAE
MAX_STEPS = 300
BATCH_SIZE = 150
trained_model = train_model(model, dataloader, learning_rate=1e-4, num_epochs=10, max_batches=MAX_STEPS)

In [None]:
# Obtain latent representations
latent_space = []
labels = []
depths = np.array([])
trained_model.eval()
sample_no = 1
ind = 0
with torch.no_grad():
    for batch in fashion_mnist_test:
        if sample_no % BATCH_SIZE == 0: 
            bno = sample_no % BATCH_SIZE
            print("Batch number", bno, " slicing from", ind, "to", ind+150)
            batch_latent_space = np.array(latent_space[ind:ind+150])
            batch_labels = np.array(labels[ind:ind+150])
            batch_depths = halfspace(batch_labels, batch_latent_space)
            print("the function gave me", len(batch_latent_space))
            depths = np.concatenate((depths, batch_depths), axis = 0)
            ind += 150
        if sample_no >= MAX_STEPS : 
            break
        # get latent shape
        x, y = batch
        x = x.view(-1, x.size(0))
        _, z, _, _ = trained_model(x)
        latent_space.append(z.cpu().numpy())
        # y-labels
        labels.append(torch.tensor(y).cpu().numpy())
        sample_no = len(latent_space)
    
latent_space = np.concatenate(latent_space, axis=0)
labels = np.concatenate([labels], axis=0)
# depths = np.concatenate([batch_depths], axis=0)

print(latent_space.shape)
print(labels.shape)
print(len(depths))

# if __name__ == "__main__":
#     main()

In [None]:
# Set a threshold for anomalies (e.g., top 5% deepest points)
print(len(depths))
threshold = np.percentile(depths, 5)
print(threshold)
anomalies = depths <= threshold

# Visualize latent space with anomalies
plt.scatter(latent_space[:, 0], latent_space[:, 1], c='blue', label='Normal')
plt.scatter(latent_space[anomalies, 0], latent_space[anomalies, 1], c='red', label='Anomalies')
plt.title('Latent Space')
plt.xlabel('Latent Dimension 1')
plt.ylabel('Latent Dimension 2')
plt.legend()
plt.show()

In [None]:
anom_depths = depths[anomalies]
anom_depths

In [None]:
print(np.min(depths), np.max(depths))
print(np.std(depths))

In [None]:
# Visualize original and reconstructed anomalies
anomaly_indices = np.where(anomalies)[0]
for i in anomaly_indices:  # Show up to 10 anomalies
    original = fashion_mnist_test[i][0].view(28, 28).numpy()
    with torch.no_grad():
        org = fashion_mnist_test[i][0]
        reconstructed, _, _, _ = trained_model(org.view(-1, org.size(0)))
    reconstructed = reconstructed.view(28, 28).numpy()
    
    fig, axes = plt.subplots(1, 2)
    axes[0].imshow(original, cmap='gray')
    axes[0].set_title('Original')
    axes[1].imshow(reconstructed, cmap='gray')
    axes[1].set_title('Reconstructed')
    plt.show()