# 1. Resnet and Bert

In [1]:
# 1. Required Imports
import os, re, json
import torch
import pandas as pd
import torch.nn as nn
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
from sklearn.utils.class_weight import compute_class_weight
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from transformers import BertTokenizer, BertModel, get_cosine_schedule_with_warmup
from torch.optim import AdamW

2025-07-12 03:57:50.836447: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1752292671.056544      13 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1752292671.120100      13 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
import os
import random
print([f for f in os.listdir() if 'torchvision' in f])

[]


In [3]:
# 2. Label Mapping
LABEL_MAP = {'Neutral': 0, 'Support': 1, 'Oppose': 2}
INVERSE_LABEL_MAP = {v: k for k, v in LABEL_MAP.items()}
root = "/kaggle/input/subtask3-comp2025-multimodel/"
random.seed(42)
torch.manual_seed(42)

# 3. Clean text function
def clean_text(text):
    text = text.lower()
    text = re.sub(r"http\S+|@\w+|#\w+|[^\w\s]", "", text)
    return re.sub(r"\s+", " ", text).strip()

# 4. Load DataFrames
def getIndexAndPath(folder):
    paths = []
    for filename in os.listdir(folder):
        if filename.lower().endswith((".png")):
            paths.append({
                "index": filename,
                "image_path": os.path.join(folder, filename)
            })
    return paths

records = []
# Train DF
records = []
for label_name in os.listdir(os.path.join(root, "train/Subtask C Train")):
    label_folder = os.path.join(os.path.join(root, "train/Subtask C Train"), label_name)
    if not os.path.isdir(label_folder): continue
    label_id = LABEL_MAP[label_name]
    records += getIndexAndPath(label_folder)
df_images = pd.DataFrame(records)
df_ocr = pd.read_csv(os.path.join(root, "train/STask_C_train.csv"))
df_train = pd.merge(df_images, df_ocr, on="index", how="left")
# Test DF
df_images = pd.DataFrame(getIndexAndPath(os.path.join(root, "test/STask_C_test_img")))
df_ocr = pd.read_csv(os.path.join(root, "test/STask-C(index,text)test.csv"))
df_test = pd.merge(df_images, df_ocr, on="index", how="left")
# Eval DF
df_images = pd.DataFrame(getIndexAndPath(os.path.join(root, "eval/STask_C_val_img")))
df_ocr = pd.read_csv(os.path.join(root, "eval/STask-C(index,text)val.csv"))
df_labels = pd.read_csv(os.path.join(root, "eval/STask-C(index,label)val.csv"))
df_eval = pd.merge(df_images, df_ocr, on="index", how="left")
df_val = pd.merge(df_eval, df_labels, on="index", how="left")

FileNotFoundError: [Errno 2] No such file or directory: '/kaggle/input/subtask3-comp2025-multimodel/train/Subtask C Train'

In [None]:
# 5. Clean Text
for df in [df_train, df_val, df_test]:
    df["text"] = df["text"].fillna("[NO TEXT]").apply(clean_text)

# 6. Class Weights
class_weights = compute_class_weight('balanced', classes=list(LABEL_MAP.values()), y=df_train['label'])
class_weights = torch.tensor(class_weights, dtype=torch.float)

# 7. Tokenizer and Transform
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
img_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
# 8. Dataset
class MemeCLIPDataset(Dataset):
    def __init__(self, df, tokenizer, transform, is_train=True):
        self.df = df
        self.tokenizer = tokenizer
        self.transform = transform
        self.is_train = is_train

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = Image.open(row.image_path).convert("RGB")
        ocr_text = row.text
        text_encoding = self.tokenizer(ocr_text, padding="max_length", truncation=True, max_length=128, return_tensors="pt")
        image_tensor = self.transform(image)

        sample = {
            "input_ids": text_encoding["input_ids"].squeeze(0),
            "attention_mask": text_encoding["attention_mask"].squeeze(0),
            "pixel_values": image_tensor
        }

        if self.is_train:
            sample["label"] = int(row.label)
        else:
            sample["index"] = row["index"]
        return sample

In [None]:
# 9. Collate Function
def collate_fn(batch):
    input_ids = torch.stack([x["input_ids"] for x in batch])
    attention_mask = torch.stack([x["attention_mask"] for x in batch])
    pixel_values = torch.stack([x["pixel_values"] for x in batch])
    if "label" in batch[0]:
        labels = torch.tensor([x["label"] for x in batch])
        return {"input_ids": input_ids, "attention_mask": attention_mask, "pixel_values": pixel_values, "labels": labels}
    else:
        indices = [x["index"] for x in batch]
        return {"input_ids": input_ids, "attention_mask": attention_mask, "pixel_values": pixel_values, "index": indices}

In [None]:
# 10. MemeCLIP Model
class MemeCLIP(nn.Module):
    def __init__(self, text_model, image_model, num_classes=3):
        super().__init__()
        self.text_encoder = text_model
        self.image_encoder = image_model
        self.image_proj = nn.Linear(2048, 768)
        self.dropout = nn.Dropout(0.5)
        self.classifier = nn.Sequential(
            nn.Linear(768*2, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )

    def forward(self, input_ids, attention_mask, pixel_values, labels=None):
        text_out = self.text_encoder(input_ids=input_ids, attention_mask=attention_mask)
        text_emb = text_out.last_hidden_state[:, 0, :]
        img_feat = self.image_encoder(pixel_values)
        img_emb = self.image_proj(img_feat)
        fused = torch.cat([text_emb, img_emb], dim=1)
        logits = self.classifier(self.dropout(fused))
        if labels is not None:
            loss = nn.CrossEntropyLoss(weight=class_weights.to(logits.device))(logits, labels)
            return loss, logits
        return logits

In [None]:
# 11. Prepare DataLoaders
train_dataset = MemeCLIPDataset(df_train, tokenizer, img_transform)
val_dataset = MemeCLIPDataset(df_val, tokenizer, img_transform)
test_dataset = MemeCLIPDataset(df_test, tokenizer, img_transform, is_train=False)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

# 12. Load Models and Init
bert_model = BertModel.from_pretrained("bert-base-uncased")
resnet_model = models.resnet50(pretrained=True)
resnet_model.fc = nn.Identity()
model = MemeCLIP(bert_model, resnet_model).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
optimizer = AdamW(model.parameters(), lr=2e-5)
total_steps = len(train_loader) * 10
scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=int(0.1 * total_steps), num_training_steps=total_steps)


In [None]:
# 13. Train & Eval Functions
def train_one_epoch(model, loader, optimizer, scheduler, device):
    model.train()
    total_loss, preds, labels = 0, [], []
    for batch in tqdm(loader, desc="Train"):
        batch = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
        loss, logits = model(**batch)
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
        total_loss += loss.item()
        preds += logits.argmax(1).cpu().tolist()
        labels += batch["labels"].cpu().tolist()
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average="macro")
    return total_loss/len(loader), acc, f1

def evaluate(model, loader, device):
    model.eval()
    total_loss, preds, labels = 0, [], []
    with torch.no_grad():
        for batch in tqdm(loader, desc="Eval"):
            batch = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
            loss, logits = model(**batch)
            total_loss += loss.item()
            preds += logits.argmax(1).cpu().tolist()
            labels += batch["labels"].cpu().tolist()
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average="macro")
    return total_loss/len(loader), acc, f1

In [None]:
# 14. Training Loop with Early Stopping
best_f1 = 0
patience = 3
counter = 0
for epoch in range(10):
    print(f"\nEpoch {epoch+1}")
    tr_loss, tr_acc, tr_f1 = train_one_epoch(model, train_loader, optimizer, scheduler, torch.device("cuda" if torch.cuda.is_available() else "cpu"))
    val_loss, val_acc, val_f1 = evaluate(model, val_loader, torch.device("cuda" if torch.cuda.is_available() else "cpu"))
    print(f"Train Loss: {tr_loss:.4f} | Acc: {tr_acc:.4f} | F1: {tr_f1:.4f}")
    print(f"Val   Loss: {val_loss:.4f} | Acc: {val_acc:.4f} | F1: {val_f1:.4f}")
    if val_f1 > best_f1:
        best_f1 = val_f1
        counter = 0
        torch.save(model.state_dict(), "best_model.pt")
        print("✅ Saved best model")
    else:
        counter += 1
        if counter >= patience:
            print("⏹️ Early stopping")
            break

In [None]:
# # 15. Inference and Export to JSON
# def predict_and_export(model, loader, output_file="submission.json"):
#     model.eval()
#     predictions = []
#     with torch.no_grad():
#         for batch in tqdm(loader, desc="Predicting"):
#             indices = batch.pop("index")
#             batch = {k: v.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
#             logits = model(**batch)
#             preds = torch.argmax(logits, dim=1).cpu().tolist()
#             for idx, label in zip(indices, preds):
#                 predictions.append({"index": idx, "prediction": INVERSE_LABEL_MAP[label]})
#     with open(output_file, "w") as f:
#         json.dump(predictions, f, indent=2)
#     print(f"✅ Predictions saved to {output_file}")

# # Load Best Model and Predict
# model.load_state_dict(torch.load("best_model.pt"))
# predict_and_export(model, test_loader)

In [None]:
def predict_and_export(model, loader, output_file="submission.json"):
    model.eval()
    device = next(model.parameters()).device
    predictions = []

    with torch.no_grad():
        for batch in tqdm(loader, desc="Predicting"):
            indices = batch.pop("index")
            indices = [str(i) for i in indices]  # ensure index is string like '20568.png'
            batch = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in batch.items()}
            logits = model(**batch)
            if isinstance(logits, tuple):
                logits = logits[1]
            preds = torch.argmax(logits, dim=1).cpu().tolist()

            for idx, label in zip(indices, preds):
                predictions.append({"index": idx, "prediction": label})
    predictions = sorted(predictions, key=lambda x: x["index"])
    # Write each prediction as one JSON object per line
    with open(output_file, "w") as f:
        for item in predictions:
            json.dump(item, f)
            f.write("\n")

    print(f"✅ Predictions saved to {output_file}")


In [None]:
# Load best weights
model.load_state_dict(torch.load("best_model.pt"))


In [None]:
# Export predictions to file
predict_and_export(model, test_loader, output_file="submission.json")

# 2. Clip Model

In [None]:
# =====================
# Upgraded MemeCLIP Stance Classification Model
# =====================

import os, re, json, random
import torch
import pandas as pd
import torch.nn as nn
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.utils.class_weight import compute_class_weight
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision.transforms import Compose, RandomResizedCrop, RandomHorizontalFlip, ColorJitter, ToTensor, Normalize
from transformers import CLIPProcessor, CLIPModel, get_cosine_schedule_with_warmup
from torch.optim import AdamW

In [None]:
# =====================
# Constants and Paths
# =====================
LABEL_MAP = {'Neutral': 0, 'Support': 1, 'Oppose': 2}
INVERSE_LABEL_MAP = {v: k for k, v in LABEL_MAP.items()}
root = "/kaggle/input/subtask3-comp2025-multimodel/"
random.seed(42)
torch.manual_seed(42)
# =====================
# Text Cleaning
# =====================
def clean_text(text):
    text = text.lower()
    text = re.sub(r"http\S+|@\w+|#\w+|[^\w\s]", "", text)
    return re.sub(r"\s+", " ", text).strip()

# =====================
# Data Loading
# =====================
def getIndexAndPath(folder):
    paths = []
    for filename in os.listdir(folder):
        if filename.lower().endswith(".png"):
            paths.append({"index": filename, "image_path": os.path.join(folder, filename)})
    return paths

# Load DataFrames
# Train DF
records = []
for label_name in os.listdir(os.path.join(root, "train/Subtask C Train")):
    label_folder = os.path.join(os.path.join(root, "train/Subtask C Train"), label_name)
    if not os.path.isdir(label_folder): continue
    label_id = LABEL_MAP[label_name]
    records += getIndexAndPath(label_folder)
df_images = pd.DataFrame(records)
df_ocr = pd.read_csv(os.path.join(root, "train/STask_C_train.csv"))
df_train = pd.merge(df_images, df_ocr, on="index", how="left")
# Test DF
df_images = pd.DataFrame(getIndexAndPath(os.path.join(root, "test/STask_C_test_img")))
df_ocr = pd.read_csv(os.path.join(root, "test/STask-C(index,text)test.csv"))
df_test = pd.merge(df_images, df_ocr, on="index", how="left")
# Eval DF
df_images = pd.DataFrame(getIndexAndPath(os.path.join(root, "eval/STask_C_val_img")))
df_ocr = pd.read_csv(os.path.join(root, "eval/STask-C(index,text)val.csv"))
df_labels = pd.read_csv(os.path.join(root, "eval/STask-C(index,label)val.csv"))
df_eval = pd.merge(df_images, df_ocr, on="index", how="left")
df_val = pd.merge(df_eval, df_labels, on="index", how="left")
# Clean text
for df in [df_train, df_val, df_test]:
    df["text"] = df["text"].fillna("[NO TEXT]").apply(clean_text)


In [None]:
df_val

In [None]:
# =====================
# Compute Class Weights
# =====================
class_weights = compute_class_weight('balanced', classes=list(LABEL_MAP.values()), y=df_train['label'])
class_weights = torch.tensor(class_weights, dtype=torch.float)

# =====================
# Transforms and Processor
# =====================
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# =====================
# Transforms
# =====================
train_transform = Compose([
    RandomResizedCrop(224, scale=(0.8, 1.0)),
    RandomHorizontalFlip(),
    ColorJitter(0.2, 0.2, 0.2, 0.1),
    ToTensor(),
    Normalize([0.48145466, 0.4578275, 0.40821073], [0.26862954, 0.26130258, 0.27577711])
])

val_transform = Compose([
    ToTensor(),
    Normalize([0.48145466, 0.4578275, 0.40821073], [0.26862954, 0.26130258, 0.27577711])
])

In [None]:
# =====================
# Dataset and Collate
# =====================
class MemeDataset(Dataset):
    def __init__(self, df, transform=None, is_train=True):
        self.df = df
        self.is_train = is_train
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = Image.open(row.image_path).convert("RGB")
        if self.transform: image = self.transform(image)
        text = row.text
        sample = clip_processor(
            text=[text], images=image,
            return_tensors="pt", padding="max_length", truncation=True, max_length=128
        )
        sample = {k: v.squeeze(0) for k, v in sample.items()}
        if self.is_train:
            sample["label"] = int(row.label)
        else:
            sample["index"] = row["index"]
        return sample

def collate_fn(batch):
    input_ids = torch.stack([x["input_ids"] for x in batch])
    attention_mask = torch.stack([x["attention_mask"] for x in batch])
    pixel_values = torch.stack([x["pixel_values"] for x in batch])
    if "label" in batch[0]:
        labels = torch.tensor([x["label"] for x in batch])
        return {"input_ids": input_ids, "attention_mask": attention_mask, "pixel_values": pixel_values, "labels": labels}
    else:
        indices = [x["index"] for x in batch]
        return {"input_ids": input_ids, "attention_mask": attention_mask, "pixel_values": pixel_values, "index": indices}


In [None]:
# =====================
# Focal Loss
# =====================
class FocalLoss(nn.Module):
    def __init__(self, gamma=2, weight=None):
        super().__init__()
        self.gamma = gamma
        self.ce = nn.CrossEntropyLoss(weight=weight)

    def forward(self, input, target):
        logp = self.ce(input, target)
        p = torch.exp(-logp)
        return ((1 - p) ** self.gamma * logp).mean()

In [None]:
# =====================
# Model Wrapper
# =====================
class CLIPClassifier(nn.Module):
    def __init__(self, clip_model, num_classes=3):
        super().__init__()
        self.clip = clip_model
        self.classifier = nn.Sequential(
            nn.Linear(512 * 2, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        self.loss_fn = FocalLoss(weight=class_weights.to(device))

    def forward(self, input_ids, attention_mask, pixel_values, labels=None):
        outputs = self.clip(input_ids=input_ids, attention_mask=attention_mask, pixel_values=pixel_values)
        text_emb = outputs.text_embeds
        image_emb = outputs.image_embeds
        fused = torch.cat([text_emb, image_emb], dim=1)
        logits = self.classifier(fused)
        if labels is not None:
            loss = self.loss_fn(logits, labels)
            return loss, logits
        return logits

In [None]:
# =====================
# Load Data
# =====================
val_dataset = MemeDataset(df_val, transform=val_transform)
test_dataset = MemeDataset(df_test, transform=val_transform, is_train=False)

# Weighted Sampling
class_counts = df_train['label'].value_counts().sort_index().values
sampling_weights = 1. / class_counts
sample_weights = df_train['label'].apply(lambda x: sampling_weights[x])
sampler = WeightedRandomSampler(sample_weights.tolist(), len(sample_weights), replacement=True)

train_loader = DataLoader(train_dataset, batch_size=16, sampler=sampler, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=16, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

model = CLIPClassifier(clip_model).to(device)
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=0.01)
total_steps = len(train_loader) * 10
scheduler = get_cosine_schedule_with_warmup(optimizer, int(0.1 * total_steps), total_steps)

scaler = torch.cuda.amp.GradScaler()

In [None]:
# =====================
# Training and Eval
# =====================
def train_one_epoch(model, loader):
    model.train()
    total_loss, preds, labels = 0, [], []
    for batch in tqdm(loader, desc="Training"):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.cuda.amp.autocast():
            loss, logits = model(**batch)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()
        scheduler.step()
        total_loss += loss.item()
        preds += logits.argmax(1).detach().cpu().tolist()
        labels += batch["labels"].cpu().tolist()
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average="macro")
    return total_loss / len(loader), acc, f1

def evaluate(model, loader):
    model.eval()
    total_loss, preds, labels = 0, [], []
    with torch.no_grad():
        for batch in tqdm(loader, desc="Evaluating"):
            batch = {k: v.to(device) for k, v in batch.items()}
            loss, logits = model(**batch)
            total_loss += loss.item()
            preds += logits.argmax(1).cpu().tolist()
            labels += batch["labels"].cpu().tolist()
    print(classification_report(labels, preds, target_names=LABEL_MAP.keys()))
    acc = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average="macro")
    return total_loss / len(loader), acc, f1

In [None]:
# =====================
# Training Loop
# =====================
best_f1 = 0
patience = 3
counter = 0
for epoch in range(10):
    print(f"\nEpoch {epoch + 1}")
    train_loss, train_acc, train_f1 = train_one_epoch(model, train_loader)
    val_loss, val_acc, val_f1 = evaluate(model, val_loader)
    print(f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}")
    print(f"Val   Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}")
    if val_f1 > best_f1:
        best_f1 = val_f1
        counter = 0
        torch.save(model.state_dict(), "best_clip_model.pt")
        print("✅ Saved best model")
    else:
        counter += 1
        if counter >= patience:
            print("⏹️ Early stopping")
            break

In [None]:
# =====================
# Inference
# =====================
def predict_and_export(model, loader, output_file="submission.json"):
    model.eval()
    predictions = []
    with torch.no_grad():
        for batch in tqdm(loader, desc="Predicting"):
            indices = batch.pop("index")
            batch = {k: v.to(device) for k, v in batch.items()}
            logits = model(**batch)
            if isinstance(logits, tuple): logits = logits[1]
            preds = torch.argmax(logits, dim=1).cpu().tolist()
            for idx, label in zip(indices, preds):
                predictions.append({"index": idx, "prediction": INVERSE_LABEL_MAP[label]})
    with open(output_file, "w") as f:
        json.dump(predictions, f, indent=2)
    print(f"✅ Predictions saved to {output_file}")

model.load_state_dict(torch.load("best_clip_model.pt"))
predict_and_export(model, test_loader)
