# __Final Project: Adding captions to your photos__

__Install__

```sh
pip install kaggle
```

__Download__

```sh
kaggle datasets download -d adityajn105/flickr8k
```

__Extract__

```sh
tar -xf flickr8k.zip
```
```

In [1]:
# Install necessary Python libraries
# pip install torch torchvision nltk sklearn pandas

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torchvision import models, transforms
from torch.nn.utils.rnn import pack_padded_sequence
from PIL import Image
import os
import json
import nltk
from collections import Counter, defaultdict
from sklearn.cluster import KMeans

# Download NLTK tokenizer
nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\yashh\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [2]:
# Set the folder path where your images are located
images_folder = "./Images/"

# Get all the image file paths in the folder
image_paths = [os.path.join(images_folder, filename) for filename in os.listdir(images_folder) if filename.endswith('.jpg')]

# Load ResNet50 pre-trained model for feature extraction
weights = models.ResNet50_Weights.IMAGENET1K_V1
model = models.resnet50(weights=weights)
model = torch.nn.Sequential(*list(model.children())[:-1])  # Remove final classification layer
model.eval()

# Function to extract features from an image for clustering
def extract_features(image_path):
    image = Image.open(image_path).convert('RGB')
    preprocess = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    input_tensor = preprocess(image).unsqueeze(0)
    with torch.no_grad():
        features = model(input_tensor)
    return features.flatten().numpy()

# Extract features for all images
features_list = [extract_features(image_path) for image_path in image_paths]

# Perform clustering on the images
kmeans = KMeans(n_clusters=10, random_state=42)
clusters = kmeans.fit_predict(features_list)

# Organize images into clusters
cluster_dict = defaultdict(list)
for idx, cluster_id in enumerate(clusters):
    cluster_dict[cluster_id].append(image_paths[idx])

# Split each cluster into train and test sets
from sklearn.model_selection import train_test_split

train_image_paths = []
test_image_paths = []
for cluster_id, images in cluster_dict.items():
    train_images, test_images = train_test_split(images, test_size=0.2, random_state=42)
    train_image_paths.extend(train_images)
    test_image_paths.extend(test_images)

# Save the train and test paths to JSON files
with open("train_image_paths.json", "w") as train_file:
    json.dump(train_image_paths, train_file)

with open("test_image_paths.json", "w") as test_file:
    json.dump(test_image_paths, test_file)

In [3]:
# Load captions from captions.txt or a JSON file
captions_dict = {}
with open("captions.txt", "r") as f:
    for line in f:
        parts = line.strip().split("\t")
        if len(parts) < 2:
            continue
        image_name = parts[0].split("#")[0]
        caption = parts[1]
        if image_name not in captions_dict:
            captions_dict[image_name] = []
        captions_dict[image_name].append(caption)

# Preprocess captions
all_captions = []
for captions in captions_dict.values():
    all_captions.extend(captions)

# Tokenize all captions
tokenized_captions = [nltk.word_tokenize(caption.lower()) for caption in all_captions]

# Build vocabulary
word_counts = Counter()
for caption in tokenized_captions:
    word_counts.update(caption)

vocab = ["<PAD>", "<START>", "<END>", "<UNK>"] + [word for word, count in word_counts.items() if count >= 5]
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word, idx in word_to_idx.items()}

# Convert captions to sequences of indices
def caption_to_indices(caption):
    tokens = ["<START>"] + nltk.word_tokenize(caption.lower()) + ["<END>"]
    return [word_to_idx.get(word, word_to_idx["<UNK>"]) for word in tokens]

train_sequences = {path.split("/")[-1]: [caption_to_indices(caption) for caption in captions_dict.get(path.split("/")[-1], [])] for path in train_image_paths}

In [6]:
# Define the EncoderCNN
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = models.resnet50(pretrained=True)
        modules = list(resnet.children())[:-1]  # Remove the last fully connected layer
        self.resnet = nn.Sequential(*modules)
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        with torch.no_grad():
            features = self.resnet(images)  # Extract features from ResNet
        features = features.reshape(features.size(0), -1)  # Flatten features

        # Apply BatchNorm only if batch size > 1
        if features.size(0) > 1:
            features = self.bn(self.linear(features))
        else:
            features = self.linear(features)
        
        return features

# Define the DecoderRNN
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1, max_seq_length=20):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.max_seq_length = max_seq_length

    def forward(self, features, captions, lengths):
        embeddings = self.embed(captions)
        embeddings = torch.cat((features.unsqueeze(1), embeddings), 1)
        packed = pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False)
        lstm_output, _ = self.lstm(packed)
        outputs = self.linear(lstm_output[0])
        return outputs

    def predict(self, features, states=None):
        sampled_ids = []
        inputs = features.unsqueeze(1)
        for _ in range(self.max_seq_length):
            hiddens, states = self.lstm(inputs, states)
            outputs = self.linear(hiddens.squeeze(1))
            _, predicted = outputs.max(1)
            sampled_ids.append(predicted)
            inputs = self.embed(predicted).unsqueeze(1)
        sampled_ids = torch.stack(sampled_ids, 1)
        return sampled_ids

In [7]:
# Hyperparameters
embed_size = 256
hidden_size = 512
vocab_size = len(vocab)
learning_rate = 0.001

# Instantiate models
device = 'cuda' if torch.cuda.is_available() else 'cpu'
encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(list(decoder.parameters()) + list(encoder.linear.parameters()) + list(encoder.bn.parameters()), lr=learning_rate)

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    encoder.train()
    decoder.train()
    total_loss = 0

    for image_path, caption_seqs in train_sequences.items():
        # Load and preprocess image
        image = Image.open(os.path.join(images_folder, image_path)).convert('RGB')
        image_tensor = transforms.ToTensor()(image).unsqueeze(0).to(device)
        features = encoder(image_tensor)

        # Train on each caption for the image
        for caption_seq in caption_seqs:
            caption_tensor = torch.tensor([word_to_idx["<START>"]] + caption_seq + [word_to_idx["<END>"]]).unsqueeze(0).to(device)
            lengths = [len(caption_seq) + 2]

            # Forward pass
            optimizer.zero_grad()
            outputs = decoder(features, caption_tensor, lengths)
            targets = pack_padded_sequence(caption_tensor[:, 1:], lengths, batch_first=True, enforce_sorted=False)[0]
            
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss:.4f}")

Epoch [1/10], Loss: 0.0000
Epoch [2/10], Loss: 0.0000
Epoch [3/10], Loss: 0.0000
Epoch [4/10], Loss: 0.0000
Epoch [5/10], Loss: 0.0000
Epoch [6/10], Loss: 0.0000
Epoch [7/10], Loss: 0.0000
Epoch [8/10], Loss: 0.0000
Epoch [9/10], Loss: 0.0000
Epoch [10/10], Loss: 0.0000


In [8]:
# Generate captions for a test image
test_image_path = test_image_paths[0]
test_image = Image.open(test_image_path).convert('RGB')
test_image_tensor = transforms.ToTensor()(test_image).unsqueeze(0).to(device)

encoder.eval()
decoder.eval()
with torch.no_grad():
    test_features = encoder(test_image_tensor)
    sampled_ids = decoder.predict(test_features)
    sampled_caption = [idx_to_word[idx.item()] for idx in sampled_ids[0] if idx.item() != word_to_idx["<PAD>"]]
    print("Generated Caption:", " ".join(sampled_caption))

Generated Caption: <UNK> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START> <START>
