## Molly Dignan (24929263) - MobileNetV3 & LSTM Model

In [10]:
!pip install torch torchvision nltk tqdm kaggle
import os
import numpy as np
import pandas as pd
from PIL import Image
from tqdm import tqdm
import torch
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import nltk
from collections import Counter
from sklearn.model_selection import train_test_split
from torchvision import models
import torch.nn as nn
import torch.optim as optim

# Download NLTK punkt tokenizer models
nltk.download('punkt')



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [11]:
!kaggle datasets download -d 'adityajn105/flickr8k'
import zipfile
with zipfile.ZipFile('flickr8k.zip', 'r') as zip_ref:
    zip_ref.extractall('flickr8k')

Dataset URL: https://www.kaggle.com/datasets/adityajn105/flickr8k
License(s): CC0-1.0
flickr8k.zip: Skipping, found more recently modified local copy (use --force to force download)


In [12]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')


Using device: cpu


In [13]:
captions_file = 'flickr8k/captions.txt'

# Load and inspect captions file
captions = pd.read_csv(captions_file, delimiter=',', header=None, names=['image', 'caption'])
print(captions.head())

                       image  \
0                      image   
1  1000268201_693b08cb0e.jpg   
2  1000268201_693b08cb0e.jpg   
3  1000268201_693b08cb0e.jpg   
4  1000268201_693b08cb0e.jpg   

                                             caption  
0                                            caption  
1  A child in a pink dress is climbing up a set o...  
2              A girl going into a wooden building .  
3   A little girl climbing into a wooden playhouse .  
4  A little girl climbing the stairs to her playh...  


In [14]:
class Vocabulary:
    def __init__(self, freq_threshold):
        self.freq_threshold = freq_threshold
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v: k for k, v in self.itos.items()}

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenizer(text):
        return nltk.tokenize.word_tokenize(text.lower())

    def build_vocabulary(self, sentence_list):
        frequencies = Counter()
        idx = 4

        for sentence in sentence_list:
            for word in self.tokenizer(sentence):
                frequencies[word] += 1
                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer(text)
        return [self.stoi.get(token, self.stoi["<UNK>"]) for token in tokenized_text]

# Load and preprocess captions
captions = pd.read_csv(captions_file, delimiter=',', header=None, names=['image', 'caption'])
captions['caption'] = captions['caption'].fillna("").astype(str)

# Build the vocabulary
vocab = Vocabulary(freq_threshold=5)
vocab.build_vocabulary(captions['caption'].tolist())
print(f"Vocabulary size: {len(vocab)}")

Vocabulary size: 3005


In [15]:
class Flickr8kDataset(Dataset):
    def __init__(self, img_dir, captions_df, transform=None, vocab=None, max_caption_length=20, train=True, test_split=0.2):
        self.img_dir = img_dir
        self.captions_df = captions_df.copy()
        self.transform = transform
        self.vocab = vocab
        self.max_caption_length = max_caption_length
        self.train = train
        self.test_split = test_split

        # Split dataset into training and testing sets
        self.split_data()

    def split_data(self):
        if self.train:
            train_size = int(len(self.captions_df) * (1 - self.test_split))
            self.data = self.captions_df.iloc[:train_size].copy().reset_index(drop=True)
        else:
            test_size = int(len(self.captions_df) * self.test_split)
            self.data = self.captions_df.iloc[-test_size:].copy().reset_index(drop=True)

        # Pad captions to maximum length
        self.pad_caption()

    def pad_caption(self):
        pad_idx = self.vocab.stoi['<PAD>']
        for idx in range(len(self.data)):
            numerical_caption = self.vocab.numericalize(self.data.at[idx, 'caption'])
            while len(numerical_caption) < self.max_caption_length:
                numerical_caption.append(pad_idx)
            self.data.at[idx, 'caption'] = numerical_caption[:self.max_caption_length]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, self.data.iloc[idx, 0])

        try:
            image = Image.open(img_name).convert("RGB")
        except FileNotFoundError:
            print(f"File not found: {img_name}")
            return None, None

        caption = self.data.iloc[idx, 1]

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(caption)

# Paths
img_dir = 'flickr8k/Images'
captions_file = 'flickr8k/captions.txt'

# Load captions
captions = pd.read_csv(captions_file, delimiter=',', header=None, names=['image', 'caption'])
captions['caption'] = captions['caption'].fillna("").astype(str)

# Define vocabulary
vocab = Vocabulary(freq_threshold=5)
vocab.build_vocabulary(captions['caption'].values)

# Data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

# Define datasets for training and testing
train_dataset = Flickr8kDataset(img_dir=img_dir, captions_df=captions, transform=transform, vocab=vocab, train=True)
test_dataset = Flickr8kDataset(img_dir=img_dir, captions_df=captions, transform=transform, vocab=vocab, train=False)

# Collate function to handle variable length sequences
def collate_fn(batch):
    batch = list(filter(lambda x: x[0] is not None, batch))
    images, captions = zip(*batch)
    images = torch.stack(images, 0)
    captions = nn.utils.rnn.pad_sequence(captions, batch_first=True, padding_value=vocab.stoi["<PAD>"])
    return images, captions

# Define data loaders for training and testing
train_data_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, drop_last=True, collate_fn=collate_fn)
test_data_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, drop_last=False, collate_fn=collate_fn)


In [16]:
class MobileNetV3Extractor(nn.Module):
    def __init__(self, embedding_dim=256):
        super(MobileNetV3Extractor, self).__init__()
        mobilenet_v3 = models.mobilenet_v3_large(pretrained=True)
        self.features = mobilenet_v3.features
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(960, embedding_dim)  # Update the input size to match the actual size of the flattened tensor

    def forward(self, images):
        x = self.features(images)
        x = self.pool(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

In [17]:
class Decoder(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, features, captions):
        embeddings = self.embedding(captions)
        embeddings = torch.cat((features.unsqueeze(1), embeddings), dim=1)
        lstm_out, _ = self.lstm(embeddings)
        outputs = self.fc(lstm_out)
        return outputs

In [18]:
embedding_dim = 256
hidden_dim = 512
vocab_size = len(vocab)
num_epochs = 5
learning_rate = 0.001
print_every = 100

# Model, Loss, Optimizer
feature_extractor = MobileNetV3Extractor(embedding_dim=embedding_dim).to(device)
decoder = Decoder(embedding_dim, hidden_dim, vocab_size).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(list(feature_extractor.parameters()) + list(decoder.parameters()), lr=learning_rate)

total_steps = len(train_data_loader)
for epoch in range(num_epochs):
    for i, (images, captions) in enumerate(train_data_loader):
        images = images.to(device)
        captions = captions.to(device)

        # Pass image features through the decoder
        features = feature_extractor(images)
        outputs = decoder(features, captions[:, :-1])

        # Calculate the loss
        outputs = outputs[:, :captions.size(1) - 1, :]
        loss = criterion(outputs.contiguous().view(-1, vocab_size), captions[:, 1:].contiguous().view(-1))

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print loss statistics
        if i % print_every == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i}/{total_steps}], Loss: {loss.item():.4f}')




Epoch [1/5], Step [0/1011], Loss: 8.0106
Epoch [1/5], Step [100/1011], Loss: 2.5735
Epoch [1/5], Step [200/1011], Loss: 2.6069
Epoch [1/5], Step [300/1011], Loss: 2.7895
Epoch [1/5], Step [400/1011], Loss: 2.6052
Epoch [1/5], Step [500/1011], Loss: 2.6383
Epoch [1/5], Step [600/1011], Loss: 2.5590
Epoch [1/5], Step [700/1011], Loss: 2.4189
File not found: flickr8k/Images/image
Epoch [1/5], Step [800/1011], Loss: 2.3914
Epoch [1/5], Step [900/1011], Loss: 2.4751
Epoch [1/5], Step [1000/1011], Loss: 2.2339
Epoch [2/5], Step [0/1011], Loss: 2.0035
Epoch [2/5], Step [100/1011], Loss: 2.5033
Epoch [2/5], Step [200/1011], Loss: 2.1039
Epoch [2/5], Step [300/1011], Loss: 2.1454
Epoch [2/5], Step [400/1011], Loss: 2.2350
Epoch [2/5], Step [500/1011], Loss: 2.3710
Epoch [2/5], Step [600/1011], Loss: 2.1364
Epoch [2/5], Step [700/1011], Loss: 2.3653
Epoch [2/5], Step [800/1011], Loss: 2.2136
File not found: flickr8k/Images/image
Epoch [2/5], Step [900/1011], Loss: 2.2941
Epoch [2/5], Step [1000/

In [None]:
import torch
from nltk.translate.bleu_score import corpus_bleu
from tqdm import tqdm

def evaluate_model(feature_extractor, decoder, test_data_loader, vocab, device):
    # Set the model to evaluation mode
    feature_extractor.eval()
    decoder.eval()

    references = []
    hypotheses = []

    with torch.no_grad():
        for images, captions in tqdm(test_data_loader):
            images = images.to(device)
            captions = captions.to(device)

            # Pass image features through the decoder
            features = feature_extractor(images)
            outputs = decoder(features, captions[:, :-1])

            # Get predicted captions
            _, predicted = torch.max(outputs, dim=2)
            predicted = predicted.tolist()
            captions = captions[:, 1:].tolist()

            # Convert numericalized captions to words
            for p, c in zip(predicted, captions):
                ref_sentence = [vocab.itos[word] for word in c if word != vocab.stoi['<PAD>']]
                hyp_sentence = [vocab.itos[word] for word in p if word != vocab.stoi['<PAD>']]
                references.append([ref_sentence])  # Note the double brackets
                hypotheses.append(hyp_sentence)

    # Calculate BLEU scores
    bleu1 = corpus_bleu(references, hypotheses, weights=(1, 0, 0, 0))
    bleu2 = corpus_bleu(references, hypotheses, weights=(0.5, 0.5, 0, 0))
    bleu3 = corpus_bleu(references, hypotheses, weights=(0.33, 0.33, 0.33, 0))
    bleu4 = corpus_bleu(references, hypotheses)

    print(f'BLEU-1: {bleu1:.4f}')
    print(f'BLEU-2: {bleu2:.4f}')
    print(f'BLEU-3: {bleu3:.4f}')
    print(f'BLEU-4: {bleu4:.4f}')

# Run Evaluation on the Test Set
evaluate_model(feature_extractor, decoder, test_data_loader, vocab, device)

In [None]:
# Inference
def generate_caption(image_path):
    # Load and preprocess the image
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)

    # Set the model to evaluation mode
    feature_extractor.eval()
    decoder.eval()

    # Extract features
    with torch.no_grad():
        features = feature_extractor(image_tensor)

    # Initialize the caption with the start token
    caption = [vocab.stoi['<SOS>']]

    while True:
        # Convert caption to tensor
        caption_tensor = torch.LongTensor(caption).unsqueeze(0).to(device)

        # Pass features and current caption through the decoder
        with torch.no_grad():
            outputs = decoder(features, caption_tensor)

        # Get the predicted word
        predicted_word = outputs.argmax(2)[:,-1].item()

        # Add the predicted word to the caption
        caption.append(predicted_word)

        # If the end token is predicted, stop generating
        if predicted_word == vocab.stoi['<EOS>']:
            break

        # Stop if caption exceeds max length
        if len(caption) >= 20:
            break

    # Convert the numericalized caption back to words
    generated_caption = [vocab.itos[idx] for idx in caption]

    # Return the generated caption
    return ' '.join(generated_caption[1:-1])  # Exclude the start and end tokens

# Example usage of generate_caption function
image_path = 'flickr8k/Images/1000268201_693b08cb0e.jpg'
caption = generate_caption(image_path)
print("Generated Caption:", caption)