<a href="https://colab.research.google.com/github/Daviduche03/Applied-Deep-Learning/blob/main/VLM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torchinfo
import torchinfo


In [None]:
import torch
import torchvision.transforms as transforms

class ImagePatch(torch.nn.Module):
    def __init__(self, patch_size):
        super(ImagePatch, self).__init__()
        self.patch_size = patch_size

    def forward(self, x):
        # Unfold the image into patches of size (batch, channels, num_patches_x, num_patches_y, patch_size, patch_size)
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)

        # Permute to bring patches to the second dimension and flatten each patch
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()  # (batch, num_patches_x, num_patches_y, channels, patch_size, patch_size)

        # Reshape to (batch, num_patches, channels * patch_size * patch_size)
        x = x.view(x.size(0), -1, self.patch_size * self.patch_size * x.size(3))  # (batch, num_patches, flattened_patch_size)

        return x

# Example usage for ImagePatch
image_patch = ImagePatch(16)
input_image = torch.randn(1, 3, 224, 224)  # 1 image, 3 channels, 224x224
output = image_patch(input_image)
print("Image Patch Output Shape:", output.shape)  # Expected output shape: (1, 196, 768)


class VIT(torch.nn.Module):
    def __init__(self, in_dim, out_dim, nheads=8, num_layers=6, num_class=10):
        super(VIT, self).__init__()
        self.linear_proj = torch.nn.Linear(in_dim, out_dim)
        self.pos_encoding = torch.nn.Parameter(torch.randn(1, 196, out_dim))
        encoder_layer = torch.nn.TransformerEncoderLayer(d_model=out_dim, nhead=nheads)
        self.transformer_encoder = torch.nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.classifier = torch.nn.Linear(out_dim, num_class)


    def forward(self, x):
        x = self.linear_proj(x) + self.pos_encoding
        x = self.transformer_encoder(x)
        # class_token = x[:, 0, :]
        # x = self.classifier(class_token)

        return x

# Example usage for VIT
mlp = VIT(in_dim=768, out_dim=512)  # in_dim now matches the flattened patch size
mlp_output = mlp(output)
print("MLP Output Shape:", mlp_output.shape)  # Expected output shape: (1, num_class)

class customDataset(torch.utils.data.Dataset):
  def __init__(self, data, labels, transform=None):
    self.data = data
    self.labels = labels
    self.transform = transform

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    img, label =  self.data[idx], self.labels[idx]
    if self.transform:
      img = self.transform(img)
    return img, label


transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])



Image Patch Output Shape: torch.Size([1, 196, 768])
MLP Output Shape: torch.Size([1, 196, 512])


In [None]:
import torch
import torchvision.transforms as transforms
from transformers import AutoTokenizer

class ImagePatch(torch.nn.Module):
    def __init__(self, patch_size):
        super(ImagePatch, self).__init__()
        self.patch_size = patch_size

    def forward(self, x):
        # Unfold the image into patches
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()
        x = x.view(x.size(0), -1, self.patch_size * self.patch_size * x.size(3))
        return x

class VIT(torch.nn.Module):
    def __init__(self, in_dim, out_dim, nheads=8, num_layers=6):
        super(VIT, self).__init__()
        self.linear_proj = torch.nn.Linear(in_dim, out_dim)
        self.pos_encoding = torch.nn.Parameter(torch.randn(1, 196, out_dim))
        encoder_layer = torch.nn.TransformerEncoderLayer(d_model=out_dim, nhead=nheads)
        self.transformer_encoder = torch.nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def forward(self, x):
        x = self.linear_proj(x) + self.pos_encoding
        x = self.transformer_encoder(x)
        return x

class MiniTransformer(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, nheads, num_layers):
        super(MiniTransformer, self).__init__()
        self.token_embedding = torch.nn.Embedding(vocab_size, embed_dim, padding_idx=0)  # Added padding_idx
        self.pos_encoding = torch.nn.Parameter(torch.randn(1, 512, embed_dim))
        encoder_layer = torch.nn.TransformerEncoderLayer(d_model=embed_dim, nhead=nheads)
        self.transformer_encoder = torch.nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def forward(self, text_tokens):
        # Ensure input tokens are within vocab range
        text_tokens = torch.clamp(text_tokens, min=0, max=self.token_embedding.num_embeddings - 1)
        pos_encoding_slice = self.pos_encoding[:, :text_tokens.size(1), :]
        x = self.token_embedding(text_tokens) + pos_encoding_slice
        x = self.transformer_encoder(x)
        return x

class MultiModalTransformer(torch.nn.Module):
    def __init__(self, embed_dim, nheads, num_layers):
        super(MultiModalTransformer, self).__init__()
        encoder = torch.nn.TransformerEncoderLayer(d_model=embed_dim, nhead=nheads)
        self.transformer_encoder = torch.nn.TransformerEncoder(encoder, num_layers=num_layers)

    def forward(self, image_features, text_features):
        # Add sequence dimension if missing
        if len(image_features.shape) == 2:
            image_features = image_features.unsqueeze(0)
        if len(text_features.shape) == 2:
            text_features = text_features.unsqueeze(0)

        print(f"Image2 Features Shape: {image_features.shape}")
        print(f"Text2 Features Shape: {text_features.shape}")

        x = torch.cat((image_features, text_features), dim=1)
        print(f"MultiModalTransformer Input Shape: {x.shape}")
        x = self.transformer_encoder(x)
        return x

class NextTokenPredictor(torch.nn.Module):
    def __init__(self, embed_dim, vocab_size):
        super(NextTokenPredictor, self).__init__()
        self.linear = torch.nn.Linear(embed_dim, vocab_size)

    def forward(self, x):
        return self.linear(x)

class VisionLanguageModel(torch.nn.Module):
    def __init__(self, vocab_size, image_embed_dim, text_embed_dim, nheads, num_layers):
        super(VisionLanguageModel, self).__init__()
        self.image_patch = ImagePatch(16)
        self.image_encoder = VIT(in_dim=image_embed_dim, out_dim=text_embed_dim)  # Match text embedding dim
        self.text_encoder = MiniTransformer(vocab_size, text_embed_dim, nheads, num_layers)
        self.fusion_transformer = MultiModalTransformer(text_embed_dim, nheads, num_layers)
        self.next_token_predictor = NextTokenPredictor(text_embed_dim, vocab_size)

    def forward(self, image, text_tokens):
        image_patches = self.image_patch(image)
        image_features = self.image_encoder(image_patches)
        text_features = self.text_encoder(text_tokens)
        print(f"Image Features Shape: {image_features.shape}")
        print(f"Text Features Shape: {text_features.shape}")
        fusion_features = self.fusion_transformer(image_features, text_features)
        print(f"Fusion Features Shape: {fusion_features.shape}")
        next_token_logits = self.next_token_predictor(fusion_features)
        return next_token_logits

# Example usage
def test_model():
    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf", token="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")
    tokenizer.pad_token = tokenizer.eos_token
    vocab_size = tokenizer.vocab_size

    # Create model
    model = VisionLanguageModel(
        vocab_size=vocab_size,
        image_embed_dim=768,
        text_embed_dim=512,
        nheads=8,
        num_layers=6
    )


    # Create dummy inputs
    image = torch.randn(1, 3, 224, 224)
    text = "This is an example text."
    text_tokens = tokenizer(
        text,
        return_tensors="pt",
        padding="max_length",
        truncation=True,
        max_length=512
    )["input_ids"]

    print(torchinfo.summary(model=model, input_data=(image, text_tokens)))

    # Forward pass
    output = model(image, text_tokens)
    print(f"Output shape: {output.shape}")
    return output

if __name__ == "__main__":
    test_model()


Image Features Shape: torch.Size([1, 196, 512])
Text Features Shape: torch.Size([1, 512, 512])
Image2 Features Shape: torch.Size([1, 196, 512])
Text2 Features Shape: torch.Size([1, 512, 512])
MultiModalTransformer Input Shape: torch.Size([1, 708, 512])
Fusion Features Shape: torch.Size([1, 708, 512])
Layer (type:depth-idx)                                  Output Shape              Param #
VisionLanguageModel                                     [1, 708, 32000]           --
├─ImagePatch: 1-1                                       [1, 196, 768]             --
├─VIT: 1-2                                              [1, 196, 512]             100,352
│    └─Linear: 2-1                                      [1, 196, 512]             393,728
│    └─TransformerEncoder: 2-2                          [1, 196, 512]             --
│    │    └─ModuleList: 3-1                             --                        18,914,304
├─MiniTransformer: 1-3                                  [1, 512, 512]           

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as transforms
from transformers import AutoTokenizer
import json
import os
from typing import List, Dict, Tuple

class VisionLanguageDataset(Dataset):
    def __init__(self,
                 data_dir: str,
                 annotations_file: str,
                 tokenizer,
                 max_length: int = 512,
                 transform=None):
        """
        Args:
            data_dir (str): Directory with all the images
            annotations_file (str): JSON file with format:
                [
                    {
                        "image_file": "image1.jpg",
                        "caption": "a photo of a dog running",
                        "next_tokens": "in the park"
                    },
                    ...
                ]
            tokenizer: HuggingFace tokenizer
            max_length (int): Maximum length of text sequence
            transform: Optional transform to be applied on images
        """
        self.data_dir = data_dir
        self.annotations = json.load(open(annotations_file))
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.transform = transform or transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                              std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
      try:
          # Load annotation
          ann = self.annotations[idx]

          # Load and transform image
          image_path = os.path.join(self.data_dir, ann['image_file'])
          image = Image.open(image_path).convert('RGB')
          image = self.transform(image)

          # Tokenize input caption
          input_tokens = self.tokenizer(
              ann['caption'],
              padding='max_length',
              truncation=True,
              max_length=self.max_length,
              return_tensors='pt'
          )

          # Tokenize target (next) tokens
          target_tokens = self.tokenizer(
              ann['next_tokens'],
              padding='max_length',
              truncation=True,
              max_length=self.max_length,
              return_tensors='pt'
          )

          return {
              'image': image,
              'input_ids': input_tokens['input_ids'].squeeze(0),
              'attention_mask': input_tokens['attention_mask'].squeeze(0),
              'labels': target_tokens['input_ids'].squeeze(0)
          }
      except IndexError as e:
          print(f"IndexError: {e}. Index {idx} is out of bounds.")
          raise
      except KeyError as e:
          print(f"KeyError: {e}. Annotation entry is missing the expected key.")
          raise


def train_one_epoch(model: torch.nn.Module,
                    dataloader: DataLoader,
                    optimizer: torch.optim.Optimizer,
                    criterion: torch.nn.Module,
                    device: str) -> float:
    """
    Train the model for one epoch
    """
    model.train()
    total_loss = 0

    for batch in dataloader:
        # Move data to device
        images = batch['image'].to(device)
        input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].to(device)

        # Forward pass
        outputs = model(images, input_ids)

        # Calculate loss
        # Reshape outputs and labels for cross entropy
        print(outputs.shape, labels.shape)
        # Use only the text portion of outputs for loss calculation
        outputs_text_only = outputs[:, -512:, :]

        B, S, V = outputs.shape  # Batch, Sequence, Vocab
        loss = criterion(outputs_text_only.view(-1, V), labels.view(-1))


        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(dataloader)

def main():
    # Example dataset structure
    example_annotation = {
        "image_file": "dog_running.jpg",
        "caption": "a photo of a dog running",
        "next_tokens": "in the park"
    }

    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf", token="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")
    tokenizer.pad_token = tokenizer.eos_token

    # Create dataset
    dataset = VisionLanguageDataset(
        data_dir="images",
        annotations_file="annotations.json",
        tokenizer=tokenizer
    )

    # Create dataloader
    dataloader = DataLoader(
        dataset,
        batch_size=32,
        shuffle=True,
        num_workers=4
    )

    # Initialize model (assuming VisionLanguageModel from previous code)
    model = VisionLanguageModel(
        vocab_size=tokenizer.vocab_size,
        image_embed_dim=768,
        text_embed_dim=512,
        nheads=8,
        num_layers=6
    )

    # Training setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    criterion = torch.nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

    # Training loop
    num_epochs = 10
    for epoch in range(num_epochs):
        avg_loss = train_one_epoch(
            model=model,
            dataloader=dataloader,
            optimizer=optimizer,
            criterion=criterion,
            device=device
        )
        print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")

if __name__ == "__main__":
    main()

Image Features Shape: torch.Size([1, 196, 512])
Text Features Shape: torch.Size([1, 512, 512])
Image2 Features Shape: torch.Size([1, 196, 512])
Text2 Features Shape: torch.Size([1, 512, 512])
MultiModalTransformer Input Shape: torch.Size([1, 708, 512])
Fusion Features Shape: torch.Size([1, 708, 512])
torch.Size([1, 708, 32000]) torch.Size([1, 512])
Epoch 1/10, Average Loss: 10.6859
Image Features Shape: torch.Size([1, 196, 512])
Text Features Shape: torch.Size([1, 512, 512])
Image2 Features Shape: torch.Size([1, 196, 512])
Text2 Features Shape: torch.Size([1, 512, 512])
MultiModalTransformer Input Shape: torch.Size([1, 708, 512])
Fusion Features Shape: torch.Size([1, 708, 512])
torch.Size([1, 708, 32000]) torch.Size([1, 512])
Epoch 2/10, Average Loss: 7.5742
Image Features Shape: torch.Size([1, 196, 512])
Text Features Shape: torch.Size([1, 512, 512])
Image2 Features Shape: torch.Size([1, 196, 512])
Text2 Features Shape: torch.Size([1, 512, 512])
MultiModalTransformer Input Shape: torc

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.transforms as transforms  # Make sure this is torchvision.transforms
from transformers import AutoTokenizer
import json
import os
from typing import List, Dict, Tuple


def inference(model, tokenizer, image_path, text, transforms, max_length=512):
    tokenized_text = tokenizer(
        text,
        padding='max_length',
        truncation=True,
        max_length=max_length,
        return_tensors='pt'
    )
    image = Image.open(image_path).convert('RGB')
    image = transforms(image)
    image = image.unsqueeze(0)

    # Ensure input tokens are within vocab range before feeding to model
    tokenized_text['input_ids'] = torch.clamp(tokenized_text['input_ids'],
                                            min=0,
                                            max=model.text_encoder.token_embedding.num_embeddings - 1)

    with torch.no_grad():
        output = model(image, tokenized_text['input_ids'])

    # Get predicted token IDs
    # 1. Apply softmax
    probabilities = torch.softmax(output[0, -1, :], dim=0)  # Get probabilities for the last token
    # 2. Find the token with the highest probability
    predicted_token_id = torch.argmax(probabilities).item()
    # 3. Decode the token
    predicted_token = tokenizer.decode(predicted_token_id)

    return predicted_token

# Define the transform outside the inference function
transform = transforms.Compose([  # Use transforms.Compose directly
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

result = inference(model, tokenizer, "images/dog.jpg", 'dog', transform)  # Pass the transform object
print(result)

-man
