In [1]:
# If needed:
# !pip install torch torchvision transformers scikit-learn pandas pillow seaborn matplotlib

import os
import json
import random
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms

from transformers import AutoTokenizer, AutoModel
from sklearn.metrics import classification_report, confusion_matrix, f1_score, precision_score, recall_score, accuracy_score

import seaborn as sns
import matplotlib.pyplot as plt

# Keep CPU responsive
torch.set_num_threads(max(1, os.cpu_count() // 2))

In [2]:
@dataclass
class Config:
    # Paths (aligned with your preprocessing outputs)
    base_dir: str = r"C:\Users\G ABHINAV REDDY\Downloads\processed_data"
    processed_dir_name: str = "processed_data"

    # Files
    memotion_csv: str = "memotion_7k_multimodal.csv"
    text_train_csv: str = os.path.join("splits", "train", "text_train.csv")
    text_val_csv: str = os.path.join("splits", "val", "text_val.csv")
    text_test_csv: str = os.path.join("splits", "test", "text_test.csv")

    # Encoders (lightweight for CPU)
    text_model_name: str = "distilbert-base-uncased"
    max_len: int = 128

    image_size: int = 224
    batch_extract: int = 32  # CPU-friendly

    # Embedding dims
    text_dim: int = 768
    image_dim: int = 512
    meta_dim: int = 2  # sarcasm + humour

    # Training on embeddings
    epochs: int = 8
    batch_train: int = 128
    lr: float = 3e-3
    weight_decay: float = 1e-4
    early_stop_patience: int = 3
    dropout: float = 0.2
    hidden_dim: int = 512  # small MLP

    # Toggle meta flags from Memotion
    use_meta_flags: bool = True

    # Reports/checkpoints
    reports_dirname: str = "reports_lightweight"
    embeddings_dirname: str = "embeddings_lightweight"
    head_ckpt: str = "fusion_heads.pt"

    # Repro
    seed: int = 42

cfg = Config()

def set_seed(seed:int=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

set_seed(cfg.seed)

processed_dir = os.path.join(cfg.base_dir, cfg.processed_dir_name)
paths = {
    "memotion": os.path.join(processed_dir, cfg.memotion_csv),
    "text_train": os.path.join(processed_dir, cfg.text_train_csv),
    "text_val": os.path.join(processed_dir, cfg.text_val_csv),
    "text_test": os.path.join(processed_dir, cfg.text_test_csv),
}
for k, p in paths.items():
    assert os.path.exists(p), f"Missing: {k} -> {p}"
print("‚úÖ All required CSVs found.")

‚úÖ All required CSVs found.


In [3]:
def map_memotion_labels(off_cat: str):
    s = str(off_cat).strip().lower()
    # HS3
    if s == "hateful_offensive":
        hs3 = 0  # Hate
    elif s in ("offensive", "very_offensive"):
        hs3 = 1  # Offensive
    elif s in ("slight", "not_offensive"):
        hs3 = 2  # Neither
    else:
        hs3 = -100
    # AB2: abusive only if offensive/very_offensive/hateful_offensive
    ab2 = 1 if s in ("offensive", "very_offensive", "hateful_offensive") else 0
    return hs3, ab2

def map_text_labels(row: pd.Series):
    # HS3 from original_class if present (e.g., Davidson dataset)
    if "original_class" in row and str(row["original_class"]).strip() != "":
        try:
            oc = int(float(row["original_class"]))
            hs3 = oc if oc in (0,1,2) else -100
        except:
            hs3 = -100
    else:
        hs3 = -100
    # AB2 from binary label
    ab2 = -100
    if "label" in row and str(row["label"]).strip() != "":
        try:
            v = int(float(row["label"]))
            ab2 = 1 if v == 1 else 0
        except:
            ab2 = -100
    return hs3, ab2

def parse_sarcasm(val) -> int:
    s = str(val).strip().lower()
    if s in {"sarcasm","sarcastic","yes","true","1"} or "sarcas" in s:
        return 1
    return 0

def parse_humour(val) -> int:
    s = str(val).strip().lower()
    if s in {"", "none", "not_funny", "not funny", "no_humour", "no_humor"}:
        return 0
    if any(k in s for k in ["funny","hilar","humor","humour","very_funny"]):
        return 1
    return 0

In [4]:
# Tokenizer + DistilBERT (frozen)
tokenizer = AutoTokenizer.from_pretrained(cfg.text_model_name, use_fast=True)
text_encoder = AutoModel.from_pretrained(cfg.text_model_name)
for p in text_encoder.parameters():
    p.requires_grad = False
text_encoder.eval()

# ResNet18 (frozen), output 512-dim
resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
for p in resnet.parameters():
    p.requires_grad = False
image_backbone = nn.Sequential(*list(resnet.children())[:-1])  # remove FC
image_backbone.eval()

device = torch.device("cpu")
text_encoder.to(device)
image_backbone.to(device)

img_tfm = transforms.Compose([
    transforms.Resize((cfg.image_size, cfg.image_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

def mean_pool(last_hidden_state, attention_mask):
    mask = attention_mask.unsqueeze(-1).float()
    masked = last_hidden_state * mask
    return masked.sum(1) / (mask.sum(1) + 1e-8)

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to C:\Users\G ABHINAV REDDY/.cache\torch\hub\checkpoints\resnet18-f37072fd.pth


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 44.7M/44.7M [00:03<00:00, 13.6MB/s]


In [5]:
class CSVDatasetForEmbeddings(Dataset):
    def __init__(self, csv_path, is_memotion=False, split=None):
        self.is_memotion = is_memotion
        df = pd.read_csv(csv_path).fillna("")
        if is_memotion and split in {"train","val","test"}:
            rng = np.random.RandomState(123)
            idx = np.arange(len(df))
            rng.shuffle(idx)
            n = len(idx); n_train = int(0.8*n); n_val = int(0.1*n)
            splits = {
                "train": idx[:n_train],
                "val": idx[n_train:n_train+n_val],
                "test": idx[n_train+n_val:]
            }
            df = df.iloc[splits[split]].reset_index(drop=True)
        self.df = df

    def __len__(self):
        return len(self.df)

    def __getitem__(self, i):
        row = self.df.iloc[i]
        text = str(row.get("text",""))
        enc = tokenizer(text, max_length=cfg.max_len, truncation=True, padding="max_length", return_tensors="pt")
        input_ids = enc["input_ids"].squeeze(0)
        attention_mask = enc["attention_mask"].squeeze(0)

        # Image (Memotion)
        img_tensor = None
        if self.is_memotion:
            p = str(row.get("image_path","")).strip()
            if os.path.exists(p) and p != "":
                try:
                    img = Image.open(p).convert("RGB")
                    img_tensor = img_tfm(img)
                except:
                    img_tensor = None

        if self.is_memotion:
            hs3, ab2 = map_memotion_labels(row.get("offensive_category",""))
            sarcasm = parse_sarcasm(row.get("sarcasm","")) if "sarcasm" in row else 0
            humour = parse_humour(row.get("humour","")) if "humour" in row else 0
        else:
            hs3, ab2 = map_text_labels(row)
            sarcasm, humour = 0, 0

        meta = torch.tensor([sarcasm, humour], dtype=torch.float32)
        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "image": img_tensor,
            "meta": meta,
            "hs3": hs3,
            "ab2": ab2
        }

def collate_extract(batch):
    input_ids = torch.stack([b["input_ids"] for b in batch])
    attention_mask = torch.stack([b["attention_mask"] for b in batch])

    img_idx = [i for i,b in enumerate(batch) if isinstance(b["image"], torch.Tensor)]
    images = torch.stack([batch[i]["image"] for i in img_idx]) if len(img_idx)>0 else None

    meta = torch.stack([b["meta"] for b in batch])
    hs3 = torch.tensor([b["hs3"] for b in batch], dtype=torch.long)
    ab2 = torch.tensor([b["ab2"] for b in batch], dtype=torch.long)

    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "images": images,
        "image_indices": torch.tensor(img_idx, dtype=torch.long) if img_idx else None,
        "meta": meta,
        "hs3": hs3,
        "ab2": ab2
    }

In [6]:
emb_dir = os.path.join(processed_dir, cfg.embeddings_dirname)
os.makedirs(emb_dir, exist_ok=True)

@torch.no_grad()
def extract_embeddings(csv_path, is_memotion=False, split=None, tag=""):
    ds = CSVDatasetForEmbeddings(csv_path, is_memotion=is_memotion, split=split)
    dl = DataLoader(ds, batch_size=cfg.batch_extract, shuffle=False, collate_fn=collate_extract)

    all_text, all_img, all_meta, all_hs3, all_ab2 = [], [], [], [], []

    for batch in dl:
        ids = batch["input_ids"].to(device)
        mask = batch["attention_mask"].to(device)
        out = text_encoder(input_ids=ids, attention_mask=mask)
        t = mean_pool(out.last_hidden_state, mask)  # [B, 768]
        all_text.append(t.cpu().numpy())

        B = ids.size(0)
        img_full = np.zeros((B, cfg.image_dim), dtype=np.float32)
        if batch["images"] is not None and batch["image_indices"] is not None and batch["image_indices"].numel()>0:
            imgs = batch["images"].to(device)
            feats = image_backbone(imgs).flatten(1)  # [N_img, 512]
            img_full[batch["image_indices"].numpy()] = feats.cpu().numpy()
        all_img.append(img_full)

        all_meta.append(batch["meta"].numpy().astype(np.float32))
        all_hs3.append(batch["hs3"].numpy())
        all_ab2.append(batch["ab2"].numpy())

    X_text = np.vstack(all_text)
    X_img  = np.vstack(all_img)
    X_meta = np.vstack(all_meta)
    y_hs3  = np.concatenate(all_hs3)
    y_ab2  = np.concatenate(all_ab2)

    out_path = os.path.join(emb_dir, f"features_{tag}.npz")
    np.savez_compressed(out_path, X_text=X_text, X_img=X_img, X_meta=X_meta, y_hs3=y_hs3, y_ab2=y_ab2)
    print(f"üíæ Saved embeddings: {out_path} | shapes: text {X_text.shape}, img {X_img.shape}, meta {X_meta.shape}")
    return out_path

In [None]:
# Text splits
p_train = extract_embeddings(paths["text_train"], is_memotion=False, split=None, tag="text_train")
p_val   = extract_embeddings(paths["text_val"],   is_memotion=False, split=None, tag="text_val")
p_test  = extract_embeddings(paths["text_test"],  is_memotion=False, split=None, tag="text_test")

# Memotion splits (80/10/10 deterministic)
pm_train = extract_embeddings(paths["memotion"], is_memotion=True, split="train", tag="memotion_train")
pm_val   = extract_embeddings(paths["memotion"], is_memotion=True, split="val",   tag="memotion_val")
pm_test  = extract_embeddings(paths["memotion"], is_memotion=True, split="test",  tag="memotion_test")

In [None]:
def load_npz(path):
    d = np.load(path)
    return {k: d[k] for k in d.files}

train_text = load_npz(os.path.join(emb_dir, "features_text_train.npz"))
val_text   = load_npz(os.path.join(emb_dir, "features_text_val.npz"))
test_text  = load_npz(os.path.join(emb_dir, "features_text_test.npz"))

train_memo = load_npz(os.path.join(emb_dir, "features_memotion_train.npz"))
val_memo   = load_npz(os.path.join(emb_dir, "features_memotion_val.npz"))
test_memo  = load_npz(os.path.join(emb_dir, "features_memotion_test.npz"))

def concat(a, b):
    return np.vstack([a, b])

def merge_sets(A, B):
    X_text = concat(A["X_text"], B["X_text"])
    X_img  = concat(A["X_img"],  B["X_img"])
    X_meta = concat(A["X_meta"], B["X_meta"])
    y_hs3  = np.concatenate([A["y_hs3"], B["y_hs3"]])
    y_ab2  = np.concatenate([A["y_ab2"], B["y_ab2"]])
    return X_text, X_img, X_meta, y_hs3, y_ab2

Xtr_t, Xtr_i, Xtr_m, ytr_hs3, ytr_ab2 = merge_sets(train_text, train_memo)
Xva_t, Xva_i, Xva_m, yva_hs3, yva_ab2 = merge_sets(val_text,   val_memo)
Xte_t, Xte_i, Xte_m, yte_hs3, yte_ab2 = merge_sets(test_text,  test_memo)

print("Train shapes:", Xtr_t.shape, Xtr_i.shape, Xtr_m.shape, ytr_hs3.shape, ytr_ab2.shape)

In [None]:
class EmbeddingsDataset(Dataset):
    def __init__(self, X_t, X_i, X_m, y_hs3, y_ab2):
        self.X_t = X_t.astype(np.float32)
        self.X_i = X_i.astype(np.float32)
        self.X_m = X_m.astype(np.float32)
        self.y_hs3 = y_hs3.astype(np.int64)
        self.y_ab2 = y_ab2.astype(np.int64)
    def __len__(self): return len(self.X_t)
    def __getitem__(self, i):
        x = np.concatenate([
            self.X_t[i], 
            self.X_i[i], 
            self.X_m[i] if cfg.use_meta_flags else np.zeros(cfg.meta_dim, np.float32)
        ]).astype(np.float32)
        return torch.from_numpy(x), torch.tensor(self.y_hs3[i]), torch.tensor(self.y_ab2[i])

train_ds = EmbeddingsDataset(Xtr_t, Xtr_i, Xtr_m, ytr_hs3, ytr_ab2)
val_ds   = EmbeddingsDataset(Xva_t, Xva_i, Xva_m, yva_hs3, yva_ab2)
test_ds  = EmbeddingsDataset(Xte_t, Xte_i, Xte_m, yte_hs3, yte_ab2)

train_dl = DataLoader(train_ds, batch_size=cfg.batch_train, shuffle=True)
val_dl   = DataLoader(val_ds,   batch_size=cfg.batch_train, shuffle=False)
test_dl  = DataLoader(test_ds,  batch_size=cfg.batch_train, shuffle=False)

# Compute class weights (ignore -100)
def compute_weights(y, num_classes):
    y = [int(v) for v in y if int(v) != -100]
    if len(y)==0: return None
    counts = np.bincount(y, minlength=num_classes).astype(np.float32)
    freqs = counts / counts.sum()
    w = 1.0 / np.clip(freqs, 1e-8, None)
    w = w / (w.mean() + 1e-8)
    return torch.tensor(w, dtype=torch.float32)

w_hs3 = compute_weights(ytr_hs3, 3)
w_ab2 = compute_weights(ytr_ab2, 2)
print("Class weights HS3:", None if w_hs3 is None else w_hs3.tolist())
print("Class weights AB2:", None if w_ab2 is None else w_ab2.tolist())

In [None]:
input_dim = cfg.text_dim + cfg.image_dim + (cfg.meta_dim if cfg.use_meta_flags else 0)

class FusionHeads(nn.Module):
    def __init__(self, in_dim, hidden=512, dropout=0.2):
        super().__init__()
        self.backbone = nn.Sequential(
            nn.Linear(in_dim, hidden),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(hidden, hidden//2),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
        )
        self.hate3_head = nn.Linear(hidden//2, 3)
        self.abuse2_head = nn.Linear(hidden//2, 2)

    def forward(self, x):
        h = self.backbone(x)
        return self.hate3_head(h), self.abuse2_head(h)

model = FusionHeads(input_dim, hidden=cfg.hidden_dim, dropout=cfg.dropout)
opt = torch.optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
ce_hs3 = nn.CrossEntropyLoss(ignore_index=-100, weight=w_hs3)
ce_ab2 = nn.CrossEntropyLoss(ignore_index=-100, weight=w_ab2)

best_val = -1.0
patience = 0
ckpt_path = os.path.join(processed_dir, cfg.head_ckpt)

def evaluate(loader, split="val"):
    model.eval()
    all_hs3_y, all_hs3_p = [], []
    all_ab2_y, all_ab2_p = [], []
    loss_sum, n_batches = 0.0, 0
    with torch.no_grad():
        for X, y_hs3, y_ab2 in loader:
            hs3_logits, ab2_logits = model(X)
            loss = ce_hs3(hs3_logits, y_hs3) + ce_ab2(ab2_logits, y_ab2)
            loss_sum += float(loss.item()); n_batches += 1

            m_hs3 = y_hs3 != -100
            m_ab2 = y_ab2 != -100
            if m_hs3.any():
                all_hs3_y.append(y_hs3[m_hs3].numpy())
                all_hs3_p.append(hs3_logits[m_hs3].argmax(-1).numpy())
            if m_ab2.any():
                all_ab2_y.append(y_ab2[m_ab2].numpy())
                all_ab2_p.append(ab2_logits[m_ab2].argmax(-1).numpy())

    def agg(Y, P):
        if len(Y)==0: return None
        y = np.concatenate(Y); p = np.concatenate(P)
        return dict(
            acc=accuracy_score(y,p),
            f1_macro=f1_score(y,p,average="macro"),
            precision_macro=precision_score(y,p,average="macro",zero_division=0),
            recall_macro=recall_score(y,p,average="macro",zero_division=0),
            y=y, p=p
        )
    metrics = {
        "loss": loss_sum/max(n_batches,1),
        "hs3": agg(all_hs3_y, all_hs3_p),
        "ab2": agg(all_ab2_y, all_ab2_p)
    }
    def p(name, m):
        if m is None:
            print(f"[{split}] {name}: no labels")
        else:
            print(f"[{split}] {name}: acc={m['acc']:.4f} f1={m['f1_macro']:.4f} prec={m['precision_macro']:.4f} rec={m['recall_macro']:.4f}")
    p("HS3", metrics["hs3"]); p("AB2", metrics["ab2"])
    return metrics

print("üèãÔ∏è Training (CPU-friendly heads)...")
for epoch in range(1, cfg.epochs+1):
    model.train()
    running = 0.0; n=0
    for X, y_hs3, y_ab2 in train_dl:
        hs3_logits, ab2_logits = model(X)
        loss = ce_hs3(hs3_logits, y_hs3) + ce_ab2(ab2_logits, y_ab2)
        opt.zero_grad(); loss.backward(); opt.step()
        running += float(loss.item()); n+=1
    print(f"Epoch {epoch}: Train loss={running/max(n,1):.4f}")
    val_m = evaluate(val_dl, split="val")

    # early stopping on mean of available F1s (or negative loss)
    f1s = []
    if val_m["hs3"] is not None: f1s.append(val_m["hs3"]["f1_macro"])
    if val_m["ab2"] is not None: f1s.append(val_m["ab2"]["f1_macro"])
    score = float(np.mean(f1s)) if f1s else -val_m["loss"]

    if score > best_val:
        best_val = score
        patience = 0
        torch.save({"model": model.state_dict(), "cfg": cfg.__dict__}, ckpt_path)
        print(f"‚úì Saved best heads to: {ckpt_path}")
    else:
        patience += 1
        print(f"No improvement. Patience {patience}/{cfg.early_stop_patience}")
        if patience >= cfg.early_stop_patience:
            print("Early stopping.")
            break

print("\nüß™ Final Test Evaluation (best checkpoint)")
state = torch.load(ckpt_path, map_location="cpu")
model.load_state_dict(state["model"])
test_metrics = evaluate(test_dl, split="test")

In [None]:
reports_dir = os.path.join(processed_dir, cfg.reports_dirname)
os.makedirs(reports_dir, exist_ok=True)

def save_confusion(y, p, labels, title, out_png, out_csv):
    cm = confusion_matrix(y, p, labels=list(range(len(labels))))
    df_cm = pd.DataFrame(cm, index=labels, columns=labels)
    plt.figure(figsize=(5,4))
    sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues")
    plt.title(title); plt.ylabel("True"); plt.xlabel("Pred")
    plt.tight_layout(); plt.savefig(out_png, dpi=150); plt.close()
    df_cm.to_csv(out_csv, index=True)

def dump_reports(split_name, metrics, prefix):
    summary = {"loss": metrics["loss"]}
    if metrics["hs3"] is not None:
        y = metrics["hs3"]["y"]; p = metrics["hs3"]["p"]
        labels = ["Hate","Offensive","Neither"]
        rep = classification_report(y, p, target_names=labels, output_dict=True, zero_division=0)
        pd.DataFrame(rep).transpose().to_csv(os.path.join(reports_dir, f"{prefix}_hs3_report.csv"))
        save_confusion(y, p, labels, f"{split_name.upper()} HS3", 
                       os.path.join(reports_dir, f"{prefix}_hs3_cm.png"),
                       os.path.join(reports_dir, f"{prefix}_hs3_cm.csv"))
        summary["hs3"] = {k: float(metrics["hs3"][k]) for k in ["acc","f1_macro","precision_macro","recall_macro"]}
    else:
        summary["hs3"] = None

    if metrics["ab2"] is not None:
        y = metrics["ab2"]["y"]; p = metrics["ab2"]["p"]
        labels = ["Non-abusive","Abusive"]
        rep = classification_report(y, p, target_names=labels, output_dict=True, zero_division=0)
        pd.DataFrame(rep).transpose().to_csv(os.path.join(reports_dir, f"{prefix}_ab2_report.csv"))
        save_confusion(y, p, labels, f"{split_name.upper()} AB2",
                       os.path.join(reports_dir, f"{prefix}_ab2_cm.png"),
                       os.path.join(reports_dir, f"{prefix}_ab2_cm.csv"))
        summary["ab2"] = {k: float(metrics["ab2"][k]) for k in ["acc","f1_macro","precision_macro","recall_macro"]}
    else:
        summary["ab2"] = None

    with open(os.path.join(reports_dir, f"{prefix}_summary.json"), "w") as f:
        json.dump(summary, f, indent=2)

dump_reports("test", test_metrics, "test")
print(f"üìÅ Reports saved to: {reports_dir}")