In [None]:
import os
import numpy as np
import random
import json
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from nltk.tokenize import word_tokenize
from PIL import Image
from tqdm import tqdm
from collections import Counter
import matplotlib.pyplot as plt
import torch
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torch.optim as optim
from torch.optim import Adam
import torch.nn as nn
import pandas as pd

from  customDatasetFromCSV import CustomDatasetFromCSV

CREATE DATASET AND GET CUDA DEVICE

In [None]:
captions_csv= './filesCSV/captions.csv'
vocab_csv ='./filesCSV/vocab.csv'
data_dir = "./imagesTrainVal/train2017/"


transform = transforms.Compose([
    transforms.Resize((224, 224)),  
    transforms.ToTensor(),       
     
])
dataset = CustomDatasetFromCSV(data_dir,captions_csv,vocab_csv,transform=transform,percentage=100)
len(dataset.vocab)

In [None]:


print("CUDA disponible:", torch.cuda.is_available())

if torch.cuda.is_available():
    print("Nombre de la GPU:", torch.cuda.get_device_name(0))
    print("Capacidad de la GPU:", torch.cuda.get_device_capability(0))
if torch.cuda.is_available():
    device = torch.device("cuda:0")
    print("Using GPU:", torch.cuda.get_device_name(device))
else:
    device = torch.device("cpu")
    print("No GPU available. Using CPU.")

MODEL ARQUITECTURE

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size, device):
        super(EncoderCNN, self).__init__()
        from torchvision.models.resnet import resnet50
        resnet = resnet50(pretrained=True)
        self.device = device
        # Disable learning for parameters
        for param in resnet.parameters():
            param.requires_grad_(False)

        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules).to(self.device)
        self.embed = nn.Linear(resnet.fc.in_features, embed_size).to(self.device)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.embed(features)
        return features

# --------- Decoder ----------

class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, caption_length, num_layers, device):
        super(DecoderRNN, self).__init__()
        self.device = device
        self.num_layers = num_layers
        self.caption_length = caption_length
        self.hidden_dim = hidden_size
        self.embed_size = embed_size
        # Embedding layer to convert token IDs to embeddings
        self.embed = nn.Embedding(vocab_size, embed_size).to(self.device)
        # GRU layer that takes embeddings and hidden states as input
        self.gru = nn.GRU(embed_size + embed_size, hidden_size, num_layers, batch_first=True).to(self.device)
        # Linear layer to produce the vocabulary distribution
        self.linear = nn.Linear(hidden_size, vocab_size).to(self.device)

    def init_hidden(self, batch_size):
        """
        Initialize the hidden state.
        """
        # Create initial hidden state filled with zeros
        return torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=self.device)

    def forward(self, features, captions):
        # Embed the start token
        input_token = torch.tensor([0], device=self.device) 
        input_token = input_token.expand(features.size(0)).unsqueeze(1)
        # Initialize the hidden state for the beginning of the sequence
        hidden = self.init_hidden(features.size(0))
        outputs = []

        for t in range(self.caption_length):
            embeddings = self.embed(input_token)
            #print(" Features shape: "+ str(features.shape))
            #print(f" Embeddings shape: {embeddings.shape}")
            inputs = torch.cat((features.unsqueeze(1), embeddings), dim=2)
            #print(f" inputs shape: {inputs.shape}")

            
            gru_out, hidden = self.gru(inputs, hidden)
            #print(f" gru_out shape: {gru_out.shape}")
            
            output = self.linear(gru_out.squeeze(1))
            #print(f" output shape: {output.shape}")
            output = output.unsqueeze(1)
            #print(f" output after unezquezee shape: {output.shape}")
            
            
            input_token = captions[:, t].unsqueeze(1).to(self.device)
            outputs.append(output)

        # Concatenate the output tensors along the sequence dimension
        outputs = torch.cat(outputs, dim=1)

        return outputs

    def generate(self, features):
        """
        Generate captions for the given image features.
        Args:
            features: Output from the encoder (image features)

        Returns:
            outputs: Predicted token scores (before softmax)
        """
        # Embed the start token
        input_token = torch.tensor([0], device=self.device)  # Start token
        input_token = input_token.expand(features.size(0)).unsqueeze(1).to(self.device)  # Move to the correct device

        # Initialize the hidden state for the beginning of the sequence
        hidden = self.init_hidden(features.size(0))
        outputs = []

        for _ in range(self.caption_length):  # Fixed caption length
            embeddings = self.embed(input_token)
            #print(" Features shape: "+ str(features.shape))
            #print(f" Embeddings shape: {embeddings.shape}")
            # Combine the image features and embedded captions as input
            inputs = torch.cat((features.unsqueeze(1), embeddings), dim=2)
            #print(f" inputs shape: {inputs.shape}")
            # Pass the inputs and hidden states through the GRU
            gru_out, hidden = self.gru(inputs, hidden)
            #print(f" gru_out shape: {gru_out.shape}")
            # Pass the GRU outputs through the linear layer
            output = self.linear(gru_out)
            #print(f" output shape: {output.shape}")

            # Update the input token for the next time step
            input_token = output.argmax(2).squeeze(1).to(self.device)  # Use argmax to get the next token
            #print(f" input token shape: {input_token.shape}")
            input_token = input_token.unsqueeze(1)  # Add the sequence dimension back
            #print(f" input token despues unezqeeze shape: {input_token.shape}")

            # Handle end token and padding token
            input_token[input_token == 2] = 0  # Replace end token with padding token
            outputs.append(output)
                
        outputs = torch.cat(outputs, dim=1)

        return outputs


class Model(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, caption_length, num_layers, device):
        super(Model, self).__init__()
        self.encoder = EncoderCNN(embed_size, device)
        self.decoder = DecoderRNN(embed_size, hidden_size, vocab_size, caption_length, num_layers, device)
        self.encoder.to(device)
        self.decoder.to(device)

    def forward(self, images, captions):
        features = self.encoder(images)
        outputs = self.decoder(features, captions)
        return outputs

    def generate(self, image):
        features = self.encoder(image)
        return self.decoder.generate(features)


HYPERPARAMETERS

In [None]:
# Define the hyperparameters
embed_size = 256
hidden_size = 512
num_layers = 1
vocab = dataset.vocab
vocab_size = len(vocab)
learning_rate = 0.001
max_caption_length = dataset.maxCaptionLength

# Initialize the model
model = Model(embed_size, hidden_size, vocab_size, max_caption_length, num_layers, device)  # Correct model initialization
model.to(device)

# Define the loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=0,) #first try without ignore_index=0
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Create dataLoaders
batch_size = 64  # Set your batch size

# Data loaders
train_loader = DataLoader(dataset, batch_size=batch_size, sampler=SubsetRandomSampler(range(len(dataset.train_data))))
val_loader = DataLoader(dataset, batch_size=batch_size, sampler=SubsetRandomSampler(range(len(dataset.train_data), len(dataset.train_data) + len(dataset.val_data))))
test_loader = DataLoader(dataset, batch_size=batch_size * 6, sampler=SubsetRandomSampler(range(len(dataset.train_data) + len(dataset.val_data), len(dataset))))

len(train_loader)


NUMBER OF TRAINABLE PARAMETERS

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
model.to(device)
total_params = count_parameters(model)
print(f'Total number of parameters: {total_params}')

TRAINING LOOP

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, SubsetRandomSampler
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

def train_model(search_for_existing_model, save_dir, embed_size, hidden_size, num_layers, vocab_size, learning_rate, max_caption_length, dataset, device, num_epochs=5, patience=3):
    # Initialize model
    model = Model(embed_size, hidden_size, vocab_size, max_caption_length, num_layers, device).to(device)

    start_epoch = 0
    if search_for_existing_model:
        print("Looking for model...")
        model_files = os.listdir(save_dir)
        best_model_path = None
        lowest_eval_loss = float('inf')
        for model_file in model_files:
            eval_loss_str = model_file.split('_')[-1].split('.pt')[0]
            try:
                eval_loss = float(eval_loss_str)
            except ValueError:
                eval_loss = float('inf')
            if eval_loss < lowest_eval_loss:
                lowest_eval_loss = eval_loss
                best_model_path = os.path.join(save_dir, model_file)

        if best_model_path is not None:
            model.load_state_dict(torch.load(best_model_path))
            start_epoch = int(best_model_path.split('_epoch_')[1].split('_Loss')[0])
            print(f"Loaded the best model from: {best_model_path}, start_epoch: {start_epoch}")
        else:
            print("No best model found in the directory.")

    os.makedirs(save_dir, exist_ok=True)
    '''
    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss(ignore_index=0)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Create data loaders
    batch_size = 64
    train_loader = DataLoader(dataset, batch_size=batch_size, sampler=SubsetRandomSampler(range(len(dataset.train_data))))
    val_loader = DataLoader(dataset, batch_size=batch_size, sampler=SubsetRandomSampler(range(len(dataset.train_data), len(dataset.train_data) + len(dataset.val_data))))
    test_loader = DataLoader(dataset, batch_size=batch_size * 6, sampler=SubsetRandomSampler(range(len(dataset.train_data) + len(dataset.val_data), len(dataset))))
    '''
    # Training loop with early stopping
    best_val_loss = float('inf')
    epochs_no_improve = 0

    for epoch in range(start_epoch, num_epochs):
        model.train()
        batch_counter = 0
        total_batches = len(train_loader)
        train_losses = []

        for images, captions in tqdm(train_loader, desc=f'Epoch {epoch + 1}'):
            images = images.to(device)
            captions = captions.to(device)

            optimizer.zero_grad()
            outputs = model(images, captions)
            loss = F.cross_entropy(outputs.view(-1, vocab_size), captions.contiguous().view(-1), ignore_index=0)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())
            batch_counter += 1

            if batch_counter % 500== 0:
                with torch.no_grad():
                    image = images[0].cpu().numpy()
                    image = np.transpose(image, (1, 2, 0))
                    plt.imshow(image)
                    plt.axis('off')
                    plt.show()
                    print("################### FORWARD METHOD ################### ")
                    print(f"Epoch [{epoch + 1}/{num_epochs}] - Batch [{batch_counter}/{total_batches}] - Loss: {loss.item():.4f}")
                    print("SHAPE OF OUTPUTS:"+str(outputs.shape))
                    generated_output_ids = outputs.argmax(dim=2).tolist()
                    generated_words = [dataset.id_to_token.get(token_id, "UNK") for token_id in generated_output_ids[0] if token_id != 0]
                    generated_caption_str = " ".join(generated_words)
                    real_caption_ids = captions.tolist()
                    real_caption_words = [dataset.id_to_token.get(int(token_id), '<UNK>') for token_id in real_caption_ids[0] if token_id != 0]
                    real_caption_str = " ".join(real_caption_words)
                    print(f"Generated Output: {generated_caption_str}")
                    print(f"Real Caption: {real_caption_str}")
                
                with torch.no_grad():
                    print("################### GENERATE METHOD ################### ")
                    generated_outputs = model.generate(images)
                    generated_output_ids = generated_outputs.argmax(dim=2).tolist()
                    generated_words = [dataset.id_to_token.get(token_id, "UNK") for token_id in generated_output_ids[0] if token_id != 0]
                    generated_caption_str = " ".join(generated_words)
                    real_caption_ids = captions.tolist()
                    real_caption_words = [dataset.id_to_token.get(int(token_id), '<UNK>') for token_id in real_caption_ids[0] if token_id != 0]
                    real_caption_str = " ".join(real_caption_words)
                    print(f"Generated Output (Generated): {generated_caption_str}")
                    print(f"Real Caption: {real_caption_str}")
                
        avg_train_loss = sum(train_losses) / len(train_losses)
        model_path = os.path.join(save_dir, f'Model3_epoch_{epoch + 1}_2024_06_09_{avg_train_loss:.4f}.pth')
        torch.save(model.state_dict(), model_path)

        model.eval()
        val_losses = []
        with torch.no_grad():
            for images, captions in val_loader:
                images = images.to(device)
                captions = captions.to(device)
                outputs = model(images, captions)
                val_loss = criterion(outputs.view(-1, vocab_size), captions.contiguous().view(-1))
                val_losses.append(val_loss.item())

        avg_val_loss = sum(val_losses) / len(val_losses)
        print(f"Epoch [{epoch + 1}/{num_epochs}] - Training Loss: {avg_train_loss:.4f} - Validation Loss: {avg_val_loss:.4f}")

        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        if epochs_no_improve == patience:
            print(f"Early stopping triggered after {epoch + 1} epochs.")
            break

    print("Training completed.")

# Usage
train_model(
    search_for_existing_model=False,
    save_dir='./SAVED_MODELS/model3/',
    embed_size=256,
    hidden_size=512,
    num_layers=1,
    vocab_size=len(dataset.vocab),
    learning_rate=0.001,
    max_caption_length=dataset.maxCaptionLength,
    dataset=dataset,
    device=device,
    num_epochs=5,
    patience=3
)

CHOOSING MODEL TO TEST

In [None]:
embed_size = 256
hidden_size = 512
num_layers = 1
vocab = dataset.vocab
vocab_size = len(vocab)
learning_rate = 0.001
max_caption_length = dataset.maxCaptionLength
model = Model(embed_size, hidden_size, vocab_size, max_caption_length, num_layers, device)  # Correct model initialization
model.to(device)

model_path= "./SAVED_MODELS_2024/Model3/Model3_epoch_2_2024_06_09_9.2786.pth"
model.load_state_dict(torch.load(model_path),strict=False)

TEST

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from tqdm import tqdm
import csv




# Create a CSV file to store real and generated captions
csv_file = './Results2024/Model3/Model3_epoch_2_2024_06_09_9.2786.csv'  # Change the filename as needed
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(['Real Caption', 'Generated Caption'])

# Set the model to evaluation mode
model.eval()

with torch.no_grad():
    batch_counter = 0  # Initialize batch counter
    total_batches = len(test_loader)  # Total number of batches in the test set
    for images, captions in tqdm(test_loader, desc='Testing'):
        # Move data to the appropriate device
        images = images.to(device)
        captions = captions.to(device)
        
        outputs = model.generate(images)  # Assuming your model generates captions given images
        outputs = outputs.argmax(dim=2)  # Get the token indices with the highest probability
        
        if batch_counter % 20 == 0:
            with torch.no_grad():
                generated_output_ids = outputs[0].tolist()
                generated_words = [dataset.id_to_token.get(token_id, "UNK") for token_id in generated_output_ids if token_id != 0]
                generated_caption_str = " ".join(generated_words)
                real_caption_ids = captions[0].tolist()
                real_caption_words = [dataset.id_to_token.get(int(token_id), '<UNK>') for token_id in real_caption_ids if token_id != 0]
                real_caption_str = " ".join(real_caption_words)
                print(f"Generated Output: {generated_caption_str}")
                print(f"Real Caption: {real_caption_str}")

        for i in range(len(images)):
            generated_output_ids = outputs[i].tolist()
            generated_words = [dataset.id_to_token.get(token_id, "UNK") for token_id in generated_output_ids if token_id != 0]
            generated_caption_str = " ".join(generated_words)
            real_caption_ids = captions[i].tolist()
            real_caption_words = [dataset.id_to_token.get(int(token_id), '<UNK>') for token_id in real_caption_ids if token_id != 0]
            real_caption_str = " ".join(real_caption_words)

            # Append the real and generated captions to the CSV file
            with open(csv_file, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([real_caption_str, generated_caption_str])
        batch_counter += 1

print("Testing completed. Real and generated captions saved.")

In [None]:
import pandas as pd
import nltk
from nltk.tokenize import word_tokenize
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer

# Download NLTK data
nltk.download('punkt')

# Load the CSV file
csv_file = './Results2024/Model3/Model3_epoch_2_2024_06_09_9.2786.csv'
df = pd.read_csv(csv_file)

# Lists to store metric scores
bleu_scores = []
rouge_l_scores = []

# Smoothing function for BLEU score
smoother = SmoothingFunction().method1
scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True)

for idx, row in df.iterrows():
    real_caption = " ".join(word_tokenize(row['Real Caption']))  # Join the words into a string
    generated_caption = " ".join(word_tokenize(row['Generated Caption']))  # Join the words into a string

    real_caption_tokenized = word_tokenize(real_caption)
    generated_caption_tokenized = word_tokenize(generated_caption)
    
    # BLEU Score
    bleu = sentence_bleu([real_caption], generated_caption, smoothing_function=smoother)
    bleu_scores.append(bleu)
    
    # ROUGE-L Score
    rouge_scores = scorer.score(real_caption, generated_caption)
    rouge_l_f1 = rouge_scores['rougeL'].fmeasure
    rouge_l_scores.append(rouge_l_f1)

# Add metric columns to the DataFrame
df['BLEU Score'] = bleu_scores
df['ROUGE-L Score'] = rouge_l_scores

# Calculate mean scores
mean_bleu_score = sum(bleu_scores) / len(bleu_scores)
mean_rouge_l_score = sum(rouge_l_scores) / len(rouge_l_scores)

# Print the DataFrame with scores and mean scores
print(df)
print("\nMean Metrics:")
print(f"Mean BLEU Score: {mean_bleu_score:.4f}")
print(f"Mean ROUGE-L Score: {mean_rouge_l_score:.4f}")


In [None]:
from PIL import Image
from torchvision import transforms
import torch
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from tqdm import tqdm
from torch.utils.data import DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn.functional as F
import cv2

model_path= "./SAVED_MODELS/model2/ModelEncoder2_epoch_4_Loss_2.2639.pth"


# Define the hyperparameters
embed_size = 256
hidden_size = 512
num_layers = 1
vocab = dataset.vocab
vocab_size = len(vocab)
learning_rate = 0.001
max_caption_length = dataset.maxCaptionLength

# Initialize the model
model = Model(embed_size, hidden_size, vocab_size, dataset.maxCaptionLength, num_layers, embed_size, device)  # Remove the duplicated hidden_size argument
model.to(device)

model.load_state_dict(torch.load(model_path))



def load_and_preprocess_image_cv2(image_path):
    # Load and preprocess the image using OpenCV
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
    image = cv2.resize(image, (224, 224))  # Resize to (224, 224)
    image = image / 255.0  # Normalize pixel values to the range [0, 1]
    image = torch.tensor(image, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)  # Add batch dimension
    return image
# Function to load and preprocess the image
def load_and_preprocess_image(image_path):
    image = Image.open(image_path).convert('RGB')
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
    ])
    image = transform(image)
    image = image.unsqueeze(0)  # Add batch dimension
    return image
def generate_caption(model, image_path, id_to_token, real_description=None):
    # Load and preprocess the image using OpenCV
    image = load_and_preprocess_image(image_path)

    # Move the model and image to the same device
    device = next(model.parameters()).device
    image = image.to(device)
    model.eval()
    model.to(device)

    # Display the image using OpenCV
    plt.imshow(cv2.cvtColor(cv2.imread(image_path), cv2.COLOR_BGR2RGB))
    plt.axis('off')
    plt.show()

    with torch.no_grad():
        # Get the generated tokens and attention scores
        generated_tokens, attention_scores = model.generate(image)

        # Convert token IDs to words
        generated_words = [id_to_token.get(token_id.item(), "UNK") for token_id in generated_tokens[0] if token_id != 0]
        generated_caption_str = " ".join(generated_words)

    print("Generated description by the model:", "###  ", generated_caption_str, "  ###")
    if real_description:
        print(f"Real description made by hand by the tester: ###  {real_description}  ###")







# Loop over each image in the folder
folder_path = './ImagenesPrueba/'
for filename in os.listdir(folder_path):
    if filename:#.endswith(".jpg")
        image_path = os.path.join(folder_path, filename)

        # Print the image filename
        print(f"Image: {filename}")

        # Use the generate_caption function
        generate_caption(model, image_path, dataset.id_to_token)
        print("\n")