# **Introdução**

O objetivo desse trabalho é implementar "Automated Bias Discover Task" descrito em: _Discover the Unknown Biased Attribute of an Image Classifier
Zhiheng Li, Chenliang Xu_

O workflow descrito no paper, pode ser bem resumido pela seguinte imagem:

# ![title](images/workflow.png)

Percaba que primeiramente precisamos obter um hiperplano do atributo enviesado e, depois disso, a tarefa de encontrar o viés do classificador continua.
Nesse sentido, o escopo desse trabalho é, primeiramente obter esse hiperplano otimizado.

Para fazer isso, precisamos de duas coisas:
1. Um classificador enviesado ✅
2. Um modelo gerativo capaz de reproduzir a distribuição de probabilidades do dataset desse classificador ✅

Como já possuímos esses dois pre-requisitos, podemos implementar esse workflow. É importante perceber que faremos uma abordagem 1 vs all para cada classe no problema. Podendo encontrar diversos vieses presentes nesse dataset, um para cada classe.

# **Implementação**

Para implementar esse precisamos saber a dimensão $d$ do espaço latente do modelo gerativo treinado. Essa será a dimensionalidade do "hiperplano do atributo enviesado" que será encontrado.

In [3]:
import torch
from torch import nn
from torch import linalg as LA
from torchvision import models
from StyleGAN.modules.stylegan import Generator
from torch import optim
import numpy as np
import os
import matplotlib.pyplot as plt

In [4]:
class Hyperplane():
    def __init__(self, weights, bias):
        self.w = weights
        self.b = bias

In [5]:
class BiasDiscoverer(nn.Module):
    def __init__(self, z_dim, generative_model, classifier, num_latent_codes=6, starting_alpha=-3, terminating_alpha=3):
        super(BiasDiscoverer, self).__init__()
        w = nn.Parameter(torch.randn(1, z_dim))
        b = nn.Parameter(torch.randn(1, 1))
        self.hyperplane = Hyperplane(w, b)
        self.generative_model = generative_model
        self.biased_classifier = classifier
        self.alphas = self.get_alphas(num_latent_codes, starting_alpha, terminating_alpha) 
    
    def get_alphas(self, num_alphas, starting_alpha, terminating_alpha):
        step = (terminating_alpha - starting_alpha)/num_alphas
        alphas = torch.arange(starting_alpha, terminating_alpha, step).unsqueeze(1).unsqueeze(2)
        return alphas

    def generate_latent_codes(self, z_points):
        w = self.hyperplane.w
        b = self.hyperplane.b
        z_proj = z_points - ( ( (w.T @ z_points) + b ) / ( LA.vector_norm(w)**2 ) ) @ w
        latent_codes = z_proj + ( self.alphas * ( w / LA.vector_norm(w) ) )
        return latent_codes

    def forward(self, z_points):
        latent_codes = self.generate_latent_codes(z_points)
        traversal_images = self.generative_model(latent_codes)
        probs = self.biased_classifier(traversal_images)
        return probs

In [None]:
def TotalVARLoss(probs):
    return torch.log(1e-10 + torch.abs(probs[:, 1:] - probs[:, :-1]).mean())

In [None]:
def optimize_hyperplane(bias_discoverer, optimizer, EPOCHS, BATCH_SIZE, Z_DIM, DEVICE):
    losses = []
    print("Starting Training Loop...")
    for epoch in range(EPOCHS):
        z_data_points = torch.rand(BATCH_SIZE, Z_DIM)
        probs_predictions = bias_discoverer.forward(z_data_points)
        loss = TotalVARLoss(probs_predictions)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        losses.append(loss)

    return losses
        

In [None]:
def load_gen_model(PATH, DEVICE, LOG_RESOLUTION=8, W_DIM=256):
    gen = Generator(LOG_RESOLUTION, W_DIM)
    gen.load_state_dict(torch.load(PATH))
    gen.to(DEVICE)
    gen.eval()
    return gen


def load_classifier(PATH, DEVICE):
    classifier = models.vgg16_bn()
    classifier.load_state_dict(torch.load(PATH))
    classifier.to(DEVICE)
    classifier.eval()
    return classifier

In [None]:
def make_path(path):
    dir = os.path.dirname(path)
    if dir: 
        if not os.path.exists(dir):
            os.makedirs(dir)

def save_statistics(losses, current_dir):
    losses_np = np.array([loss for loss in losses])
    losses_path = os.path.join(current_dir, "statistics/losses.csv")
    make_path(losses_path)
    np.savetxt(losses_path, losses_np, delimiter=",")

def save_graphics(losses, current_dir):
    plt.figure(figsize=(10,5))
    plt.title("Total Variation Loss During Training")
    plt.plot(losses)
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plot_path = os.path.join(current_dir, "statistics/training_loss_plot.png")
    make_path(plot_path)
    plt.savefig(plot_path)
    plt.close()  

In [None]:
def preview_saving(model):
    # Print model's state_dict
    print("Model's state_dict:")
    for param_tensor in model.state_dict():
        print(param_tensor, "\t", model.state_dict()[param_tensor].size())

In [None]:
def main(args):
    Z_DIM = args.Z_DIM
    LEARNING_RATE = args.LEARNING_RATE
    EPOCHS = args.EPOCHS
    BATCH_SIZE = args.BATCH_SIZE
    DEVICE = args.DEVICE
    GENERATOR_PATH = args.GENERATOR_PATH
    CLASSIFIER_PATH = args.CLASSIFIER_PATH

    gen_model = load_gen_model(GENERATOR_PATH, DEVICE)

    biased_classifier = load_classifier(CLASSIFIER_PATH, DEVICE)

    bias_discoverer = BiasDiscoverer(Z_DIM, gen_model, biased_classifier)

    preview_saving(bias_discoverer)

    optimizer = optim.Adam(bias_discoverer.parameters(), lr=LEARNING_RATE)
    
    # losses = optimize_hyperplane(bias_discoverer, optimizer, EPOCHS, BATCH_SIZE, Z_DIM, DEVICE)

    # save_statistics(losses)
    