# Image-to-Text

O objetivo deste notebook é construir e treinar um modelo de inteligência artificial capaz de gerar descrições textuais (legendas) para imagens de forma automática. Para isso, utilizaremos o dataset **COCO (Common Objects in Context)** disponível em https://cocodataset.org/#download.

### Instalação de dependências

In [None]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip install pycocotools

!pip install spacy
!python -m spacy download en_core_web_sm

print("Dependências instaladas com sucesso!")

Looking in indexes: https://download.pytorch.org/whl/cu121
INFO: pip is looking at multiple versions of torch to determine which version is compatible with other requirements. This could take a while.
Collecting torch
  Downloading https://download.pytorch.org/whl/cu121/torch-2.5.1%2Bcu121-cp311-cp311-linux_x86_64.whl (780.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m780.5/780.5 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Downloading https://download.pytorch.org/whl/cu121/nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m23.7/23.7 MB[0m [31m89.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Downloading https://download.pytorch.org/whl/cu121/nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

### Importações

In [None]:
# Importações de bibliotecas padrão e de machine learning.
import os
import spacy
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision.models as models
import torch.nn.functional as F

# Ferramenta específica para o dataset COCO.
from pycocotools.coco import COCO

### Configurações

In [None]:
# Configurações de Caminhos e Dados
DATA_ROOT = './COCO'
IMAGE_DIR = os.path.join(DATA_ROOT, 'images')
TRAIN_IMAGE_DIR = os.path.join(IMAGE_DIR, 'train2014')
VAL_IMAGE_DIR = os.path.join(IMAGE_DIR, 'val2014')
TEST_IMAGE_DIR = os.path.join(IMAGE_DIR, 'test2014')
TRAIN_ANNOTATION_FILE = os.path.join(DATA_ROOT, 'annotations/captions_train2014.json')
VAL_ANNOTATION_FILE = os.path.join(DATA_ROOT, 'annotations/captions_val2014.json')
CHECKPOINT_PATH = 'best_model_checkpoint_attention.pth'

# Flag para limitar o dataset para testes rápidos.
# Defina como None para usar o dataset completo.
MAX_IMAGES = 50000



### Preparação dos Dados (Classes e Funções)

Esses quatro tokens especiais são adicionados manualmente no início, com índices fixos:

- PAD: para preenchimento (padding)
- SOS: início da sequência (Start of Sequence)
- EOS: fim da sequência (End of Sequence)
- UNK: token desconhecido (para palavras fora do vocabulário)

In [None]:
# Carrega o tokenizador do Spacy para o idioma inglês.
spacy_eng = spacy.load("en_core_web_sm")

In [None]:
class Vocabulary:
    """Cria o vocabulário para mapear palavras para índices numéricos."""
    def __init__(self, freq_threshold):
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenizer(text):
        return [tok.text.lower() for tok in spacy_eng.tokenizer(text)]

    def build_vocabulary(self, sentence_list):
        frequencies = {}
        idx = 4 # Começa depois dos tokens especiais

        for sentence in sentence_list:
            for word in self.tokenizer(sentence):
                if word not in frequencies:
                    frequencies[word] = 1
                else:
                    frequencies[word] += 1

                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer(text)
        return [self.stoi[token] if token in self.stoi else self.stoi["<UNK>"] for token in tokenized_text]

class CocoDataset(Dataset):
    """Classe para carregar os dados do COCO, adaptada para treino, validação e teste."""
    def __init__(self, root_dir, transform, annotation_file=None, vocab=None):
        self.root_dir = root_dir
        self.transform = transform
        self.is_test = annotation_file is None

        if self.is_test:
            # Modo de teste: lê os nomes dos arquivos de imagem diretamente.
            self.image_files = [f for f in os.listdir(root_dir) if f.endswith('.jpg')]
            # Extrai os IDs
            self.ids = [int(f.split('_')[-1].split('.')[0]) for f in self.image_files]
            self.vocab = vocab # Usa o vocabulário já treinado
        else:
            # Modo de treino/validação: usa o arquivo de anotações.
            self.coco = COCO(annotation_file)
            self.ids = list(sorted(self.coco.imgs.keys()))
            if MAX_IMAGES is not None:
                self.ids = self.ids[:MAX_IMAGES]

            # Constrói o vocabulário apenas no dataset de treino
            if vocab is None:
                self.vocab = Vocabulary(freq_threshold=5)
                all_captions = [ann['caption'] for ann in self.coco.anns.values()]
                self.vocab.build_vocabulary(all_captions)
            else:
                self.vocab = vocab

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        if self.is_test:
            img_id = self.ids[index]
            # Encontra o nome do arquivo correspondente ao ID
            img_filename = f"COCO_test2014_{str(img_id).zfill(12)}.jpg"
            img_path = os.path.join(self.root_dir, img_filename)
            image = Image.open(img_path).convert("RGB")
            if self.transform is not None:
                image = self.transform(image)
            return image, img_id
        else:
            img_id = self.ids[index]
            # Pega na primeira legenda disponível para a imagem
            caption = self.coco.loadAnns(self.coco.getAnnIds(imgIds=img_id))[0]['caption']
            img_path = self.coco.loadImgs(img_id)[0]['file_name']

            image = Image.open(os.path.join(self.root_dir, img_path)).convert("RGB")
            if self.transform is not None:
                image = self.transform(image)

            numericalized_caption = [self.vocab.stoi["<SOS>"]]
            numericalized_caption += self.vocab.numericalize(caption)
            numericalized_caption.append(self.vocab.stoi["<EOS>"])

            return image, torch.tensor(numericalized_caption)

class MyCollate:
    """Junta uma lista de amostras para formar um lote (batch), com preenchimento (padding)."""
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        imgs = [item[0].unsqueeze(0) for item in batch]
        imgs = torch.cat(imgs, dim=0)

        targets = [item[1] for item in batch]
        lengths = [len(cap) for cap in targets]

        padded_targets = torch.full((len(targets), max(lengths)), self.pad_idx, dtype=torch.long)
        for i, cap in enumerate(targets):
            end = lengths[i]
            padded_targets[i, :end] = cap[:end]

        return imgs, padded_targets

def get_loader(root_dir, transform, batch_size, shuffle, num_workers, annotation_file=None, vocab=None):
    """Cria e retorna um DataLoader para o dataset COCO."""
    dataset = CocoDataset(
        root_dir=root_dir,
        annotation_file=annotation_file,
        transform=transform,
        vocab=vocab
    )

    # Obtém o índice de preenchimento do vocabulário do dataset.
    pad_idx = dataset.vocab.stoi["<PAD>"]

    loader = DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
        collate_fn=MyCollate(pad_idx=pad_idx)
    )

    return loader, dataset

# Define as transformações a serem aplicadas nas imagens.
# As médias e desvios padrão são do ImageNet
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])


### Arquitetura do Modelo

#### Hiperparâmetros

In [None]:
# Hiperparâmetros de Treino
BATCH_SIZE = 128          # Tamanho do lote
NUM_WORKERS = 8           # Número de processos para carregar dados
LEARNING_RATE = 3e-4      # Taxa de aprendizagem inicial para o otimizador
NUM_EPOCHS = 50           # Número máximo de épocas de treino
PATIENCE_EARLY_STOP = 10  # Paciência para o Early Stopping
PATIENCE_LR_SCHEDULER = 3 # Paciência para o redutor de LR

# Hiperparâmetros do Modelo
EMBED_SIZE = 256        # Dimensão dos vetores de embedding de palavras
ATTENTION_DIM = 256     # Dimensão da camada de atenção
ENCODER_DIM = 2048      # Dimensão de saída do encoder
DECODER_DIM = 512       # Dimensão da camada oculta da LSTM do decoder

#### Encoder

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, encoded_image_size=7, train_backbone=False):
        super().__init__()
        from torchvision.models import resnet50, ResNet50_Weights
        backbone = resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
        self.cnn = nn.Sequential(*list(backbone.children())[:-2])   # até camada conv5_x
        self.pool = nn.AdaptiveAvgPool2d((encoded_image_size, encoded_image_size))
        self.encoder_dim = 2048
        if not train_backbone:
            for p in self.cnn.parameters():
                p.requires_grad = False

    def forward(self, x):                             # x: (B, 3, 224, 224)
        x = self.pool(self.cnn(x))                    # (B, 2048, 7, 7)
        x = x.permute(0, 2, 3, 1).flatten(1, 2)       # (B, 49, 2048)
        return x

#### Decoder

In [None]:
class Attention(nn.Module):
    def __init__(self, encoder_dim, decoder_dim, attention_dim):
        super().__init__()
        self.attn_feat = nn.Linear(encoder_dim, attention_dim)
        self.attn_hidden = nn.Linear(decoder_dim, attention_dim)
        self.attn_score = nn.Linear(attention_dim, 1)

    def forward(self, feats, h_t):               # feats: (B, 49, enc_dim)
        e = torch.tanh(self.attn_feat(feats) + self.attn_hidden(h_t).unsqueeze(1))
        e = self.attn_score(e).squeeze(2)        # (B, 49)
        alpha = torch.softmax(e, dim=1)          # (B, 49)
        context = (feats * alpha.unsqueeze(2)).sum(dim=1)  # (B, enc_dim)
        return context, alpha

# %%
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, encoder_dim, decoder_dim,
                 vocab_size, attention_dim=256, dropout=0.5):
        super().__init__()
        self.attention = Attention(encoder_dim, decoder_dim, attention_dim)
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm      = nn.LSTMCell(embed_size + encoder_dim, decoder_dim)
        self.fc        = nn.Linear(decoder_dim, vocab_size)
        self.dropout   = nn.Dropout(dropout)
        self.decoder_dim = decoder_dim

    def init_state(self, B, device):
        return (torch.zeros(B, self.decoder_dim, device=device),
                torch.zeros(B, self.decoder_dim, device=device))

    def forward(self, feats, caps):              # feats: (B, 49, enc_dim)
        B, seq_len = caps.size()
        embeddings = self.embedding(caps)        # (B, seq_len, embed)
        h, c = self.init_state(B, caps.device)
        outputs = torch.zeros(B, seq_len-1, self.fc.out_features, device=caps.device)

        for t in range(seq_len-1):
            context, _ = self.attention(feats, h)
            lstm_in = torch.cat([embeddings[:, t, :], context], dim=1)
            h, c = self.lstm(lstm_in, (h, c))
            outputs[:, t, :] = self.fc(self.dropout(h))
        return outputs

    def sample(
        self,
        features,
        max_len: int = 20,
        sos_idx: int = 1,
        eos_idx: int = 2,
        beam_size: int = 5
    ):
        """
        Gera uma legenda usando Beam Search para encontrar a frase com a maior
        probabilidade total.
        """
        device = features.device
        batch_size = features.size(0)
        k = beam_size

        # O feixe  armazena as k sequências mais prováveis.
        # Cada item é uma tupla: (sequência_de_IDs, log_prob_total, estado_h, estado_c)
        h, c = self.init_state(batch_size, device)

        initial_input = torch.full((batch_size,), sos_idx, device=device, dtype=torch.long)

        sequences = [([sos_idx], 0.0, h, c)]

        # Itera até o comprimento máximo da legenda
        for _ in range(max_len):
            all_candidates = []

            # Expansão do feixe: para cada sequência candidata, encontra as k melhores próximas palavras.
            for seq, score, h_prev, c_prev in sequences:

                if seq[-1] == eos_idx:
                    all_candidates.append((seq, score, h_prev, c_prev))
                    continue

                # A entrada para a LSTM é a última palavra da sequência atual.
                inputs = torch.tensor([seq[-1]], device=device, dtype=torch.long)
                embed = self.embedding(inputs)

                context, _ = self.attention(features, h_prev)

                lstm_in = torch.cat([embed, context], dim=1)
                h_new, c_new = self.lstm(lstm_in, (h_prev, c_prev))

                logits = self.fc(h_new)

                log_probs = F.log_softmax(logits, dim=1)

                # Obtém as k palavras mais prováveis e as suas log-probabilidades
                top_log_probs, top_indices = log_probs.topk(k, dim=1)

                # Cria k novos candidatos a partir da sequência atual
                for i in range(k):
                    next_word_idx = top_indices[0, i].item()
                    log_p = top_log_probs[0, i].item()

                    new_seq = seq + [next_word_idx]
                    new_score = score + log_p
                    all_candidates.append((new_seq, new_score, h_new, c_new))

            ordered_candidates = sorted(all_candidates, key=lambda x: x[1], reverse=True)

            # O novo feixe são as k melhores sequências da lista de candidatos.
            sequences = ordered_candidates[:k]

            if sequences[0][0][-1] == eos_idx:
                break

        # A melhor sequência final é a primeira da lista (a que tem a maior log-probabilidade)
        best_sequence = sequences[0][0]

        return best_sequence[1:]


#### Completo

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, embed_size, encoder_dim, decoder_dim, vocab_size, attention_dim=256):
        super().__init__()
        self.encoder = EncoderCNN()
        self.decoder = DecoderRNN(embed_size, encoder_dim, decoder_dim,
                                  vocab_size, attention_dim)

    def forward(self, images, captions):
        feats = self.encoder(images)             # (B, 49, 2048)
        return self.decoder(feats, captions)

### Treinamento

In [None]:
# Definindo GPU como dispositivo a ser utilizado, se disponível
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {device}")

#### Carregamento dos Dados

In [None]:
train_loader, train_dataset = get_loader(
    root_dir=TRAIN_IMAGE_DIR,
    annotation_file=TRAIN_ANNOTATION_FILE,
    transform=transform,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS
)

print(f"Dataset de treino carregado com {len(train_dataset)} imagens.")
print(f"Tamanho do vocabulário construído: {len(train_dataset.vocab)} palavras.")

val_loader, _ = get_loader(
    root_dir=VAL_IMAGE_DIR,
    annotation_file=VAL_ANNOTATION_FILE,
    transform=transform,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    vocab=train_dataset.vocab # Usa o mesmo vocabulário do treino
)


#### Funções auxiliares

In [None]:
class EarlyStopping:
    """Para o treino se a perda de validação não melhorar após um determinado número de épocas."""
    def __init__(self, patience=7, verbose=False, delta=0, path='checkpoint.pth'):
        """
        Args:
            patience (int): Quantas épocas esperar após a última melhoria da perda.
            verbose (bool): Se True, imprime uma mensagem para cada melhoria da perda.
            delta (float): Mudança mínima para se qualificar como uma melhoria.
            path (str): Caminho para guardar o checkpoint do melhor modelo.
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.inf
        self.delta = delta
        self.path = path

    def __call__(self, val_loss, model):

        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            print(f'EarlyStopping counter: {self.counter} de {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        """Guarda o modelo quando a perda de validação diminui."""
        if self.verbose:
            print(f'Perda de validação diminuiu ({self.val_loss_min:.6f} --> {val_loss:.6f}). Guardando modelo ...')
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

In [None]:
def check_accuracy(outputs, targets):
    """Calcula a acurácia por palavra, ignorando o padding."""

    # Obtém a palavra prevista com a maior probabilidade
    _, predicted = outputs.max(2) # outputs.shape: (N, seq_len, vocab_size) -> predicted.shape: (N, seq_len)

    # Compara as previsões com os alvos
    correct = (predicted == targets)

    # Cria uma máscara para ignorar os tokens <PAD> (índice 0)
    pad_mask = (targets != 0)

    # Aplica a máscara e calcula a acurácia
    correct_masked = correct[pad_mask]
    num_correct = correct_masked.sum().item()
    total_words = pad_mask.sum().item()

    accuracy = (num_correct / total_words) * 100 if total_words > 0 else 0
    return accuracy


#### Instanciação do Modelo

In [None]:
model = Seq2Seq(
    embed_size=EMBED_SIZE,
    decoder_dim=DECODER_DIM,
    encoder_dim=ENCODER_DIM,
    vocab_size=len(train_dataset.vocab),
    attention_dim=ATTENTION_DIM,
).to(device)

criterion = nn.CrossEntropyLoss(
    ignore_index=train_dataset.vocab.stoi["<PAD>"],
    label_smoothing=0.1
)
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=PATIENCE_LR_SCHEDULER)

early_stopping = EarlyStopping(
    patience=PATIENCE_EARLY_STOP,
    verbose=True,
    path=CHECKPOINT_PATH
)

#### Loop de Treino

In [None]:
# Listas para guardar o histórico de treino
train_loss_history = []
val_loss_history = []
train_acc_history = []
val_acc_history = []

In [None]:
# Ciclo Principal de Treino
for epoch in range(NUM_EPOCHS):
    print(f"\n--- Época {epoch+1}/{NUM_EPOCHS} ---")

    # Fase de Treino
    model.train()
    train_losses, train_accuracies = [], []
    for idx, (imgs, captions) in enumerate(train_loader):
        imgs, captions = imgs.to(device), captions.to(device)

        targets = captions[:, 1:]

        outputs = model(imgs, captions)

        loss = criterion(outputs.reshape(-1, outputs.shape[2]),
                 targets.reshape(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_losses.append(loss.item())
        accuracy = check_accuracy(outputs, targets)

        train_accuracies.append(accuracy)

        if (idx + 1) % 100 == 0:
            print(f"  [Treino] Lote {idx+1}/{len(train_loader)}, Perda: {loss.item():.4f}, Acurácia: {accuracy:.2f}%")

    avg_train_loss = sum(train_losses) / len(train_losses)
    avg_train_acc = sum(train_accuracies) / len(train_accuracies)
    train_loss_history.append(avg_train_loss)
    train_acc_history.append(avg_train_acc)

    # --- Fase de Validação ---
    model.eval()
    val_losses, val_accuracies = [], []
    with torch.no_grad():
        for imgs, captions in val_loader:
            imgs, captions = imgs.to(device), captions.to(device)
            targets = captions[:, 1:]

            outputs  = model(imgs, captions)

            loss = criterion(outputs.reshape(-1, outputs.shape[2]),
                         targets.reshape(-1))
            accuracy = check_accuracy(outputs, targets)
            val_losses.append(loss.item())
            val_accuracies.append(accuracy)

    avg_val_loss = sum(val_losses) / len(val_losses)
    avg_val_acc = sum(val_accuracies) / len(val_accuracies)
    val_loss_history.append(avg_val_loss)
    val_acc_history.append(avg_val_acc)

    print(f"\nResumo da Época {epoch+1}:")
    print(f"  Perda de Treino: {avg_train_loss:.4f} | Acurácia de Treino: {avg_train_acc:.2f}%")
    print(f"  Perda de Validação: {avg_val_loss:.4f} | Acurácia de Validação: {avg_val_acc:.2f}%\n")

    scheduler.step(avg_val_loss)
    early_stopping(avg_val_loss, model)
    if early_stopping.early_stop:
        print("Paragem antecipada ativada!")
        break

print("\nTreino Concluído")

#### Histórico de Treino

In [None]:
def plot_metrics(train_loss, val_loss, train_acc, val_acc):
    """Plota os gráficos de perda e acurácia do treino e validação."""

    # Cria a figura e os eixos para os dois gráficos
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

    # Gráfico da Perda (Loss)
    ax1.plot(train_loss, label='Perda de Treino')
    ax1.plot(val_loss, label='Perda de Validação')
    ax1.set_title('Gráfico de Perda (Loss)', fontsize=16)
    ax1.set_xlabel('Épocas', fontsize=12)
    ax1.set_ylabel('Perda', fontsize=12)
    ax1.legend()
    ax1.grid(True)

    # Gráfico da Acurácia por Palavra
    ax2.plot(train_acc, label='Acurácia de Treino')
    ax2.plot(val_acc, label='Acurácia de Validação')
    ax2.set_title('Gráfico de Acurácia por Palavra', fontsize=16)
    ax2.set_xlabel('Épocas', fontsize=12)
    ax2.set_ylabel('Acurácia (%)', fontsize=12)
    ax2.legend()
    ax2.grid(True)

    # Ajusta o layout e exibe os gráficos
    plt.tight_layout()
    plt.savefig('history.png')


In [None]:
# Chama a função para plotar os resultados
plot_metrics(train_loss_history, val_loss_history, train_acc_history, val_acc_history)

## Testes e Exemplos

In [None]:
def generate_caption_for_image(model, device, image_path, vocab, transform):
    """Carrega uma imagem, gera e exibe uma legenda."""

    # Carrega o melhor modelo
    try:
        model.load_state_dict(torch.load('best_model_checkpoint_attention.pth', map_location=device))
        model.to(device)
        model.eval()
    except FileNotFoundError:
        print("Erro: Ficheiro 'best_model_checkpoint_attention.pth' não encontrado. Treine o modelo primeiro.")
        return

    # Prepara a imagem
    image = Image.open(image_path).convert("RGB")
    transformed_image = transform(image).unsqueeze(0).to(device)

    # Gera a legenda
    with torch.no_grad():
        features = model.encoder(transformed_image)
        sampled_ids = model.decoder.sample(features)

    # Converte os IDs de volta para texto
    caption_text = []
    for word_id in sampled_ids:
        word = vocab.itos[word_id]
        if word == "<EOS>":
            break
        if word not in ["<SOS>", "<PAD>", "<UNK>"]:
            caption_text.append(word)

    # Exibe a imagem e a legenda
    plt.imshow(image)
    plt.title("Legenda Gerada: " + " ".join(caption_text))
    plt.axis("off")
    plt.show()

# Cria uma pasta para guardar as imagens geradas
output_dir = "generated_captions"
os.makedirs(output_dir, exist_ok=True)


def denormalize(tensor):
    """Reverte a normalização de um tensor de imagem para exibição."""
    tensor = tensor.clone()
    mean = torch.tensor([0.485, 0.456, 0.406], device=tensor.device)
    std = torch.tensor([0.229, 0.224, 0.225], device=tensor.device)
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)
    return tensor

def show_and_generate_captions(loader, model, vocab, device, num_examples, title, dataset_name):
    """Itera sobre um loader, gera legendas e guarda as imagens com as legendas."""
    print(f"\n--- {title} ---")

    try:
        # Carrega o modelo salvo
        model.load_state_dict(torch.load('best_model_checkpoint_attention.pth', weights_only=True, map_location=device))
        model.to(device)
        model.eval()
    except FileNotFoundError:
        print("Erro: Ficheiro 'best_model_checkpoint_attention.pth' não encontrado. Treine o modelo primeiro.")
        return

    count = 0
    with torch.no_grad():
        for item in loader:
            if count >= num_examples:
                break

            # O loader de teste retorna (img, id), o de validação retorna (img, caption)
            img_tensor, original_id = item[0], item[1]
            img_tensor = img_tensor.to(device)

            features = model.encoder(img_tensor)
            sampled_ids = model.decoder.sample(features)

            caption_text = []
            for word_id in sampled_ids:
                word = vocab.itos[word_id]
                if word == "<EOS>": break
                if word not in ["<SOS>", "<PAD>", "<UNK>"]: caption_text.append(word)

            final_caption = " ".join(caption_text)
            print(f"  Imagem {count+1}: {final_caption}")

            img_display = denormalize(img_tensor.cpu().squeeze(0))
            img_display = img_display.permute(1, 2, 0).numpy()
            img_display = np.clip(img_display, 0, 1)

            # Salva a imagem com a legenda
            plt.figure(figsize=(7, 7))
            plt.imshow(img_display)
            plt.title(f"Legenda: {final_caption}", fontsize=12, wrap=True)
            plt.axis("off")
            filename = f"{dataset_name}_image_{count+1}.png"
            plt.savefig(os.path.join(output_dir, filename))
            plt.close()

            count += 1
    print(f"\nImagens geradas foram guardadas na pasta '{output_dir}'")

# --- Preparação dos Loaders para Geração ---
val_loader_gen, _ = get_loader(
    root_dir=VAL_IMAGE_DIR,
    annotation_file=VAL_ANNOTATION_FILE,
    transform=transform,
    batch_size=1,
    shuffle=False,
    num_workers=NUM_WORKERS,
    vocab=train_dataset.vocab
)

TEST_IMAGE_DIR = os.path.join(IMAGE_DIR, 'test2014')
if os.path.exists(TEST_IMAGE_DIR):
    # CORREÇÃO 3: Removido o collate_fn para o loader de teste.
    test_dataset_gen = CocoDataset(root_dir=TEST_IMAGE_DIR, transform=transform, vocab=train_dataset.vocab)
    test_loader_gen = DataLoader(
        dataset=test_dataset_gen,
        batch_size=1,
        shuffle=False,
        num_workers=NUM_WORKERS
    )
else:
    test_loader_gen = None
    print(f"\nAviso: Diretório de teste não encontrado. A geração para o conjunto de teste será ignorada.")


# --- Execução da Geração ---
show_and_generate_captions(
    loader=val_loader_gen, model=model, vocab=train_dataset.vocab, device=device,
    num_examples=50, title="Gerando Legendas para as 50 Primeiras Imagens de VALIDAÇÃO",
    dataset_name="validation"
)

if test_loader_gen:
    show_and_generate_captions(
        loader=test_loader_gen, model=model, vocab=train_dataset.vocab, device=device,
        num_examples=100, title="Gerando Legendas para as 100 Primeiras Imagens de TESTE (Não Vistas pelo Modelo)",
        dataset_name="test"
    )