<a href="https://colab.research.google.com/github/deepeshhada/AGSN/blob/master/AGSNv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import os

import math
import statistics 

import numpy as np
import pandas as pd
import scipy.io as io

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
# choose a dataset from: CUB, SUN, AWA, AWA2, APY
_dataset = "CUB"
data_root = "./drive/My Drive/Colab Data/Deep Learning/datasets/ZSL Datasets/" + _dataset + "/"
model_path = "./drive/My Drive/Colab Data/Deep Learning/saved models/AGSN/" + _dataset + "/"
pretrained = False

In [0]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
batch_size = 64
latent_dim = 1024
embed_dim = 312

In [0]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, features, labels, class_embeddings):
        self.features = features
        self.labels = labels
        self.class_embeddings = class_embeddings
    
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        label = self.labels[index]
        return (self.features[index], label, class_embeddings[label])

In [0]:
# load mat files
res101 = io.loadmat(data_root + "res101.mat")
att_splits = io.loadmat(data_root + "att_splits.mat")

resnet_features = res101['features'].T
class_labels = res101['labels']
class_embeddings = att_splits['att'].T

# print((att_splits['trainval_loc']).reshape(-1))

In [0]:
def generate_splits(loc, shuffle=False):
    indices = att_splits[loc].reshape(-1) - 1
    features = resnet_features[indices]
    labels = class_labels[indices].reshape(-1) - 1

    split = Dataset(
        features=features,
        labels=labels,
        class_embeddings=class_embeddings
    )

    dataloader = torch.utils.data.DataLoader(
        dataset=split,
        batch_size=batch_size,
        shuffle=shuffle
    )

    return split, dataloader


train_set, trainloader = generate_splits(loc='trainval_loc', shuffle=True)
seen_test_set, seen_testloader = generate_splits(loc='test_seen_loc', shuffle=False)
unseen_test_set, unseen_testloader = generate_splits(loc='test_unseen_loc', shuffle=False)

In [0]:
unseen_labels = np.unique(unseen_test_set.labels)
seen_labels = np.unique(train_set.labels)

seen_y = torch.tensor(seen_labels, device=device).long()
seen_cy = torch.tensor(class_embeddings[seen_labels], device=device).float()
unseen_y = torch.tensor(unseen_labels, device=device).long()
unseen_cy = torch.tensor(class_embeddings[unseen_labels], device=device).float()

In [0]:
def normal_initialize(module):
    if isinstance(module, nn.Linear):
        module.weight.data.normal_(0.0, 0.02)
        module.bias.data.normal_(0.0, 0.02)

In [0]:
# the out_features of both the classifier and regressor are hardcoded for now.
# TODO: make the out_features generic.

class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(in_features=2048, out_features=200, bias=True), # Earlier out_features set to 200 but changed to 150
            nn.Softmax(dim=1)
        )

    def weights_init(self):
        for m in self._modules:
            normal_initialize(self._modules[m])

    def loss(self, true, pred):
        return F.cross_entropy(pred, true)

    def forward(self, input):
        return self.model(input)


class Regressor(nn.Module):
    def __init__(self):
        super(Regressor, self).__init__()
        self.model = nn.Linear(in_features=2048, out_features=312, bias=True)
        self.softmax = nn.Softmax(dim=1)

    def weights_init(self):
        for m in self._modules:
            normal_initialize(self._modules[m])

    def loss(self, true, pred):
        pred = F.normalize(input=pred, p=2, dim=1)
        similarity_scores = torch.mm(pred, seen_cy.T) # batch * 150
        loss = F.cross_entropy(similarity_scores, true)

        true_embeddings = torch.Tensor(class_embeddings[torch.Tensor.cpu(true)]).to(device)
        true_similarity = torch.bmm(pred.view(similarity_scores.size(0), 1, -1), true_embeddings.view(similarity_scores.size(0), -1, 1))

        numerator = torch.exp(true_similarity.view(-1))
        denominator = torch.sum(torch.exp(similarity_scores), dim=1)
        loss_temp = -(numerator/denominator).log().mean()

        return loss_temp

    def forward(self, x):
        x = self.model(x)
        return x

In [0]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(in_features=624, out_features=2048, bias=True),
            nn.BatchNorm1d(num_features=2048),
            nn.LeakyReLU(negative_slope=0.01, inplace=False),
            nn.Linear(in_features=2048, out_features=2048),
            nn.ReLU(inplace=False)
        )

    def weights_init(self):
        for m in self._modules:
            normal_initialize(self._modules[m])

    def forward(self, x):
        output = self.model(x)
        return output

In [0]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(in_features=2360, out_features=4096, bias=True),
            nn.BatchNorm1d(num_features=4096),
            nn.LeakyReLU(negative_slope=0.01, inplace=False),
            nn.Linear(in_features=4096, out_features=1),
        )

    def weights_init(self):
        for m in self._modules:
            normal_initialize(self._modules[m])

    def forward(self, x):
        output = self.model(x)
        return output

In [0]:
G = Generator().to(device)
D = Discriminator().to(device)
fc = Classifier().to(device)
fr = Regressor().to(device)

G.weights_init()
D.weights_init()
fc.weights_init()
fr.weights_init()

lr = 0.001
gamma = 0.01
beeta = 0.1
lamda = 10
max_epochs = 2000

G_params = list(G.parameters()) + list(fc.parameters()) + list(fr.parameters())
G_optimizer = optim.Adam(G_params, lr=lr, betas=(0.5, 0.999))
D_optimizer = optim.Adam(D.parameters(), lr = lr, betas=(0.5, 0.999))

In [0]:
for epoch in range(max_epochs):
    for i, data in enumerate(trainloader, 0):
        f, l, e = data
        features, labels, embeddings = f.to(device).float(), l.to(device).long(), e.to(device).float()
        b_size = embeddings.size(0)

        for k in range(2):
            D_optimizer.zero_grad()
            for parameter in D.parameters(): #Weight Clip
                parameter.data.clamp_(-0.01, 0.01)

            # train with real features
            disc_input_real = torch.cat((features, embeddings), dim=1)
            disc_score_real = D(disc_input_real)
            disc_loss_real = torch.mean(disc_score_real)

            noise = torch.randn(b_size, 312, device=device)
            gen_input = torch.cat((noise, embeddings), dim=1)
            fake_features = G(gen_input)
            disc_input_fake = torch.cat((fake_features, embeddings), dim=1)
            disc_score_fake = D(disc_input_fake)
            disc_loss_fake = torch.mean(disc_score_fake)
            
            disc_loss = torch.mean(disc_score_fake - disc_score_real)  # Still have to implement grad penalty
            disc_loss.backward(retain_graph=True)
            D_optimizer.step()

        # Wasserstein Generator loss
        with torch.autograd.set_detect_anomaly(True):
            G_optimizer.zero_grad()
            noise = torch.randn(b_size, embed_dim, device=device)
            gen_input = torch.cat((noise, embeddings), dim=1)
            fake_features = G(gen_input)

            align_cls = fc(fake_features)
            cls_loss = fc.loss(labels, align_cls) # Computing classifier loss

            align_reg = fr(fake_features)
            reg_loss = fr.loss(labels, align_reg) # Computing Regressor loss

            disc_input_fake = torch.cat((fake_features, embeddings), dim=1)
            disc_score_fake = D(disc_input_fake)
            disc_loss_fake = torch.mean(disc_score_fake)
            gen_loss = beeta*(cls_loss + (gamma * reg_loss)) - disc_loss_fake
            gen_loss.backward()
            G_optimizer.step()

    
    if epoch == 0 or (epoch+1) % 200 == 0:
        print("Epochs: %d/%d | Discriminator Loss = %f | Generator Loss: %f" % 
            (epoch+1, max_epochs, disc_loss.item(), gen_loss.item()))
        if os.path.exists(model_path + "Generator"):
            os.remove(model_path + "Generator")
        torch.save(G.state_dict(), model_path + "Generator")

Epochs: 1/2000 | Discriminator Loss = -0.028687 | Generator Loss: 0.500063


In [0]:
# generating features from the unseen classes using trained generator

x_train = torch.tensor(train_set.features, device=device).float()
y_train = torch.tensor(train_set.labels, device=device).long()

k = -1 # current index
for c_y in unseen_cy:
    k += 1
    embed = c_y.repeat(1, 100).view(100, -1)  # 100 X 312
    lab = (unseen_y[k]).repeat(1, 100).view(100) # 100 labels
    rand_noise = torch.randn(100, 312, device=device)  # generate 100 features
    gen_inp = torch.cat((rand_noise, embed), dim=1)
    generated = G(gen_inp)  # 100 X 2048 : 100 features generated 
    x_train = torch.cat((x_train, generated), dim=0)
    y_train = torch.cat((y_train, lab), dim=0)

print(x_train.shape)
print(y_train.shape)
# print(torch.unique(y_train))

In [0]:
class Final_Classifier(nn.Module):
    def __init__(self):
        super(Final_Classifier, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(2048, 2048),
            nn.LeakyReLU(negative_slope=0.01),
            nn.Linear(2048, 200),
            nn.Softmax(dim=1)
        )
    
    def weights_init(self):
        for m in self._modules:
            normal_initialize(self._modules[m])

    # computes 200*200 confusion matrix for relevant classes
    
    def compute_confusion_matrix(self, inputs, classes):
        per_class_acc = 0.0
        nb_classes = 200
        confusion_matrix = torch.zeros(nb_classes, nb_classes)
        with torch.no_grad():
            outputs = self.model(inputs)
            _, preds = torch.max(outputs, 1)
            for t, p in zip(classes.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1
        return(confusion_matrix)

    def compute_per_class_acc(self, x_test, test_label, nclass):
        # acc_per_class = np.zeros(nclass)
        # for i in range(0, nclass):
        #     # idx = (test_label == i)
        #     idx = (test_label == nclass[i])
        #     acc_per_class[i] = np.sum(test_label[idx]==predicted_label[idx]) / np.sum(idx)
        # return np.mean(acc_per_class)

        with torch.no_grad():
            predicted_label = self.model(x_test)

        acc_per_class = np.zeros(nclass.shape)
        for i in range(0, nclass.shape):
            idx = (test_label == nclass[i])
            acc_per_class[i] = np.sum(test_label[idx]==predicted_label[idx]) / np.sum(idx)
        return np.mean(acc_per_class)

    def forward(self, x):
        output = self.model(x)
        return output

In [0]:
softmax_cls = Final_Classifier()
softmax_cls = softmax_cls.to(device)
softmax_cls.weights_init()

num_iters = 100
lr = 0.0009
cls_optimizer = optim.Adam(softmax_cls.parameters(), lr=lr, betas=(0.5,0.999))

for ep in range(num_iters):
    cls_optimizer.zero_grad()
    final_preds = softmax_cls(x_train)
    final_loss = F.cross_entropy(final_preds, y_train)
    final_loss.backward(retain_graph=True)
    cls_optimizer.step()
    
    print("loss after ", ep + 1, "iters ", final_loss.item())

    if ep % 20 == 0:
        if os.path.exists(model_path + "Final_cls"):
            os.remove(model_path + "Final_cls")
        torch.save(G.state_dict(), model_path + "Final_cls")

In [0]:
# Final Test and accuracy computation
# compute confusion matrix for seen and unseen classes separately

x_unseen = torch.tensor(unseen_test_set.features, device=device).float()
y_unseen = torch.tensor(unseen_test_set.labels, device=device).long()
x_seen = torch.tensor(seen_test_set.features, device=device).float()
y_seen = torch.tensor(seen_test_set.labels, device=device).long()

cm_unseen = softmax_cls.compute_confusion_matrix(x_unseen, y_unseen)
cm_seen = softmax_cls.compute_confusion_matrix(x_seen, y_seen)

# compute per class accuracy matrix

acc_mat_us = ((cm_unseen.diag()/cm_unseen.sum(1)))
acc_mat_s = ((cm_seen.diag()/cm_seen.sum(1)))

unseen_acc = []
seen_acc = []

# Remove Nan's from irrelevant classes

for acc in acc_mat_us:
    if not math.isnan(acc):
        unseen_acc.append(acc)

for acc in acc_mat_s:
    if not math.isnan(acc):
        seen_acc.append(acc)

# Compute per class accuracy

per_class_seen = np.mean(seen_acc)
per_class_unseen = np.mean(unseen_acc)
l = [per_class_seen, per_class_unseen]

print("Unseen Class Accuracy:", per_class_unseen*100)
print("Seen Class Accuracy iS:", per_class_seen*100)
# print("Harmonic Mean Accuracy for GZSL:", statistics.harmonic_mean(l)*100)

In [0]:
def compute_per_class_acc(x_test, test_label, nclass):
    x = torch.tensor(x_test, device=device).float()
    with torch.no_grad():
        outputs = softmax_cls(x)
        _, predicted_label = torch.max(outputs, 1)

    print(torch.unique(predicted_label).shape)
    # print(predicted_label.shape)

    acc_per_class = np.zeros(nclass.shape)
    print(test_label.shape)
    # for i, data in enumerate(dataloader, 0):
    #     f, l, e = data
    #     features, labels = f.to(device).float(), l.to(device).long()
    #     pred = softmax_cls(f)

    for i in range(0, nclass.shape[0]):
        idx = (test_label == nclass[i])
        acc_per_class[i] = np.sum(test_label[idx]==predicted_label[idx]) / np.sum(idx)
    return np.mean(acc_per_class)


cm_u = compute_per_class_acc(unseen_test_set.features, unseen_test_set.labels, unseen_labels)
cm_s = compute_per_class_acc(seen_test_set.features, seen_test_set.labels, seen_labels)