In [None]:
# Vision Transformer for Facial Expression Recognition

# Final Project for Deep Learning Course
# Students: Gabriel Moreira Marques - 108207
# Victor Afonso Teixeira Santos - 108212

import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import math
import torch.nn.functional as F
import os
from tqdm import tqdm

datasetDir = '/kaggle/input/meu-tinyimgnet/tiny-imagenet-200'

In [None]:
import shutil

outputDir = '/kaggle/working/tiny-imagenet-200'
val_images_dir = os.path.join(datasetDir, 'val', 'images')
val_annotations_path = os.path.join(datasetDir, 'val', 'val_annotations.txt')
val_output_dir = os.path.join(outputDir, 'val', 'organized')

transform = transforms.Compose([
    transforms.Resize(64),
    transforms.CenterCrop(64),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

if not os.path.exists(outputDir):
    print("Copiando o dataset para o diretório de trabalho...")
    shutil.copytree(datasetDir, outputDir)

#organizando as pastas
if not os.path.exists(val_output_dir):
    os.makedirs(val_output_dir, exist_ok=True)
    with open(val_annotations_path, 'r') as f:
        for line in f.readlines():
            parts = line.strip().split('\t')  
            image_name, class_id = parts[0], parts[1]
            
            class_dir = os.path.join(val_output_dir, class_id)
            os.makedirs(class_dir, exist_ok=True)

            src = os.path.join(val_images_dir, image_name)
            dst = os.path.join(class_dir, image_name)
            shutil.copy(src, dst)
else:
    print("O conjunto de validação já está organizado.")


trainSet = datasets.ImageFolder(root=os.path.join(outputDir, 'train'), transform=transform)
valSet = datasets.ImageFolder(root=val_output_dir, transform=transform)

batchSize = 32
train_loader = DataLoader(trainSet, batch_size=batchSize, shuffle=True, num_workers=2)
val_loader = DataLoader(valSet, batch_size=batchSize, shuffle=False, num_workers=2)

print(f"Quantidade de imagens de treino: {len(trainSet)}")
print(f"Quantidade de imagens de validação: {len(valSet)}")

In [None]:
# Hyperparameters
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
learning_rate = 1e-4
#1e-3 pode ser alto para ViTs, especialmente no início do treinamento.
#Normalmente, taxas menores, como 1e-4 ou 3e-4, são mais estáveis. O artigo sugere o uso de linear warmup e decaimento (cosine).
n_outputs = 200 #se for treinar com o tiny imagenet trocar pra 200(afinal tem 200 classes) #valor p expressoes 7
patch_size = 8
image_size = 64
input_channels = 3 #tinyimagenet e os do fine tunning usam imagenns rgb, entao 3 canais
n_heads = 4
dropout = 0.1
hidden_dim = 768
adam_weight_decay = 0.1
betas = (0.9, 0.999)
n_encoders = 8
# (n_encoders=2) é baixo. Modelos ViT geralmente têm mais camadas (como 6, 12 ou mais) para aprender representações mais profundas.
embed_dim = (patch_size ** 2) * input_channels
num_epochs = 14

In [None]:
class PatchEmbedding(nn.Module):
    def __init__(self, image_size, patch_size, in_channels, embed_dim)
        super().__init__()
        
        self.patch_size = patch_size
        self.num_patches = (image_size // patch_size) ** 2
        self.embed_dim = embed_dim

        self.projection = nn.Conv2d(
            in_channels, embed_dim, kernel_size=patch_size, stride=patch_size
        )

    def forward(self, x):
        batch_size, channels, height, width = x.shape

        x = self.projection(x)  # [batch_size, embed_dim, num_patches^(1/2), num_patches^(1/2)]

        x = x.flatten(2).transpose(1, 2)#-> [batch_size, num_patches, embed_dim]
        return x


class PositionalEmbedding(nn.Module):
    def __init__(self, num_patches, embed_dim):
        super().__init__()
        
        self.position_embeddings = nn.Parameter(torch.zeros(1, num_patches, embed_dim))

    def forward(self, x):
        return x + self.position_embeddings

In [None]:
class VisionTransformerInput(nn.Module):
    def __init__(self, image_size, patch_size, in_channels, embed_dim):
        super().__init__()
        self.patch_embed = PatchEmbedding(image_size, patch_size, in_channels, embed_dim)
        self.pos_embed = PositionalEmbedding(self.patch_embed.num_patches, embed_dim)

    def forward(self, x):
        x = self.patch_embed(x)  # Gera os embeddings dos patches
        x = self.pos_embed(x)    # Adiciona os embeddings posicionais
        return x

In [None]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, dropout=0.1):

        super().__init__()

        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        self.scale = math.sqrt(self.head_dim)

        self.qkv = nn.Linear(embed_dim, embed_dim * 3)
        self.attention_dropout = nn.Dropout(dropout)
        self.proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        batch_size, num_patches, embed_dim = x.size()

        
        qkv = self.qkv(x)  # [batch_size, num_patches, 3 * embed_dim]
        qkv = qkv.reshape(batch_size, num_patches, 3, self.num_heads, self.head_dim)
        q, k, v = qkv.permute(2, 0, 3, 1, 4) 

        attn_weights = (q @ k.transpose(-2, -1)) / self.scale  # [batch_size, num_heads, num_patches, num_patches]
        attn_weights = attn_weights.softmax(dim=-1)
        attn_weights = self.attention_dropout(attn_weights)

        attn_output = (attn_weights @ v).transpose(1, 2).reshape(batch_size, num_patches, embed_dim)
        return self.proj(attn_output)

In [None]:
class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, mlp_dim, dropout=0.1):

        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadSelfAttention(embed_dim, num_heads, dropout)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.mlp = nn.Sequential(
            nn.Linear(embed_dim, mlp_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(mlp_dim, embed_dim),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        # atenção + Residual
        x = x + self.attn(self.norm1(x))
        # feedforward + Residual
        x = x + self.mlp(self.norm2(x))
        return x

In [None]:
class VisionTransformer(nn.Module):
    def __init__(self, image_size, patch_size, in_channels, embed_dim, num_heads, mlp_dim, num_layers, num_classes, dropout=0.1):

        super().__init__()
        self.input_layer = VisionTransformerInput(image_size, patch_size, in_channels, embed_dim)
        self.encoder = nn.ModuleList([
            TransformerEncoderBlock(embed_dim, num_heads, mlp_dim, dropout) for _ in range(num_layers)
        ])
        self.class_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, x):
        batch_size = x.size(0)
        x = self.input_layer(x)

        class_token = self.class_token.expand(batch_size, -1, -1)
        x = torch.cat((class_token, x), dim=1)

        for layer in self.encoder:
            x = layer(x)

        return self.mlp_head(x[:, 0])

In [None]:
def train_one_epoch_with_progress(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss, correct, total_samples = 0, 0, 0
    loop = tqdm(dataloader, desc="Treinando", leave=False)

    for images, labels in loop:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * images.size(0)
        _, preds = outputs.max(1)
        correct += (preds == labels).sum().item()
        total_samples += images.size(0)

        loop.set_postfix(loss=loss.item(), acc=(correct / total_samples))

    avg_loss = total_loss / total_samples
    accuracy = correct / total_samples
    return avg_loss, accuracy


In [None]:
def validate_one_epoch_with_progress(model, dataloader, criterion, device):
    model.eval()
    total_loss, correct, total_samples = 0, 0, 0

    with torch.no_grad():
        loop = tqdm(dataloader, desc="Validando", leave=False)

        for images, labels in loop:
            images, labels = images.to(device), labels.to(device)

            outputs = model(images)
            loss = criterion(outputs, labels)

            total_loss += loss.item() * images.size(0)
            _, preds = outputs.max(1)
            correct += (preds == labels).sum().item()
            total_samples += images.size(0)

            loop.set_postfix(loss=loss.item(), acc=(correct / total_samples))

    avg_loss = total_loss / total_samples
    accuracy = correct / total_samples
    return avg_loss, accuracy


def train_and_save(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device, save_dir):
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}

    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print("-" * 40)

        train_loss, train_acc = train_one_epoch_with_progress(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = validate_one_epoch_with_progress(model, val_loader, criterion, device)

        scheduler.step()

        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}")

        # Salva checkpoint
        checkpoint_path = os.path.join(save_dir, f'checkpoint_epoch_{epoch + 1}.pt')
        torch.save({'epoch': epoch + 1,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'scheduler_state_dict': scheduler.state_dict(),
                    'loss': val_loss},
                   checkpoint_path)
        print(f"Checkpoint salvo: {checkpoint_path}\n")

    plt.figure(figsize=(10, 5))
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Metrics')
    plt.title('Training and Validation Loss/Accuracy')
    plt.legend()
    plt.savefig(os.path.join(save_dir, 'training_results.png'))
    plt.show()
    print("Treinamento concluído. Gráfico salvo.")


In [None]:
from torch.optim.lr_scheduler import CosineAnnealingLR
import torch.optim as optim

def execute_training_with_saving(train_loader, val_loader, save_dir):
    model = VisionTransformer(
        image_size=image_size,
        patch_size=patch_size,
        in_channels=input_channels,
        embed_dim=hidden_dim,
        num_heads=n_heads,
        mlp_dim=hidden_dim * 4,
        num_layers=n_encoders,
        num_classes=n_outputs,
        dropout=dropout
    ).to(device)

    # Função de perda e otimizador
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(
        model.parameters(), lr=learning_rate, betas=betas, weight_decay=adam_weight_decay
    )
    scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs)  # Decaimento cosseno

    train_and_save(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        criterion=criterion,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=num_epochs,
        device=device,
        save_dir=save_dir
    )

save_directory = "/kaggle/working/" 
execute_training_with_saving(train_loader, val_loader, save_directory)