In [32]:
import json
import torch
from torch.nn.utils.rnn import pad_sequence
from torchtext.vocab import build_vocab_from_iterator
from typing import Iterable, List

In [83]:
# choose the first 1000 image for training 
import json

def select_first_mscoco_images(annotations_path, num_samples=20000):
    with open(annotations_path, 'r') as file:
        data = json.load(file)

    img_ids = set()
    captions = {}

    for ann in data['annotations']:
        if len(img_ids) < num_samples:
            image_id = ann['image_id']
            formatted_image_id = str(image_id).zfill(12)  # Pad the ID with zeros
            if formatted_image_id not in img_ids:
                img_ids.add(formatted_image_id)
                captions[formatted_image_id] = ann['caption']
        else:
            break

    return img_ids, captions

# Example usage
annotations_path = 'annotations/captions_train2014.json'  # Replace with your path
img_ids, captions = select_first_mscoco_images(annotations_path)

# Optionally, save the results to a file for later use
output_path = 'annotations/first_20000_captions.json'  # Replace with your desired output path
with open(output_path, 'w') as outfile:
    json.dump({'image_ids': list(img_ids), 'captions': captions}, outfile)

print(f"Extracted captions for {len(img_ids)} images.")

print(img_ids)

Extracted captions for 20000 images.
{'000000003682', '000000257666', '000000111223', '000000325569', '000000251660', '000000557771', '000000309403', '000000406490', '000000109561', '000000378415', '000000068867', '000000259113', '000000344236', '000000365314', '000000268313', '000000371174', '000000196021', '000000049346', '000000338169', '000000412034', '000000160559', '000000238232', '000000084427', '000000258890', '000000361669', '000000436406', '000000025470', '000000156506', '000000110665', '000000130973', '000000389320', '000000109246', '000000010925', '000000428304', '000000450270', '000000380039', '000000426712', '000000271167', '000000513944', '000000296781', '000000064339', '000000085610', '000000330054', '000000466719', '000000081406', '000000167644', '000000169945', '000000272299', '000000306433', '000000272312', '000000288905', '000000204509', '000000465874', '000000471182', '000000426663', '000000567084', '000000214087', '000000444340', '000000420681', '000000525660', '0

In [84]:
from torchvision import transforms
from PIL import Image
import os

# Define the image transformations
transform = transforms.Compose([
    transforms.Resize(256),            # Resize the image to 256x256 pixels
    transforms.CenterCrop(224),        # Crop the image to 224x224 pixels around the center
    transforms.ToTensor(),             # Convert the image to a PyTorch Tensor
    transforms.Normalize(              # Normalize the image
        mean=[0.485, 0.456, 0.406],    # These are the ImageNet mean and std
        std=[0.229, 0.224, 0.225]
    )
])

# Function to load and preprocess an image
def load_preprocess_image(image_path, transform=transform):
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"The image {image_path} does not exist.")
    
    image = Image.open(image_path).convert("RGB")
    image = transform(image)
    return image

# Example usage:
img_path = 'Train2014/train2014/COCO_train2014_000000000036.jpg'  # Replace with the path to an image from the selected 1000 images
img_tensor = load_preprocess_image(img_path)
print(img_tensor.shape)  # Should be torch.Size([3, 224, 224])


torch.Size([3, 224, 224])


In [85]:
from collections import Counter
from torchtext.vocab import vocab
from torch.nn.utils.rnn import pad_sequence
import torch

# Load captions
with open('annotations/first_20000_captions.json', 'r') as f:
    data = json.load(f)
captions = data['captions']

# Tokenize captions and build a vocabulary
tokenized_captions = [caption.split() for caption in captions.values()]
counter = Counter([token for caption in tokenized_captions for token in caption])
vocab = vocab(counter, specials=['<unk>', '<pad>', '<start>', '<end>'])
# Numericalize captions
numericalized_captions = [[vocab['<start>']] + [vocab[token] for token in caption] + [vocab['<end>']] for caption in tokenized_captions]

# Pad sequences
padded_captions = pad_sequence([torch.tensor(caption) for caption in numericalized_captions], padding_value=vocab['<pad>'], batch_first=True)

In [86]:
from torch.utils.data import Dataset
import os
import torch
from PIL import Image

class ImageCaptionDataset(Dataset):
    def __init__(self, img_dir, img_ids, captions, transform=None, vocab=None):
        """
        Args:
            img_dir (string): Directory with all the images.
            img_ids (list): List of image ids.
            captions (dict): Dictionary mapping image ids to captions.
            transform (callable, optional): Optional transform to be applied on an image.
            vocab (Vocab): Vocabulary object for token to index mapping.
        """
        self.img_dir = img_dir
        self.img_ids = list(img_ids)
        self.captions = captions
        self.transform = transform
        self.vocab = vocab

    def __len__(self):
        return len(self.img_ids)

    def __getitem__(self, idx):
        img_id = self.img_ids[idx]
        img_path = os.path.join(self.img_dir, f'COCO_train2014_{img_id}.jpg')  # Adjust as per MSCOCO file naming
        image = load_preprocess_image(img_path, self.transform)

        # Process caption
        caption = self.captions[img_id]
        numericalized_caption = [self.vocab['<start>']] + [self.vocab[token] for token in caption.split()] + [self.vocab['<end>']]
        caption_tensor = torch.tensor(numericalized_caption, dtype=torch.long)

        return image, caption_tensor

# Assuming `vocab` is already created as per your previous code
img_dir = 'Train2014/train2014'
dataset = ImageCaptionDataset(img_dir, img_ids, captions, transform=transform, vocab=vocab)


In [87]:
from torch.utils.data import DataLoader
def collate_fn(batch):
    images, captions = zip(*batch)
    images = torch.stack(images, 0)
    captions = pad_sequence(captions, batch_first=True, padding_value=vocab['<pad>'])
    return images, captions

data_loader = DataLoader(dataset=dataset, batch_size=32, shuffle=True, collate_fn=collate_fn, drop_last=True)


In [88]:
import torch
import torch.nn as nn
from torchvision.models import resnet50

# ResNet Encoder
class EncoderCNN(nn.Module):
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        resnet = resnet50(pretrained=True)
        for param in resnet.parameters():
            param.requires_grad_(False)
        
        modules = list(resnet.children())[:-2]  # Removing the last fully connected layer
        self.resnet = nn.Sequential(*modules)
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(resnet.fc.in_features, embed_size)

    def forward(self, images):
        features = self.resnet(images)
        features = self.adaptive_pool(features)
        features = features.reshape(features.size(0), -1)
        features = self.fc(features)
        return features

# RNN (LSTM) Decoder
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.linear = nn.Linear(256, hidden_size)  # Linear layer to transform features
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear_out = nn.Linear(hidden_size, vocab_size)

    def forward(self, features, captions):
        embeddings = self.embed(captions)

        # Transform features to match hidden_size
        features = self.linear(features)  # [batch_size, hidden_size]
        features = features.unsqueeze(0)  # [1, batch_size, hidden_size]

        hiddens, _ = self.lstm(embeddings, (features, torch.zeros_like(features)))
        outputs = self.linear_out(hiddens)
        return outputs


# Set the size of the embedding and the hidden layer
embed_size = 256
hidden_size = 512
vocab_size = len(vocab)  # This should be the size of your vocabulary

# Initialize the models
encoder = EncoderCNN(embed_size)
decoder = DecoderRNN(embed_size, hidden_size, vocab_size)




In [89]:
criterion = nn.CrossEntropyLoss()
params = list(decoder.parameters()) + list(encoder.fc.parameters())  # Train only the final layer of the encoder
optimizer = torch.optim.Adam(params, lr=0.001)


In [90]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [91]:
num_epochs = 20
total_step = len(data_loader)  # Assuming you have a DataLoader for your dataset

for epoch in range(num_epochs):
    for i, (images, captions) in enumerate(data_loader):
        # Move images and captions to the desired device (e.g., GPU)
        images = images.to(device)
        captions = captions.to(device)

        # Forward pass
        features = encoder(images)
        outputs = decoder(features, captions)  
        loss = criterion(outputs.view(-1, vocab_size), captions.reshape(-1))

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if i % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i}/{total_step}], Loss: {loss.item():.4f}")


Epoch [1/20], Step [0/625], Loss: 9.0615
Epoch [1/20], Step [100/625], Loss: 1.2226
Epoch [1/20], Step [200/625], Loss: 0.6072
Epoch [1/20], Step [300/625], Loss: 0.4546
Epoch [1/20], Step [400/625], Loss: 0.3287
Epoch [1/20], Step [500/625], Loss: 0.6127
Epoch [1/20], Step [600/625], Loss: 0.3283
Epoch [2/20], Step [0/625], Loss: 0.1061
Epoch [2/20], Step [100/625], Loss: 0.0904
Epoch [2/20], Step [200/625], Loss: 0.0934
Epoch [2/20], Step [300/625], Loss: 0.1439
Epoch [2/20], Step [400/625], Loss: 0.0919
Epoch [2/20], Step [500/625], Loss: 0.0351
Epoch [2/20], Step [600/625], Loss: 0.1577
Epoch [3/20], Step [0/625], Loss: 0.0214
Epoch [3/20], Step [100/625], Loss: 0.0225
Epoch [3/20], Step [200/625], Loss: 0.0201
Epoch [3/20], Step [300/625], Loss: 0.0311
Epoch [3/20], Step [400/625], Loss: 0.0114
Epoch [3/20], Step [500/625], Loss: 0.0335
Epoch [3/20], Step [600/625], Loss: 0.0344
Epoch [4/20], Step [0/625], Loss: 0.0010
Epoch [4/20], Step [100/625], Loss: 0.0018
Epoch [4/20], Step 

In [42]:
print(loss.item())
print(dir(vocab))

0.06605000793933868
['T_destination', '__annotations__', '__call__', '__class__', '__contains__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getitem__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__jit_unused_properties__', '__le__', '__len__', '__lt__', '__module__', '__ne__', '__new__', '__prepare_scriptable__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_apply', '_backward_hooks', '_backward_pre_hooks', '_buffers', '_call_impl', '_compiled_call_impl', '_forward_hooks', '_forward_hooks_always_called', '_forward_hooks_with_kwargs', '_forward_pre_hooks', '_forward_pre_hooks_with_kwargs', '_get_backward_hooks', '_get_backward_pre_hooks', '_get_name', '_is_full_backward_hook', '_load_from_state_dict', '_load_state_dict_post_hooks', '_load_state_dict_pre_hooks', '_maybe_warn_non_full_backward_

In [115]:
import json

model_state = {
    'epoch': epoch,
    'encoder_state_dict': encoder.state_dict(),
    'decoder_state_dict': decoder.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss 
    # Any other metadata you might find relevant
}

torch.save(model_state, 'model_checkpoint.pth')
# Save the trained encoder and decoder models
torch.save(encoder.state_dict(), 'encoder_model.pth')
torch.save(decoder.state_dict(), 'decoder_model.pth')

# Save the vocabulary as a JSON file

def save_vocab(vocab, file_path):
    """
    Save the vocabulary to a JSON file.

    Args:
    vocab (torchtext.vocab.Vocab): The vocabulary object.
    file_path (str): The path to the file where the vocabulary will be saved.
    """
    stoi = vocab.get_stoi()
    itos = vocab.get_itos()

    # Retrieve special token indices
    unk_index = stoi['<unk>'] if '<unk>' in stoi else None
    pad_index = stoi['<pad>'] if '<pad>' in stoi else None

    vocab_dict = {
        'stoi': stoi,
        'itos': itos,
        'unk_index': unk_index,
        'pad_index': pad_index
    }

    with open(file_path, 'w') as f:
        json.dump(vocab_dict, f)

# Example usage
vocab_file_path = 'vocab.json'
save_vocab(vocab, vocab_file_path)




In [116]:
# Assuming you have saved the model and optimizer states
checkpoint = torch.load('model_checkpoint.pth')
encoder.load_state_dict(checkpoint['encoder_state_dict'])
decoder.load_state_dict(checkpoint['decoder_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# Make sure the models are in training mode
encoder.train()
decoder.train()




DecoderRNN(
  (embed): Embedding(8853, 256)
  (linear): Linear(in_features=256, out_features=512, bias=True)
  (lstm): LSTM(256, 512, batch_first=True)
  (linear_out): Linear(in_features=512, out_features=8853, bias=True)
)

In [120]:

def load_preprocess_image(image_path, transform):
    image = Image.open(image_path).convert("RGB")
    image = transform(image)
    image = image.unsqueeze(0)  # Add a batch dimension
    return image

transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])




# Set the model to evaluation mode
encoder.eval()
decoder.eval()


DecoderRNN(
  (embed): Embedding(8853, 256)
  (linear): Linear(in_features=256, out_features=512, bias=True)
  (lstm): LSTM(256, 512, batch_first=True)
  (linear_out): Linear(in_features=512, out_features=8853, bias=True)
)

In [136]:
def generate_caption(image_path, encoder, decoder, vocab, transform, max_length=10):
    stoi = vocab.get_stoi()
    itos = vocab.get_itos()
    # Load and preprocess the image
    image_tensor = load_preprocess_image(image_path, transform).to(device)

    # Generate caption
    encoder.eval()
    decoder.eval()
    with torch.no_grad():
        features = encoder(image_tensor)
        sampled_ids = []
        inputs = torch.tensor([stoi['<start>']]).unsqueeze(0).to(device)
        for i in range(max_length):
            outputs = decoder(features, inputs)
            predicted = outputs.argmax(2)
            predicted_id = predicted.item()
            sampled_ids.append(predicted_id)
            if predicted_id == stoi['<end>']:
                break
            inputs = torch.tensor([predicted_id]).unsqueeze(0).to(device)
    print(inputs)
    print(sampled_ids)
    # Convert word indices to words
    sampled_caption = []
    for word_id in sampled_ids:
        word = itos[word_id]
        if word == '<end>':
            break
        sampled_caption.append(word)

    return ' '.join(sampled_caption)

# Example usage
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
image_path = "Train2014/train2014/COCO_train2014_000000000510.jpg"
caption = generate_caption(image_path, encoder, decoder, vocab, transform)
print("Generated Caption:", caption)



tensor([[1135]])
[160, 2483, 1491, 1798, 5609, 1823, 7933, 2071, 73, 1135]
Generated Caption: as spoons doorway bar. Tigers cleaning pus theme. child beautifully
