<a href="https://colab.research.google.com/github/Busola181/Deep-Learning-CNN-Projects/blob/main/Image_captioning_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

MOUNTING DRIVE

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


EXTRACTING DATASET

In [None]:
API_KEY_PATH="/content/drive/MyDrive/kaggle.json"

!mkdir -p ~/.kaggle
!cp $API_KEY_PATH ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d kunalgupta2616/flickr-8k-images-with-captions

RESOURCES_PATH="/content/flickr-image-dataset.zip"
!cp $RESOURCES_PATH.
!unzip /content/flickr-8k-images-with-captions.zip -d /content/new_folder

IMPORT LIBRARIES

In [None]:
!pip install --upgrade sympy
import pandas as pd
import os
import torch
import torch.nn as nn
from torch.utils.data import random_split, DataLoader, Dataset
import zipfile
import re
import nltk
import torch.optim as optim
from torchvision import models
from PIL import Image
from torchvision.models import resnet50, ResNet50_Weights
from torchsummary import summary
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from torch.nn.utils.rnn import pad_sequence
from torchvision.io import read_image
import torchvision.transforms as transforms
import torch.nn.functional as F

nltk.data.path.append('/usr/local/share/nltk_data')
nltk.download('wordnet', download_dir='/usr/local/share/nltk_data')
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('punkt_tab')

captions_path = "/content/new_folder/captions.txt"

In [None]:
data = {'image_name': [], 'caption': []}
with open(captions_path, 'r') as f:
    for idx, line in enumerate(f):
        if idx == 0:
            continue
        parts = [line.strip()[:line.find(',')], line.strip()[line.find(',')+1:].replace('"', '')]
        if len(parts) == 2:
            image_name, caption = parts
            data['image_name'].append(image_name)
            data['caption'].append(caption)
# print(data)

df = pd.DataFrame(data)

# texts = " ".join(df['caption'].astype(str).tolist())

texts = ""
for caption in data["caption"]:
    texts += caption
cleaned_texts = re.sub(r'[^a-zA-Z\s]', ' ', texts).lower()
words = word_tokenize(cleaned_texts)

lemmatizer = WordNetLemmatizer()
lemmatized_words = [lemmatizer.lemmatize(word) for word in words]

vocab = sorted(set(lemmatized_words))
vocab.insert(0, '<PAD>')
vocab.insert(1, '<BOS>')
vocab.insert(2, '<EOS>')
vocab.insert(3, '<UNK>')

word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for idx, word in enumerate(vocab)}

print("Vocabulary Size:", len(vocab))
print("Sample Vocabulary:", vocab[:20])

Vocabulary Size: 9094
Sample Vocabulary: ['<PAD>', '<BOS>', '<EOS>', '<UNK>', 'a', 'aa', 'abandon', 'abandoned', 'abarrotes', 'abdomen', 'ability', 'aboard', 'aboriginal', 'about', 'above', 'abseiling', 'abspedestrians', 'ac', 'accelerates', 'accends']


HYPER PARAMETERS

In [None]:
embed_size = 1024
hidden_size = 1024
vocab_size = len(vocab)
num_layers = 3
learning_rate = 1e-4
epochs = 100
num_workers = 2
batch_size = 64
max_caption_length = 50
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


IMAGE DATA PREPARATION

In [None]:
class ICDataset(Dataset):
    img_base_folder = "/content/new_folder/Images"
    def __init__(self, captions_file, transforms=None):
        super(ICDataset, self).__init__()
        self.dataset_metadata = self.load_captions(captions_file)
        self.transforms = transforms
        self.max_caption_length = 50

    def load_captions(self, captions_file):
         metadata = []
         with open(captions_file, 'r') as f:
            for idx, line in enumerate(f):
                if idx == 0:
                    continue
                parts = [line.strip()[:line.find(',')], line.strip()[line.find(',')+1:].replace('"', '')]
                if len(parts) == 2:
                    filename, caption = parts
                    filename = filename.split("#")[0]
                    metadata.append([filename, caption])
                else:
                    print(f"Skipping malformed line: {line.strip()}")
         return metadata

    def __getitem__(self, idx):
        file_name, caption = self.dataset_metadata[idx]
        image_path = os.path.join(self.img_base_folder, file_name)
        image = Image.open(image_path).convert("RGB")
        if self.transforms:
            image = self.transforms(image)
        target = self.parse_caption(caption)
        return image, target

    def parse_caption(self, caption=""):
        cleaned_text = re.sub(r'[^a-zA-Z\s]', ' ', caption).lower()
        words = word_tokenize(cleaned_text)
        lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
        input_indices = [word_to_idx.get(word, word_to_idx['<UNK>']) for word in lemmatized_words]
        input_vector = torch.tensor(input_indices)
        if len(input_vector) > self.max_caption_length:
            input_vector = input_vector[:self.max_caption_length]
        else:
            padding = [word_to_idx['<PAD>']] * (self.max_caption_length - len(input_vector))
            input_vector = torch.cat((input_vector, torch.tensor(padding)))
        return input_vector

    def __len__(self):
        return len(self.dataset_metadata)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])
dataset = ICDataset(captions_path, transforms=transform)
print(dataset[0])
print(len(dataset))
train_ds, valid_ds, test_ds = random_split(dataset, [28318, 6069, 6068])

train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_dl = DataLoader(valid_ds, batch_size=batch_size, shuffle=False)
test_dl = DataLoader(test_ds, batch_size=batch_size, shuffle=False)

print("DataLoader setup complete.")



(tensor([[[0.3216, 0.4353, 0.4549,  ..., 0.0157, 0.0235, 0.0235],
         [0.3098, 0.4431, 0.4667,  ..., 0.0314, 0.0275, 0.0471],
         [0.3020, 0.4588, 0.4745,  ..., 0.0314, 0.0275, 0.0392],
         ...,
         [0.7294, 0.5882, 0.6706,  ..., 0.8314, 0.6471, 0.6471],
         [0.6902, 0.6941, 0.8627,  ..., 0.8235, 0.6588, 0.6588],
         [0.8118, 0.8196, 0.7333,  ..., 0.8039, 0.6549, 0.6627]],

        [[0.3412, 0.5020, 0.5255,  ..., 0.0118, 0.0235, 0.0314],
         [0.3294, 0.5059, 0.5412,  ..., 0.0353, 0.0392, 0.0824],
         [0.3098, 0.5176, 0.5529,  ..., 0.0353, 0.0510, 0.0863],
         ...,
         [0.4235, 0.3137, 0.4784,  ..., 0.8667, 0.7255, 0.7216],
         [0.3765, 0.5059, 0.6627,  ..., 0.8549, 0.7216, 0.7216],
         [0.4941, 0.5804, 0.4784,  ..., 0.8392, 0.7216, 0.7216]],

        [[0.3804, 0.4902, 0.4980,  ..., 0.0118, 0.0157, 0.0196],
         [0.3608, 0.5059, 0.5176,  ..., 0.0275, 0.0235, 0.0235],
         [0.3647, 0.5255, 0.5333,  ..., 0.0196, 0.0235, 0

In [None]:
img_base_folder = "/content/new_folder/Images"

TRANSFER LEARNING

In [None]:
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(pretrained=True)
        for param in resnet.parameters():
            param.requires_grad_(False)

        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.embed(features)
        return features


class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()

        self.hidden_dim = hidden_size
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.hidden = (torch.zeros(1, 1, hidden_size), torch.zeros(1, 1, hidden_size))

    def forward(self, features, captions):
        cap_embedding = self.embed(
            captions[:, :-1]
        )
        embeddings = torch.cat((features.unsqueeze(dim=1), cap_embedding), dim=1)
        lstm_out, self.hidden = self.lstm(
            embeddings
        )
        outputs = self.linear(lstm_out)

        return outputs

    def sample(self, inputs, states=None, max_len=20):
        res = []

        for i in range(max_len):
            lstm_out, states = self.lstm(
                inputs, states
            )
            outputs = self.linear(lstm_out.squeeze(dim=1))
            _, predicted_idx = outputs.max(dim=1)
            res.append(predicted_idx.item())
            if predicted_idx == 1:
                break
            inputs = self.embed(predicted_idx)
            inputs = inputs.unsqueeze(1)

        return res

DATASETS LENGTH

In [None]:
print(len(train_ds))
print(len(valid_ds))
print(len(test_ds))

28318
6069
6068


TRANSFER LEARNING {LOADING THE PRETRAINED MODEL, FEATURE VECTOR EXTRACTION}

In [None]:
class CNNtoRNN_attention(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=3):
        super(CNNtoRNN_attention, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(embed_size + hidden_size, hidden_size, num_layers, batch_first=True)
        self.attention = nn.Linear(hidden_size + embed_size, 1)
        self.fc = nn.Linear(hidden_size, vocab_size)
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)

    def calculate_attention(self, features, hidden):
        if features.dim() == 2:
            features = features.unsqueeze(1)

        hidden = hidden.unsqueeze(1).expand(-1, features.size(1), -1)

        combined = torch.cat((features, hidden), dim=2)
        attention_scores = self.attention(combined)
        attention_weights = F.softmax(attention_scores, dim=1)
        return attention_weights

    def forward(self, features, captions):
        embeddings = self.dropout(self.embed(captions))

        hidden_state = torch.zeros((self.num_layers, features.size(0), self.hidden_size)).to(features.device)
        cell_state = torch.zeros((self.num_layers, features.size(0), self.hidden_size)).to(features.device)

        outputs = []
        for t in range(captions.size(1)):
            attention_weights = self.calculate_attention(features, hidden_state[-1])
            context_vector = torch.sum(attention_weights * features, dim=1)

            lstm_input = torch.cat((context_vector.unsqueeze(1), embeddings[:, t].unsqueeze(1)), dim=2)

            output, (hidden_state, cell_state) = self.lstm(lstm_input, (hidden_state, cell_state))
            output = self.fc(output.squeeze(1))
            outputs.append(output)

        outputs = torch.stack(outputs, dim=1)
        return outputs

    def sample(self, features, max_len=20):
        """Generates a caption for an input feature vector."""
        sampled_indices = []
        hidden_state = torch.zeros((self.num_layers, 1, self.hidden_size)).to(features.device)
        cell_state = torch.zeros((self.num_layers, 1, self.hidden_size)).to(features.device)

        input_word = torch.tensor([word_to_idx['<BOS>']]).to(features.device)  # Start with the beginning-of-sequence token
        input_word = self.embed(input_word).unsqueeze(1)  # Embedding and add batch dimension

        for _ in range(max_len):
            attention_weights = self.calculate_attention(features, hidden_state[-1])
            context_vector = torch.sum(attention_weights * features, dim=1).unsqueeze(1)
            lstm_input = torch.cat((context_vector, input_word), dim=2)

            output, (hidden_state, cell_state) = self.lstm(lstm_input, (hidden_state, cell_state))
            output = self.fc(output.squeeze(1))
            predicted_idx = output.argmax(dim=1).item()

            sampled_indices.append(predicted_idx)

            if predicted_idx == word_to_idx['<EOS>']:
                break

            input_word = self.embed(torch.tensor([predicted_idx]).to(features.device)).unsqueeze(1)

        return sampled_indices




ATTENTION MECHANISM

In [None]:
encoder = EncoderCNN(embed_size).to(device)
decoder = CNNtoRNN_attention(embed_size, hidden_size, vocab_size, num_layers).to(device)
model = CNNtoRNN_attention(embed_size,hidden_size,vocab_size,num_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index= word_to_idx['<PAD>'])
params = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optim.Adam(params, lr= learning_rate)


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 137MB/s]


TRAINING THE DATASET

In [None]:
checkpoint_dir = '/content/drive/MyDrive/checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)


# Define the checkpoint
checkpoint_path = os.path.join(checkpoint_dir, 'model_checkpoint.pth')

In [None]:
checkpoint_path = '/content/drive/MyDrive/checkpoints/model_checkpoint.pth'

In [None]:
def save_checkpoint(encoder, decoder, epoch, optimizer, train_loss, val_loss, checkpoint_path):
    checkpoint = {
                'epoch': epoch + 1,
                'encoder_state_dict': encoder.state_dict(),
                'decoder_state_dict': decoder.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'train_loss': train_loss,
                'val_loss': val_loss
            }
    torch.save(checkpoint, checkpoint_path)
    print(f'checkpoint saved at {epoch + 1}')

def load_checkpoint(encoder, decoder, optimizer, checkpoint_path, device):
    if os.path.exists(checkpoint_path):
        checkpoint = torch.load(checkpoint_path, map_location=device)
        encoder.load_state_dict(checkpoint['encoder_state_dict'])
        decoder.load_state_dict(checkpoint['decoder_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        epoch = checkpoint['epoch']
        train_loss = checkpoint['train_loss']
        val_loss = checkpoint['val_loss']
        print(f"Resuming from epoch {epoch + 1} with train loss {train_loss:.4f} and val loss {val_loss:.4f}")
        return epoch + 1
    else:
        print("No checkpoint starting from scratch")
        return 0

def train_captioning(encoder, decoder, train_dl, val_dl, epochs, criterion, optimizer, device, vocab_size, checkpoint_path):
    encoder.to(device)
    decoder.to(device)

    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

    start_epoch = load_checkpoint(encoder, decoder, optimizer, checkpoint_path, device)


    for epoch in range(start_epoch, epochs):
        encoder.train()
        decoder.train()
        epoch_train_loss = 0
        correct_train = 0
        total_train = 0

        for images, captions in train_dl:
            images, captions = images.to(device), captions.to(device)

            optimizer.zero_grad()
            features = encoder(images)
            outputs = decoder(features, captions[:, :-1])


            loss = criterion(outputs.view(-1, vocab_size), captions[:, 1:].reshape(-1))
            loss.backward()
            optimizer.step()
            epoch_train_loss += loss.item()

            _, predicted = torch.max(outputs, 2)
            correct_train += (predicted == captions[:, 1:]).sum().item()
            total_train += captions[:, 1:].numel()

        avg_train_loss = epoch_train_loss / len(train_dl)
        train_accuracy = 100 * correct_train / total_train

        encoder.eval()
        decoder.eval()
        epoch_val_loss = 0
        correct_val = 0
        total_val = 0

        with torch.no_grad():
            for images, captions in val_dl:
                images, captions = images.to(device), captions.to(device)
                features = encoder(images)
                outputs = decoder(features, captions[:, :-1])

                loss = criterion(outputs.view(-1, vocab_size), captions[:, 1:].reshape(-1))
                epoch_val_loss += loss.item()

                _, predicted = torch.max(outputs, 2)
                correct_val += (predicted == captions[:, 1:]).sum().item()
                total_val += captions[:, 1:].numel()

        avg_val_loss = epoch_val_loss / len(val_dl)
        val_accuracy = 100 * correct_val / total_val

        if (epoch + 1) % 1 == 0:
            print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
                  f'Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')
            save_checkpoint(encoder, decoder,epoch, optimizer, avg_train_loss, avg_val_loss, checkpoint_path)

            scheduler.step()


In [None]:
train_captioning(encoder, decoder, train_dl, val_dl, epochs, criterion, optimizer, device, vocab_size, checkpoint_path)

  checkpoint = torch.load(checkpoint_path, map_location=device)


Resuming from epoch 59 with train loss 5.2965 and val loss 5.2983
Epoch 60/100, Train Loss: 5.2994, Train Acc: 2.39%, Val Loss: 5.2843, Val Acc: 2.39%
checkpoint saved at 60
Epoch 61/100, Train Loss: 5.2990, Train Acc: 2.39%, Val Loss: 5.2844, Val Acc: 2.39%
checkpoint saved at 61
Epoch 62/100, Train Loss: 5.2982, Train Acc: 2.39%, Val Loss: 5.2844, Val Acc: 2.39%
checkpoint saved at 62
Epoch 63/100, Train Loss: 5.2983, Train Acc: 2.39%, Val Loss: 5.2844, Val Acc: 2.39%
checkpoint saved at 63
Epoch 64/100, Train Loss: 5.2985, Train Acc: 2.39%, Val Loss: 5.2845, Val Acc: 2.39%
checkpoint saved at 64
Epoch 65/100, Train Loss: 5.2986, Train Acc: 2.39%, Val Loss: 5.2845, Val Acc: 2.39%
checkpoint saved at 65
Epoch 66/100, Train Loss: 5.2988, Train Acc: 2.39%, Val Loss: 5.2846, Val Acc: 2.39%
checkpoint saved at 66
Epoch 67/100, Train Loss: 5.2987, Train Acc: 2.39%, Val Loss: 5.2846, Val Acc: 2.39%
checkpoint saved at 67
Epoch 68/100, Train Loss: 5.2985, Train Acc: 2.39%, Val Loss: 5.2847, 

In [None]:
image_path = '/content/new_folder/Images/1001773457_577c3a7d70.jpg'

INFERENCE FUNCTION

In [None]:
from nltk.translate.bleu_score import sentence_bleu
import matplotlib.pyplot as plt
from PIL import Image


transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()])

def evaluate_model(encoder, decoder, test_dl, criterion, vocab_size, max_caption_length, device, idx_to_word, word_to_idx, image_path=image_path, transforms=transform):


    encoder.eval()
    decoder.eval()
    test_loss = 0
    correct_test = 0
    total_test = 0
    generated_captions = []
    ground_truth_captions = []

    # If an image_path is provided, generate a caption for it
    if image_path:
        image = Image.open(image_path).convert("RGB")
        if transforms:
            image = transforms(image)
        image = image.unsqueeze(0).to(device)

        with torch.no_grad():
            features = encoder(image)
            sampled_caption = decoder.sample(features, max_len=max_caption_length)

        caption_text = ' '.join([idx_to_word[idx] for idx in sampled_caption if idx not in [word_to_idx['<PAD>'], word_to_idx['<BOS>'], word_to_idx['<EOS>']]])

        plt.imshow(Image.open(image_path))
        #plt.figure(figsize=(8, 8))
        plt.title("Generated Caption: " + caption_text)
        plt.axis('off')
        plt.show()
        return

    # Otherwise, evaluate on the entire test dataset
    with torch.no_grad():
        for images, captions in test_dl:
            images, captions = images.to(device), captions.to(device)
            features = encoder(images)
            outputs = decoder(features, captions[:, :-1])

            # Calculate loss and accuracy
            loss = criterion(outputs.view(-1, vocab_size), captions[:, 1:].reshape(-1))
            test_loss += loss.item()

            _, predicted = torch.max(outputs, 2)
            correct_test += (predicted == captions[:, 1:]).sum().item()
            total_test += captions[:, 1:].numel()

            # Store generated and ground truth captions for BLEU score calculation
            for i in range(images.size(0)):
                generated_caption = decoder.sample(features[i].unsqueeze(0), max_len=max_caption_length)
                generated_text = ' '.join([idx_to_word[idx] for idx in generated_caption])
                ground_truth_text = ' '.join([idx_to_word[idx] for idx in captions[i].cpu().numpy() if idx not in [word_to_idx['<PAD>'], word_to_idx['<BOS>'], word_to_idx['<EOS>']]])

                generated_captions.append(generated_text.split())
                ground_truth_captions.append([ground_truth_text.split()])

    # Average test loss and accuracy
    avg_test_loss = test_loss / len(test_dl)
    test_accuracy = 100 * correct_test / total_test
    print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")

    bleu_score = sum([sentence_bleu(gt, pred) for gt, pred in zip(ground_truth_captions, generated_captions)]) / len(generated_captions)
    print(f"BLEU Score: {bleu_score:.4f}")

    for i in range(5):
        img, caption = next(iter(test_dl))
        img, caption = img.to(device), caption.to(device)

        features = encoder(img)
        sampled_caption = decoder.sample(features[0].unsqueeze(0), max_len=max_caption_length)
        caption_text = ' '.join([idx_to_word[idx] for idx in sampled_caption])

        plt.imshow(img[0].cpu().permute(1, 2, 0))
        plt.title("Generated Caption: " + caption_text)
        plt.axis('off')
        plt.show()


In [None]:
evaluate_model(encoder, decoder, test_dl, criterion, vocab_size, max_caption_length, device, idx_to_word, word_to_idx, image_path="/content/new_folder/Images/1001773457_577c3a7d70.jpg", transforms=transform)
