In [None]:
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.optim.lr_scheduler import ReduceLROnPlateau


from get_loader import  get_loader, get_length_vocab, get_pad_index, get_vocab, show_image, get_vocab_stoi

from utils.utils_2 import weights_matrix
from train_and_val import train, validate, train_and_visualize_caps, evaluate_caps

from test import test_caps, test 

import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import wandb
import warnings
warnings.filterwarnings("ignore")
import pickle
import bcolz

In [None]:
# MODEL

class EncoderCNN(nn.Module):
    def __init__(self,embed_size):
        super(EncoderCNN,self).__init__()
        resnet = models.resnet50(pretrained=True) 
        for param in resnet.parameters():
            param.requires_grad_(False)
        
        modules = list(resnet.children())[:-1] # To extract the features of Rsenet from the last layer before the Softmax is applied
        self.resnet = nn.Sequential(*modules)
        self.embed = nn.Linear(resnet.fc.in_features,embed_size) 
        
    def forward(self,images):
        features = self.resnet(images) # resenet features shape - torch.Size([4, 2048, 1, 1])
        features = features.view(features.size(0),-1)  # resenet features viewed shape - torch.Size([4, 2048])
        features = self.embed(features) # resenet features embed shape - torch.Size([4, 400]
        
        return features

class DecoderRNN(nn.Module):
    def __init__(self,embed_size,hidden_size,vocab_size,num_layers=1,drop_prob=0.3):
            super(DecoderRNN,self).__init__()
            self.embedding = nn.Embedding(vocab_size,embed_size)
            self.lstm = nn.LSTM(embed_size,hidden_size,num_layers=num_layers,batch_first=True)
            self.batch_norm = nn.BatchNorm1d(hidden_size)  # Add batch normalization layer
            self.fcn = nn.Linear(hidden_size,vocab_size)
            self.drop = nn.Dropout(drop_prob)
        
    def forward(self, features, captions, teacher_forcing_prob=0.5):
        # vectorize the caption
        # caption shape - torch.Size([4, 14])
        embeds = self.embedding(captions[:,:-1]) # shape of embeds - torch.Size([4, 14, 400])
        # features shape - torch.Size([4, 400])
        x = torch.cat((features.unsqueeze(1),embeds),dim=1) # features unsqueeze at index 1 shape - torch.Size([4, 1, 400])
        # shape of x - torch.Size([4, 15, 400])
        x,_ = self.lstm(x)
        # shape of x after lstm - torch.Size([4, 15, 512])
        x = self.fcn(x)

        if self.training and teacher_forcing_prob > 0.0:
            use_teacher_forcing = torch.rand(1).item() < teacher_forcing_prob
            if use_teacher_forcing:
                 x = x[:, :-1, :] # Exclude the last predicted step, , so ground truth is used from the second to the last time, ignoring predicted step.

        return x

    def generate_caption(self,inputs,hidden=None,max_len=25,vocab=None):

        # Given the image features generate the caption
        batch_size = inputs.size(0)
        captions = []
        for i in range(max_len):
            output,hidden = self.lstm(inputs,hidden)
            output = self.fcn(output)
            output = output.view(batch_size,-1)
        
            #select the word with most val
            predicted_word_idx = output.argmax(dim=1)
            
            #save the generated word
            captions.append(predicted_word_idx.item())
            
            #end if <EOS detected>
            if vocab[predicted_word_idx.item()] == "<EOS>":
                break
            
            # Embed the predicted word to the next time step
            inputs = self.embedding(predicted_word_idx.unsqueeze(1))
        
            #convert the vocab idx to words and return generated sentence
        return [vocab[idx] for idx in captions]  
  
class EncoderDecoder2(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1, drop_prob=0.3):
        super(EncoderDecoder2, self).__init__()
        self.encoder = EncoderCNN(embed_size)
        self.decoder = DecoderRNN(embed_size, hidden_size, vocab_size, num_layers, drop_prob)

    def forward(self, images, captions, teacher_forcing_prob=0.5):
        features = self.encoder(images)
        outputs = self.decoder(features, captions, teacher_forcing_prob)

In [None]:
main_path_miguel = 'C:/Users/Miguel/OneDrive/Escritorio/dlnn-project_ia-group_2/'
data_dir = main_path_miguel + 'data/Images/'
captions_file = main_path_miguel + 'data/captions.txt'

input_size = (224,224)

transform_train = transforms.Compose(
[
transforms.Resize(input_size),   
transforms.RandomHorizontalFlip(),          # ADDED RANDOM HORIZONTAL FLIP
transforms.ToTensor(),
transforms.Normalize(
    mean=[0.485, 0.456, 0.406],  # Normalize with the mean
    std=[0.229, 0.224, 0.225]    # Normalize with the standard deviation
)
])

transform_val = transforms.Compose(
[
transforms.Resize(input_size),
transforms.ToTensor(),
transforms.Normalize(
    mean=[0.485, 0.456, 0.406],  # Normalize with the mean
    std=[0.229, 0.224, 0.225]    # Normalize with the standard deviation
)
])

# Split data into train and test sets
df_captions = pd.read_csv(captions_file)
unique_images = df_captions['image'].unique()
train_images, testval_images = train_test_split(unique_images, test_size=0.25, random_state=42)
val_images, test_images = train_test_split(testval_images, test_size=0.5, random_state=42)

train_df = df_captions[df_captions['image'].isin(train_images)]
val_df = df_captions[df_captions['image'].isin(val_images)]
test_df = df_captions[df_captions['image'].isin(test_images)]

pad_index = get_pad_index(data_dir=data_dir, dataframe=train_df, transform=transform_train)

vocab_train_df = get_vocab(data_dir=data_dir, dataframe=train_df, transform=transform_train)
vocab_val_df = get_vocab(data_dir=data_dir, dataframe=val_df, transform=transform_val)
vocab_test_df = get_vocab(data_dir=data_dir, dataframe=test_df, transform=transform_val)


# Create train, validation, and test data loaders
train_dataloader = get_loader(data_dir=data_dir, dataframe=train_df, transform=transform_train, batch_size=32, num_workers=1, shuffle = True) # Batch size 32
val_dataloader = get_loader(data_dir=data_dir, dataframe=val_df, transform=transform_val, batch_size=8, num_workers=1, shuffle = True) # Batch size 8
test_dataloader = get_loader(data_dir=data_dir, dataframe=test_df, transform=transform_val, batch_size=8, num_workers=1)

In [None]:
# Hyperparameters
embed_size = 300
hidden_size = 512
vocab_size_train = len(vocab_train_df)
num_layers = 2
learning_rate = 0.001
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# TENER EL PATH POR FUERA, ES DECIR NO EN EL REPOSITORIO YA Q SINO SE COLAPSA Y NO PODEMOS COMITEAR
# TIENE Q SER LA CARPETA CON LOS PICKLES YA CREADOS
miguel_path_glove = r"C:\Users\Miguel\OneDrive\Escritorio\2n curs\2n Semestre\Neural Networks and Deep Learning\Project\glove_files"

raw_glove_path = miguel_path_glove
processed_glove_path = miguel_path_glove
vectors = bcolz.open(f'{raw_glove_path}/6B.100.dat')[:]
words = pickle.load(open(f'{processed_glove_path}/6B.100_words.pkl', 'rb'))
word2idx = pickle.load(open(f'{processed_glove_path}/6B.100_idx.pkl', 'rb'))

glove = {w: vectors[word2idx[w]] for w in words}

In [None]:
weights = weights_matrix(vocab_train_df, 100, glove)

In [None]:
model = EncoderDecoder2(embed_size, hidden_size, vocab_size_train, num_layers, weight_matrix=weights).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=pad_index)
optimizer = optim.Adam(model.parameters(), lr=learning_rate) # We proved to apply weight decay that is L2 reg to prevent overfitting, but not worked well
# #scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True) # also tried scheduler

In [None]:
# Here we only print and calculate the validation loss
def validate(criterion, model, loader, device): # vocab tendria q ser train_vocab_df

    model.eval()
    total_loss = 0
    total_samples = 0

    with torch.no_grad():
        for images, captions,_ in loader:
            images = images.to(device)
            captions = captions.to(device)
            batch_size = images.size(0)
            total_samples += batch_size

            outputs = model(images, captions)
            loss = criterion(outputs.view(-1, outputs.size(-1)), captions.view(-1))
            total_loss += loss.item() * batch_size

    average_loss = total_loss / total_samples
    print("Validation set: AVERAGE VALIDATION LOSS: {:.5f}".format(average_loss))
    return average_loss

# Here we only print and calculate the train loss
def train(epoch, criterion, model, optimizer, loader, device):
    total_samples = 0
    total_loss = 0.0
    print_every = 250

    model.train()

    for batch_idx, (images, captions,_) in enumerate(loader):
        images = images.to(device)
        
        batch_size = images.size(0)
        total_samples += batch_size
        optimizer.zero_grad()

        outputs = model(images, captions)
        loss = criterion(outputs.view(-1, outputs.size(-1)), captions.view(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * batch_size

        if (batch_idx + 1) % print_every == 0:
            print("Train Epoch: {} Batch [{}/{}]\tLoss: {:.5f}".format(
                epoch, batch_idx + 1, len(loader), loss.item()
            ))

    average_loss = total_loss / total_samples
    print("Train Epoch: {} Average Loss: {:.5f}".format(epoch, average_loss))

    return average_loss
        
# In this function we train the model and visualize the 
# generated caption for image in the val set every 400 in the batch 
# per epoch we see the train loss 2 times (every 400 and the batch is of size 800))
# Also per epoch we visualize the avergae training loss and val loss in the batch and the plot to compare them
# Here we have merged visualization of caps with training to use ONLY 1 function to train
def train_and_visualize_caps(epoch, train_dataloader, val_dataloader, model, optimizer, criterion, vocab, val_df, device):
    print_every = 400
    total_loss = 0
    total_samples = 0
    model.train()
    for batch_idx, (image, captions,_) in enumerate(iter(train_dataloader)):
        images, captions = image.to(device), captions.to(device)
        batch_size = images.size(0)
        total_samples += batch_size
        optimizer.zero_grad()
        outputs = model(images, captions)
        
        # Calculate the batch loss.
        loss = criterion(outputs.view(-1, outputs.size(-1)), captions.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * batch_size
        if (batch_idx + 1) % print_every == 0:
            print("Train Epoch: {} Batch [{}/{}]\tLoss: {:.5f}".format(epoch,
            batch_idx + 1, len(train_dataloader), loss.item()
        ))
            #generate the caption
            model.eval()
            with torch.no_grad():
                dataiter = iter(val_dataloader)
                img,captions_val,img_dir = next(dataiter)
                df_filtered = val_df.loc[val_df['image'] == img_dir[0], 'caption']
                original_captions = [caption.lower() for caption in df_filtered] # list of all the original captions
                features = model.encoder(img[0:1].to(device))
                caps = model.decoder.generate_caption(features.unsqueeze(0),vocab=vocab)
                pred_caption = ' '.join(caps)
                pred_caption = ' '.join(pred_caption.split()[1:-1]) # to erase sos and eos tokens from pred caption
                original_caption, bleu_score = best_bleu_cap(original_captions, pred_caption) # call to function in utils.py
                print("Best original caption (1 out of 5):", original_caption)
                print("Predicted caption:", pred_caption)
                print("BLEU score :", bleu_score)
                show_image(img[0],title=pred_caption)
            model.train()

In [None]:
### USING TRAIN AND VAL SEPARATED FUNCTIONS ONLY VISUALIZING LOSS VALUES FROM TRAINING AND VAL

losses = {"train": [], "val": []}
for epoch in range(40):

    train_loss = train_and_visualize_caps(epoch, train_dataloader, val_dataloader, model, optimizer, criterion, vocab_train_df, val_df, device)
    val_loss = validate(criterion, model, val_dataloader, device)
    losses["train"].append(train_loss)
    losses["val"].append(val_loss)

    
    plt.plot(losses["train"], label="training loss")
    plt.plot(losses["val"], label="validation loss")

    plt.legend()
    plt.pause(0.000001)
    plt.show()



# RUN THE CELL BELOW TO SAVE THE MODEL IN A PKL TO THEN BE ABLE TO USED TO TEST