<a href="https://colab.research.google.com/github/Tahnees/assignment2/blob/main/AI_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

----------------COMPARISION----------------------------

**1.**
Implemented Code:
- Focused on **image captioning** using a VisionEncoderDecoder model (`nlpconnect/vit-gpt2-image-captioning`).
- Does not include **segmentation or multi-target reasoning** tasks.
**Paper:**
- Designed for **pixel-level reasoning** tasks, including segmentation and mask generation.
- Utilizes a **novel lightweight pixel decoder** and segmentation codebook for high-quality mask production.

**2.**
PixelLM employs a more advanced architecture tailored for **segmentation** and **pixel-level reasoning**, while implemented model is optimized for **sequence-to-sequence caption generation**.

**3.**
PixelLM demonstrates **state-of-the-art segmentation performance**, while implemented code provides metrics for text-based evaluations (BLEU scores).

**4.**
**Implemented Code:**
- Implements a standard image-captioning pipeline with enhancements like caption preprocessing and data augmentation.
**Paper:**
- Introduces innovative features such as:
  - **Token fusion** for multi-target reasoning.
  - **Target refinement loss** for overlapping masks.
  - MUSE dataset tailored for segmentation tasks.

**5.**:
PixelLM introduces innovations to handle segmentation tasks effectively, whereas implemented code leverages existing methods for caption generation.

**6.**:
The paper uses COCO-Stuff as part of the training data but does not report accuracy specifically for this dataset. Instead, it focuses on benchmarks better aligned with the model's pixel-level reasoning objectives, like MUSE and refCOCO.
Best Accuracy on MUSE Test Set (13B Model):
gIoU: 45.2
cIoU: 62.9.
Best Accuracy on refCOCOg Validation Set (13B Model):
gIoU: 70.5
cIoU: 73.0

Implemented code accuracy= 80.06

In [None]:
from logging import lastResort
import os
import json
import numpy as np
from zipfile import ZipFile
from PIL import Image
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms
import torch
from sklearn.model_selection import train_test_split
from transformers import VisionEncoderDecoderModel, AutoTokenizer, AutoModelForVision2Seq
from torch.nn.utils.rnn import pad_sequence
from torch.optim import AdamW
import matplotlib.pyplot as plt
import re
from google.colab import drive
import pandas as pd

# =======================================
#  Mount Google Drive
# =======================================
drive.mount('/content/drive', force_remount=True)

# =======================================
#  Set Paths and Extract Dataset
# =======================================
drive_dataset_path = '/content/drive/My Drive/Images.zip'
dataset_folder = './dataset'

if not os.path.exists(dataset_folder):
    with ZipFile(drive_dataset_path, 'r') as zip_ref:
        zip_ref.extractall(dataset_folder)

print("Extracted dataset contents:", os.listdir(dataset_folder))

nested_dirs = os.listdir(dataset_folder)
if len(nested_dirs) == 1 and os.path.isdir(os.path.join(dataset_folder, nested_dirs[0])):
    dataset_folder = os.path.join(dataset_folder, nested_dirs[0])

print("Updated dataset folder path:", dataset_folder)
print("Contents of dataset folder:", os.listdir(dataset_folder))

# =======================================
#  Verify and Load Captions and Images
# =======================================
captions_file = os.path.join(dataset_folder, 'captions.json')
image_dir = os.path.join(dataset_folder, 'Images')

if not os.path.exists(captions_file):
    raise FileNotFoundError(f"Captions file not found at {captions_file}. Check the dataset structure.")
if not os.path.exists(image_dir):
    raise FileNotFoundError(f"Image directory not found at {image_dir}. Check the dataset structure.")

with open(captions_file, 'r') as f:
    captions_data = json.load(f)
print("Number of captions loaded:", len(captions_data))

# =======================================
#  EDA: Integrate Dataset Insights
# =======================================
caption_lengths = [len(caption["caption"].split()) for caption in captions_data]
print(f"Caption Length Stats: Min: {min(caption_lengths)}, Max: {max(caption_lengths)}, Mean: {np.mean(caption_lengths):.2f}")

plt.hist(caption_lengths, bins=20, color='skyblue')
plt.title("Caption Length Distribution")
plt.xlabel("Length")
plt.ylabel("Frequency")
plt.show()

image_sizes = [Image.open(os.path.join(image_dir, f"COCO_train2014_{str(c['image_id']).zfill(12)}.jpg")).size for c in captions_data[:50]]
widths, heights = zip(*image_sizes)
print("Image Widths:", pd.Series(widths).describe())
print("Image Heights:", pd.Series(heights).describe())

plt.scatter(widths, heights, alpha=0.5)
plt.title("Image Resolution Distribution")
plt.xlabel("Width")
plt.ylabel("Height")
plt.show()
# =======================================
#  Preprocess Captions
# =======================================
def preprocess_caption(caption):
    caption = re.sub(r"[^a-zA-Z0-9\s]", "", caption)
    caption = caption.lower()
    return caption

for caption in captions_data:
    caption['caption'] = preprocess_caption(caption['caption'])

# =======================================
#  Define Custom Dataset Class
# =======================================
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

class CustomImageCaptionDataset(Dataset):
    def __init__(self, image_dir, captions_data, transform=None, tokenizer=None, max_length=50):
        self.image_dir = image_dir
        self.captions_data = captions_data
        self.transform = transform
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.captions_data)

    def __getitem__(self, idx):
        caption_data = self.captions_data[idx]
        image_id = caption_data['image_id']
        caption = caption_data['caption']

        image_filename = f"COCO_train2014_{str(image_id).zfill(12)}.jpg"
        image_path = os.path.join(self.image_dir, image_filename)

        if not os.path.exists(image_path):
            raise FileNotFoundError(f"Image file not found: {image_path}")

        image = Image.open(image_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        tokenized_caption = self.tokenizer.encode_plus(
            caption,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors="pt"
        )

        return image, tokenized_caption["input_ids"].squeeze(0), tokenized_caption["attention_mask"].squeeze(0)
test_caption = "Sample caption for debugging tokenizer."
encoded = tokenizer.encode(test_caption)
decoded = tokenizer.decode(encoded)
print(f"Original: {test_caption}")
print(f"Encoded: {encoded}")
print(f"Decoded: {decoded}")

# =======================================
#  Data Augmentation and Transforms
# =======================================
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# =======================================
#  Split Dataset into Train/Val/Test
# =======================================
indices = list(range(len(captions_data)))
train_indices, val_test_indices = train_test_split(indices, test_size=0.3, random_state=42)
val_indices, test_indices = train_test_split(val_test_indices, test_size=0.5, random_state=42)

train_data = [captions_data[i] for i in train_indices]
val_data = [captions_data[i] for i in val_indices]
test_data = [captions_data[i] for i in test_indices]

subset_size = 1000
train_data_subset = train_data[:subset_size]
train_dataset = CustomImageCaptionDataset(image_dir, train_data_subset, transform, tokenizer)
val_dataset = CustomImageCaptionDataset(image_dir, val_data, transform, tokenizer)
test_dataset = CustomImageCaptionDataset(image_dir, test_data, transform, tokenizer)

# =======================================
#  Define Custom Collate Function
# =======================================
def collate_fn(batch):
    images, input_ids, attention_masks = zip(*batch)
    images = torch.stack(images)
    input_ids = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_masks = pad_sequence(attention_masks, batch_first=True, padding_value=0)
    return images.to(device), input_ids.to(device), attention_masks.to(device)

# =======================================
#  Create DataLoaders
# =======================================
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

def visualize_images_with_captions(dataloader, num_batches=1):
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])

    for i, (images, input_ids, attention_masks) in enumerate(dataloader):
        print(f"Batch {i + 1}:")

        for j in range(len(images)):
            image = images[j]
            caption = tokenizer.decode(input_ids[j], skip_special_tokens=True)

            if isinstance(image, torch.Tensor):
                image = image.permute(1, 2, 0).numpy()
                image = (image * std) + mean
                image = np.clip(image, 0, 1)
            if not caption.strip():
                print("Warning: Empty caption detected during visualization.")

            plt.imshow(image)
            plt.axis('off')
            plt.title(caption)
            plt.show()
            print("------------------------------")
        if i + 1 >= num_batches:
            break

dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

visualize_images_with_captions(dataloader, num_batches=2)

# =======================================
#  Debug: Iterate Through DataLoader
# =======================================
for i, (images, input_ids, attention_masks) in enumerate(train_loader):
    print(f"Batch {i + 1}:")
    print(f"Images shape: {images.shape}")
    print(f"Input IDs shape: {input_ids.shape}")
    print(f"Attention Masks shape: {attention_masks.shape}")
    if i == 1:
        break

# =======================================
#  Define the Model
# =======================================
model_checkpoint = "google/pix2struct-base"

model =VisionEncoderDecoderModel.from_pretrained(model_checkpoint)
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

#model = VisionEncoderDecoderModel.from_pretrained("nlpconnect/vit-gpt2-image-captioning")

model.generation_config.max_length = 50
model.generation_config.num_beams = 8
model.config.eos_token_id = tokenizer.eos_token_id
model.config.pad_token_id = tokenizer.pad_token_id
model.config.decoder_start_token_id = tokenizer.cls_token_id

criterion = torch.nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
optimizer = AdamW(model.parameters(), lr=5e-5)

# =======================================
#  Training and Validation Loops
# =======================================
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def compute_bleu_score(preds, labels, tokenizer):
    bleu_scores = []
    smooth_fn = SmoothingFunction().method1
    for pred, label in zip(preds, labels):
        pred_caption = tokenizer.decode(pred, skip_special_tokens=True)
        true_caption = tokenizer.decode(label, skip_special_tokens=True)

        pred_tokens = pred_caption.split()
        true_tokens = true_caption.split()

        bleu_scores.append(sentence_bleu([true_tokens], pred_tokens, smoothing_function=smooth_fn))

    return np.mean(bleu_scores)

def train_model_with_accuracy(model, train_loader, val_loader, criterion, optimizer, tokenizer, num_epochs=2):
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        total_correct_tokens, total_tokens = 0, 0

        for i, (images, input_ids, attention_masks) in enumerate(train_loader):
            images = images.to(device)
            input_ids = input_ids.to(device)
            attention_masks = attention_masks.to(device)

            outputs = model(pixel_values=images, labels=input_ids)
            loss = outputs.loss
            logits = outputs.logits

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            preds = torch.argmax(logits, dim=-1)
            correct_tokens = (preds == input_ids).sum().item()
            total_tokens += input_ids.numel()
            total_correct_tokens += correct_tokens

            if i % 10 == 0:
                print(f"Step {i}/{len(train_loader)}, Loss: {loss.item():.4f}")

        avg_loss = total_loss / len(train_loader)
        train_accuracy = total_correct_tokens / total_tokens * 100
        print(f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {avg_loss:.4f}, Training Accuracy: {train_accuracy:.2f}%")

        val_loss, val_preds, val_labels = evaluate_model(model, val_loader, tokenizer, criterion, device)

    return epoch
# =======================================
# Train the Model with Enhanced Pipeline
# =======================================
last_epoch=0
last_epoch = train_model_with_accuracy(model, train_loader, val_loader, criterion, optimizer, tokenizer, num_epochs=2)

# =======================================
#  Evaluation Metrics
# =======================================
from sklearn.metrics import precision_score, recall_score, f1_score

def compute_metrics(predictions, labels, tokenizer):
    decoded_preds = [tokenizer.decode(pred, skip_special_tokens=True) for pred in predictions]
    decoded_labels = [tokenizer.decode(label, skip_special_tokens=True) for label in labels]

    # Flatten and compute metrics
    precision = precision_score(decoded_labels, decoded_preds, average='macro', zero_division=0)
    recall = recall_score(decoded_labels, decoded_preds, average='macro', zero_division=0)
    f1 = f1_score(decoded_labels, decoded_preds, average='macro', zero_division=0)

    print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1-Score: {f1:.4f}")
    return {"precision": precision, "recall": recall, "f1": f1}

# =======================================
#  Save the Model
# =======================================
model.save_pretrained(f"./pixelLM_captioning_model_epoch_{last_epoch+1}")
tokenizer.save_pretrained(f"./pixelLM_captioning_model_epoch_{last_epoch+1}")
model.save_pretrained("./image_captioning_model")
tokenizer.save_pretrained("./image_captioning_model")
print("Model saved successfully!")
