In [1]:
# # Code to download file into Colaboratory:
# !pip install -U -q PyDrive > /dev/null
# !pip install torchmetrics > /dev/null
# from pydrive.auth import GoogleAuth
# import torch
# dev = "cuda" if torch.cuda.is_available() else "cpu"
# from pydrive.drive import GoogleDrive
# from google.colab import auth
# from oauth2client.client import GoogleCredentials
# # Authenticate and create the PyDrive client.
# auth.authenticate_user()
# gauth = GoogleAuth()
# gauth.credentials = GoogleCredentials.get_application_default()
# drive = GoogleDrive(gauth)

# #get the data from the drive
# def get_feature(model = "ViT-L/14@336px"):
#   id = '11yXddz5j-IDcH77wkMF7dW2JctRv7YN6'
#   size = "B"

#   if model == 'ViT-L/14@336px':
#     id = '1P6CgrgiIACjnhtHsUwdY8qjZ-E3Vx_0x'
#     size = "L336"
#   elif model == 'ViT-L/14':
#     id = '1DG4J-YF57ZsfXTzwg5EFfkzUmUFfr82h'
#     size = "L"
#   elif model == 'ViT-B/32':
#     id = '11yXddz5j-IDcH77wkMF7dW2JctRv7YN6'
#     size = "B"

#   downloaded = drive.CreateFile({'id':id}) 
#   downloaded.GetContentFile('clip_features.zip')

#   #get the data from the drive
#   id = '1b-ujWaLM_jOzlRMbXVb9T-3oEibKQW1r'
#   downloaded = drive.CreateFile({'id':id}) 
#   downloaded.GetContentFile('label_onehot_tensor.pt')

#   !unzip clip_features.zip > /dev/null

#   test_image_features = torch.load(f"test_image_features_vit{size}.pt")
#   test_text_feature = torch.load(f"test_text_feature_vit{size}.pt")
#   all_image_features = torch.load(f"all_image_features_vit{size}.pt")
#   all_text_feature = torch.load(f"all_text_feature_vit{size}.pt")
#   label_onehot_tensor = torch.load(f"label_onehot_tensor.pt")

#   return all_image_features, all_text_feature, test_image_features, test_text_feature, label_onehot_tensor

In [2]:
# all_image_features, all_text_feature, test_image_features, test_text_feature, label_onehot_tensor = get_feature()

In [3]:
# import torch
# dev = "cuda" if torch.cuda.is_available() else "cpu"
# test_image_features = torch.load("test_image_features_vitL.pt", map_location = torch.device(dev))
# test_text_feature = torch.load("test_text_feature_vitL.pt", map_location = torch.device(dev))
# all_image_features = torch.load("all_image_features_vitL.pt", map_location = torch.device(dev))
# all_text_feature = torch.load("all_text_feature_vitL.pt", map_location = torch.device(dev))
# label_onehot_tensor = torch.load("label_onehot_tensor.pt", map_location = torch.device(dev))

In [4]:
import torch
dev = "cuda" if torch.cuda.is_available() else "cpu"
test_image_features = torch.load("../features/test_image_features_vitL.pt", map_location = torch.device(dev))
test_text_feature = torch.load("../features/test_text_feature_vitL.pt", map_location = torch.device(dev))
all_image_features = torch.load("../features/all_image_features_vitL.pt", map_location = torch.device(dev))
all_text_feature = torch.load("../features/all_text_feature_vitL.pt", map_location = torch.device(dev))
label_onehot_tensor = torch.load("../features/label_onehot_tensor.pt", map_location = torch.device(dev))

In [5]:
test_image_features.shape

torch.Size([10000, 768])

In [6]:
def add_weight_decay(model, weight_decay=1e-4, skip_list=()):
    decay = []
    no_decay = []
    for name, param in model.named_parameters():
        if not param.requires_grad:
            continue  # frozen weights
        if len(param.shape) == 1 or name.endswith(".bias") or name in skip_list:
            no_decay.append(param)
        else:
            decay.append(param)
    return [
        {'params': no_decay, 'weight_decay': 0.},
        {'params': decay, 'weight_decay': weight_decay}]

In [7]:
import torch
import torch.nn as nn


class AsymmetricLoss(nn.Module):
    def __init__(self, gamma_neg=4, gamma_pos=1, clip=0.05, eps=1e-8, disable_torch_grad_focal_loss=True):
        super(AsymmetricLoss, self).__init__()

        self.gamma_neg = gamma_neg
        self.gamma_pos = gamma_pos
        self.clip = clip
        self.disable_torch_grad_focal_loss = disable_torch_grad_focal_loss
        self.eps = eps

    def forward(self, x, y):
        """"
        Parameters
        ----------
        x: input logits
        y: targets (multi-label binarized vector)
        """

        # Calculating Probabilities
        
        xs_pos = x
        xs_neg = 1 - x

        # Asymmetric Clipping
        if self.clip is not None and self.clip > 0:
            xs_neg = (xs_neg + self.clip).clamp(max=1)

        # Basic CE calculation
        los_pos = y * torch.log(xs_pos.clamp(min=self.eps))
        los_neg = (1 - y) * torch.log(xs_neg.clamp(min=self.eps))
        loss = los_pos + los_neg

        # Asymmetric Focusing
        if self.gamma_neg > 0 or self.gamma_pos > 0:
            if self.disable_torch_grad_focal_loss:
                torch.set_grad_enabled(False)
            pt0 = xs_pos * y
            pt1 = xs_neg * (1 - y)  # pt = p if t > 0 else 1-p
            pt = pt0 + pt1
            one_sided_gamma = self.gamma_pos * y + self.gamma_neg * (1 - y)
            one_sided_w = torch.pow(1 - pt, one_sided_gamma)
            if self.disable_torch_grad_focal_loss:
                torch.set_grad_enabled(True)
            loss *= one_sided_w

        return -loss.sum()


class AsymmetricLossOptimized(nn.Module):
    ''' Notice - optimized version, minimizes memory allocation and gpu uploading,
    favors inplace operations'''

    def __init__(self, gamma_neg=4, gamma_pos=1, clip=0.05, eps=1e-8, disable_torch_grad_focal_loss=False):
        super(AsymmetricLossOptimized, self).__init__()

        self.gamma_neg = gamma_neg
        self.gamma_pos = gamma_pos
        self.clip = clip
        self.disable_torch_grad_focal_loss = disable_torch_grad_focal_loss
        self.eps = eps
        # prevent memory allocation and gpu uploading every iteration, and encourages inplace operations
        self.targets = self.anti_targets = self.xs_pos = self.xs_neg = self.asymmetric_w = self.loss = None

    def forward(self, x, y):
        """"
        Parameters
        ----------
        x: input logits
        y: targets (multi-label binarized vector)
        """

        self.targets = y
        self.anti_targets = 1 - y

        # Calculating Probabilities
        self.xs_pos = torch.sigmoid(x)
        self.xs_neg = 1.0 - self.xs_pos

        # Asymmetric Clipping
        if self.clip is not None and self.clip > 0:
            self.xs_neg.add_(self.clip).clamp_(max=1)

        # Basic CE calculation
        self.loss = self.targets * torch.log(self.xs_pos.clamp(min=self.eps))
        self.loss.add_(self.anti_targets * torch.log(self.xs_neg.clamp(min=self.eps)))
        
        # Asymmetric Focusing
        if self.gamma_neg > 0 or self.gamma_pos > 0:
            if self.disable_torch_grad_focal_loss:
                torch.set_grad_enabled(False)
            self.xs_pos = self.xs_pos * self.targets
            self.xs_neg = self.xs_neg * self.anti_targets
            self.asymmetric_w = torch.pow(1 - self.xs_pos - self.xs_neg,
                                          self.gamma_pos * self.targets + self.gamma_neg * self.anti_targets)
            if self.disable_torch_grad_focal_loss:
                torch.set_grad_enabled(True)
            self.loss *= self.asymmetric_w

        return -self.loss.sum()

In [8]:
import numpy as np
from tqdm import tqdm
import torch.nn.functional as F
from torchmetrics import F1Score
from torch import optim
from torch.cuda.amp import GradScaler, autocast

from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
def Trainer(model, Data, epochs, epoch_step_1, epoch_step_2, lr = 1e-3):
    torch.manual_seed(5329)
    train_data = DataLoader(TensorDataset(Data[:25000], label_onehot_tensor[:25000]), batch_size=25000, shuffle = True)
    val_data = DataLoader(TensorDataset(Data[25000:], label_onehot_tensor[25000:].to(torch.int32)), batch_size=5000, shuffle = False)
    
    # Change here to switch to the best setting
    # train_data = DataLoader(TensorDataset(Data, label_onehot_tensor), batch_size=30000, shuffle = True)
    
    model = model.to(dev)
 
    weight_decay = 2e-4
    criterion = AsymmetricLoss(gamma_neg=0, gamma_pos=0, clip=0, disable_torch_grad_focal_loss=True)
    parameters = add_weight_decay(model, weight_decay)
    opti = optim.Adam(params=parameters, lr=lr, weight_decay=0)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(opti, milestones=[epoch_step_1,epoch_step_2], gamma = 0.1)
    f1 = F1Score(task="multilabel", num_labels = 18).to(dev)

    epoch = epochs
    train_loss = []
    val_loss = []
    f1_list = []
    scaler = GradScaler()
    
    for epoch in tqdm(range(epoch), colour = 'GREEN'):
        for data, label in train_data:   
            data, label = data.to(dev), label.to(dev)

            with autocast():  # mixed precision
                output = model(data).float() 

            loss = criterion(output, label)
            model.zero_grad()
            
            scaler.scale(loss).backward()
            scaler.step(opti)
            scaler.update()
            
        train_loss.append(loss.item())
        
        with torch.autograd.no_grad():
            for data_val, label_val in val_data:
                data_val, label_val = data_val.to(dev), label_val.to(dev)
                predict = model(data_val)
                f1_score = f1(predict, label_val)
                v_loss = criterion(predict, label_val)
            val_loss.append(v_loss.item())
            f1_list.append(f1_score.item())
        
        # Comment the code below if you want to switch to the best settings (i.e., no validation data)
        if epoch % 10 == 0:
            print('Validation F1 in epoch{} : {:.4f}'.format(epoch, f1_score.item()))
            print('Validation loss in epoch{} : {:.4f}'.format(epoch, v_loss.item()))
    
    return model, train_loss, val_loss, f1_list

## Experiments on Activation Functions

In [9]:
import torch.nn as nn
import torch.nn.functional as F
class FEATURE_EXTRACTOR(nn.Module):
    def __init__(self, act:str='gelu'):
        super().__init__()
        self.act = act
        self.fc1 = nn.Linear(768, 2048)
        self.fc2 = nn.Linear(2048, 512)
        self.fc3 = nn.Linear(512, 18)
        self.dropout = nn.Dropout(p = 0.5)

    def forward(self, inputs):
        if self.act == 'gelu':
            tensor = F.gelu(self.fc1(inputs))
            tensor = self.dropout(tensor)
            tensor = F.gelu(self.fc2(tensor))
        elif self.act == 'relu':
            tensor = F.relu(self.fc1(inputs))
            tensor = self.dropout(tensor)
            tensor = F.relu(self.fc2(tensor))
        elif self.act == 'leaky_relu':
            tensor = F.leaky_relu(self.fc1(inputs))
            tensor = self.dropout(tensor)
            tensor = F.leaky_relu(self.fc2(tensor))
        tensor = self.dropout(tensor)
        tensor = torch.sigmoid(self.fc3(tensor))
        return tensor

class DECISION_MODEL(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(18, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 18)

    def forward(self, inputs):
        tensor = F.gelu(self.fc1(inputs))
        tensor = F.gelu(self.fc2(tensor))
        tensor = torch.sigmoid(self.fc3(tensor))
        return tensor

In [None]:
import os
import numpy as np
activation_function = ['gelu','relu','leaky_relu']
mean_list = []
std_list = []
val_loss = []
train_loss = []
for i in range(len(activation_function)):
    f1_list = []
    for j in range(3):
        Net, train_image_loss, val_image_loss, f1_image_list = Trainer(FEATURE_EXTRACTOR(act=activation_function[i]), 
                                                                       all_image_features, 300, 200, 250)
        Net.eval()
        with torch.autograd.no_grad():
            img_train = Net(all_image_features.to(dev))
            img_test = Net(test_image_features.to(dev))

        Net, train_text_loss, val_text_loss, f1_text_list = Trainer(FEATURE_EXTRACTOR(act=activation_function[i]), 
                                                                    all_text_feature, 300, 200, 250)
        Net.eval()
        with torch.autograd.no_grad():
            txt_train = Net(all_text_feature.to(dev))
            txt_test = Net(test_text_feature.to(dev))

        sum_train = img_train + txt_train
        sum_test = img_test + txt_test
        Net, train_sum_loss, val_sum_loss, f1_sum_list = Trainer(DECISION_MODEL(), sum_train, 300, 200, 250)
        Net.eval()
        
        f1_list.append(f1_sum_list[-1])
    
    mean_list.append(np.mean(f1_list))
    std_list.append(np.std(f1_list))
    
    train_loss.append(train_sum_loss)
    val_loss.append(val_sum_loss)
    
print("Mean: ", mean_list)
print('Std: ', std_list)

  0%|[32m▎                                                                                 [0m| 1/300 [00:00<04:51,  1.03it/s][0m

Validation F1 in epoch0 : 0.1097
Validation loss in epoch0 : 62664.8281


  4%|[32m██▉                                                                              [0m| 11/300 [00:03<01:19,  3.65it/s][0m

Validation F1 in epoch10 : 0.1108
Validation loss in epoch10 : 62688.5742


  5%|[32m███▊                                                                             [0m| 14/300 [00:04<01:18,  3.66it/s][0m

In [None]:
import matplotlib.pyplot as plt
plt.plot(val_loss[0], label='GeLU')
plt.plot(val_loss[1], label='ReLU')
plt.plot(val_loss[2], label='Leaky ReLU')
plt.xlabel('Training Epoch')
plt.ylabel('Validation Loss')
plt.legend()
#plt.savefig('mlploss.png')
plt.show()