Overall Steps

In [None]:
# Get hugging face dataset - Done
# Resize all images to 448 by 448 - Done
# For each image, split into grid of 16 x 16 pixels - sequences of each patch representing the image - Done
# Convert each patch to 1 dimensional vector - this will be the pre embedding of the patch - Done
# Pass pre embedding to embedding layer to get the embedding of the patch 
# This will input to the encoder 

# Tokenise each caption in the dataset - Done
# Create two copies of each tokenised caption, 
#  - one with start of sentence token at the start - input to decoder
#  - one with end of sentence token at the end - label for the loss function 

In [19]:
from PIL import Image
import numpy as np
import torch

from tqdm import tqdm
import pandas as pd
import sentencepiece as spm
import ast  # For converting string representation of list to list
import torch.nn as nn
import math



Split up image into grid of 16 x 16 and convert to list of 1D arrays

In [28]:

# Path to your image
image_path = 'flickr30k/flickr30k-images/testImage.jpg'


def convertImageTo1DPatchesVectorList(image_path):
    
    # Load the image
    image = Image.open(image_path)

    # Resize the image to 448 x 448 pixels
    resized_image = image.resize((448, 448))

    # Convert the image to a numpy array for easier manipulation
    image_np = np.array(resized_image)

    # Initialize a list to hold the 1D vectors for each 16x16 patch
    patch_vectors = []

    # Iterate over the image in 16x16 blocks
    for y in range(0, image_np.shape[0], 16):
        for x in range(0, image_np.shape[1], 16):
            # Extract the patch
            patch = image_np[y:y+16, x:x+16]

            # Flatten the patch to a 1D vector (16*16*number_of_channels)
            # This line assumes the image is in color (RGB), so the patch size becomes 16*16*3
            patch_vector = patch.flatten()
            
            # Add the patch vector to our list
            patch_vectors.append(patch_vector)

        # Convert the list of patch vectors to a single numpy array
    patch_vectors_np = np.array(patch_vectors)

     # Convert the numpy array of patch vectors to a PyTorch tensor
    patch_vectors_tensor = torch.tensor(patch_vectors_np, dtype=torch.float32)
    
    return patch_vectors_tensor

patch_vectors_tensor = convertImageTo1DPatchesVectorList(image_path)

print(f"Total patches extracted: {len(patch_vectors_tensor)}")
# Verify the shape
print("Shape of patch vectors tensor:", patch_vectors_tensor.shape)



Total patches extracted: 784
Shape of patch vectors tensor: torch.Size([784, 768])


Sentencepiece sub word encoding

In [7]:
# Tokenise the captions 
# Create the copies (one for the decoder, the second for the loss function label)


def encode_sentences(csv_path, sp_model_path):
    # Load the SentencePiece model
    sp = spm.SentencePieceProcessor(model_file=sp_model_path)
    
    # Read the CSV file
    df = pd.read_csv(csv_path)
    
    # Prepare a dictionary to hold encoded sentences
    encoded_sentences = {'img_id': [], 'encoded_sentences': []}
    
    for _, row in df.iterrows():
        # Convert string representation of list to actual list
        sentences = ast.literal_eval(row['raw'])
        
        # Encode each sentence in the list
        encoded = [sp.encode(sentence, out_type=int) for sentence in sentences]
        
        # Append results
        encoded_sentences['img_id'].append(row['img_id'])
        encoded_sentences['encoded_sentences'].append(encoded)
    
    # Convert dictionary to DataFrame for easy handling/viewing
    encoded_df = pd.DataFrame(encoded_sentences)
    return encoded_df

# Usage example
csv_path = 'flickr_annotations_30k.csv'
sp_model_path = 'spm.model'
encoded_captions = encode_sentences(csv_path, sp_model_path) #Dimensions - batch size x num_images x num_captions x max caption length (after padding)

31014


In [38]:
encoded_captions.encoded_sentences[0] #encoding for 5 captions 

[[19, 29, 373, 14, 2280, 124, 195, 20, 74, 177, 30, 360, 83, 7, 8, 538, 5],
 [19, 29, 15, 1113, 792, 17, 62, 95, 367, 1554, 5],
 [19, 40, 7, 55, 281, 17, 37, 7, 4, 538, 5],
 [6, 12, 7, 4, 32, 25, 37, 7, 4, 805, 5],
 [19, 555, 573, 653, 3793, 1927, 157, 5]]

Positional Encoding

In [43]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_length):
        super().__init__()
        self.pos_encoding = self.create_pos_encoding(d_model, max_length)

    def create_pos_encoding(self, d_model, max_length):
        position = torch.arange(max_length).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_length, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        return nn.Parameter(pe, requires_grad=False)
    
    def forward(self, embeddings):
        # Use embeddings to determine sequence length and to get device information
        seq_len = embeddings.size(1)
        pos_encoding = self.pos_encoding[:, :seq_len, :].to(embeddings.device)
        print(pos_encoding.shape)
        print(embeddings.shape)
        return embeddings + pos_encoding

Image Encoder


In [34]:
class ModifiedImageEncoder(nn.Module):
    def __init__(self, num_patches, embed_dim=768, num_layers=6, num_heads=12, d_ff=2048, dropout=0.1):
        super().__init__()
        self.num_patches = num_patches
        self.embed_dim = embed_dim
        self.positional_encoding = PositionalEncoding(embed_dim, num_patches)
        
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=d_ff, dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def forward(self, flat_patches):
        # Assume flat_patches is of shape [B, num_patches, embed_dim]
        # where each patch is already flattened into a vector of embed_dim
        x = flat_patches
        
        # You might need to adjust positional encodings if the sequence length
        # could vary or if it's dynamically determined
        x = self.positional_encoding(x)
        x = self.transformer_encoder(x)  # Pass through transformer encoder
        return x


Setting up device

In [29]:
# Check if CUDA (NVIDIA GPU) is available, then use it; otherwise, fall back to CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# For PyTorch 1.12 or newer, to include support for Apple Silicon (MPS)
if torch.backends.mps.is_available():
    device = torch.device('mps')  # Use Apple's Metal Performance Shaders (MPS)
elif torch.cuda.is_available():
    device = torch.device('cuda')  # Use NVIDIA GPU with CUDA
else:
    device = torch.device('cpu')  # Use CPU

Testing Image Encoder

In [36]:
# Parameters
num_patches = (448 // 16) * (448 // 16)  # Assuming the image is resized to 448x448 and patches are 16x16
embed_dim = 768  # Assuming each patch is projected to an embedding dimension of 768

# Initialize the encoder
image_encoder = ModifiedImageEncoder(
    num_patches=num_patches,
    embed_dim=embed_dim,
    num_layers=6,
    num_heads=12,
    d_ff=2048,
    dropout=0.1
)

# Assuming the device setup from the previous discussion
image_encoder = image_encoder.to(device)
patch_vectors_tensor = patch_vectors_tensor.to(device)

# Encode the preprocessed patches
with torch.no_grad():
    encoded_patches = image_encoder(patch_vectors_tensor.unsqueeze(0))  # Add a batch dimension

print("Encoded patches shape:", encoded_patches.shape)


Encoded patches shape: torch.Size([1, 784, 768])


Decoder

In [45]:
class ImageCaptioningDecoder(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, d_ff, max_length, dropout=0.1):
        super(ImageCaptioningDecoder, self).__init__()
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.positional_encoding = PositionalEncoding(embed_dim, max_length)
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        decoder_layer = nn.TransformerDecoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=d_ff, dropout=dropout, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(embed_dim, vocab_size)

    def forward(self, input_tokens, encoder_features, tgt_mask=None):
        # tgt_mask is used to prevent the decoder from peeking at future tokens
          # Convert token IDs to embeddings
        input_embeddings = self.embedding(input_tokens)  # This should fix the dimension issue
        
        # Add positional encoding
        input_embeddings = input_embeddings + self.positional_encoding(input_embeddings)
    
        # tgt_embeddings = self.embedding(input_tokens) + self.positional_encoding(input_tokens)
        tgt_embeddings = input_embeddings #Renaming as part of debugging - nothing important happening here 
        
        decoder_output = self.transformer_decoder(tgt=tgt_embeddings, memory=encoder_features, tgt_mask=tgt_mask)
        output = self.fc_out(decoder_output)
        return output
    
    def subsequent_mask(size):
        "Mask out subsequent positions."
        attn_shape = (1, size, size)
        subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(torch.bool)
        return subsequent_mask == 0

Testing the Decoder

In [44]:
# Mock data for demonstration purposes
batch_size = 4  # Number of samples in your batch
seq_length = 10  # Length of tokenized captions
encoded_dim = 768  # Dimension of encoder output features
num_patches = 784  # Assuming this from the encoder
vocab_size = 32000  # Vocabulary size from the SentencePiece model

# Mock tensor representing encoded image features from the encoder
# Shape: [batch_size, num_patches, encoded_dim]
encoded_image_features = torch.rand(batch_size, num_patches, encoded_dim)

# Mock tensor representing tokenized captions
# Shape: [batch_size, seq_length]
tokenized_captions = torch.randint(0, vocab_size, (batch_size, seq_length))

decoder = ImageCaptioningDecoder(
    vocab_size=vocab_size,
    embed_dim=encoded_dim,  # Matching the dimension of the encoder's output
    num_heads=8,
    num_layers=6,
    d_ff=2048,
    max_length=seq_length,
    dropout=0.1
)

def generate_subsequent_mask(seq_length):
    mask = torch.triu(torch.ones((seq_length, seq_length)) * float('-inf'), diagonal=1)
    return mask

tgt_mask = generate_subsequent_mask(seq_length)

# Ensure everything is on the same device, for example, CPU for simplicity
decoder.to("cpu")
encoded_image_features = encoded_image_features.to("cpu")
tokenized_captions = tokenized_captions.to("cpu")
tgt_mask = tgt_mask.to("cpu")

# Run the decoder
# Note: Omit tgt_mask if testing for inference
output = decoder(tokenized_captions, encoded_image_features, tgt_mask=tgt_mask)

print("Output shape:", output.shape)  # Expected shape: [batch_size, seq_length, vocab_size]


torch.Size([1, 10, 768])
torch.Size([4, 10])


RuntimeError: The size of tensor a (10) must match the size of tensor b (768) at non-singleton dimension 2