<a href="https://colab.research.google.com/github/andreac941/tutorials/blob/main/ProyectoFinal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 1. Preprocess the Data
First of all, we'll need to preprocess both the images and textcaptions.

We use use a pre-trained model like ResNet for feature extraction to preprocess the images to match the input format expected by this model.

For the captions preprocessing, we need to tokenize them, create a vocabulary and convert the captions to sequences of integers.

In [None]:
# Importing the libraries:
import torch # Import the PyTorch library
import torch.nn as nn # Import the neural network module from PyTorch
import torchvision.transforms as transforms # for data transformations.
from PIL import Image # For image visualizarion
from torch.nn.utils.rnn import pad_sequence # Used to pad a list of variable-length sequences to the same length (for RNN)
from torch.utils.data import DataLoader, Dataset # For data processing
from torchvision.models import resnet50 # For transfer learning
import spacy  # For tokenization
import os # To interact with operating system.
import pandas as pd # To manage data manipulation

class Vocabulary:
    def __init__(self, freq_threshold):
        # Special tokens with fixed indices
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        # Reverse mapping: token to index
        self.stoi = {v: k for k, v in self.itos.items()}
        # Frequency threshold for including words in the vocabulary
        self.freq_threshold = freq_threshold

    def __len__(self):
        # Return the length of the vocabulary
        return len(self.itos)

    def tokenizer_eng(self, text):
        # Tokenize English text using Spacy
        return [tok.text.lower() for tok in spacy_eng.tokenizer(text)]

    def build_vocabulary(self, sentence_list):
        # Dictionary to store word frequencies
        frequencies = {}
        # Starting index for words in the vocabulary
        idx = 4

        # Iterate through each sentence in the input list
        for sentence in sentence_list:
            # Tokenize the sentence and iterate through each word
            for word in self.tokenizer_eng(sentence):
                # Update word frequencies
                if word not in frequencies:
                    frequencies[word] = 1
                else:
                    frequencies[word] += 1

                # Check if the word frequency reaches the threshold
                if frequencies[word] == self.freq_threshold:
                    # Add the word to the vocabulary
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        # Tokenize the input text
        tokenized_text = self.tokenizer_eng(text)

        # Convert tokens to their corresponding indices or use <UNK> if not in vocabulary
        return [
            self.stoi[token] if token in self.stoi else self.stoi["<UNK>"]
            for token in tokenized_text
        ]


class FlickrDataset(Dataset):
    def __init__(self, root_dir, captions_file, transform=None, freq_threshold=5):
        # Store root directory, captions file path, and transformation function
        self.root_dir = root_dir
        self.df = pd.read_csv(captions_file)

        # Handling missing or non-string values in captions by filling NaN values with an empty string
        self.df['caption'] = self.df['caption'].fillna('').astype(str)

        # Store the transformation function for images
        self.transform = transform

        # Get 'image' and 'caption' columns from the DataFrame
        self.imgs = self.df["image"]
        self.captions = self.df["caption"]

        # Initialize a vocabulary object with a specified frequency threshold
        self.vocab = Vocabulary(freq_threshold)

        # Build the vocabulary using the captions in the dataset
        self.vocab.build_vocabulary(self.captions.tolist())

    # rest of the class remains the same
    def __len__(self):
        # Return the total number of samples in the dataset
        return len(self.df)

    def __getitem__(self, index):
        # Get caption and image ID for the given index
        caption = self.captions[index]
        img_id = self.imgs[index]

        # Load image from file using PIL and convert to RGB
        img = Image.open(os.path.join(self.root_dir, img_id)).convert("RGB")

        # Apply the specified transformation to the image if available
        if self.transform is not None:
            img = self.transform(img)

        # Numericalize the caption by converting it to a list of indices
        numericalized_caption = [self.vocab.stoi["<SOS>"]]
        numericalized_caption += self.vocab.numericalize(caption)
        numericalized_caption.append(self.vocab.stoi["<EOS>"])

        # Return the image and its numericalized caption as a torch tensor
        return img, torch.tensor(numericalized_caption)


class MyCollate:
    def __init__(self, pad_idx):
        # Constructor to initialize the collate function with the specified padding index
        self.pad_idx = pad_idx

    def __call__(self, batch):
        # __call__ method is called when an instance of the class is called as a function

        # Extract images and targets from the batch
        imgs = [item[0].unsqueeze(0) for item in batch]
        imgs = torch.cat(imgs, dim=0) # Concatenate images along the batch dimension
        targets = [item[1] for item in batch]

        # Pad the sequences in targets to create a batch
        targets = pad_sequence(targets, batch_first=False, padding_value=self.pad_idx)

        # Return the collated batch containing images and padded targets
        return imgs, targets

# Transformations for the image
transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
    ]
)

# DataLoader function
def get_loader(
    root_folder,
    annotation_file,
    transform,
    batch_size=32,
    num_workers=8,
    shuffle=True,
    pin_memory=True,
):
    # Create a FlickrDataset instance with the specified parameters
    dataset = FlickrDataset(root_folder, annotation_file, transform=transform)

    # Get the padding index from the vocabulary of the dataset
    pad_idx = dataset.vocab.stoi["<PAD>"]

    # Create a DataLoader with the dataset and other specified parameters
    loader = DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        num_workers=num_workers,
        shuffle=shuffle,
        pin_memory=pin_memory,
        collate_fn=MyCollate(pad_idx=pad_idx), # Use the custom collate function
    )

    # Return the DataLoader and the vocabulary of the dataset
    return loader, dataset.vocab

In [None]:
# Use Spacy for tokenization - Load the English language model from SpaCy
spacy_eng = spacy.load("en_core_web_sm")

# Set the root folder and annotation file paths
root_folder='flickr30k_images/flickr30k_images'
annotation_file='flickr30k_images/results.csv'
#annotation_file = annotation_file['caption'].astype(str)

# Initialize the data loader and get the vocabulary
data_loader, vocab = get_loader(root_folder, annotation_file, transform)

## 2. Build the Model
We will create a CNN-RNN model. The CNN part will be a pre-trained ResNet model (without the classification head) for feature extraction, and the RNN part will be an LSTM network for generating captions.

In this code:

EncoderCNN uses a pre-trained ResNet50 model for image feature extraction.

DecoderRNN is an LSTM network for generating captions.

CNNtoRNN combines both the encoder and decoder.work for generating captions.
CNNtoRNN combines both the encoder and decoder..

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size, train_CNN=False):
        """
        EncoderCNN initializes a CNN-based image encoder.

        Args:
        - embed_size (int): Size of the output embedding.
        - train_CNN (bool): Flag indicating whether to train the CNN layers.
        """
        super(EncoderCNN, self).__init__()
        self.train_CNN = train_CNN

        # Load pre-trained ResNet-50 model from torchvision
        self.inception = resnet50(pretrained=True)

        # Replace the final fully connected layer to match the desired embed_size
        self.inception.fc = nn.Linear(self.inception.fc.in_features, embed_size)

        # Additional layers for non-linearity and dropout
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, images):
        """
        Forward pass of the EncoderCNN.

        Args:
        - images (torch.Tensor): Input images.

        Returns:
        - torch.Tensor: Output embedding.
        """
        # Pass images through the modified ResNet-50 model
        features = self.inception(images)

        # Don't backpropagate through the entire network/layers if not training CNN
        if not self.train_CNN:
            for param in self.inception.parameters():
                param.requires_grad = False

        # Apply dropout and ReLU activation
        return self.dropout(self.relu(features))

class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        """
        DecoderRNN initializes an RNN-based caption decoder.

        Args:
        - embed_size (int): Size of the input embedding.
        - hidden_size (int): Size of the hidden state in the LSTM.
        - vocab_size (int): Size of the vocabulary.
        - num_layers (int): Number of layers in the LSTM.
        """
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, features, captions):
        """
        Forward pass of the DecoderRNN.

        Args:
        - features (torch.Tensor): Image features.
        - captions (torch.Tensor): Input captions.

        Returns:
        - torch.Tensor: Output scores.
        """
        # Embed input captions
        embeddings = self.dropout(self.embed(captions))

        # Concatenate image features with embedded captions
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)

        # Pass through LSTM and linear layer
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs

class CNNtoRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        """
        CNNtoRNN initializes a model combining EncoderCNN and DecoderRNN.

        Args:
        - embed_size (int): Size of the input/output embedding.
        - hidden_size (int): Size of the hidden state in the LSTM.
        - vocab_size (int): Size of the vocabulary.
        - num_layers (int): Number of layers in the LSTM.
        """
        super(CNNtoRNN, self).__init__()
        # Initialize EncoderCNN and DecoderRNN
        self.encoderCNN = EncoderCNN(embed_size)
        self.decoderRNN = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers)

    def forward(self, images, captions):
        """
        Forward pass of the CNNtoRNN model.

        Args:
        - images (torch.Tensor): Input images.
        - captions (torch.Tensor): Input captions.

        Returns:
        - torch.Tensor: Output scores.
        """
        # Pass images through EncoderCNN
        features = self.encoderCNN(images)

        # Pass features and captions through DecoderRNN
        outputs = self.decoderRNN(features, captions)
        return outputs

    def caption_image(self, image, vocabulary, max_length=50):
        """
        Generate captions for images using the trained model.

        Args:
        - image (torch.Tensor): Input image.
        - vocabulary (Vocabulary): Vocabulary object.
        - max_length (int): Maximum length of generated captions.

        Returns:
        - List[str]: Generated caption as a list of words.
        """
        result_caption = []

        # Disable gradient computation during inference
        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None

            # Generate captions word by word
            for _ in range(max_length):
                hiddens, states = self.decoderRNN.lstm(x, states)
                output = self.decoderRNN.linear(hiddens.squeeze(0))
                predicted = output.argmax(1)
                result_caption.append(predicted.item())
                x = self.decoderRNN.embed(predicted).unsqueeze(0)

                # Break if "<EOS>" token is predicted
                if vocabulary.itos[predicted.item()] == "<EOS>":
                    break

        # Convert indices to words using vocabulary
        return [vocabulary.itos[idx] for idx in result_caption]

## 3. Training
Now, let's set up the training loop. In this training function, we calculate the loss for each batch and update the model's weights:

In [None]:
def train(model, data_loader, optimizer, criterion, vocab_size, device):
    """
    Train the given model using the provided data loader, optimizer, and criterion.

    Args:
    - model (nn.Module): The neural network model to be trained.
    - data_loader (DataLoader): DataLoader providing training data.
    - optimizer (torch.optim.Optimizer): The optimizer used for training.
    - criterion (nn.Module): The loss function used for training.
    - vocab_size (int): Size of the vocabulary.
    - device (torch.device): Device (CPU or GPU) on which to perform training.

    Returns:
    - float: Average training loss over the entire dataset.
    """
    # Set the model to training mode
    model.train()
    # Initialize the total loss
    total_loss = 0

    # Iterate through batches in the data loader
    for idx, (images, captions) in enumerate(data_loader):
        # Move images and captions to the specified device
        images, captions = images.to(device), captions.to(device)

        # Zero the gradients in the optimizer
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images, captions[:-1])
        # Reshape outputs and captions for calculating the loss
        loss = criterion(outputs.reshape(-1, vocab_size), captions.reshape(-1))

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Accumulate the total loss
        total_loss += loss.item()

    # Calculate and return the average training loss
    return total_loss / len(data_loader)

## Step 4: Evaluation with BLEU
For evaluating an image captioning model, you can use metrics like BLEU (Bilingual Evaluation Understudy Score). However, please note that BLEU is not perfect and might not always align with human judgment. It's typically used to get a quantitative estimate of the model's performance.

In [None]:
from nltk.translate.bleu_score import corpus_bleu

def evaluate(model, data_loader, device):
    """
    Evaluate the given model using the provided data loader and vocabulary.

    Args:
    - model (nn.Module): The neural network model to be evaluated.
    - data_loader (DataLoader): DataLoader providing evaluation data.
    - device (torch.device): Device (CPU or GPU) on which to perform evaluation.
    - vocab (Vocabulary): Vocabulary object for converting indices to words.

    Returns:
    - float: BLEU-4 score calculated on the evaluation dataset.
    """
    # Set the model to evaluation mode
    model.eval()
    # Lists to store references and hypotheses for BLEU calculation
    references = []
    hypotheses = []

    # Disable gradient computation during evaluation
    with torch.no_grad():
        # Iterate through batches in the data loader
        for images, captions in data_loader:
            # Move images and captions to the specified device
            images = images.to(device)
            captions = captions.to(device)

            # Generate captions using the trained model
            outputs = model.caption_image(images, vocab)
            outputs = [[vocab.itos[idx] for idx in seq] for seq in outputs]

            # Convert target captions to words
            targets = [[vocab.itos[idx] for idx in seq] for seq in captions.cpu().numpy()]

            # Collect references and hypotheses for BLEU calculation
            references.extend(targets)
            hypotheses.extend(outputs)

    # Calculate BLEU-4 score using NLTK's corpus_bleu function
    bleu4 = corpus_bleu(references, hypotheses, weights=(0.25, 0.25, 0.25, 0.25))

    # Return the BLEU-4 score
    return bleu4

## 5. Train the Data

Finally, we initialized the model, the optimizer, and the loss function and start the training process.

In [None]:
# Hyperparameters definition
embed_size = 256  # Size of the word embeddings
hidden_size = 512  # Size of the hidden state in the LSTM
vocab_size = len(vocab)  # Size of the vocabulary
num_layers = 1  # Number of layers in the LSTM
learning_rate = 3e-4  # Learning rate for the optimizer

# Initialize the data loader and get the vocabulary
data_loader, vocab = get_loader(root_folder, annotation_file, transform)

# Initialize model, optimizer, and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Choose device (GPU if available, else CPU)
model = CNNtoRNN(embed_size, hidden_size, vocab_size, num_layers).to(device)  # Create the CNN-to-RNN model
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  # Adam optimizer
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<PAD>"])  # Cross-entropy loss with padding ignored

# Define number of epochs
num_epochs = 10

# Training and evaluation loop
for epoch in range(num_epochs):
    # Training
    model.train()  # Set the model to training mode
    total_loss = 0

    # Iterate through batches in the data loader
    for idx, (images, captions) in enumerate(data_loader):
        images, captions = images.to(device), captions.to(device)  # Move data to the specified device

        # Forward pass
        outputs = model(images, captions[:-1])  # Predict captions (excluding the last token)
        loss = criterion(outputs.reshape(-1, vocab_size), captions.reshape(-1))  # Calculate the loss

        # Backward pass and optimization
        optimizer.zero_grad()  # Zero the gradients
        loss.backward()  # Perform backward pass
        optimizer.step()  # Update model parameters

        total_loss += loss.item()  # Accumulate the total loss

    avg_loss = total_loss / len(data_loader)  # Calculate average loss
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

    # Evaluation
    bleu4 = evaluate(model, data_loader, device, vocab)  # Evaluate BLEU-4 score
    print(f"Epoch [{epoch+1}/{num_epochs}], BLEU-4: {bleu4:.4f}")

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to C:\Users\AHERNANDEZ/.cache\torch\hub\checkpoints\resnet50-0676ba61.pth
100%|█████████████████████████████████████████████████████████████████████████████| 97.8M/97.8M [00:08<00:00, 12.8MB/s]


## 6. Generate a Prediction
You need a function to transform a new input image into the same format as the training images, using the caption_image method from the CNNtoRNN class. Here's a function that takes an image path and generates a caption:

In this code, generate_caption takes the path of the image you want to caption, uses the trained model directly, and processes the image using the same transform as during training. The model variable is assumed to be the same one you trained earlier in the script.

In [None]:
def load_image(image_path, transform=None):
    """
    Load an image from the specified file path and apply optional transformations.

    Args:
    - image_path (str): File path of the image.
    - transform (callable, optional): Transformation to be applied to the image.

    Returns:
    - torch.Tensor: Processed image tensor.
    """
    # Open the image using PIL and convert it to RGB format
    image = Image.open(image_path).convert("RGB")

    # Apply the specified transformation if provided
    if transform is not None:
        image = transform(image).unsqueeze(0)  # Add an extra dimension for batch (unsqueeze)

    # Return the processed image tensor
    return image


## Make sure to replace 'path_to_your_image.jpg' with the path to your actual image.

In [None]:
# Function to generate captions for an input image
def generate_caption(image_path, model, vocabulary, transform, device):
    """
    Generate a caption for the input image using the trained model and vocabulary.

    Args:
    - image_path (str): File path of the input image.
    - model (nn.Module): Trained model for generating captions.
    - vocabulary (Vocabulary): Vocabulary object for converting indices to words.
    - transform (callable): Transformation to be applied to the input image.
    - device (torch.device): Device (CPU or GPU) on which to perform inference.

    Returns:
    - str: Generated caption for the input image.
    """
    # Set the model to evaluation mode
    model.eval()

    # Load and transform the input image
    image = load_image(image_path, transform)
    image = image.to(device)

    # Generate caption
    with torch.no_grad():
        caption = model.caption_image(image, vocabulary, max_length=50)

    # Filter out special tokens and join the words to form the final caption
    caption = [word for word in caption if word not in {"<SOS>", "<EOS>", "<PAD>", "<UNK>"}]
    return ' '.join(caption)

# Assume that `model` is already trained and available in your script

# Define your transformations, same as used for the training images
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Path to your input image
image_path = 'path_to_your_image.jpg'

# Generate caption for the input image
caption = generate_caption(image_path, model, vocab, transform, device)
print("Generated Caption:", caption)