In [13]:
import pandas as pd

dataset = pd.read_csv('/kaggle/input/flickr8k/captions.txt')
dataset.head()

Unnamed: 0,image,caption
0,1000268201_693b08cb0e.jpg,A child in a pink dress is climbing up a set o...
1,1000268201_693b08cb0e.jpg,A girl going into a wooden building .
2,1000268201_693b08cb0e.jpg,A little girl climbing into a wooden playhouse .
3,1000268201_693b08cb0e.jpg,A little girl climbing the stairs to her playh...
4,1000268201_693b08cb0e.jpg,A little girl in a pink dress going into a woo...


In [14]:
import torch, torchvision 
from torchvision import transforms
from PIL import Image

image_transforms = transforms.Compose([
    # Resize the image to 256x256 pixels
    transforms.Resize((256, 256)),
    
    # Optionally, you could add a random crop for data augmentation:
    # transforms.RandomCrop(224),
    
    # Alternatively, center crop to 224x224
    transforms.CenterCrop(224),
    
    # Convert image to tensor (scales pixel values to [0,1])
    transforms.ToTensor(),
    
    # Normalize the image using ImageNet's mean and standard deviation
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                         std=[0.229, 0.224, 0.225])
])

train_data = []
epoch = len(dataset)

cnt = 0

for i in range(0, epoch, 1):
    if i % 5 == 0:
        cur_path = '/kaggle/input/flickr8k/Images/'
        cur_img_path = dataset['image'][i]
    
        img_path = cur_path + cur_img_path
    
        image = Image.open(img_path).convert("RGB")  # Ensure image is in RGB
    
        preprocessed_image = image_transforms(image)

    train_data.append([preprocessed_image, dataset['caption'][i]])
    
len(train_data)

40455

In [15]:
vocab = {}
num_words = 4

vocab['<sos>'] = 0
vocab['<eos>'] = 1
vocab['<unk>'] = 2
vocab['<pad>'] = 3

for img, sentence in train_data:
    for word in sentence.split():
        word = word.lower()
        if word not in vocab:
            vocab[word] = num_words
            num_words += 1

len(vocab)

8922

In [16]:
text_encodings = []
img_data = []

def encode(sentence, max_length):
    output = []
    output.append(vocab['<sos>'])
    
    for word in sentence.split():
        output.append(vocab[word.lower()])
        
    output.append(vocab['<eos>'])
    
    for i in range(len(output), max_length):
        output.append(vocab['<pad>'])

    return output


for img, sentence in train_data:
    img_data.append(img)
    text_encodings.append(encode(sentence, 50))

In [17]:
import torch
from torch.utils.data import Dataset, DataLoader

class ImageCaptioningDataset(Dataset):
    def __init__(self, captions, inp):
        self.inp = inp
        self.captions = captions

    def __len__(self):
        return len(self.captions)

    def __getitem__(self, idx):
        return torch.tensor(self.inp[idx]), torch.tensor(self.captions[idx])

train_idx = int(0.8 * len(train_data))
val_idx = int(0.9 * len(train_data))

train_dataset = ImageCaptioningDataset(text_encodings[:train_idx], img_data[:train_idx])
val_dataset = ImageCaptioningDataset(text_encodings[train_idx : val_idx], img_data[train_idx : val_idx])
test_dataset = ImageCaptioningDataset(text_encodings[val_idx :], img_data[val_idx :])

type(train_dataset), train_dataset

(__main__.ImageCaptioningDataset,
 <__main__.ImageCaptioningDataset at 0x7b18eb775ed0>)

In [18]:
BATCH_SIZE = 32 * 4

train_dataloader = DataLoader(train_dataset, batch_size= BATCH_SIZE, shuffle= True)
val_dataloader = DataLoader(val_dataset, batch_size= BATCH_SIZE, shuffle= True)
test_dataloader = DataLoader(test_dataset, batch_size= BATCH_SIZE, shuffle= True)

In [19]:
from torch import nn
from torchvision import models
import torch.nn.init as init

class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super().__init__()
        resnet = models.resnet50(pretrained=True)
        for param in resnet.parameters():
            param.requires_grad_(False)

        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)
        self.batch = nn.BatchNorm1d(embed_size, momentum=0.01)
        
        # Use nn.init functions to initialize parameters safely.
        init.normal_(self.embed.weight, mean=0.0, std=0.02)
        init.constant_(self.embed.bias, 0)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = self.batch(self.embed(features))
        return features


In [20]:
embed_size = 256
vocab_size = len(vocab)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

dummy_inp = torch.randn([5, 3, 224, 224])
encoder = EncoderCNN(embed_size)
encoder.eval()
output = encoder(dummy_inp)
output.shape



torch.Size([5, 256])

In [21]:
class DecoderRNN(nn.Module):
  def __init__(self, embed_size, hidden_size, vocab_size):
    super(DecoderRNN, self).__init__()
    self.embed = nn.Embedding(vocab_size, embed_size)
    self.lstm = nn.LSTM(embed_size, hidden_size)
    self.linear = nn.Linear(hidden_size, vocab_size)
    self.dropout = nn.Dropout(0.5)
    
  def forward(self, features, captions):
    embeddings = self.dropout(self.embed(captions))
    embeddings = embeddings.transpose(0, 1)
    embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
    
    hiddens, _ = self.lstm(embeddings)
    outputs = self.linear(hiddens)
      
    return outputs

# Instantiate the decoder
decoder = DecoderRNN(embed_size, 256, vocab_size)

# Correctly create dummy_inp2 using torch.randint:
dummy_inp2 = torch.randint(0, vocab_size, (5, 50))

t_output = decoder(output, dummy_inp2)

t_output.shape

torch.Size([51, 5, 8922])

In [22]:
class Encoder_Decoder(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size):
        super().__init__()
        self.CNN = EncoderCNN(embed_size)
        self.RNN = DecoderRNN(embed_size, hidden_size, vocab_size)

    def forward(self, images, captions):
        features = self.CNN(images)
        output = self.RNN(features, captions)

        return output

    def sample(self, images, max_len=50):
        features = self.CNN(images)  # shape: (batch, embed_size)
        batch_size = features.size(0)
        sampled_ids = []  # to store predicted word indices

        inputs = features.unsqueeze(0)  # shape: (1, batch, embed_size)
        states = None

        for t in range(max_len):
            hiddens, states = self.RNN.lstm(inputs, states)  # hiddens: (1, batch, hidden_size)
            
            logits = self.RNN.linear(hiddens.squeeze(0))      # shape: (batch, vocab_size)
            
            predicted = logits.argmax(dim=1)                  # shape: (batch,)
            sampled_ids.append(predicted)

            inputs = self.RNN.embed(predicted)                # shape: (batch, embed_size)
            inputs = inputs.unsqueeze(0)                      # shape: (1, batch, embed_size)

        sampled_ids = torch.stack(sampled_ids, dim=1)  # shape: (batch, max_len)
        return sampled_ids

model = Encoder_Decoder(embed_size, 256, vocab_size).to(device)


In [23]:
from torch import optim
import torch.nn as nn
import torch

def evaluate_loss(model, val_dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for images, captions in val_dataloader:
            images = images.to(device)
            captions = captions.to(device)
            
            # Prepare input and target captions
            input_captions = captions[:, :-1]
            target_captions = captions[:, 1:]
            
            outputs = model(images, input_captions)
            outputs = outputs[1:].transpose(0, 1)  # shape: (batch, seq_len, vocab_size)
            
            loss = criterion(outputs.reshape(-1, outputs.size(-1)),
                             target_captions.reshape(-1))
            total_loss += loss.item()
    
    avg_loss = total_loss / len(val_dataloader)
    model.train()  # back to training mode if needed
    return avg_loss
    
def train(model, train_dataloader, epochs, lr, device):
    criterion = nn.CrossEntropyLoss(ignore_index=3)
    
    params = list(model.RNN.parameters()) + list(model.CNN.embed.parameters()) + list(model.CNN.batch.parameters())
    
    optimizer = optim.Adam(params, lr=lr)
    
    model.train() 
    for epoch in range(epochs):
        total_loss = 0.0
        for batch_idx, (images, captions) in enumerate(train_dataloader):
            images = images.to(device)         # shape: (batch, 3, 224, 224)
            captions = captions.to(device)       # shape: (batch, seq_len)
            
            input_captions = captions[:, :-1]    # shape: (batch, seq_len-1)
            target_captions = captions[:, 1:]      # shape: (batch, seq_len-1)
            
            optimizer.zero_grad()
            
            outputs = model(images, input_captions)
            #print(outputs.shape)
            outputs = outputs[1:]  # shape now: (seq_len, batch, vocab_size)
            
            outputs = outputs.transpose(0, 1)
            
            loss = criterion(outputs.reshape(-1, outputs.size(-1)),
                             target_captions.reshape(-1))
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            if (batch_idx + 1) % 10 == 0:
                print(f"Epoch [{epoch+1}/{epochs}], Step [{batch_idx+1}/{len(train_dataloader)}], Loss: {loss.item():.4f}")

        val_loss = evaluate_loss(model, val_dataloader, nn.CrossEntropyLoss(ignore_index=vocab['<pad>']), device)
        print(f"Validation Loss: {val_loss:.4f}")
        
        avg_loss = total_loss / len(train_dataloader)
        print(f"Epoch [{epoch+1}/{epochs}] Average Loss: {avg_loss:.4f}")

In [24]:
train(model, train_dataloader, 15, 1e-3, device)

  return torch.tensor(self.inp[idx]), torch.tensor(self.captions[idx])


Epoch [1/15], Step [10/253], Loss: 8.2417
Epoch [1/15], Step [20/253], Loss: 5.4260
Epoch [1/15], Step [30/253], Loss: 5.1128
Epoch [1/15], Step [40/253], Loss: 4.8478
Epoch [1/15], Step [50/253], Loss: 4.8278
Epoch [1/15], Step [60/253], Loss: 4.6696
Epoch [1/15], Step [70/253], Loss: 4.4515
Epoch [1/15], Step [80/253], Loss: 4.3581
Epoch [1/15], Step [90/253], Loss: 4.4524
Epoch [1/15], Step [100/253], Loss: 4.2672
Epoch [1/15], Step [110/253], Loss: 4.1993
Epoch [1/15], Step [120/253], Loss: 4.1561
Epoch [1/15], Step [130/253], Loss: 4.1819
Epoch [1/15], Step [140/253], Loss: 4.1793
Epoch [1/15], Step [150/253], Loss: 3.9672
Epoch [1/15], Step [160/253], Loss: 3.9160
Epoch [1/15], Step [170/253], Loss: 3.8892
Epoch [1/15], Step [180/253], Loss: 3.7159
Epoch [1/15], Step [190/253], Loss: 4.0411
Epoch [1/15], Step [200/253], Loss: 3.9622
Epoch [1/15], Step [210/253], Loss: 3.8126
Epoch [1/15], Step [220/253], Loss: 3.7644
Epoch [1/15], Step [230/253], Loss: 3.8020
Epoch [1/15], Step [

In [25]:
model.eval()
img = img_data[0]
img = img.to(device)
img = img.unsqueeze(0)  # Now shape becomes [1, 3, 224, 224]
img.shape
img_output = model.sample(img)

In [26]:
img_output

tensor([[  4,  19,   6,   4,   7,   8,   9, 146,   6,  60,  13,   4,  22, 500,
          18,   1,   1,  18,   1,   1,   1,  18,   1,   1,  18,   1,   1,  18,
           1,   1,  18,   1,   1,  18,   1,   1,  18,   1,   1,  18,   1,   1,
          18,   1,   1,  18,   1,   1,  18,   1]], device='cuda:0')

In [30]:
sentence = []
img_output = img_output.squeeze()

idx_to_words = {}
for key, val in vocab.items():
    idx_to_words[val] = key
    

In [37]:
for element in img_output.tolist():
    sentence.append(idx_to_words[element])

sentence

['a',
 'girl',
 'in',
 'a',
 'pink',
 'dress',
 'is',
 'standing',
 'in',
 'front',
 'of',
 'a',
 'wooden',
 'door',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>',
 '<eos>',
 '.',
 '<eos>']