In [115]:
import pandas as pd
import json
import os
import csv
from PIL import Image
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import random
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
from PIL import Image

image_folder = r'../lab3/archive/Images'
caption_file = r'../lab3/archive/captions.txt'


In [116]:
# Build vocabulary from captions
# Load and organize captions
def load_captions(filename):
    captions_dict = {}
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.reader(file)
        next(reader)
        for row in reader:
            if len(row) < 2:
                continue
            image_id, caption = row[0], row[1]
            if image_id in captions_dict:
                captions_dict[image_id].append(caption)
            else:
                captions_dict[image_id] = [caption]
    return captions_dict

captions = load_captions(caption_file)

def build_vocabulary(captions):
    word_counts = {}
    for image_id, image_captions in captions.items():
        for caption in image_captions:
            tokens = caption.split()  # Tokenize the caption
            for token in tokens:
                if token not in word_counts:
                    word_counts[token] = 1
                else:
                    word_counts[token] += 1
    
    # Sort words by frequency
    sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
    
    # Create vocabulary dictionary
    vocabulary = {'<PAD>': 0, '<START>': 1, '<END>': 2}
    for i, (word, _) in enumerate(sorted_words):
        vocabulary[word] = i + 3  # Start from index 3, since 0, 1, and 2 are reserved for special tokens
    
    return vocabulary

# Build vocabulary from captions
vocabulary = build_vocabulary(captions)

# Example vocabulary
print(vocabulary)




In [125]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

import random
class Flickr8kDataset(Dataset):
    def __init__(self, image_folder, captions_file, max_caption_length, vocabulary, transform=None):
        """
        Args:
            image_folder (string): Directory with all the images.
            captions (dictionary): Dictionary containing image IDs as keys and lists of captions as values.
            max_caption_length (int): Maximum length of captions after padding.
            vocabulary (dictionary): Vocabulary dictionary mapping tokens to indices.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.image_folder = image_folder
        self.captions = captions_file
        self.max_caption_length = max_caption_length
        self.vocabulary = vocabulary
        self.transform = transform
        self.image_filenames = [f for f in os.listdir(image_folder) if f.endswith(('.jpg', '.jpeg'))]

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_folder, self.image_filenames[idx])
        try:
            image = Image.open(img_name).convert('RGB')
            if self.transform:
                image = self.transform(image)
        except IOError:
            print(f'Error opening image {img_name}, skipping')
            return None

        # Get a random caption for the image
        captions_for_image = self.captions[self.image_filenames[idx]]
        caption = random.choice(captions_for_image)

        # Tokenize and pad caption
        caption_tokens = caption.split()
        padded_caption = [self.vocabulary[token] for token in caption_tokens]
        padded_caption += [self.vocabulary['<PAD>']] * (self.max_caption_length - len(padded_caption))
        caption_tensor = torch.tensor(padded_caption, dtype=torch.long)

        return image, caption_tensor


    
flickr_dataset = Flickr8kDataset(image_folder=image_folder, captions_file=captions,max_caption_length=50, vocabulary=vocabulary, transform=transform)
# Splitting the dataset into train and test
train_indices, test_indices = train_test_split(range(len(flickr_dataset)), test_size=0.2)

# Splitting the training set into trrain&validation
train_indices, val_indices = train_test_split(train_indices, test_size=0.2)

# Subsets creation
train_dataset_Img = Subset(flickr_dataset, train_indices)
test_dataset_Img = Subset(flickr_dataset, test_indices)
val_dataset_Img = Subset(flickr_dataset, val_indices)

train_loader_Img = DataLoader(train_dataset_Img, batch_size=32, shuffle=True)
test_loader_Img = DataLoader(test_dataset_Img, batch_size=32, shuffle=False)
val_loader_Img = DataLoader(val_dataset_Img, batch_size=32, shuffle=False)

print(len(train_dataset_Img))
    

5177


In [126]:
import torchvision.models as models


class CNN(nn.Module):
    def __init__(self,embed_size, train_CNN= False):
        super(CNN,self).__init__()
        self.train_CNN = train_CNN
        self.inception = models.resnet50(pretrained = True)
        self.inception.fc = nn.Linear(self.inception.fc.in_features, embed_size)
        self.relu = nn.ReLU() 
        self.dropout = nn.Dropout(0.3)

    def forward(self,images):
        features = self.inception(images)
        
        for name, param in self.inception.named_parameters():

            if "fc.weights" in name in "fc.bias" in name :
                param.requires_grad = True
            else:
                param.requires_grad = self.train_CNN
        
        return self.dropout(self.relu(features))



In [127]:
import torch
import torch.nn as nn

# Define the sequence model
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size,num_layers):
        super(LSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.2)

    def forward(self, features, captions):
        # Embedding the captions
        captions_embedded = self.dropout(self.embedding(captions))
        
        # Concatenate image features with embedded captions
        features = features.unsqueeze(0)  # Add sequence length dimension
        inputs = torch.cat((features, captions_embedded), dim=1)
    
        # LSTM
        lstm_out, _ = self.lstm(inputs)
        
        # Linear layer
        outputs = self.linear(lstm_out)
        
        +
        return outputs



# Initialize the caption generator model
#caption_model = LSTM(vocab_size, embedding_dim, hidden_size, num_layers)

In [128]:
class CNNtoLSTM(nn.Module):
    def __init__(self,embed_size,hidden_size,vocab_size,num_layers):
        super(CNNtoLSTM,self).__init__()
        self.encoderCNN = CNN(embed_size)
        self.decoderRNN = LSTM(embed_size, hidden_size,vocab_size,num_layers)

    def forward(self, images,captions):
        features = self.encoderCNN(images)
        outputs = self.decoderRNN(features,captions)
        return outputs
        
    def caption_image(self,image,vocabulary,max_length=50):
        result_caption = []

        with torch.no_grad():
            x = self.encoderCNN(image).unsqueeze(0)
            states = None
            for _ in range(max_length):
                hiddens, states = self.LSTM.lstm(x,states)
                output = self.decoderRNN.linear(hiddens.unsqueeze(0))
                predicted = output.argmax(1)

                result_caption.append(predicted.item())
                x = self.decoderRNN.embedding(predicted).unsqueeze(0)

                if vocabulary.itos[predicted.item()] == "<EOS>":
                    break

        return [vocabulary.itos[idx] for idx in result_caption]


# Example usage
vocab_size = len(vocabulary)
embedding_dim = 256
hidden_size = 256
num_layers = 3
learning_rate = 1e-3
num_epochs = 100

model = CNNtoLSTM(embedding_dim,hidden_size, vocab_size,num_layers)
criterion = nn.CrossEntropyLoss()




In [130]:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)

for epoch in range(num_epochs):
    train_loss = 0.0

    for batch_nr, (img,cap) in enumerate(train_loader_Img):
        img = img.to(device)
        cap = cap.to(device)

        outputs = model(img,cap[:-1])
        loss = criterion(outputs.reshape(-1,outputs.shape[2]), cap.reshape(-1))

        optimizer.zero_grad()
        loss.backward(loss)
        optimizer.step()

        train_loss += loss.item()
    
    train_loss /= len(train_loader_Img.dataset)

    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.6f}')
    




IndexError: index out of range in self