In [None]:
!wget https://storage.googleapis.com/4995-dlcv-project-data/Flickr8k.zip

--2023-12-04 17:45:00--  https://storage.googleapis.com/4995-dlcv-project-data/Flickr8k.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.141.207, 142.251.2.207, 2607:f8b0:4023:c0d::cf
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.141.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1112850179 (1.0G) [application/zip]
Saving to: ‘Flickr8k.zip.1’


2023-12-04 17:45:10 (104 MB/s) - ‘Flickr8k.zip.1’ saved [1112850179/1112850179]



In [None]:
!mkdir -p "/content/flickr8k"

In [None]:
!unzip -q "/content/Flickr8k.zip" -d "/content/flickr8k"

replace /content/flickr8k/captions.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [None]:
import os
from collections import defaultdict
import glob
import random
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np

In [None]:
# Count data
flickr8k_data = glob.glob('/content/flickr8k/Images/*.jpg')
print(f"count of Flickr8k images :  {len(flickr8k_data)}")

count of Flickr8k images :  8091


In [None]:
# Use GPU
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Use {DEVICE} device")

Use cuda device


### Caption Processing

In [None]:
# Create a dictionary that has image name as key and all 5 captions as value
def read_image_captions(filename):
    image_descriptions = defaultdict(list)
    with open(filename,'r') as file_list:
        next(file_list)
        for line in file_list:
            line = line.strip()
            img_caption_list = line.split(".jpg,")
            img_name, captions = img_caption_list[0] + ".jpg", img_caption_list[1]
            caption_list = ["<START>"] + captions.lower().split(" ") + ["<END>"]
            image_descriptions[img_name].append(caption_list)
    return image_descriptions

In [None]:
descriptions = read_image_captions("/content/flickr8k/captions.txt")

In [None]:
print(descriptions["1001773457_577c3a7d70.jpg"])

[['<START>', 'a', 'black', 'dog', 'and', 'a', 'spotted', 'dog', 'are', 'fighting', '<END>'], ['<START>', 'a', 'black', 'dog', 'and', 'a', 'tri-colored', 'dog', 'playing', 'with', 'each', 'other', 'on', 'the', 'road', '.', '<END>'], ['<START>', 'a', 'black', 'dog', 'and', 'a', 'white', 'dog', 'with', 'brown', 'spots', 'are', 'staring', 'at', 'each', 'other', 'in', 'the', 'street', '.', '<END>'], ['<START>', 'two', 'dogs', 'of', 'different', 'breeds', 'looking', 'at', 'each', 'other', 'on', 'the', 'road', '.', '<END>'], ['<START>', 'two', 'dogs', 'on', 'pavement', 'moving', 'toward', 'each', 'other', '.', '<END>']]


In [None]:
# Load the features and IDs
loaded_features_list = np.load('/content/features.npy', allow_pickle=True)
loaded_ids_list = np.load('/content/ids.npy', allow_pickle=True)
# Recreate the dictionary
loaded_features_dict = dict(zip(loaded_ids_list, loaded_features_list))

In [None]:
# Split the dataset so that train : validation : test is 70 : 15 : 15
image_names = list(descriptions.keys())
random.shuffle(image_names)
total_images = len(image_names)

train_end = int(0.7 * total_images)
validation_end = train_end + int(0.15 * total_images)

train_names = image_names[: train_end]
val_names = image_names[train_end : validation_end]
test_names = image_names[validation_end :]

In [None]:
# Create a list of image names in the order
image_names = list(loaded_features_dict.keys())

# Use the image name subsets to create training, validation and test sets
train_features = {name: loaded_features_dict[name] for name in train_names}
val_features = {name: loaded_features_dict[name] for name in val_names}
test_features = {name: loaded_features_dict[name] for name in test_names}

In [74]:
train_names[10]

'501650847_b0beba926c.jpg'

In [77]:
train_features["501650847_b0beba926c.jpg"].shape

(2048,)

In [82]:
import numpy as np

# Reshape all features in the dictionary
for name in train_features.keys():
    feature = train_features[name]
    reshaped_feature = np.reshape(feature, (1, 2048))
    train_features[name] = reshaped_feature  # Replace original feature with reshaped feature

# Now print the shape of a feature to confirm
print(train_features["501650847_b0beba926c.jpg"].shape)  # Should output: (1, 2048)

(1, 2048)


In [83]:
print(train_features["501650847_b0beba926c.jpg"].shape)
print(train_features["501650847_b0beba926c.jpg"])

(1, 2048)
[[0.35474449 0.32226744 0.46525913 ... 0.24886391 0.00793392 0.16913247]]


In [None]:
# Create mapping for unique words in training data
train_tokens = set()
for name in train_names:
    captions = descriptions[name]
    for caption in captions:
        for token in caption:
            train_tokens.add(token)
train_tokens_sorted = sorted(list(train_tokens))

id_to_word = {}
word_to_id = {}
for i, token in enumerate(train_tokens_sorted):
    id_to_word[i] = token
    word_to_id[token] = i

In [None]:
print(word_to_id["dog"], id_to_word[2093])

2093 dog


### Decoder Model (LSTM)

In [None]:
max_length = max(len(description) for name in train_names for description in descriptions[name])
print("Maximum length of a sequence: ", max_length)

Maximum length of a sequence:  37


In [None]:
class TextDataset(Dataset):
    def __init__(self, train_list, descriptions, word_to_id, max_len, vocab_size):
        self.data = []
        for img_name in train_list:
            captions = descriptions[img_name]
            for caption in captions:
                for i in range(1, len(caption)):
                    encoded_input = [word_to_id[w] for w in caption[:i]]
                    # If input sequence is shorter than max_len, pad remaining entries with 0
                    if len(encoded_input) < max_len:
                        encoded_input += [0] * (max_len - len(encoded_input))
                    encoded_output = word_to_id[caption[i]]
                    self.data.append((encoded_input, encoded_output))

        self.vocab_size = vocab_size

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_seq, output_word = self.data[idx]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(output_word, dtype=torch.long)

In [None]:
class TextDataset(Dataset):
    def __init__(self, train_list, descriptions, word_to_id, max_len, vocab_size, image_features):
        self.data = []
        self.image_features = image_features
        for img_name in train_list:
            captions = descriptions[img_name]
            for caption in captions:
                for i in range(1, len(caption)):
                    encoded_input = [word_to_id[w] for w in caption[:i]]
                    if len(encoded_input) < max_len:
                        encoded_input += [0] * (max_len - len(encoded_input))
                    encoded_output = word_to_id[caption[i]]

                    # Get the corresponding image feature
                    img_feature = self.image_features[img_name]

                    # Append a tuple of the encoded_input, encoded_output and the image_feature
                    self.data.append((encoded_input, encoded_output, img_feature))

        self.vocab_size = vocab_size

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        input_seq, output_word, img_feature = self.data[idx]
        return torch.tensor(input_seq, dtype=torch.long), torch.tensor(output_word, dtype=torch.long), torch.tensor(img_feature, dtype=torch.float)

In [None]:
# Create the training dataset and data loader
MAX_LEN = max_length
vocab_size = len(word_to_id)

train_dataset = TextDataset(train_names, descriptions, word_to_id, MAX_LEN, vocab_size)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

In [None]:
MAX_LEN = max_length
vocab_size = len(word_to_id)

train_dataset = TextDataset(train_names, descriptions, word_to_id, MAX_LEN, vocab_size, train_features)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
#val_dataset = TextDataset(val_names, descriptions, word_to_id, MAX_LEN, vocab_size, val_features)
#val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

In [None]:
class TextModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(TextModel, self).__init__()
        # Embedding layer: Maps each word to an embedding_dim vector
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        # Bidirectional LSTM
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        # Dense layer that outputs the probability distribution over the vocabulary
        self.fc = nn.Linear(hidden_dim * 2, vocab_size)

    def forward(self, x):
        # x shape: (batch_size, max_len)
        # Embedding layer output: (batch_size, max_len, embedding_dim)
        x = self.embedding(x)
        # LSTM layer output: (batch_size, max_len, hidden_dim * 2)
        x, _ = self.lstm(x)
        # We only use the output of the last time step
        x = x[:, -1, :]
        # Fully connected layer output: (batch_size, vocab_size)
        x = self.fc(x)

        return x

In [None]:
# feature in initial state for LSTM version
class ImageCaptioningModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, image_feature_dim):
        super(ImageCaptioningModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, vocab_size)
        # Add a layer to transform the image features into a suitable initial state for the LSTM
        self.image_fc = nn.Linear(image_feature_dim, hidden_dim)

    def forward(self, x, image_features):
        x = self.embedding(x)
        # Use the transformed image features as the initial hidden state of the LSTM
        h0 = self.image_fc(image_features).unsqueeze(0).repeat(2, 1, 1)
        c0 = torch.zeros_like(h0)
        x, _ = self.lstm(x, (h0, c0))
        x = x[:, -1, :]
        x = self.fc(x)
        return x

In [None]:
# Define the model
EMBEDDING_DIM = 300
HIDDEN_DIM = 512

model = TextModel(vocab_size, EMBEDDING_DIM, HIDDEN_DIM).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.005)
softmax = nn.Softmax(dim=1)

In [None]:
model

TextModel(
  (embedding): Embedding(7763, 300, padding_idx=0)
  (lstm): LSTM(300, 512, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=1024, out_features=7763, bias=True)
)

In [None]:
# Training loop
EPOCHS = 2
best_train_loss = float('inf')
degrade_times = 0
threshold = 2
for epoch in range(EPOCHS):
    train_loss = 0.0
    train_corrects = 0
    train_count = 0
    model.train()
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * len(targets)
        pred = softmax(outputs)
        train_corrects += (torch.argmax(pred, dim=1) == targets).float().sum()
        train_count += targets.size(0)
    train_loss = train_loss / len(train_loader.dataset)
    train_acc = train_corrects / train_count
    print(f"Epoch {epoch} Train Loss {train_loss:.4f} Train Accuracy {train_acc:.4f}")

    # Check for early stopping
    if train_loss < best_train_loss:
        best_train_loss = train_loss
        degrade_times = 0
        torch.save(model.state_dict(), './best_lstm_model.pth')
    else:
        degrade_times += 1
        if degrade_times > threshold:
            print(f'Early stopping at epoch {epoch}')
            break

In [None]:
image_feature_dim = 2048  # set this to the size of your image features
embedding_dim = 200
hidden_dim = 300

In [None]:
import torch.nn.functional as F

model = ImageCaptioningModel(vocab_size, embedding_dim, hidden_dim, image_feature_dim)

# Use the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 5

# Training loop
for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    total = 0
    correct = 0
    for input_seq, output_word, img_feature in train_loader:
        # If using GPU, move data to GPU
        input_seq = input_seq.to(device)
        output_word = output_word.to(device)
        img_feature = img_feature.to(device)

        # Forward pass
        outputs = model(input_seq, img_feature)

        # Compute loss
        loss = criterion(outputs, output_word)

        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * input_seq.size(0)

        # Compute accuracy
        _, predicted = torch.max(outputs.data, 1)
        total += output_word.size(0)
        correct += (predicted == output_word).sum().item()

    epoch_loss = running_loss / len(train_dataset)
    epoch_acc = correct / total
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}, Accuracy: {epoch_acc}')

Epoch 1/5, Loss: 4.661154145037747, Accuracy: 0.25505121621248517
Epoch 2/5, Loss: 4.054450480040539, Accuracy: 0.3020707363538474
Epoch 3/5, Loss: 3.735434069585167, Accuracy: 0.33147795356028603
Epoch 4/5, Loss: 3.5170771101098457, Accuracy: 0.35294458709517107
Epoch 5/5, Loss: 3.363012059390069, Accuracy: 0.36784560589745713


In [None]:
# Load the best model
model.load_state_dict(torch.load('./best_lstm_model.pth'))

<All keys matched successfully>

In [None]:
# Sample the next word from the distribution returned by the model
def sample_decoder():
    seq = ["<START>"]
    while len(seq) < MAX_LEN and seq[-1] != "<END>":
        encoded_input = [word_to_id[w] for w in seq]
        if len(encoded_input) < MAX_LEN:
            encoded_input += [0] * (MAX_LEN - len(encoded_input))
        encoded_input = torch.tensor(([encoded_input])).to(DEVICE)
        outputs = model(encoded_input)
        probs = softmax(outputs).cpu()
        probs = probs.detach().numpy().astype('float64')
        probs = probs[0]
        normalized_probs = probs / np.sum(probs)
        sampling = np.random.multinomial(1, normalized_probs)
        pred_word = id_to_word[np.argmax(sampling)]
        seq.append(pred_word)
    return seq

In [None]:
def sample_decoder(img_feature, start_seq):
    seq = [id_to_word[id.item()] for id in start_seq if id != 0]  # 0 is usually the padding value
    while len(seq) < MAX_LEN and seq[-1] != "<END>":
        encoded_input = [word_to_id[w] for w in seq]
        encoded_input = torch.tensor([encoded_input]).to(device)
        img_feature = img_feature.to(device)

        # Forward pass through the model
        with torch.no_grad():
            outputs = model(encoded_input, img_feature)

        # The output is a distribution over the vocabulary.
        # Use the softmax function to convert it to probabilities
        probs = F.softmax(outputs, dim=-1)

        # Sample a word from the distribution
        sampled_word = torch.multinomial(probs[0], 1)

        pred_word = id_to_word[sampled_word.item()]
        seq.append(pred_word)
    return seq

# Get an input sequence and the associated image feature from the validation dataset
input_seq, _, img_feature = train_dataset[0]

# Generate a caption for the image
caption = sample_decoder(img_feature, input_seq)

# Print the caption
print(' '.join(caption))

<START> eats the person people rodeo guardrail <END>
