In [None]:
%cd /content/drive/MyDrive/M.Tech CS_2022-23/Project/One Class Learning

In [None]:
import os
import pickle
import numpy as np
import torch
from tqdm import tqdm
from collections import defaultdict
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import scipy.stats
from sklearn.metrics import roc_auc_score
from sklearn.metrics import precision_recall_fscore_support
import pandas as pd

In [None]:
data_path = './data/CIFAR10/'

train_dataset = torchvision.datasets.CIFAR10(data_path, train=True, download=True)
test_dataset  = torchvision.datasets.CIFAR10(data_path, train=False, download=True)

In [None]:
train_transform = transforms.ToTensor()
test_transform = transforms.ToTensor()

In [None]:
def get_class_c(x, y, c):    
  y = np.array(y)
  pos_c = np.argwhere(y == c)
  pos_c = list(pos_c[:, 0])
  class_c_data = [x[i] for i in pos_c]
  return class_c_data, [c]*len(pos_c)

In [None]:
class DatasetMaker(Dataset):
  def __init__(self, true_label, data, targets, transform_func=None):
    super().__init__()    
    self.data = data
    self.targets = targets
    self.true_label = true_label
    self.transform_func = transform_func

  def __getitem__(self, idx):
    img, target = self.data[idx], self.targets[idx]
    if self.transform_func:
      img = self.transform_func(img)
    return img, target == self.true_label

  def __len__(self):
    return len(self.targets)


def prepare_oc_dataset(dataset, class_label):
  # get single class (class_label) data
  data, targets = get_class_c(dataset.data, dataset.targets, class_label)
  return data, targets

In [None]:
batch_size = 256

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
cuda = True if device == 'cuda' else False
if cuda: torch.cuda.set_device(0)

In [None]:
class MLP(nn.Module):
  def __init__(self, num_outputs, num_hiddens):
    super(MLP, self).__init__()
    self.net = nn.Sequential(nn.Flatten(),
                             *[nn.Sequential(nn.LazyLinear(nh, bias=False), nn.LeakyReLU()) for nh in num_hiddens]                             
                             ) 
    self.fc = nn.LazyLinear(num_outputs, bias=False)   
  
  def forward(self, X):
    X = self.net(X)
    return X, self.fc(X)

In [None]:
c = .5
l = 6.5
s = 0.5

In [None]:
def train_epoch(model, dataloader, optimizer):
  model.train()  
  train_loss = []
  
  for inputs, _ in dataloader:
    inputs = inputs.to(device)
    inputs.requires_grad = True
    features, op = model(inputs)

    with torch.no_grad():
      triangular_samples = torch.from_numpy(scipy.stats.triang.rvs(c, l, s, features.shape[0])).to(device)      
          
    triangle = ((torch.linalg.norm(features, dim=1) - triangular_samples) ** 2).mean()    
    loss = triangle
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    train_loss.append(loss.detach().cpu())

  return train_loss

In [None]:
def test(model, dataloader):
  model.train(False)
  model.eval()
  outputs, labels, norms = [], [], []
  for input_batch, label_batch in dataloader:
    input_batch = input_batch.to(device)           
    norm = torch.linalg.norm(model(input_batch)[0], dim=1).detach().cpu()    
    outputs.append(torch.logical_and(norm >= l, norm <= l+s))        
    labels.append(label_batch)
    norms.append(norm)
  return torch.cat(outputs), torch.cat(labels), torch.cat(norms)


def compute_scores(labels, outputs):  
  return precision_recall_fscore_support(labels, outputs)[:-1]

In [None]:
num_epochs = 15

# acc_scores = defaultdict(list)
precision = defaultdict(list)
recall = defaultdict(list)
f1_scores = defaultdict(list)
  
# For each class
for true_label in range(10):
  
  # prepare the data and
  train_data, train_targets = prepare_oc_dataset(train_dataset, true_label)
  train_oc_dataset = DatasetMaker(true_label, train_data, train_targets, train_transform)
  test_oc_dataset = DatasetMaker(true_label, test_dataset.data, test_dataset.targets, test_transform)
  train_loader = DataLoader(train_oc_dataset, batch_size=batch_size, shuffle=True)
  test_loader = DataLoader(test_oc_dataset, batch_size=batch_size, shuffle=False)
  
  # run the experiment five times...
  for experiment in range(5):    

    # instantiate the model
    model = MLP(num_outputs=1, num_hiddens=[900, 300]).to(device)
    # optimizer
    optimizer = torch.optim.Adam(model.parameters())

    # train the model    
    for epoch in range(num_epochs):
      batchwise_losses = train_epoch(model, train_loader, optimizer)
      train_epoch_loss = np.mean(batchwise_losses)      

    # test time
    model_outputs, labels, norms = test(model, test_loader)

    # compute the results
    prec, rec, f1 = compute_scores(labels, model_outputs)

    # store the results
    precision[true_label].append(prec)
    recall[true_label].append(rec)
    f1_scores[true_label].append(f1)    

In [None]:
for label, scores in precision.items():
  print(f'Class: {label}\tMean Accuracy: {np.mean(scores):.4f}')

In [None]:
for label, scores in recall.items():
  print(f'Class: {label}\tMean Accuracy: {np.mean(scores):.4f}')

In [None]:
for label, scores in f1_scores.items():
  print(f'Class: {label}\tMean Accuracy: {np.mean(scores):.4f}')