In [2]:
import os
import cv2
import json
import string
import csv
import pickle
import timm
import numpy as np
from collections import Counter
from tqdm import tqdm
from matplotlib import pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
from torch import Tensor
from torch.optim import Adam
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms.functional import to_tensor
from torchvision.models.detection.image_list import ImageList

data_dir = os.path.join('dataset')
working_dir = os.path.join('working')
images_dir = os.path.join(data_dir,'Images')
captions_dir = os.path.join(data_dir,'captions.txt')

In [3]:
if torch.cuda.is_available():
    for i in range(torch.cuda.device_count()):
        print(f"Device {i}: {torch.cuda.get_device_name(i)}")
else:
    print("No GPU devices available.")
# *********************
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Device 0: NVIDIA GeForce GTX 1660 SUPER
Using device: cuda


In [4]:
import gensim

pretrained_embeddings_path = "GoogleNews-vectors-negative300.bin"
word2vec = gensim.models.KeyedVectors.load_word2vec_format(pretrained_embeddings_path, 
binary=True)

In [5]:
from gensim.utils import simple_preprocess
from gensim.models import Word2Vec


# # Example text data
# documents = ["This is the first sentence.", "Here is another one.", "And a third sentence."]

# # Preprocess text
# processed_docs = [simple_preprocess(doc) for doc in documents]



In [8]:
# processed_docs

In [9]:
# for flickr8k
def load_captions(filepath):
    captions = {}
    with open(filepath, 'r') as file:
        reader = csv.reader(file) 
        for row in reader:
            if len(row) != 2:
                print(f"Skipping malformed line: {row[:50]}...")  
                continue
            image_id, caption = row
            image_id = image_id.split('.')[0]  
            if image_id not in captions:
                captions[image_id] = []
            captions[image_id].append(caption)
    
    return captions
captions = load_captions(captions_dir)

In [10]:
# # for flickr30k
# def load_captions(filepath):
#     captions = {}
#     # Adding encoding parameter to handle potential encoding issues
#     with open(filepath, 'r', encoding='utf-8', errors='replace') as file:
#         reader = csv.reader(file) 
#         for row in reader:
#             if len(row) != 2:
#                 print(f"Skipping malformed line: {row[:50]}...")  
#                 continue
#             image_id, caption = row
#             image_id = image_id.split('.')[0]  
#             if image_id not in captions:
#                 captions[image_id] = []
#             captions[image_id].append(caption)
    
#     return captions

# captions = load_captions(captions_dir)

In [11]:
# def clean_and_tokenize(caption):
#     tokens = simple_preprocess(caption)
#     return tokens

In [12]:
def clean_and_tokenize(caption):
    tokens = caption.lower().translate(str.maketrans('', '', string.punctuation)).split()
    return tokens
    
# Collect all captions
all_captions = []
for cap_list in captions.values():
    all_captions.extend(cap_list)
    
# Count word frequencies
word_counts = Counter()
for caption in all_captions:
    word_counts.update(clean_and_tokenize(caption))

# Create a vocabulary with words that exist in both Word2Vec and your dataset
vocab = [word for word, count in word_counts.items() if count >= 2]

# Map words to indices for the special tokens
word_to_ix = {word: ix for ix, word in enumerate(vocab, start=4)}  # start=4 to leave 0 for <PAD>, 1 for <START>, 2 for <END>, 3 for <UNK>
word_to_ix['<PAD>'] = 0
word_to_ix['<START>'] = 1
word_to_ix['<END>'] = 2
word_to_ix['<UNK>'] = 3

# Reverse lookup for decoding
ix_to_word = {ix: word for word, ix in word_to_ix.items()}

# Update vocab size
vocab_size = len(word_to_ix)

In [13]:
vocab_size

5224

In [24]:
vocab_file_path = 'vocab.json'
with open('vocab.json', 'w') as vocab_file:
    json.dump(word_to_ix, vocab_file)

In [25]:
def encode_caption(caption, word_to_ix, max_length):
    tokens = clean_and_tokenize(caption)
    tokens = ['<START>'] + tokens + ['<END>']
    caption_ids = [word_to_ix.get(token, word_to_ix['<UNK>']) for token in tokens]
    if len(caption_ids) < max_length:
        caption_ids += [word_to_ix['<PAD>']] * (max_length - len(caption_ids))
    else:
        caption_ids = caption_ids[:max_length]
    return np.array(caption_ids)

max_length = max(len(clean_and_tokenize(caption)) + 2 for caption in all_captions)  # +2 for <START> and <END>
encoded_captions = {img_id: [encode_caption(caption, word_to_ix, max_length) for caption in cap_list]
                    for img_id, cap_list in captions.items()}


In [26]:
def extract_features(model, image_path, transform):
    # Preprocess the image
    image = Image.open(image_path).convert("RGB") 
    image_tensor = transform(image).unsqueeze(0)  # Add batch dimension
    image_tensor = image_tensor.to(device)
    # Extract features
    with torch.no_grad():
        features = model(image_tensor)
    
    # Swin transformer returns [batch_size, num_patches, embedding_dim]
    # You can reshape or pool as needed for your LSTM input
    return features.view(features.size(0), -1)  # Flatten into [batch_size, feature_dim]

In [27]:
# # Load a pretrained Swin Transformer model
# swin_model = timm.create_model('swin_large_patch4_window7_224', pretrained=True, num_classes=0)  # num_classes=0 removes the classification head
# swin_model.to(device)
# swin_model.train()  # Set to training mode

# # Optionally, you might still want to freeze some layers if you don't want to train the entire model
# # Example: Freezing earlier layers but training later layers
# for name, param in swin_model.named_parameters():
#     if 'stage4' in name:  # Just an example, adjust according to your needs
#         param.requires_grad = True
#     else:
#         param.requires_grad = False


In [28]:
# Load a pretrained Swin Transformer model
swin_model = timm.create_model('swin_large_patch4_window7_224', pretrained=True, num_classes=0)  # num_classes=0 removes the classification head
swin_model.to(device)
swin_model.eval()

# # Freeze the Swin model parameters
# for param in swin_model.parameters():
#     param.requires_grad = False

SwinTransformer(
  (patch_embed): PatchEmbed(
    (proj): Conv2d(3, 192, kernel_size=(4, 4), stride=(4, 4))
    (norm): LayerNorm((192,), eps=1e-05, elementwise_affine=True)
  )
  (layers): Sequential(
    (0): SwinTransformerStage(
      (downsample): Identity()
      (blocks): Sequential(
        (0): SwinTransformerBlock(
          (norm1): LayerNorm((192,), eps=1e-05, elementwise_affine=True)
          (attn): WindowAttention(
            (qkv): Linear(in_features=192, out_features=576, bias=True)
            (attn_drop): Dropout(p=0.0, inplace=False)
            (proj): Linear(in_features=192, out_features=192, bias=True)
            (proj_drop): Dropout(p=0.0, inplace=False)
            (softmax): Softmax(dim=-1)
          )
          (drop_path1): Identity()
          (norm2): LayerNorm((192,), eps=1e-05, elementwise_affine=True)
          (mlp): Mlp(
            (fc1): Linear(in_features=192, out_features=768, bias=True)
            (act): GELU(approximate='none')
            (

In [29]:
# List of image IDs (filenames without extension)
image_ids = [img_name.split('.')[0] for img_name in os.listdir(images_dir) if img_name.endswith('.jpg')]

# Set up transforms for training and validation/test
train_transform = transforms.Compose([
    transforms.RandomResizedCrop((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomRotation(degrees=10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# Define the transform used during feature extraction (should be fixed)
feature_extraction_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [30]:
# # Extract features for all images and save them in a dictionary
# features_dict = {}
# for image_id in tqdm(image_ids):
#     image_path = os.path.join(images_dir, image_id + '.jpg')
#     if os.path.exists(image_path):
#         features = extract_features(swin_model,image_path,feature_extraction_transform)
#         features_dict[image_id] = features  # Convert to list for JSON serialization

In [31]:
# # Save features to a .pkl file
# with open('features.pkl', 'wb') as f:  # 'wb' for write-binary
#     pickle.dump(features_dict, f)

In [32]:
# Load features from a .pkl file
with open('features.pkl', 'rb') as f:  # 'rb' for read-binary
    features_dict = pickle.load(f)

In [33]:
# # Save 30k features to a .pkl file
# with open('features30k.pkl', 'wb') as f:  # 'wb' for write-binary
#     pickle.dump(features_dict, f)

In [22]:
# # Load 30kfeatures from a .pkl file
# with open('features30k.pkl', 'rb') as f:  # 'rb' for read-binary
#     features_dict = pickle.load(f)

In [10]:
class Flickr8kDataset(Dataset):
    def __init__(self, features_dict , captions, encoded_captions, image_ids, transform):
        self.features_dict  = features_dict 
        self.captions = captions
        self.encoded_captions = encoded_captions
        self.image_ids = image_ids
        self.transform = transform
    
    def __len__(self):
        return len(self.image_ids)
    
    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        
        features = self.features_dict[image_id].clone().detach().float()
        # Randomly choose one of the captions for this image
        # captions_for_image = self.encoded_captions[image_id]
        # caption_idx = np.random.randint(0, len(captions_for_image))  # Choose a random caption
        
        captions_for_image = np.array(self.encoded_captions[image_id])# caption = torch.tensor(captions_for_image[caption_idx], dtype=torch.long)
        caption = torch.tensor(captions_for_image, dtype=torch.long)
        
        return features, caption

NameError: name 'Dataset' is not defined

In [24]:
def custom_collate_fn(batch):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Filter out None values
    batch = list(filter(lambda x: x is not None, batch))
    if len(batch) == 0:
        return torch.tensor([]).to(device), torch.tensor([]).to(device)  # Return empty tensors if the batch is empty
    
    features, captions = zip(*batch)
    
    # Determine the maximum feature size
    max_feature_size = max(feature.size(1) for feature in features)
    
    # Pad feature tensors to the maximum size
    features = torch.stack([
        torch.cat((feature.to(device), torch.zeros((feature.size(0), max_feature_size - feature.size(1)), device=device)), dim=1)
        for feature in features
    ])
    
    captions = torch.stack(captions).to(device)
    return features, captions
    
print(type(captions))  # This should print <class 'dict'>

<class 'dict'>


In [25]:
import random # Shuffle and split data
image_ids = list(captions.keys())
random.seed(42)
random.shuffle(image_ids)

# Calculate indices for splits
total_images = len(image_ids)
train_end = int(0.7 * total_images)
val_end = int(0.9 * total_images)

train_ids = image_ids[:train_end]
val_ids = image_ids[train_end:val_end]
test_ids = image_ids[val_end:]

In [26]:
train_dataset = Flickr8kDataset(features_dict, captions, encoded_captions, train_ids, train_transform)
val_dataset = Flickr8kDataset(features_dict, captions, encoded_captions, val_ids, test_transform)
test_dataset = Flickr8kDataset(features_dict, captions, encoded_captions, test_ids, test_transform)

# Create DataLoaders for each split
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=custom_collate_fn)

In [27]:
sample_image_path = "dataset/Images/133905560_9d012b47f3.jpg"  # Replace with a valid image path
sample_features = extract_features(swin_model, sample_image_path,feature_extraction_transform)
feature_size = sample_features.size(1)
print("Feature Size:", feature_size)

Feature Size: 1536


In [28]:
class ImageCaptioningModel(nn.Module):
    def __init__(self, feature_size, hidden_size, vocab_size, embed_size, dropout):
        super(ImageCaptioningModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.dropout = nn.Dropout(dropout)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.lstm_dropout = nn.Dropout(dropout)  # Dropout after LSTM
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.feature_fc = nn.Linear(feature_size, embed_size)  # Adapting Swin output to LSTM
    
    def forward(self, features, captions):
        features = features.to(device)
        captions = captions.to(device)
        # Project features to embedding size
        batch_size = features.size(0)
        features = features.view(batch_size, -1)
        features = self.feature_fc(features).unsqueeze(1)  # [batch_size, 1, embed_size]
        
        # Prepare LSTM inputs
        embeddings = self.embedding(captions)
        embeddings = self.dropout(embeddings)  # Dropout after embedding layer
        inputs = torch.cat((features, embeddings[:, :-1, :]), dim=1)  # Concatenate features with captions
        
        # Pass through LSTM
        hiddens, _ = self.lstm(inputs)
        hiddens = self.lstm_dropout(hiddens)  # Dropout after LSTM hidden states
        outputs = self.linear(hiddens)
        return outputs




hidden_size = 1024
embed_size = 256
dropout = 0.5  
vocab_size
captioning_model = ImageCaptioningModel(feature_size, hidden_size, vocab_size, embed_size, dropout)
captioning_model = captioning_model.to(device)

In [29]:
class EarlyStopping:
    def __init__(self, patience=5, delta=0.01):
        self.patience = patience
        self.delta = delta
        self.best_loss = float('inf')
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

In [30]:
# Training loop
criterion = nn.CrossEntropyLoss(ignore_index=word_to_ix['<PAD>'])
optimizer = Adam(captioning_model.parameters(), lr=0.0018, weight_decay=0.0001)
num_epochs = 40
early_stopping = EarlyStopping(patience=4, delta=0.01)  # Adjust patience and delta as needed
train_loss_values = []
val_loss_values = []

# Create a tqdm instance to show global progress for training and validation
total_steps = num_epochs * len(train_loader) + num_epochs * len(val_loader)
with tqdm(total=total_steps, desc='Training Progress') as pbar:
    for epoch in range(num_epochs):
        captioning_model.train()
        epoch_train_loss = 0
        for i, (features, captions) in enumerate(train_loader):
            if features.shape[0] == 0:  # Skip if batch is empty
                continue
                
            # Move inputs and targets to the selected device (GPU or CPU)
            features = features.to(device)
            captions = captions.to(device)
            
            optimizer.zero_grad()

            # Get the size of the captions
            batch_size, num_captions, seq_len = captions.size()

            # Squeeze out the extra dimensions in `features` to make it [batch_size, feature_size]
            features = features.squeeze(1).squeeze(1)  # Removing the unnecessary singleton dimensions

            # Now expand the features to repeat them across the captions dimension
            features = features.unsqueeze(1).expand(-1, num_captions, -1)  # [batch_size, num_captions, feature_size]

            # Flatten features and captions for input to the model
            features = features.contiguous().view(batch_size * num_captions, -1)
            captions = captions.view(batch_size * num_captions, seq_len)

            # Forward pass
            outputs = captioning_model(features, captions)
            loss = criterion(outputs[:, :captions.size(1), :].view(-1, vocab_size), captions.view(-1))

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            epoch_train_loss += loss.item()

            pbar.set_description(f'Epoch [{epoch+1}/{num_epochs}]')
            pbar.set_postfix({'Step': i, 'Loss': loss.item()})
            pbar.update(1)
        
        epoch_train_loss /= len(train_loader)
        train_loss_values.append(epoch_train_loss)

        # Validation loop
        captioning_model.eval()
        epoch_val_loss = 0
        with torch.no_grad():
            for features, captions in val_loader:
                if features.shape[0] == 0:  # Skip empty batches
                    continue
                # Move inputs and targets to the selected device (GPU or CPU)
                features = features.to(device)
                captions = captions.to(device)
                
                # Get the size of the captions
                batch_size, num_captions, seq_len = captions.size()

                # Squeeze and expand features just like in the training loop
                features = features.squeeze(1).squeeze(1)  # Remove unnecessary singleton dimensions
                features = features.unsqueeze(1).expand(-1, num_captions, -1)  # Expand to match num_captions
                features = features.contiguous().view(batch_size * num_captions, -1)

                captions = captions.view(batch_size * num_captions, seq_len)

                # Forward pass
                outputs = captioning_model(features, captions)
                loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))
                epoch_val_loss += loss.item()
                pbar.update(1)  # Update progress bar for validation batches

        epoch_val_loss /= len(val_loader)
        val_loss_values.append(epoch_val_loss)
        print(f'Epoch [{epoch+1}/{num_epochs}], Validation Loss: {epoch_val_loss}')

        # Update tqdm progress bar to reflect the completion of an epoch
        pbar.set_description(f'Epoch [{epoch+1}/{num_epochs}]')
        pbar.set_postfix({'Validation Loss': epoch_val_loss})

        # Early stopping
        early_stopping(epoch_val_loss)
        if early_stopping.early_stop:
            print("Early stopping")
            break


Epoch [2/40]:   2%|▎         | 228/9120 [00:26<07:00, 21.12it/s, Step=0, Loss=3.45]   

Epoch [1/40], Validation Loss: 3.4184787086412016


Epoch [3/40]:   5%|▌         | 457/9120 [00:53<08:45, 16.47it/s, Step=0, Loss=3.2]    

Epoch [2/40], Validation Loss: 3.0827246834250057


Epoch [4/40]:   8%|▊         | 685/9120 [01:21<08:29, 16.57it/s, Step=0, Loss=2.64]   

Epoch [3/40], Validation Loss: 2.929031732035618


Epoch [5/40]:  10%|█         | 913/9120 [01:48<08:18, 16.47it/s, Step=0, Loss=2.77]   

Epoch [4/40], Validation Loss: 2.838222625208836


Epoch [6/40]:  13%|█▎        | 1141/9120 [02:15<08:10, 16.28it/s, Step=0, Loss=2.77]   

Epoch [5/40], Validation Loss: 2.7835500100079704


Epoch [7/40]:  15%|█▌        | 1369/9120 [02:43<08:01, 16.10it/s, Step=0, Loss=2.64]   

Epoch [6/40], Validation Loss: 2.7404459691515157


Epoch [8/40]:  18%|█▊        | 1597/9120 [03:11<07:57, 15.75it/s, Step=0, Loss=2.59]   

Epoch [7/40], Validation Loss: 2.7155344299241606


Epoch [9/40]:  20%|██        | 1825/9120 [03:39<07:43, 15.75it/s, Step=0, Loss=2.63]   

Epoch [8/40], Validation Loss: 2.694678348653457


Epoch [10/40]:  23%|██▎       | 2053/9120 [04:08<07:27, 15.79it/s, Step=0, Loss=2.42]   

Epoch [9/40], Validation Loss: 2.669717784021415


Epoch [11/40]:  25%|██▌       | 2281/9120 [04:37<07:20, 15.52it/s, Step=0, Loss=2.31]   

Epoch [10/40], Validation Loss: 2.661142157573326


Epoch [12/40]:  28%|██▊       | 2509/9120 [05:05<07:01, 15.68it/s, Step=0, Loss=2.2]    

Epoch [11/40], Validation Loss: 2.646244960672715


Epoch [13/40]:  30%|███       | 2737/9120 [05:34<06:49, 15.59it/s, Step=0, Loss=2.5]    

Epoch [12/40], Validation Loss: 2.626915286569034


Epoch [14/40]:  33%|███▎      | 2965/9120 [06:03<06:29, 15.79it/s, Step=0, Loss=2.27]   

Epoch [13/40], Validation Loss: 2.6193768463882745


Epoch [15/40]:  35%|███▌      | 3193/9120 [06:31<06:23, 15.46it/s, Step=0, Loss=2.33]   

Epoch [14/40], Validation Loss: 2.6261795642329195


Epoch [16/40]:  38%|███▊      | 3421/9120 [07:00<06:12, 15.28it/s, Step=0, Loss=2.32]   

Epoch [15/40], Validation Loss: 2.6153986641004976


Epoch [17/40]:  40%|████      | 3649/9120 [07:29<05:55, 15.39it/s, Step=0, Loss=2.43]   

Epoch [16/40], Validation Loss: 2.6060943229525697


Epoch [18/40]:  43%|████▎     | 3877/9120 [07:58<05:44, 15.23it/s, Step=0, Loss=2.2]    

Epoch [17/40], Validation Loss: 2.61017323475258


Epoch [19/40]:  45%|████▌     | 4105/9120 [08:27<05:26, 15.35it/s, Step=0, Loss=2.25]  

Epoch [18/40], Validation Loss: 2.6035492981181427


Epoch [20/40]:  48%|████▊     | 4333/9120 [08:56<05:11, 15.36it/s, Step=0, Loss=2.21]  

Epoch [19/40], Validation Loss: 2.5968259689854643


Epoch [21/40]:  50%|█████     | 4561/9120 [09:26<05:03, 15.01it/s, Step=0, Loss=2.08]  

Epoch [20/40], Validation Loss: 2.603459961274091


Epoch [22/40]:  53%|█████▎    | 4789/9120 [09:55<04:43, 15.29it/s, Step=0, Loss=2.25]   

Epoch [21/40], Validation Loss: 2.607053686590756


Epoch [22/40]:  55%|█████▌    | 5016/9120 [10:24<08:30,  8.03it/s, Validation Loss=2.59]

Epoch [22/40], Validation Loss: 2.5937503973642984
Early stopping





In [31]:
model_save_path = 'captioning_model.pth'

# Save the model's state dictionary
torch.save(captioning_model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")

Model saved to captioning_model.pth


In [32]:
# Define the path where the model is saved
model_save_path = 'captioning_model.pth'

# Load the state dictionary into the model
captioning_model.load_state_dict(torch.load(model_save_path))
# Set the model to evaluation mode
captioning_model.eval()

print(f"Model loaded from {model_save_path}")

Model loaded from captioning_model.pth


  captioning_model.load_state_dict(torch.load(model_save_path))


In [33]:
# Save loss values to a file
with open('train_loss_values.json', 'w') as f:
    json.dump(train_loss_values, f)
# Save loss values to a file
with open('val_loss_values.json', 'w') as f:
    json.dump(val_loss_values, f)

In [34]:
def load_json(filename):
    try:
        with open(filename, 'r') as f:
            data = json.load(f)
        if not data:
            print(f"Warning: {filename} is empty.")
        return data
    except FileNotFoundError:
        print(f"Error: {filename} not found.")
        return None
    except json.JSONDecodeError:
        print(f"Error: Could not decode JSON from {filename}.")
        return None

# Load the loss values
train_loss_values = load_json('train_loss_values.json')
val_loss_values = load_json('val_loss_values.json')

# show thier values
train_loss_values ,val_loss_values

# Simplified plot without specifying figsize
plt.plot(train_loss_values, label='Training Loss')
plt.plot(val_loss_values, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [37]:
# Testing phase
captioning_model.eval()
test_loss = 0
with torch.no_grad():
    for features, captions in test_loader:
        if features.shape[0] == 0:  # Skip empty batches
            continue
        # Move inputs and targets to the selected device (GPU or CPU)
        features = features.to(device)
        captions = captions.to(device)
                
        # Get the size of the captions
        batch_size, num_captions, seq_len = captions.size()

        # Squeeze and expand features like in training and validation loops
        features = features.squeeze(1).squeeze(1)  # Remove unnecessary singleton dimensions
        features = features.unsqueeze(1).expand(-1, num_captions, -1)  # Expand to match num_captions
        features = features.contiguous().view(batch_size * num_captions, -1)

        captions = captions.view(batch_size * num_captions, seq_len)

        # Forward pass
        outputs = captioning_model(features, captions)
        loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))
        test_loss += loss.item()

# Compute average test loss
test_loss /= len(test_loader)
print(f'Test Loss: {test_loss}')


Test Loss: 2.620670887140127


In [39]:
def preprocess_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image = feature_extraction_transform(image).unsqueeze(0)  # Add batch dimension
    return image
    
def extract_image_features(image_path, model):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    image_tensor = preprocess_image(image_path).to(device)
    with torch.no_grad():
        features = model(image_tensor).view(1, -1)
    return features

In [40]:
def remove_repetitive_words(caption):
    words = caption.split()
    filtered_words = [words[0]]  # Initialize with the first word
    
    for i in range(1, len(words)):
        if words[i] != words[i - 1]:
            filtered_words.append(words[i])
    
    return ' '.join(filtered_words)

def generate_caption(captioning_model, swin_model, image_path, word_to_ix, ix_to_word, max_length, feature_size, beam_size=5):
    """
    Generate a caption for an image using the trained captioning model and Swin Transformer for feature extraction.
    """
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Preprocess the image to extract features
    image = Image.open(image_path).convert("RGB")
    image = feature_extraction_transform(image).unsqueeze(0).to(device)  # Add batch dimension and move to device

    with torch.no_grad():
        features = extract_image_features(image_path, swin_model).to(device)  # Ensure features are on the same device
        features = features.view(1, feature_size)

    # Initialize beam search
    beam = [([word_to_ix['<START>']], 0)]  # (sequence, score)
    for _ in range(max_length):
        new_beam = []
        for seq, score in beam:
            caption_tensor = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device)  # Move to device
            with torch.no_grad():
                outputs = captioning_model(features, caption_tensor)
            
            # Get top beam_size predictions
            top_k_scores, top_k_ids = torch.topk(outputs[0, -1], beam_size)
            
            for i in range(beam_size):
                new_seq = seq + [top_k_ids[i].item()]
                new_score = score + top_k_scores[i].item()
                new_beam.append((new_seq, new_score))

        # Keep only the top beam_size sequences
        beam = sorted(new_beam, key=lambda x: x[1], reverse=True)[:beam_size]
        
        # Check for end token
        if any(seq[-1] == word_to_ix['<END>'] for seq, _ in beam):
            break

    # Choose the best sequence
    best_seq = max(beam, key=lambda x: x[1])[0]

    # Skip <START> and <END> tokens
    caption = ' '.join([ix_to_word[ix] for ix in best_seq if ix not in [word_to_ix['<START>'], word_to_ix['<END>']]])

    # Remove repetitive words from the caption
    caption = remove_repetitive_words(caption)
    
    return caption



In [2]:
import gc
# Path to the image you want to caption
image_path = 'dataset/Images/10815824_2997e03d76.jpg'
# Generate caption
feature_size = 1536
max_length = 30  # Maximum caption length

try:
    # Ensure synchronization before running the caption generation
    torch.cuda.synchronize()
    
    # Generate caption using preloaded models
    caption = generate_caption(captioning_model, swin_model, image_path, word_to_ix, ix_to_word, max_length, feature_size, beam_size=3)
    
    # Load the image
    image = Image.open(image_path)
    
    # Convert image to numpy array
    image_array = np.array(image)
    
    # Create a new figure
    fig, ax = plt.subplots()
    
    # Plot the image and add the caption
    ax.imshow(image_array)
    ax.axis('off')  # Hide axes
    ax.set_title(caption)
    
    # Save the image with caption before showing it
    fig.savefig('image_with_caption.png', bbox_inches='tight', pad_inches=0)  # Save the plot as a PNG file
    
    # Display the image and the caption
    plt.show()  # Ensure the image is displayed with the caption
    
    # Free resources by clearing the figure
    plt.close(fig)  # Close the current figure after saving the image
    
    # Manually clear GPU cache and force garbage collection
    torch.cuda.empty_cache()  # Clear the GPU cache
    gc.collect()  # Force garbage collection to free RAM

except Exception as e:
    print(f"Error during caption generation: {e}")

Error during caption generation: name 'torch' is not defined


In [42]:
from tqdm import tqdm
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
import fractions

# Custom fraction class to avoid the unexpected keyword argument '_normalize' issue
class CustomFraction(fractions.Fraction):
    def __new__(cls, numerator=0, denominator=None, _normalize=True):
        return super().__new__(cls, numerator, denominator)

# Override the modified_precision function in nltk
import nltk.translate.bleu_score as bleu_score
bleu_score.Fraction = CustomFraction

# Testing phase with BLEU score calculation and tqdm progress bar
captioning_model.eval()
test_loss = 0
bleu_scores = []
smoothing_function = SmoothingFunction().method4

# Lists to collect actual and predicted captions for BLEU score calculation
actual_captions = []
predicted_captions_list = []

# Use tqdm to track the progress of the testing phase
with torch.no_grad():
    for features, captions in tqdm(test_loader, desc="Testing"):
        if features.shape[0] == 0:
            continue
        features = features.to(device)
        captions = captions.to(device)
        # Get the size of the captions
        batch_size, num_captions, seq_len = captions.size()

        # Squeeze and expand features like in training and validation loops
        features = features.squeeze(1).squeeze(1)  # Remove unnecessary singleton dimensions
        features = features.unsqueeze(1).expand(-1, num_captions, -1)  # Expand to match num_captions
        features = features.contiguous().view(batch_size * num_captions, -1)

        captions = captions.view(batch_size * num_captions, seq_len)

        # Forward pass
        outputs = captioning_model(features, captions)
        loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))
        test_loss += loss.item()

        # Decode the outputs and collect captions
        predicted_captions = outputs.argmax(2).cpu().numpy()
        for pred, actual in zip(predicted_captions, captions.cpu().numpy()):
            pred_caption = [ix_to_word[ix] for ix in pred if ix not in {word_to_ix['<PAD>'], word_to_ix['<START>'], word_to_ix['<END>'], word_to_ix['<UNK>']}]
            actual_caption = [ix_to_word[ix] for ix in actual if ix not in {word_to_ix['<PAD>'], word_to_ix['<START>'], word_to_ix['<END>'], word_to_ix['<UNK>']}]
            actual_captions.append(actual_caption)
            predicted_captions_list.append(pred_caption)

# Calculate BLEU scores for all captions
bleu1 = corpus_bleu([[actual] for actual in actual_captions], predicted_captions_list, weights=(1, 0, 0, 0), smoothing_function=smoothing_function)
bleu2 = corpus_bleu([[actual] for actual in actual_captions], predicted_captions_list, weights=(0.5, 0.5, 0, 0), smoothing_function=smoothing_function)
bleu3 = corpus_bleu([[actual] for actual in actual_captions], predicted_captions_list, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smoothing_function)
bleu4 = corpus_bleu([[actual] for actual in actual_captions], predicted_captions_list, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothing_function)

# Calculate average loss
test_loss /= len(test_loader)
print(f'Test Loss: {test_loss}')
print(f'BLEU-1 Score: {bleu1}')
print(f'BLEU-2 Score: {bleu2}')
print(f'BLEU-3 Score: {bleu3}')
print(f'BLEU-4 Score: {bleu4}')


Testing: 100%|██████████| 26/26 [00:01<00:00, 17.14it/s]


Test Loss: 2.620670887140127
BLEU-1 Score: 0.4200664208557216
BLEU-2 Score: 0.2794557133311556
BLEU-3 Score: 0.215315307599643
BLEU-4 Score: 0.16172399759665515


In [87]:
from tqdm import tqdm
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import fractions

# Custom fraction class to avoid the unexpected keyword argument '_normalize' issue
class CustomFraction(fractions.Fraction):
    def __new__(cls, numerator=0, denominator=None, _normalize=True):
        return super().__new__(cls, numerator, denominator)

# Override the modified_precision function in nltk
import nltk.translate.bleu_score as bleu_score
bleu_score.Fraction = CustomFraction

# Testing phase with BLEU score calculation and tqdm progress bar
captioning_model.eval()
test_loss = 0
bleu_scores = []
smoothing_function = SmoothingFunction().method4

# Use tqdm to track the progress of the testing phase
with torch.no_grad():
    for features, captions in tqdm(test_loader, desc="Testing"):
        if features.shape[0] == 0:
            continue
        # Move inputs and targets to the selected device (GPU or CPU)
        features = features.to(device)
        captions = captions.to(device)
                
        # Get the size of the captions
        batch_size, num_captions, seq_len = captions.size()

        # Squeeze and expand features like in training and validation loops
        features = features.squeeze(1).squeeze(1)  # Remove unnecessary singleton dimensions
        features = features.unsqueeze(1).expand(-1, num_captions, -1)  # Expand to match num_captions
        features = features.contiguous().view(batch_size * num_captions, -1)

        captions = captions.view(batch_size * num_captions, seq_len)

        # Forward pass
        outputs = captioning_model(features, captions)
        loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))
        test_loss += loss.item()

        # Decode the outputs and calculate BLEU scores
        predicted_captions = outputs.argmax(2).cpu().numpy()
        for pred, actual in zip(predicted_captions, captions.cpu().numpy()):
            pred_caption = [ix_to_word[ix] for ix in pred if ix not in {word_to_ix['<PAD>'], word_to_ix['<START>'], word_to_ix['<END>'], word_to_ix['<UNK>']}]
            actual_caption = [ix_to_word[ix] for ix in actual if ix not in {word_to_ix['<PAD>'], word_to_ix['<START>'], word_to_ix['<END>'], word_to_ix['<UNK>']}]
            bleu_score_value = sentence_bleu([actual_caption], pred_caption, smoothing_function=smoothing_function)
            bleu_scores.append(bleu_score_value)

# Calculate average loss and BLEU score
test_loss /= len(test_loader)
average_bleu_score = sum(bleu_scores) / len(bleu_scores)
print(f'Test Loss: {test_loss}')
print(f'Average BLEU Score: {average_bleu_score}')


Testing: 100%|██████████| 26/26 [00:02<00:00, 12.88it/s]

Test Loss: 2.620670887140127
Average BLEU Score: 0.19049614634848383



