In [1]:
# Import necessary libraries
from pathlib import Path
import os
import pickle
from collections import Counter
import nltk
from pycocotools.coco import COCO
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from tqdm import tqdm
import numpy as np

# Download NLTK data
nltk.download('punkt')

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [2]:
import os.path


class Vocabulary(object):
    def __init__(
        self,
        vocab_threshold,
        vocab_file="/kaggle/input/vocab/pytorch/default/1/vocab.pkl",  
        start_word="<start>",
        end_word="<end>",
        unk_word="<unk>",
        annotations_file="/kaggle/input/coco-image-caption/annotations_trainval2014/annotations/captions_train2014.json",  # COCO 2014 training annotations
        vocab_from_file=False,
    ):
        """Initialize the vocabulary.
        Args:
          vocab_threshold: Minimum word count threshold.
          vocab_file: File containing the vocabulary.
          start_word: Special word denoting sentence start.
          end_word: Special word denoting sentence end.
          unk_word: Special word denoting unknown words.
          annotations_file: Path for train annotation file.
          vocab_from_file: If False, create vocab from scratch and override any existing vocab_file
                           If True, load vocab from existing vocab_file, if it exists
        """
        self.vocab_threshold = vocab_threshold
        self.vocab_file = vocab_file
        self.start_word = start_word
        self.end_word = end_word
        self.unk_word = unk_word
        self.annotations_file = annotations_file
        self.vocab_from_file = vocab_from_file
        self.get_vocab()

    def get_vocab(self):
        """Load the vocabulary from file OR build the vocabulary from scratch."""
        if os.path.exists(self.vocab_file) and self.vocab_from_file:
            with open(self.vocab_file, "rb") as f:
                vocab = pickle.load(f)
            self.word2idx = vocab.word2idx
            self.idx2word = vocab.idx2word
            print("Vocabulary successfully loaded from vocab.pkl file!")

        # create a new vocab file
        else:
            self.build_vocab()
            with open(self.vocab_file, "wb") as f:
                pickle.dump(self, f)

    def build_vocab(self):
        """Populate the dictionaries for converting tokens to integers (and vice-versa)."""
        self.init_vocab()
        self.add_word(self.start_word)
        self.add_word(self.end_word)
        self.add_word(self.unk_word)
        self.add_captions()

    def init_vocab(self):
        """Initialize the dictionaries for converting tokens to integers (and vice-versa)."""
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        """Add a token to the vocabulary."""
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def add_captions(self):
        """Loop over training captions and add all tokens to the vocabulary that meet or exceed the threshold."""
        coco = COCO(self.annotations_file)
        counter = Counter()
        ids = coco.anns.keys()
        for i, idx in enumerate(ids):
            caption = str(coco.anns[idx]["caption"])
            tokens = nltk.tokenize.word_tokenize(caption.lower())
            counter.update(tokens)

            if i % 100000 == 0:
                print("[%d/%d] Tokenizing captions..." % (i, len(ids)))

        # keep only words that repeated more than threshold times in the final vocabulary
        words = [word for word, cnt in counter.items() if cnt >= self.vocab_threshold]

        for i, word in enumerate(words):
            self.add_word(word)

    def __call__(self, word):
        if word not in self.word2idx:
            return self.word2idx[self.unk_word]
        return self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)

In [3]:
from nltk.translate.bleu_score import corpus_bleu


def clean_sentence(output, idx2word):
    sentence = ""
    for i in output:
        word = idx2word[i]
        if i == 0:
            continue
        if i == 1:
            break
        if i == 18:
            sentence = sentence + word
        else:
            sentence = sentence + " " + word
    return sentence


def bleu_score(true_sentences, predicted_sentences):
    hypotheses = []
    references = []
    for img_id in set(true_sentences.keys()).intersection(
        set(predicted_sentences.keys())
    ):
        img_refs = [cap.split() for cap in true_sentences[img_id]]
        references.append(img_refs)
        hypotheses.append(predicted_sentences[img_id][0].strip().split())

    return corpus_bleu(references, hypotheses)


In [4]:
class CoCoDataset(data.Dataset):
    def __init__(
        self,
        transform,
        mode,
        batch_size,
        vocab_threshold,
        vocab_file,
        start_word="<start>",
        end_word="<end>",
        unk_word="<unk>",
        annotations_file=None,
        vocab_from_file=False,
        img_folder=None,
    ):
        """
        Initialize the COCO dataset.

        Args:
            transform: Image transformations (e.g., resizing, normalization).
            mode: One of "train" or "test".
            batch_size: Batch size for training.
            vocab_threshold: Minimum word frequency threshold for vocabulary.
            vocab_file: Path to save/load the vocabulary.
            start_word: Special token for the start of a sentence.
            end_word: Special token for the end of a sentence.
            unk_word: Special token for unknown words.
            annotations_file: Path to the COCO annotations file.
            vocab_from_file: If True, load vocabulary from file; else, build from scratch.
            img_folder: Path to the folder containing images.
        """
        self.transform = transform
        self.mode = mode
        self.batch_size = batch_size
        self.img_folder = img_folder

        # Initialize vocabulary
        self.vocab = Vocabulary(
            vocab_threshold,
            vocab_file,
            start_word,
            end_word,
            unk_word,
            annotations_file,
            vocab_from_file,
        )

        if self.mode == "train":
            # Load COCO annotations for training
            self.coco = COCO(annotations_file)
            self.ids = list(self.coco.anns.keys())
            print("Obtaining caption lengths...")

            # Tokenize captions and store their lengths
            tokenized_captions = [
                nltk.tokenize.word_tokenize(
                    str(self.coco.anns[self.ids[index]]["caption"]).lower()
                )
                for index in tqdm(np.arange(len(self.ids)))
            ]
            self.caption_lengths = [len(token) for token in tokenized_captions]
        else:
            # Load COCO annotations for testing
            test_info = json.loads(open(annotations_file).read())
            self.paths = [item["file_name"] for item in test_info["images"]]

    def __getitem__(self, index):
        """
        Get an item from the dataset.

        Args:
            index: Index of the item to retrieve.

        Returns:
            In training mode: (image, caption), where caption is a tensor of word indices.
            In testing mode: (original_image, transformed_image).
        """
        if self.mode == "train":
            # Get caption and image ID
            ann_id = self.ids[index]
            caption = self.coco.anns[ann_id]["caption"]
            img_id = self.coco.anns[ann_id]["image_id"]
            path = self.coco.loadImgs(img_id)[0]["file_name"]

            # Load and preprocess the image
            image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            image = self.transform(image)

            # Convert caption to tensor of word indices
            tokens = nltk.tokenize.word_tokenize(str(caption).lower())
            caption = [self.vocab(self.vocab.start_word)]  # Add <start> token
            caption.extend([self.vocab(token) for token in tokens])  # Add word tokens
            caption.append(self.vocab(self.vocab.end_word))  # Add <end> token
            caption = torch.Tensor(caption).long()  # Convert to tensor

            return image, caption

        else:
            # Get image path
            path = self.paths[index]

            # Load and preprocess the image
            pil_image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            orig_image = np.array(pil_image)  # Keep original image for visualization
            image = self.transform(pil_image)  # Apply transformations

            return orig_image, image

    def get_train_indices(self):
        """
        Get a batch of indices for training, ensuring all captions have the same length.

        Returns:
            List of indices.
        """
        sel_length = np.random.choice(self.caption_lengths)  # Randomly select a caption length
        all_indices = np.where(
            [
                self.caption_lengths[i] == sel_length
                for i in np.arange(len(self.caption_lengths))
            ]
        )[0]
        indices = list(np.random.choice(all_indices, size=self.batch_size))  # Select batch_size indices
        return indices

    def __len__(self):
        """
        Get the number of samples in the dataset.

        Returns:
            Number of samples.
        """
        if self.mode == "train":
            return len(self.ids)
        else:
            return len(self.paths)

In [5]:
def get_loader(
    transform,
    mode="train",
    batch_size=32,
    vocab_threshold=None,
    vocab_file="/kaggle/input/vocab/pytorch/default/1/vocab.pkl",
    start_word="<start>",
    end_word="<end>",
    unk_word="<unk>",
    vocab_from_file=True,
    num_workers=4,
    img_folder=None,
    annotations_file=None,
):

    assert mode in ["train", "val", "test"], "mode must be one of 'train', 'val', or 'test'."

    if mode == "train" and (not img_folder or not annotations_file):
        raise ValueError("img_folder and annotations_file must be provided in 'train' mode.")

    # Initialize COCO dataset
    dataset = CoCoDataset(
        transform=transform,
        mode=mode,
        batch_size=batch_size,
        vocab_threshold=vocab_threshold,
        vocab_file=vocab_file,
        start_word=start_word,
        end_word=end_word,
        unk_word=unk_word,
        annotations_file=annotations_file,
        vocab_from_file=vocab_from_file,
        img_folder=img_folder,
    )

    # Configure DataLoader
    if mode == "train":
        indices = dataset.get_train_indices()
        sampler = torch.utils.data.SubsetRandomSampler(indices)

        # batch_sampler for training
        batch_sampler = torch.utils.data.BatchSampler(
            sampler=sampler, batch_size=batch_size, drop_last=False
        )
        data_loader = DataLoader(
            dataset=dataset,
            batch_sampler=batch_sampler,
            num_workers=num_workers,
            pin_memory=True,
            prefetch_factor=4,
            persistent_workers=True,         )
    else:
        data_loader = DataLoader(
            dataset=dataset,
            batch_size=batch_size,
            num_workers=num_workers,
            pin_memory=True,
            prefetch_factor=4,
            persistent_workers=True, 

            
            
        )

    return data_loader


In [6]:

import torch.utils.data as data
from PIL import Image

def val_get_loader(
    transform,
    mode="valid",
    batch_size=1,
    vocab_threshold=None,
    vocab_file="/kaggle/input/vocab/pytorch/default/1/vocab.pkl",
    start_word="<start>",
    end_word="<end>",
    unk_word="<unk>",
    vocab_from_file=True,
    num_workers=4,
    img_folder=None, 
    annotations_file=None,  
):
    """
    Returns the data loader for the COCO dataset.

    Args:
        transform: Image transformations (e.g., resizing, normalization).
        mode: One of 'train', 'valid', or 'test'.
        batch_size: Batch size (if in testing mode, must have batch_size=1).
        vocab_threshold: Minimum word count threshold for vocabulary.
        vocab_file: Path to save/load the vocabulary.
        start_word: Special token for the start of a sentence.
        end_word: Special token for the end of a sentence.
        unk_word: Special token for unknown words.
        vocab_from_file: If True, load vocabulary from file; else, build from scratch.
        num_workers: Number of subprocesses to use for data loading.
        img_folder: Path to the image folder.
        annotations_file: Path to the annotations file.

    Returns:
        DataLoader: PyTorch DataLoader for the COCO dataset.
    """
    assert mode in ["train", "valid", "test"], "mode must be one of 'train', 'valid', or 'test'."

    if not vocab_from_file:
        assert mode == "train", "To generate vocab from captions file, must be in training mode (mode='train')."

    if mode == "valid" and (not img_folder or not annotations_file):
        raise ValueError("img_folder and annotations_file must be provided in 'valid' mode.")

    # Initialize COCO dataset
    dataset = CoCoDataset(
        transform=transform,
        mode=mode,
        batch_size=batch_size,
        vocab_threshold=vocab_threshold,
        vocab_file=vocab_file,
        start_word=start_word,
        end_word=end_word,
        unk_word=unk_word,
        annotations_file=annotations_file,
        vocab_from_file=vocab_from_file,
        img_folder=img_folder,
    )

    # DataLoader for validation or testing mode
    data_loader = data.DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        num_workers=4,  # Increase based on your system's cores
        pin_memory=True,
        shuffle = True,
        prefetch_factor=4,
    )

    return data_loader

class CoCoDataset(data.Dataset):
    def __init__(
        self,
        transform,
        mode,
        batch_size,
        vocab_threshold,
        vocab_file,
        start_word,
        end_word,
        unk_word,
        annotations_file,
        vocab_from_file,
        img_folder,
    ):
        self.transform = transform
        self.mode = mode
        self.batch_size = batch_size
        self.vocab = Vocabulary(
            vocab_threshold,
            vocab_file,
            start_word,
            end_word,
            unk_word,
            annotations_file,
            vocab_from_file,
        )
        self.img_folder = img_folder
        if self.mode == "train":
            self.coco = COCO(annotations_file)
            self.ids = list(self.coco.anns.keys())
            print("Obtaining caption lengths...")
            all_tokens = [
                nltk.tokenize.word_tokenize(
                    str(self.coco.anns[self.ids[index]]["caption"]).lower()
                )
                for index in tqdm(np.arange(len(self.ids)))
            ]
            self.caption_lengths = [len(token) for token in all_tokens]
        else:
            test_info = json.loads(open(annotations_file).read())
            self.paths = [item["file_name"] for item in test_info["images"]]

    def __getitem__(self, index):
        # obtain image and caption if in training mode
        if self.mode == "train":
            ann_id = self.ids[index]
            caption = self.coco.anns[ann_id]["caption"]
            img_id = self.coco.anns[ann_id]["image_id"]
            path = self.coco.loadImgs(img_id)[0]["file_name"]

            # Convert image to tensor and pre-process using transform
            image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            image = self.transform(image)

            # Convert caption to tensor of word ids.
            tokens = nltk.tokenize.word_tokenize(str(caption).lower())
            caption = []
            caption.append(self.vocab(self.vocab.start_word))
            caption.extend([self.vocab(token) for token in tokens])
            caption.append(self.vocab(self.vocab.end_word))
            caption = torch.Tensor(caption).long()

            # return pre-processed image and caption tensors
            return image, caption

        elif self.mode == "valid":
            path = self.paths[index]
            image_id = int(path.split("/")[0].split(".")[0].split("_")[-1])
            pil_image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            image = self.transform(pil_image)

            # return image_id and pre-processed image tensor
            return image_id, image

        # obtain image if in test mode
        else:
            path = self.paths[index]

            # Convert image to tensor and pre-process using transform
            pil_image = Image.open(os.path.join(self.img_folder, path)).convert("RGB")
            orig_image = np.array(pil_image)
            image = self.transform(pil_image)

            # return original image and pre-processed image tensor
            return orig_image, image

    def get_train_indices(self):
        sel_length = np.random.choice(self.caption_lengths)
        all_indices = np.where(
            [
                self.caption_lengths[i] == sel_length
                for i in np.arange(len(self.caption_lengths))
            ]
        )[0]
        indices = list(np.random.choice(all_indices, size=self.batch_size))
        return indices

    def __len__(self):
        if self.mode == "train":
            return len(self.ids)
        else:
            return len(self.paths)

In [7]:
# ----------- Encoder ------------
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet34(pretrained=True)
        # disable learning for parameters
        for param in resnet.parameters():
            param.requires_grad_(False)

        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.embed(features)
        return features


# --------- Decoder ----------
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        """
        Args:
            embed_size: final embedding size of the CNN encoder
            hidden_size: hidden size of the LSTM
            vocab_size: size of the vocabulary
            num_layers: number of layers of the LSTM
        """
        super(DecoderRNN, self).__init__()

        # Assigning hidden dimension
        self.hidden_dim = hidden_size
        # Map each word index to a dense word embedding tensor of embed_size
        self.embed = nn.Embedding(vocab_size, embed_size)
        # Creating LSTM layer
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        # Initializing linear to apply at last of RNN layer for further prediction
        self.linear = nn.Linear(hidden_size, vocab_size)
        # Initializing values for hidden and cell state
        self.hidden = (torch.zeros(1, 1, hidden_size), torch.zeros(1, 1, hidden_size))

    def forward(self, features, captions):
        """
        Args:
            features: features tensor. shape is (bs, embed_size)
            captions: captions tensor. shape is (bs, cap_length)
        Returns:
            outputs: scores of the linear layer

        """
        # remove <end> token from captions and embed captions
        cap_embedding = self.embed(
            captions[:, :-1]
        )  # (bs, cap_length) -> (bs, cap_length-1, embed_size)

        # concatenate the images features to the first of caption embeddings.
        # [bs, embed_size] => [bs, 1, embed_size] concat [bs, cap_length-1, embed_size]
        # => [bs, cap_length, embed_size] add encoded image (features) as t=0
        embeddings = torch.cat((features.unsqueeze(dim=1), cap_embedding), dim=1)

        #  getting output i.e. score and hidden layer.
        # first value: all the hidden states throughout the sequence. second value: the most recent hidden state
        lstm_out, self.hidden = self.lstm(
            embeddings
        )  # (bs, cap_length, hidden_size), (1, bs, hidden_size)
        outputs = self.linear(lstm_out)  # (bs, cap_length, vocab_size)

        return outputs

    def sample(self, inputs, states=None, max_len=50):
        """
        accepts pre-processed image tensor (inputs) and returns predicted
        sentence (list of tensor ids of length max_len)
        Args:
            inputs: shape is (1, 1, embed_size)
            states: initial hidden state of the LSTM
            max_len: maximum length of the predicted sentence

        Returns:
            res: list of predicted words indices
        """
        res = []

        for i in range(max_len):
            lstm_out, states = self.lstm(
                inputs, states
            )  # lstm_out: (1, 1, hidden_size)
            outputs = self.linear(lstm_out.squeeze(dim=1))  # outputs: (1, vocab_size)
            _, predicted_idx = outputs.max(dim=1)  # predicted: (1, 1)
            res.append(predicted_idx.item())
            # if the predicted idx is the stop index, the loop stops
            if predicted_idx == 1:
                break
            inputs = self.embed(predicted_idx)  # inputs: (1, embed_size)
            # prepare input for next iteration
            inputs = inputs.unsqueeze(1)  # inputs: (1, 1, embed_size)

        return res


In [8]:

import math
from tqdm.notebook import tqdm
import torch.utils.data as data
from collections import defaultdict

%load_ext autoreload
%autoreload 2

In [9]:


# Base dataset paths
base_path_dataset = Path('/kaggle/input/mscoco')
base_path_annotation = Path('/kaggle/input/coco-image-caption/annotations_trainval2014')

# Annotation files
train_annot_file = base_path_annotation / 'annotations' / 'captions_train2014.json'
val_annot_file = base_path_annotation / 'annotations' / 'captions_val2014.json'

# Image folders
train_images_folder = base_path_dataset / 'train2014' / 'train2014'
val_images_folder = base_path_dataset / 'val2014' / 'val2014'
test_images_folder = base_path_dataset / 'test2014' / 'test2014'


In [10]:
# Hyperparameters
batch_size = 128  
vocab_threshold = 5  # minimum word count threshold
vocab_from_file = True  # if True, load existing vocab file
embed_size = 128  # dimensionality of image and word embeddings
hidden_size = 256  # number of features in hidden state of the RNN decoder
num_epochs = 5  # number of training epochs
save_every = 1  # determines frequency of saving model weights
print_every = 20  # determines window for printing average loss
log_file = "/kaggle/working/training_log.txt"  # path to save training log


In [11]:
# Image transformations for training
transform_train = transforms.Compose([
    # Smaller edge of image resized to 256
    transforms.Resize(256),
    # Get 224x224 crop from random location
    transforms.RandomCrop(224),
    # Horizontally flip image with probability=0.5
    transforms.RandomHorizontalFlip(),
    # Convert the PIL Image to a tensor
    transforms.ToTensor(),
    # Normalize image for pre-trained model
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])

In [12]:
data_loader = get_loader(
    transform=transform_train,
    mode="train",
    batch_size=batch_size,
    vocab_threshold=vocab_threshold,
    vocab_file="/kaggle/working/vocab.pkl",
    start_word="<start>",
    end_word="<end>",
    unk_word="<unk>",
    vocab_from_file=False,
    num_workers=4,
    img_folder=str(train_images_folder),
    annotations_file=str(train_annot_file),
)


loading annotations into memory...
Done (t=1.30s)
creating index...
index created!
[0/414113] Tokenizing captions...
[100000/414113] Tokenizing captions...
[200000/414113] Tokenizing captions...
[300000/414113] Tokenizing captions...
[400000/414113] Tokenizing captions...
loading annotations into memory...
Done (t=0.68s)
creating index...
index created!
Obtaining caption lengths...


  0%|          | 0/414113 [00:00<?, ?it/s]

In [13]:
# ----------- Encoder ------------
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet34(pretrained=True)
        # disable learning for parameters
        for param in resnet.parameters():
            param.requires_grad_(False)

        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.embed(features)
        return features


# --------- Decoder ----------
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        """
        Args:
            embed_size: final embedding size of the CNN encoder
            hidden_size: hidden size of the LSTM
            vocab_size: size of the vocabulary
            num_layers: number of layers of the LSTM
        """
        super(DecoderRNN, self).__init__()

        # Assigning hidden dimension
        self.hidden_dim = hidden_size
        # Map each word index to a dense word embedding tensor of embed_size
        self.embed = nn.Embedding(vocab_size, embed_size)
        # Creating LSTM layer
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        # Initializing linear to apply at last of RNN layer for further prediction
        self.linear = nn.Linear(hidden_size, vocab_size)
        # Initializing values for hidden and cell state
        self.hidden = (torch.zeros(1, 1, hidden_size), torch.zeros(1, 1, hidden_size))

    def forward(self, features, captions):
        """
        Args:
            features: features tensor. shape is (bs, embed_size)
            captions: captions tensor. shape is (bs, cap_length)
        Returns:
            outputs: scores of the linear layer

        """
        # remove <end> token from captions and embed captions
        cap_embedding = self.embed(
            captions[:, :-1]
        )  # (bs, cap_length) -> (bs, cap_length-1, embed_size)

        embeddings = torch.cat((features.unsqueeze(dim=1), cap_embedding), dim=1)

        #  getting output i.e. score and hidden layer.
        # first value: all the hidden states throughout the sequence. second value: the most recent hidden state
        lstm_out, self.hidden = self.lstm(
            embeddings
        )  # (bs, cap_length, hidden_size), (1, bs, hidden_size)
        outputs = self.linear(lstm_out)  # (bs, cap_length, vocab_size)

        return outputs

    def sample(self, inputs, states=None, max_len=50):
        """
        accepts pre-processed image tensor (inputs) and returns predicted
        sentence (list of tensor ids of length max_len)
        Args:
            inputs: shape is (1, 1, embed_size)
            states: initial hidden state of the LSTM
            max_len: maximum length of the predicted sentence

        Returns:
            res: list of predicted words indices
        """
        res = []

        # Now we feed the LSTM output and hidden states back into itself to get the caption
        for i in range(max_len):
            lstm_out, states = self.lstm(
                inputs, states
            )  # lstm_out: (1, 1, hidden_size)
            outputs = self.linear(lstm_out.squeeze(dim=1))  # outputs: (1, vocab_size)
            _, predicted_idx = outputs.max(dim=1)  # predicted: (1, 1)
            res.append(predicted_idx.item())
            # if the predicted idx is the stop index, the loop stops
            if predicted_idx == 1:
                break
            inputs = self.embed(predicted_idx)  # inputs: (1, embed_size)
            # prepare input for next iteration
            inputs = inputs.unsqueeze(1)  # inputs: (1, 1, embed_size)

        return res


In [14]:
# The size of the vocabulary.
vocab_size = len(data_loader.dataset.vocab)
print("vocab size is : ",vocab_size)

# Initializing the encoder and decoder
encoder = EncoderCNN(embed_size)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size)

# Move models to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
encoder.to(device)
decoder.to(device)

# Defining the loss function
criterion = (
    nn.CrossEntropyLoss().cuda() if torch.cuda.is_available() else nn.CrossEntropyLoss()
)

# Specifying the learnable parameters of the mode
params = list(decoder.parameters()) + list(encoder.embed.parameters())

# Defining the optimize
optimizer = torch.optim.Adam(params, lr=0.001)

# Set the total number of training steps per epoc
total_step = math.ceil(len(data_loader.dataset) / data_loader.batch_sampler.batch_size)

vocab size is :  8855


Downloading: "https://download.pytorch.org/models/resnet34-b627a593.pth" to /root/.cache/torch/hub/checkpoints/resnet34-b627a593.pth
100%|██████████| 83.3M/83.3M [00:00<00:00, 204MB/s]


In [15]:
print(total_step)

3236


In [16]:
from torch.cuda.amp import GradScaler, autocast
from tqdm import tqdm

os.makedirs("/kaggle/working/models", exist_ok=True)

# Initialize GradScaler for mixed precision training
scaler = GradScaler()  

best_loss = float("inf") 
best_epoch = 0 
save_every = 1  # Save model every epochs

for epoch in range(1, num_epochs + 1):
    epoch_loss = 0  
    print(f"Epoch {epoch}/{num_epochs}")

    with tqdm(total=total_step, desc=f"Epoch {epoch}", unit="step") as pbar:
        for i_step in range(1, total_step + 1):

            # Randomly sample a caption length and indices with that length
            indices = data_loader.dataset.get_train_indices()
            new_sampler = data.sampler.SubsetRandomSampler(indices=indices)
            data_loader.batch_sampler.sampler = new_sampler

            # Obtain batch data
            images, captions = next(iter(data_loader))
            images = images.to(device)
            captions = captions.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Mixed precision training
            with autocast():  
                features = encoder(images)
                outputs = decoder(features, captions)
                loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))
                epoch_loss += loss.item()

            # Scale loss and backpropagate
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            # Update progress bar
            pbar.set_postfix({"Loss": loss.item(), "Perplexity": np.exp(loss.item())})
            pbar.update(1)

    # Average loss for the epoch
    epoch_loss /= total_step

    # Save the best model if the current epoch's loss is lower
    if epoch_loss < best_loss:
        best_loss = epoch_loss
        best_epoch = epoch
        torch.save(encoder, os.path.join("/kaggle/working/models", "best_encoder.pkl"))
        torch.save(decoder, os.path.join("/kaggle/working/models", "best_decoder.pkl"))

    # Save model weights every 'save_every' epochs
    if epoch % save_every == 0:
        torch.save(encoder.state_dict(), os.path.join("/kaggle/working/models", f"encoder-{epoch}.pkl"))
        torch.save(decoder.state_dict(), os.path.join("/kaggle/working/models", f"decoder-{epoch}.pkl"))

print(f"Training complete. Best model saved from epoch {best_epoch} with loss {best_loss:.4f}.")




Epoch 1/5


  with autocast():
Epoch 1: 100%|██████████| 3236/3236 [1:26:28<00:00,  1.60s/step, Loss=2.33, Perplexity=10.3]


Epoch 2/5


Epoch 2: 100%|██████████| 3236/3236 [1:16:34<00:00,  1.42s/step, Loss=2.01, Perplexity=7.48]


Epoch 3/5


Epoch 3: 100%|██████████| 3236/3236 [1:16:49<00:00,  1.42s/step, Loss=2.21, Perplexity=9.15]


Epoch 4/5


Epoch 4: 100%|██████████| 3236/3236 [1:16:11<00:00,  1.41s/step, Loss=2.16, Perplexity=8.71]


Epoch 5/5


Epoch 5: 100%|██████████| 3236/3236 [1:18:43<00:00,  1.46s/step, Loss=2.21, Perplexity=9.11]


Training complete. Best model saved from epoch 5 with loss 2.0569.
