### Training a two headed model (using a pretrained backbone). 

In [1]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import random
random.seed(4)
# PyTorch TensorBoard support
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime
import os
from torch import optim, nn, utils, Tensor
from torchvision.transforms import ToTensor
import lightning as L

In [2]:
def get_all_celeba_attributes():
    return ['5_o_Clock_Shadow', 'Arched_Eyebrows', 'Attractive', 'Bags_Under_Eyes', 'Bald', 'Bangs', 'Big_Lips',
            'Big_Nose', 'Black_Hair', 'Blond_Hair', 'Blurry', 'Brown_Hair', 'Bushy_Eyebrows', 'Chubby',
            'Double_Chin', 'Eyeglasses', 'Goatee', 'Gray_Hair', 'Heavy_Makeup', 'High_Cheekbones', 'Male',
            'Mouth_Slightly_Open', 'Mustache', 'Narrow_Eyes', 'No_Beard', 'Oval_Face', 'Pale_Skin',
            'Pointy_Nose', 'Receding_Hairline', 'Rosy_Cheeks', 'Sideburns', 'Smiling', 'Straight_Hair',
            'Wavy_Hair', 'Wearing_Earrings', 'Wearing_Hat', 'Wearing_Lipstick', 'Wearing_Necklace',
            'Wearing_Necktie', 'Young']

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
torch.cuda.is_available()

True

In [5]:
model=models.mobilenet_v3_small(weights='MobileNet_V3_Small_Weights.IMAGENET1K_V1') #this can be changed to resnet50 etc
model.classifier[3]=torch.nn.Linear(1024,2) # Modify the model to have two heads i.e. predict two classes. One head is for the target attribute (in your case this would be for hate speech detected). 
#The other head we put a squared loss on (this is for the protected attribute --- in the text data this would be gender/race etc). 

In [6]:
from torchvision.models import mobilenet_v3_small, MobileNet_V3_Small_Weights

weights = MobileNet_V3_Small_Weights.IMAGENET1K_V1

In [7]:
# Root directory for the dataset
data_root = './data/'
# Spatial size of training images, images are resized to this size.
image_size = 224


celeba_train = datasets.CelebA(data_root,split='train',
                              download=False,
                              transform=transforms.Compose([
                                  transforms.Resize(256),
                                  transforms.RandomCrop(224),
                                  transforms.ToTensor(),
                                  transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                       std=[0.229, 0.224, 0.225])
                              ]))
celeba_val = datasets.CelebA(data_root, split='valid',
                              download=False,
                              transform=transforms.Compose([
                                  transforms.Resize(256),
                                  transforms.CenterCrop(224),
                                  transforms.ToTensor(),
                                  transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                       std=[0.229, 0.224, 0.225])
                              ]))

celeba_test = datasets.CelebA(data_root, split='test',
                              download=False,
                              transform=transforms.Compose([
                                  transforms.Resize(256),
                                  transforms.CenterCrop(224),
                                  transforms.ToTensor(),
                                  transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                       std=[0.229, 0.224, 0.225])
                              ]))


In [8]:
get_all_celeba_attributes().index("Smiling")

31

In [9]:
target_loss = torch.nn.BCEWithLogitsLoss()
attribute_loss = torch.nn.MSELoss()
target_class=9
protected_class=20


scaling_factor=0.5 # factor of 0.5 used by lohaus (this is the weight on the second head --- protected attribute)
 
def total_loss(y,pred):
    "Two headed training loss"
    if y.dim() == 1:
        y = y.unsqueeze(0)
    y = y.type(torch.float32)    
    tl = target_loss(pred[:, 0],y[:, target_class])
    al = attribute_loss(pred[:,1],y[:, protected_class])
    return tl + al*scaling_factor

In [10]:
from typing import Any


from lightning.pytorch.utilities.types import STEP_OUTPUT


class LitTwoHead(L.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.model = model

    def my_logging(self,loss,y,pred):
        # Logging to TensorBoard (if installed) by default
        self.log("train_loss", loss)
        self.log('head 1 loss',target_loss(pred[:, 0],y[:, target_class].type(torch.float32)))
        self.log('head 2 loss',attribute_loss(pred[:, 1],y[:, protected_class].type(torch.float32)))
        self.log('accuracy (head 1)',((pred[:,0]<=0)==(y[:,target_class]<=0)).type(torch.float32).mean())
        self.log('accuracy (head 2)',((pred[:,1]<=0.5)==(y[:,protected_class]<=0)).type(torch.float32).mean())


    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x, y = batch
        pred = self.model(x)
        loss = total_loss(y, pred)
        self.my_logging(loss, y, pred)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=0.0001)
        return optimizer

    def test_step(self, batch, batch_idx):
        # this is the test loop
        x, y = batch
        pred = self.model(x)
        loss = total_loss(y, pred)
        self.my_logging(loss, y, pred)
        
    def validation_step(self, batch, batch_idx):
        #Same statistics as test
        self.test_step(batch, batch_idx)

        


In [11]:
model_trainer=LitTwoHead(model)

In [12]:
train_loader = utils.data.DataLoader(celeba_train, batch_size = 32, num_workers=9,persistent_workers=True)
val_loader = utils.data.DataLoader(celeba_val, batch_size = 32, num_workers=9,persistent_workers=True)
test_loader = utils.data.DataLoader(celeba_test, batch_size = 32,num_workers=9,persistent_workers=True)

In [13]:
from lightning.pytorch.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

In [14]:
trainer = L.Trainer(limit_train_batches=500,limit_val_batches=False, max_epochs=20) #I've changed these numbers to make training much faster   
trainer.fit(model=model_trainer, train_dataloaders=train_loader ,val_dataloaders=val_loader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA RTX 3500 Ada Generation Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type        | Params
--------------------------------------
0 | model | MobileNetV3 | 1.5 M 
--------------------------------------
1.5 M     Trainable params
0         Non-trainable params
1.5 M     Total params
6.080     Total estimated model params size (MB)


Training: |                                                                                                   …

`Trainer.fit` stopped: `max_epochs=20` reached.


In [15]:
trainer = L.Trainer() 


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [16]:
# Test the model
trainer.test(model_trainer, dataloaders=test_loader)

LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: |                                                                                                    …

─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
    accuracy (head 1)       0.9501051902770996
    accuracy (head 2)       0.9630297422409058
       head 1 loss           0.292962908744812
       head 2 loss          0.03179924562573433
       train_loss           0.3088625967502594
─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'train_loss': 0.3088625967502594,
  'head 1 loss': 0.292962908744812,
  'head 2 loss': 0.03179924562573433,
  'accuracy (head 1)': 0.9501051902770996,
  'accuracy (head 2)': 0.9630297422409058}]

### Saving the model outputs and the true and predicted attributes for using in OxonFair

In [34]:
model.eval()
import numpy as np
outputs_val =np.zeros((len(celeba_val),2))
for i,data in enumerate(celeba_val):
     outputs_val[i]=model(data[0].unsqueeze(0)).detach()
np.save('prototyping_outputs_val.npy', outputs_val)

In [35]:
outputs_test =np.zeros((len(celeba_test),2))
for i,data in enumerate(celeba_test):
     outputs_test[i]=model(data[0].unsqueeze(0)).detach()
np.save('prototyping_outputs_test.npy', outputs_test)

In [36]:
celeba_val.attr[:, target_class].numpy()

array([0, 0, 0, ..., 0, 1, 0], dtype=int64)

In [37]:
celeba_val.attr[:,20].numpy()

array([0, 1, 0, ..., 0, 0, 1], dtype=int64)

In [38]:
celeba_test.attr[:, target_class].numpy()

array([0, 0, 0, ..., 0, 0, 1], dtype=int64)

In [39]:
celeba_test.attr[:, 20].numpy()

array([0, 0, 0, ..., 1, 0, 0], dtype=int64)

In [40]:
target_class

9

In [41]:
protected_class

20

In [42]:
np.save('prototyping_target_label_val.npy', celeba_val.attr[:, target_class].numpy())
np.save('prototyping_protected_label_val.npy', celeba_val.attr[:, protected_class].numpy())

In [43]:
np.save('prototyping_target_label_test.npy', celeba_test.attr[:, target_class].numpy())
np.save('prototyping_protected_label_test.npy', celeba_test.attr[:, protected_class].numpy())

In [44]:
outputs_test.shape[0] == celeba_test.attr[:, target_class].numpy().shape[0] == celeba_test.attr[:, target_class].numpy().shape[0] #checking same length

True

In [45]:
outputs_val.shape[0] == celeba_val.attr[:, target_class].numpy().shape[0] == celeba_val.attr[:, target_class].numpy().shape[0]

True

In [46]:
len(celeba_train)

162770