<a href="https://colab.research.google.com/github/Eshan133/Hate-Speech-Detection/blob/main/Meme_Clip_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Step 1: Clone the original CLIP repo
!git clone https://github.com/openai/CLIP.git

# Step 2: Install the CLIP repo as a package
%cd CLIP
!pip install -e .


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from clip import clip  # ensure the clip repo or package is available

In [None]:
from torch.utils.data import Dataset
import pandas as pd
from PIL import Image

class MemeDataset(Dataset):
    def __init__(self, csv_file, processor, is_test=False):
        self.data = pd.read_csv(csv_file)
        self.processor = processor
        self.is_test = is_test

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        image_path = row['name']
        image = Image.open(image_path).convert('RGB')
        text = row['text']

        item = {
            "image": image,
            "text": text
        }

        if not self.is_test:
            label = int(row['label'])
            return item, label
        else:
            return item


In [None]:
import torch
from transformers import CLIPProcessor

processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

def collate_fn(batch):
    # Check if test mode: batch is list of dicts
    is_test = isinstance(batch[0], dict)

    if is_test:
        texts = [item["text"] for item in batch]
        images = [item["image"] for item in batch]

        encoded_inputs = processor(
            text=texts,
            images=images,
            return_tensors="pt",
            padding=True,
            truncation=True
        )
        return encoded_inputs  # no labels in test
    else:
        # train/val mode: batch is list of (item, label)
        texts = [item[0]["text"] for item in batch]
        images = [item[0]["image"] for item in batch]
        labels = torch.tensor([item[1] for item in batch])

        encoded_inputs = processor(
            text=texts,
            images=images,
            return_tensors="pt",
            padding=True,
            truncation=True
        )
        return encoded_inputs, labels



In [None]:
train_csv = '/content/drive/MyDrive/Hate Speech Competition/Task_A/train_data.csv'
test_csv = '/content/drive/MyDrive/Hate Speech Competition/Task_A/test_data.csv'
val_csv = '/content/drive/MyDrive/Hate Speech Competition/Task_A/val_data.csv'

In [None]:
from torch.utils.data import DataLoader

In [None]:
train_dataset = MemeDataset(train_csv, processor, is_test=False)
val_dataset = MemeDataset(val_csv, processor, is_test=False)
test_dataset = MemeDataset(test_csv, processor, is_test=True)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)


In [None]:
class CosineClassifier(nn.Module):
    def __init__(self, feat_dim, num_classes):
        super().__init__()
        self.weight = nn.Parameter(torch.randn(num_classes, feat_dim))

    def forward(self, x):
        x_norm = F.normalize(x, dim=-1)
        w_norm = F.normalize(self.weight, dim=-1)
        return x_norm @ w_norm.t()

In [None]:
class Adapter(nn.Module):
    def __init__(self, dim, reduction=4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, dim // reduction),
            nn.ReLU(),
            nn.Linear(dim // reduction, dim)
        )
    def forward(self, x):
        return self.net(x) + x

In [None]:
class LinearProjection(nn.Module):
    def __init__(self, input_dim, output_dim, num_layers=1, drop_probs=[0.1, 0.1]):
        super().__init__()
        layers = []
        for i in range(num_layers):
            in_dim = input_dim if i == 0 else output_dim
            layers.append(nn.Linear(in_dim, output_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(drop_probs[min(i, len(drop_probs)-1)]))
        self.net = nn.Sequential(*layers)
    def forward(self, x):
        return self.net(x)

In [None]:
class MemeCLIP(nn.Module):
    def __init__(self, clip_variant="ViT-B/32", num_classes=2, map_dim=512, num_mapping_layers=1, drop_probs=[0.1, 0.1]):
        super().__init__()
        self.clip_model, _ = clip.load(clip_variant, device="cpu", jit=False)
        self.clip_model.float()

        # Freeze CLIP parameters if desired (partial unfreezing can be done later)
        for p in self.clip_model.parameters():
            p.requires_grad = False

        # Projection heads for image and text embeddings
        self.image_map = LinearProjection(self.clip_model.visual.output_dim, map_dim, num_mapping_layers, drop_probs)
        self.text_map = LinearProjection(self.clip_model.transformer.width, map_dim, num_mapping_layers, drop_probs)

        # Adapters for image and text
        self.img_adapter = Adapter(map_dim)
        self.text_adapter = Adapter(map_dim)

        # Cosine similarity classifier
        self.classifier = CosineClassifier(map_dim, num_classes)

    def forward(self, input_ids, attention_mask, pixel_values):
        # Get CLIP embeddings
        image_embeds = self.clip_model.encode_image(pixel_values)
        text_embeds = self.clip_model.encode_text(input_ids)

        # Project embeddings
        image_proj = self.image_map(image_embeds)
        text_proj = self.text_map(text_embeds)

        # Adapt embeddings
        image_feat = self.img_adapter(image_proj)
        text_feat = self.text_adapter(text_proj)

        # Normalize features
        image_feat = F.normalize(image_feat, dim=-1)
        text_feat = F.normalize(text_feat, dim=-1)

        # Combine features element-wise (Hadamard product)
        combined = image_feat * text_feat

        # Classify with cosine similarity
        logits = self.classifier(combined)
        return logits

In [None]:
import torch
import torch.nn as nn
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score
from tqdm import tqdm
from torch.optim.lr_scheduler import ReduceLROnPlateau

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = MemeCLIP(num_classes=2).to(device)  # from previous step
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)

scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, verbose=True)


In [None]:
def train_one_epoch(model, dataloader, optimizer, criterion):
    model.train()
    running_loss = 0.0

    for batch in tqdm(dataloader, desc="Training"):
        inputs, labels = batch
        inputs = {k: v.to(device) for k, v in inputs.items()}
        labels = labels.to(device)

        optimizer.zero_grad()
        logits = model(inputs['input_ids'], inputs['attention_mask'], inputs['pixel_values'])
        loss = criterion(logits, labels)

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    return running_loss / len(dataloader)


In [None]:
def evaluate(model, dataloader):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            inputs, labels = batch
            inputs = {k: v.to(device) for k, v in inputs.items()}
            labels = labels.to(device)

            logits = model(inputs['input_ids'], inputs['attention_mask'], inputs['pixel_values'])
            probs = torch.softmax(logits, dim=1)
            preds = torch.argmax(probs, dim=1)

            all_preds.extend(preds.cpu().tolist())
            all_labels.extend(labels.cpu().tolist())
            all_probs.extend(probs[:, 1].cpu().tolist())  # Prob for class 1

    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    try:
        auc = roc_auc_score(all_labels, all_probs)
    except:
        auc = 0.0  # fallback for edge cases

    return acc, f1, auc


In [None]:
import os
import torch

best_val_auc = 0.0
patience = 3
patience_counter = 0
save_path = "best_memeclip_model.pt"


In [None]:
EPOCHS = 20  # or however many you want
train_losses = []
val_accuracies = []
val_f1s = []
val_aucs = []

for epoch in range(EPOCHS):
    print(f"\nEpoch {epoch+1}/{EPOCHS}")

    train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
    acc, f1, auc = evaluate(model, val_loader)

    print(f"Train Loss: {train_loss:.4f}")
    print(f"Val Acc: {acc:.4f} | F1: {f1:.4f} | AUC: {auc:.4f}")

    # Append to lists
    train_losses.append(train_loss)
    val_accuracies.append(acc)
    val_f1s.append(f1)
    val_aucs.append(auc)

    # Step scheduler
    scheduler.step(auc)

    # Save best model
    if auc > best_val_auc:
        best_val_auc = auc
        patience_counter = 0
        torch.save(model.state_dict(), save_path)
        print("✅ Best model saved!")
    else:
        patience_counter += 1
        print(f"⚠️ Patience {patience_counter}/{patience}")

    # Early stopping
    if patience_counter >= patience:
        print("⛔ Early stopping triggered.")
        break


In [None]:
import matplotlib.pyplot as plt

epochs = list(range(1, len(train_losses) + 1))

plt.figure(figsize=(16, 5))

# Plot 1: Training Loss
plt.subplot(1, 4, 1)
plt.plot(epochs, train_losses, marker='o')
plt.title("Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")

# Plot 2: Validation Accuracy
plt.subplot(1, 4, 2)
plt.plot(epochs, val_accuracies, marker='o', color='green')
plt.title("Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")

# Plot 3: F1 Score
plt.subplot(1, 4, 3)
plt.plot(epochs, val_f1s, marker='o', color='orange')
plt.title("Validation F1 Score")
plt.xlabel("Epoch")
plt.ylabel("F1 Score")

# Plot 4: AUC Score
plt.subplot(1, 4, 4)
plt.plot(epochs, val_aucs, marker='o', color='red')
plt.title("Validation AUC")
plt.xlabel("Epoch")
plt.ylabel("AUC")

plt.tight_layout()
plt.show()


In [None]:
model.load_state_dict(torch.load("best_memeclip_model.pt"))
model.to(device)
model.eval()


In [None]:
import json

model.eval()
submission = []

with torch.no_grad():
    for i, batch in enumerate(tqdm(test_loader, desc="Inference")):
        inputs = {k: v.to(device) for k, v in batch.items()}

        logits = model(inputs['input_ids'], inputs['attention_mask'], inputs['pixel_values'])
        preds = torch.argmax(torch.softmax(logits, dim=1), dim=1)
        preds = preds.cpu().tolist()

        batch_indices = test_dataset.data.iloc[i * test_loader.batch_size : (i+1) * test_loader.batch_size]['index'].tolist()

        for idx, label in zip(batch_indices, preds):
            submission.append({
                "index": idx,
                "label": int(label)
            })


In [None]:
# Sort by index
submission = sorted(submission, key=lambda x: x["index"])

# Save to JSON
with open("submission.json", "w") as f:
    json.dump(submission, f)

# Zip it as ref.zip
import zipfile

with zipfile.ZipFile("ref.zip", "w") as zipf:
    zipf.write("submission.json")
