<a href="https://colab.research.google.com/github/Tahnees/PRAssignment/blob/main/PRSniffer1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import auth
auth.authenticate_user()


In [None]:
import zipfile
import os

zip_path = "/content/drive/MyDrive/test_dataset.zip"  # update path
extract_path = "/content/test_dataset"

# Extraction path
extract_path = '/content'

# Unzip
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

# Verify
print("Unzipped content:", os.listdir('/content/test_dataset'))
print("Extraction complete.")


Unzipped content: ['visual_news_test.json', 'visual_news_test']
Extraction complete.


In [None]:
!pip install transformers torchvision scikit-learn matplotlib


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch==2.6.0->torchvision)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch==2.6.0->torchvision)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch==2.6.0->torchvision)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch==2.6.0->torchvision)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86

In [None]:
import json
import os
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPProcessor, AutoTokenizer
from PIL import Image
import torch

class SnifferDataset(Dataset):
    def __init__(self, json_path, root_dir):
        with open(json_path, 'r') as f:
            self.metadata = json.load(f)
        self.root_dir = root_dir
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
        self.text_tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
        self.samples = self.build_samples()

    def build_samples(self):
        samples = []
        for sample_id, data in self.metadata.items():
            image_path = os.path.join(self.root_dir, data["image_path"])
            article_path = os.path.join(self.root_dir, data["article_path"])
            label = data.get("label", 0)

            if os.path.exists(image_path) and os.path.exists(article_path):
                samples.append({
                    "image_path": image_path,
                    "text_path": article_path,
                    "label": label,
                })
        return samples

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]

        image = Image.open(sample["image_path"]).convert("RGB")
        image_inputs = self.clip_processor(images=image, return_tensors="pt")

        with open(sample["text_path"], 'r', encoding='utf-8') as f:
            text = f.read()

        text_inputs = self.text_tokenizer(text, padding="max_length", truncation=True, max_length=256, return_tensors="pt")

        return {
            "pixel_values": image_inputs["pixel_values"].squeeze(0),
            "input_ids": text_inputs["input_ids"].squeeze(0),
            "attention_mask": text_inputs["attention_mask"].squeeze(0),
            "label": torch.tensor(sample["label"], dtype=torch.long)
        }

json_path = "/content/test_dataset/visual_news_test.json"
root_dir = "/content/test_dataset"
dataset = SnifferDataset(json_path, root_dir)
train_loader = DataLoader(dataset, batch_size=8, shuffle=True)


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
from torch import nn
from transformers import CLIPModel, AutoModel

class SnifferModel(nn.Module):
    def __init__(self):
        super(SnifferModel, self).__init__()
        self.clip = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.bert = AutoModel.from_pretrained("bert-base-uncased")
        self.classifier = nn.Linear(self.clip.config.projection_dim + self.bert.config.hidden_size, 2)

    def forward(self, pixel_values, input_ids, attention_mask):
        image_features = self.clip.get_image_features(pixel_values)
        text_features = self.bert(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        combined = torch.cat([image_features, text_features], dim=1)
        logits = self.classifier(combined)
        return logits


In [None]:
from torch.optim import Adam
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SnifferModel().to(device)
optimizer = Adam(model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()

epochs = 3
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        pixel_values = batch["pixel_values"].to(device)
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["label"].to(device)

        logits = model(pixel_values, input_ids, attention_mask)
        loss = criterion(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}")


Epoch 1/3, Loss: 1.7264
Epoch 2/3, Loss: 0.0008
Epoch 3/3, Loss: 0.0000


In [None]:
from sklearn.metrics import accuracy_score, classification_report

def evaluate(model, data_loader):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for batch in data_loader:
            pixel_values = batch["pixel_values"].to(device)
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)

            logits = model(pixel_values, input_ids, attention_mask)
            preds = torch.argmax(logits, dim=1)

            all_preds.extend(preds.cpu().tolist())
            all_labels.extend(labels.cpu().tolist())

    acc = accuracy_score(all_labels, all_preds)
    report = classification_report(all_labels, all_preds, digits=4)
    print(f"\nEvaluation Accuracy: {acc:.4f}")
    print(f"\nClassification Report:\n{report}")

# Call the evaluation function
evaluate(model, train_loader)  # Replace with test_loader when available


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F

def gradcam_visualize(model, image_path):
    model.eval()

    image = Image.open(image_path).convert("RGB")
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    inputs = processor(images=image, return_tensors="pt").to(device)

    activations = []
    gradients = []

    def forward_hook(module, input, output):
        activations.append(output)

    def backward_hook(module, grad_input, grad_output):
        gradients.append(grad_output[0])

    target_layer = model.clip.vision_model.encoder.layers[-1].mlp.fc2
    forward_handle = target_layer.register_forward_hook(forward_hook)
    backward_handle = target_layer.register_backward_hook(backward_hook)

    logits = model(pixel_values=inputs["pixel_values"], input_ids=None, attention_mask=None)
    pred = logits.argmax(dim=1)
    score = logits[:, pred].squeeze()
    score.backward()

    act = activations[0].detach().cpu()[0]
    grad = gradients[0].detach().cpu()[0]

    weights = grad.mean(dim=(1, 2))
    cam = (weights[:, None, None] * act).sum(0)
    cam = F.relu(cam)
    cam = cam - cam.min()
    cam = cam / cam.max()
    cam = cam.numpy()

    plt.imshow(image)
    plt.imshow(cam, cmap='jet', alpha=0.5)
    plt.title(f"Predicted Label: {pred.item()}")
    plt.axis("off")
    plt.show()

    forward_handle.remove()
    backward_handle.remove()

sample_image_path = dataset.samples[0]["image_path"]
gradcam_visualize(model, sample_image_path)
