In [79]:
import torch
import torch.optim as optim
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
from PIL import Image
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [80]:
import string
import numpy as np
import PIL.Image

from os import listdir
from pickle import dump, load

from numpy import array
from numpy import argmax

In [81]:
image_dir = '/kaggle/input/flicker8k-dataset/Flickr8k_Dataset/Flicker8k_Dataset'
text_dir = '/kaggle/input/flicker8k-dataset/Flickr8k_text/Flickr8k.token.txt'

In [82]:
def load_description(text):
    mapping = dict()
    for line in text.split("\n"): 
        token = line.split("\t")
        if len(line) < 2:   # remove short descriptions 
            continue
        img_id = token[0].split('.')[0] # name of the image 
        img_des = token[1]              # description of the image 
        if img_id not in mapping: 
            mapping[img_id] = list() 
        mapping[img_id].append(img_des) 
    return mapping 

In [83]:
token_path = '/kaggle/input/flicker8k-dataset/Flickr8k_text/Flickr8k.token.txt'
text = open(token_path, 'r', encoding = 'utf-8').read() 
descriptions = load_description(text) 
print(descriptions['1000268201_693b08cb0e'])

['A child in a pink dress is climbing up a set of stairs in an entry way .', 'A girl going into a wooden building .', 'A little girl climbing into a wooden playhouse .', 'A little girl climbing the stairs to her playhouse .', 'A little girl in a pink dress going into a wooden cabin .']


In [84]:
def clean_description(desc):
    for key, des_list in desc.items(): 
        for i in range(len(des_list)): 
            caption = des_list[i] 
            caption = [ch for ch in caption if ch not in string.punctuation]  # remove punctuations
            caption = ''.join(caption) #back to string
            caption = caption.split(' ') #remove spaces
            caption = [word.lower() for word in caption if len(word)>1 and word.isalpha()]  #convert to lowercase
            caption = ' '.join(caption)
            des_list[i] = caption 
            

In [85]:
clean_description(descriptions) 
descriptions['1000268201_693b08cb0e']

['child in pink dress is climbing up set of stairs in an entry way',
 'girl going into wooden building',
 'little girl climbing into wooden playhouse',
 'little girl climbing the stairs to her playhouse',
 'little girl in pink dress going into wooden cabin']

In [86]:
def to_vocab(desc): 
    words = set() 
    for key in desc.keys(): 
        for line in desc[key]: 
            words.update(line.split()) 
    return words 

In [87]:
vocab = to_vocab(descriptions)
print(len(vocab))

8763


In [88]:
#time to split train and test
import glob 
images = '/kaggle/input/flicker8k-dataset/Flickr8k_Dataset/Flicker8k_Dataset/'
# Create a list of all image names in the directory 
img = glob.glob(images + '*.jpg')

train_path = '/kaggle/input/flicker8k-dataset/Flickr8k_text/Flickr_8k.trainImages.txt'
train_images = open(train_path, 'r', encoding = 'utf-8').read().split("\n") 
train_img = []  # list of all images in training set 
for im in img: 
    if(im[len(images):] in train_images): 
        train_img.append(im) 
        


In [89]:
# load descriptions of training set in a dictionary. Name of the image will act as ey 
def load_clean_descriptions(des, dataset): 
    dataset_des = dict() 
    for key, des_list in des.items(): 
        if key+'.jpg' in dataset: 
            if key not in dataset_des: 
                dataset_des[key] = list() 
            for line in des_list: 
                desc = 'startseq ' + line + ' endseq'
                dataset_des[key].append(desc) 
    return dataset_des 

In [90]:
train_descriptions = load_clean_descriptions(descriptions, train_images) 
print(train_descriptions['1000268201_693b08cb0e'])

['startseq child in pink dress is climbing up set of stairs in an entry way endseq', 'startseq girl going into wooden building endseq', 'startseq little girl climbing into wooden playhouse endseq', 'startseq little girl climbing the stairs to her playhouse endseq', 'startseq little girl in pink dress going into wooden cabin endseq']


In [91]:
# list of all training captions 
all_train_captions = [] 
for key, val in train_descriptions.items(): 
    for caption in val: 
        all_train_captions.append(caption) 
        
# consider only words which occur atleast 10 times 
vocabulary = vocab 
threshold = 10 # you can change this value according to your need 
word_counts = {} 
for cap in all_train_captions: 
    for word in cap.split(' '): 
        word_counts[word] = word_counts.get(word, 0) + 1
  
vocab = [word for word in word_counts if word_counts[word] >= threshold] 
vocab_size = len(vocab) + 1

In [92]:
print(vocab_size)

1652


In [93]:
# word mapping to integers 
ixtoword = {} 
wordtoix = {} 
  
ix = 1
for word in vocab: 
    wordtoix[word] = ix 
    ixtoword[ix] = word 
    ix += 1
      
# find the maximum length of a description in a dataset 
max_length = max(len(des.split()) for des in all_train_captions) 
max_length

34

In [94]:
#extract image features
def load_image(image_path, max_size = 224):
    image = Image.open(image_path).convert('RGB')
    
    size = max_size if max(image.size) > max_size else max(image.size)
    
    transformations = transforms.Compose([
        transforms.Resize(size),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406),
                             (0.229, 0.224, 0.225)) 
    ])
    
    image = transformations(image)[:3, :, :].unsqueeze(0)
    return image.to(device)

def extract_features(directory = image_dir):
    model =  models.vgg19(pretrained=True).to(device)
    model.classifier = nn.Sequential(*list(model.classifier.children())[:-1])
    model.eval()
    features = dict()
    with torch.no_grad():
        for name in listdir(directory):
            filename = directory + '/' + name
            image = load_image(filename, 224)
            image.to(device)
            feature = model(image)
            image_id = name.split('.')[0]

            features[image_id] = feature
            torch.cuda.empty_cache()
    return features

In [95]:
features = extract_features()

In [96]:
def get_train_features():
    train = {}
    for key, feat in features.items():
        key = key + ".jpg"
        if key in train_images:
            train[key] = feat
    return train

In [97]:
train_features = get_train_features()

In [98]:
def tto_categorical(y, num_classes):
    """ 1-hot encodes a tensor """
    return np.eye(num_classes, dtype='uint8')[y]

In [99]:
import torch

def pad_sequences(sequences, maxlen=None, padding_value=0):
    # Check the maximum length if not provided
    if maxlen is None:
        maxlen = max(len(seq) for seq in sequences)

    # Create a tensor filled with the padding value
    padded_sequences = torch.full((len(sequences), maxlen), padding_value)

    # Pad each sequence
    for i, seq in enumerate(sequences):
        length = len(seq)
        # Copy the sequence into the padded tensor
        padded_sequences[i, :length] = torch.tensor(seq)

    return padded_sequences

In [100]:
X1, X2, y = list(), list(), list() 
for key, des_list in train_descriptions.items(): 
    pic = train_features[key + '.jpg'] 
    if pic.is_cuda:  # Check if it's a GPU tensor
        pic = pic.cpu()
        
    for cap in des_list: 
        seq = [wordtoix[word] for word in cap.split(' ') if word in wordtoix] 
        for i in range(1, len(seq)): 
            in_seq, out_seq = seq[:i], seq[i] 
            
            in_seq = pad_sequences([in_seq], maxlen = max_length)[0] 
            out_seq = tto_categorical([out_seq], num_classes = vocab_size)[0] 
            # store 
            X1.append(pic) 
            X2.append(in_seq) 
            y.append(out_seq) 
  
X2 = np.array(X2) 
X1 = np.array([pic.cpu().numpy() if torch.is_tensor(pic) else pic for pic in X1])  # Ensure all elements are NumPy arrays 
y = np.array(y) 

In [101]:
import torch
from torch.utils.data import TensorDataset, DataLoader

# Convert NumPy arrays to PyTorch tensors
X1 = torch.from_numpy(X1).float()  # Ensure floating-point tensor for image features
X2 = torch.from_numpy(X2).float()   # Integer type for captions (word indices)
y = torch.from_numpy(y).float()     # Integer type for targets (next word index)


In [102]:
X1 = X1.view(-1, 4096)  
print(X1.shape)  # Should be (num_samples, 2048)
print(X2.shape)  # Should be (num_samples, max_length)
print(y.shape)

torch.Size([292328, 4096])
torch.Size([292328, 34])
torch.Size([292328, 1652])


In [103]:
# load glove vectors for embedding layer 
embeddings_index = {} 
golve_path ='/kaggle/input/glove-img/glove.6B.200d.txt'
glove = open(golve_path, 'r', encoding = 'utf-8').read() 
for line in glove.split("\n"): 
    values = line.split(" ") 
    word = values[0] 
    indices = np.asarray(values[1: ], dtype = 'float32') 
    embeddings_index[word] = indices 
  
emb_dim = 200
emb_matrix = np.zeros((vocab_size, emb_dim)) 
for word, i in wordtoix.items(): 
    emb_vec = embeddings_index.get(word) 
    if emb_vec is not None: 
        emb_matrix[i] = emb_vec 
emb_matrix.shape

(1652, 200)

In [104]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

class ImageCaptioningModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, max_length, emb_matrix):
        super(ImageCaptioningModel, self).__init__()
        
        # Feature vector (Image) pathway
        self.dropout1 = nn.Dropout(0.2)
        self.fc1 = nn.Linear(4096, 256)  # Dense(256)

        # Sequence (Caption) pathway
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=0)  # Embedding layer
        self.embedding.weight = nn.Parameter(torch.tensor(emb_matrix, dtype=torch.float32))  # Set embedding weights
        self.embedding.weight.requires_grad = False  # Freeze embedding layer
        
        self.dropout2 = nn.Dropout(0.2)
        self.lstm = nn.LSTM(emb_dim, 256, batch_first=True)  # LSTM layer
        
        # Decoder layers
        self.fc2 = nn.Linear(256, 256)  # Decoder Dense(256)
        self.fc3 = nn.Linear(256, vocab_size)  # Output layer Dense(vocab_size)

    def forward(self, img_feat, caption):
        caption = caption.long()
        # Image feature pathway
        x1 = self.dropout1(img_feat)
        x1 = torch.relu(self.fc1(x1))  # Shape: (batch_size, 256)

        # Caption pathway
        x2 = self.embedding(caption)  # Shape: (batch_size, max_length, emb_dim)
        x2 = self.dropout2(x2)
        x2, _ = self.lstm(x2)  # Shape: (batch_size, max_length, 256)
        x2 = x2[:, -1, :]  # Get the last time step (batch_size, 256)

        # Combine features
        combined = x1 + x2  # Add the image and caption vectors

        # Decoder layers
        x = torch.relu(self.fc2(combined))  # Dense(256)
        output = torch.softmax(self.fc3(x), dim=1)  # Dense(vocab_size) with softmax
        
        return output


In [105]:
# Loss and optimizer
model = ImageCaptioningModel(vocab_size, emb_dim, max_length, emb_matrix)
criterion = nn.CrossEntropyLoss()  # Cross-entropy loss (for classification)
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adam optimizer

# Dataset and DataLoader for batching
dataset = TensorDataset(X1, X2, y)
data_loader = DataLoader(dataset, batch_size=256, shuffle=True)

In [106]:
def to_one_hot(y, num_classes):
    predicted_classes = torch.argmax(y, dim=1)  # Shape will be (256,)
    num_classes = y.size(1)  # Get the number of classes (1652 in this case)
    one_hot = torch.zeros_like(y)  # Create a tensor of zeros with the same shape as logits
    one_hot[torch.arange(y.size(0)), predicted_classes] = 1  # Set the appropriate indices to 1

    return one_hot

In [None]:
import torch
import torch.nn.functional as F
from tqdm import tqdm
# Training loop
epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss = 0
    
    for img_feat, caption, target in tqdm(data_loader):
        optimizer.zero_grad()  # Clear gradients
        #print(img_feat)
        output = model(img_feat, caption)  # Forward pass
        target = target.float()  # If your target is one-hot encoded
        
        # Calculate loss using Categorical Cross Entropy
        loss = F.binary_cross_entropy_with_logits(output, target)
        
        loss.backward()  # Backpropagation
        optimizer.step()  # Update weights
        
        total_loss += loss.item()

    print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss/len(data_loader):.4f}')

100%|██████████| 1142/1142 [03:51<00:00,  4.93it/s]


Epoch 1/10, Loss: 0.6934


100%|██████████| 1142/1142 [03:55<00:00,  4.84it/s]


Epoch 2/10, Loss: 0.6934


 82%|████████▏ | 940/1142 [03:12<00:39,  5.08it/s]

In [None]:
def greedy_search(pic): 
    start = 'startseq'
    pic = extract_features(pic).items()
    
    # Extract features and assume the first value is the tensor
    for key, val in pic:
        pic = val
    
    # Move the pic tensor to the same device as the model
    pic = pic.to(next(model.parameters()).device)  # This gets the device of the model parameters
    
    for i in range(max_length): 
        seq = [wordtoix[word] for word in start.split() if word in wordtoix] 
        seq = pad_sequences([seq], maxlen=max_length) 
        
        # Convert seq to tensor and move it to the same device as the model
        seq = torch.tensor(seq, device=next(model.parameters()).device)  # Ensure seq is on the same device
        
        yhat = model(pic, seq)

        # Detach yhat from the computation graph and convert to NumPy
        yhat = yhat.detach().cpu().numpy()  # Ensure to detach and move to CPU for numpy operation
        
        print("Model Output:", yhat)  # Debugging line

        yhat = np.argmax(yhat)  # Now you can safely use np.argmax
        print("Predicted Index:", yhat)  # Debugging line
        
        word = ixtoword[yhat] 
        print("Predicted Word:", word)  # Debugging line

        start += ' ' + word 
        
        if word == 'endseq': 
            print("Early termination with word:", word)  # Debugging line
            break
    
    final = start.split() 
    print("Generated Sequence:", final)  # Debugging line
    final = final[1:-1] 
    final = ' '.join(final) 
    return final

# Usage
output = greedy_search('/kaggle/input/sample-image')
print("Length of Output:", len(output))


In [None]:
output = greedy_search('/kaggle/input/randomimg/')
print(len(output))

In [None]:
print(output)