In [None]:
import torch

print("CUDA available:", torch.cuda.is_available())
print("GPU:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")
print("Torch version:", torch.__version__)

In [None]:
#1
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from transformers import (
    BertTokenizer, BertModel,
    RobertaTokenizer, RobertaModel,
    DistilBertTokenizer, DistilBertModel,
    XLNetTokenizer, XLNetModel
)
from sklearn.model_selection import train_test_split

# Load and prepare dataset
df = pd.read_csv("labels.csv")
df['label'] = ((df['humour'] != 'not_funny') | (df['sarcasm'] != 'not_sarcastic')).astype(int)
df = df[['text_corrected', 'label']].dropna()

# Define constants
MAX_LEN = 80
BATCH_SIZE = 4
EPOCHS = 1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Tokenizers
tokenizers = {
    'bert': BertTokenizer.from_pretrained('bert-base-uncased'),
    'roberta': RobertaTokenizer.from_pretrained('roberta-base'),
    'distilbert': DistilBertTokenizer.from_pretrained('distilbert-base-uncased'),
    'xlnet': XLNetTokenizer.from_pretrained('xlnet-base-cased')
}

# Tokenization helper
def tokenize_text(text, tokenizer):
    return tokenizer(text, padding="max_length", truncation=True, max_length=MAX_LEN, return_tensors="pt")

# Custom Dataset
class SarcasmHumorDataset(Dataset):
    def __init__(self, texts, labels, tokenizers):
        self.texts = texts
        self.labels = labels.values
        self.tokenizers = tokenizers

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = self.texts.iloc[idx]
        label = self.labels[idx]
        encoding = {model: tokenize_text(text, tokenizer) for model, tokenizer in self.tokenizers.items()}

        return {
            'bert_input_ids': encoding['bert']['input_ids'].squeeze(0),
            'bert_attention_mask': encoding['bert']['attention_mask'].squeeze(0),
            'roberta_input_ids': encoding['roberta']['input_ids'].squeeze(0),
            'roberta_attention_mask': encoding['roberta']['attention_mask'].squeeze(0),
            'distilbert_input_ids': encoding['distilbert']['input_ids'].squeeze(0),
            'distilbert_attention_mask': encoding['distilbert']['attention_mask'].squeeze(0),
            'xlnet_input_ids': encoding['xlnet']['input_ids'].squeeze(0),
            'xlnet_attention_mask': encoding['xlnet']['attention_mask'].squeeze(0),
            'label': torch.tensor(label, dtype=torch.long),
        }

# Model class
class BaseClassifier(nn.Module):
    def __init__(self, model_name):
        super(BaseClassifier, self).__init__()
        self.model_name = model_name

        if model_name == 'bert':
            self.encoder = BertModel.from_pretrained("bert-base-uncased")
        elif model_name == 'roberta':
            self.encoder = RobertaModel.from_pretrained("roberta-base")
        elif model_name == 'distilbert':
            self.encoder = DistilBertModel.from_pretrained("distilbert-base-uncased")
        elif model_name == 'xlnet':
            self.encoder = XLNetModel.from_pretrained("xlnet-base-cased")

        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(768, 2)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        if self.model_name == 'xlnet':
            cls_output = outputs.last_hidden_state[:, -1, :]
        else:
            cls_output = outputs.last_hidden_state[:, 0, :]
        return self.classifier(self.dropout(cls_output))

# Evaluation function
def evaluate_model(model, data_loader, tokenizer_key):
    model.eval()
    model.to(device)
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch[f"{tokenizer_key}_input_ids"].to(device)
            attention_mask = batch[f"{tokenizer_key}_attention_mask"].to(device)
            labels = batch["label"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            preds = outputs.argmax(1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    acc = correct / total
    print(f"{tokenizer_key.upper()} Accuracy: {acc:.4f}")

# Split data
train_texts, val_texts, train_labels, val_labels = train_test_split(
    df["text_corrected"], df["label"], test_size=0.2, random_state=42
)

# Create loaders
train_dataset = SarcasmHumorDataset(train_texts, train_labels, tokenizers)
val_dataset = SarcasmHumorDataset(val_texts, val_labels, tokenizers)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

# Run individual training & evaluation
for model_name in ['bert', 'roberta', 'distilbert', 'xlnet']:
    print(f"\n🔍 Evaluating {model_name.upper()}...")

    model = BaseClassifier(model_name).to(device)
    optimizer = optim.AdamW(model.parameters(), lr=2e-5)
    criterion = nn.CrossEntropyLoss()

    model.train()
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch[f"{model_name}_input_ids"].to(device)
        attention_mask = batch[f"{model_name}_attention_mask"].to(device)
        labels = batch["label"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    # Evaluate
    evaluate_model(model, val_loader, model_name)
 
from transformers import logging
logging.set_verbosity_error()


In [None]:
#2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from transformers import (
    BertTokenizer, BertModel,
    RobertaTokenizer, RobertaModel,
    DistilBertTokenizer, DistilBertModel,
    XLNetTokenizer, XLNetModel
)
from sklearn.model_selection import train_test_split

#  Load dataset
df = pd.read_csv("labels.csv")

#  Encode labels: 0 = not funny + not sarcastic, 1 = all other combinations
df['label'] = ((df['humour'] != 'not_funny') | (df['sarcasm'] != 'not_sarcastic')).astype(int)

# Drop unused columns
df = df[['text_corrected', 'label']].dropna()

#  Define tokenizers
tokenizers = {
    'bert': BertTokenizer.from_pretrained('bert-base-uncased'),
    'roberta': RobertaTokenizer.from_pretrained('roberta-base'),
    'distilbert': DistilBertTokenizer.from_pretrained('distilbert-base-uncased'),
    'xlnet': XLNetTokenizer.from_pretrained('xlnet-base-cased')
}

MAX_LEN = 80  # Reduced input size

# Tokenize function
def tokenize_text(text, tokenizer):
    return tokenizer(text, padding="max_length", truncation=True, max_length=MAX_LEN, return_tensors="pt")

# 5️⃣ Split dataset
train_texts, val_texts, train_labels, val_labels = train_test_split(
    df["text_corrected"], df["label"], test_size=0.2, random_state=42
)

#  Custom Dataset class
class SarcasmHumorDataset(Dataset):
    def __init__(self, texts, labels, tokenizers):
        self.texts = texts
        self.labels = labels.values
        self.tokenizers = tokenizers

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = self.texts.iloc[idx]
        label = self.labels[idx]

        encoding = {model: tokenize_text(text, tokenizer) for model, tokenizer in self.tokenizers.items()}

        return {
            'bert_input_ids': encoding['bert']['input_ids'].squeeze(0),
            'bert_attention_mask': encoding['bert']['attention_mask'].squeeze(0),
            'roberta_input_ids': encoding['roberta']['input_ids'].squeeze(0),
            'roberta_attention_mask': encoding['roberta']['attention_mask'].squeeze(0),
            'distilbert_input_ids': encoding['distilbert']['input_ids'].squeeze(0),
            'distilbert_attention_mask': encoding['distilbert']['attention_mask'].squeeze(0),
            'xlnet_input_ids': encoding['xlnet']['input_ids'].squeeze(0),
            'xlnet_attention_mask': encoding['xlnet']['attention_mask'].squeeze(0),
            'label': torch.tensor(label, dtype=torch.long),
        }

# Create dataset & dataloader
train_dataset = SarcasmHumorDataset(train_texts, train_labels, tokenizers)
val_dataset = SarcasmHumorDataset(val_texts, val_labels, tokenizers)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)  # Reduced batch size
val_loader = DataLoader(val_dataset, batch_size=4)

#  Define Optimized Ensemble Model
class OptimizedEnsembleClassifier(nn.Module):
    def __init__(self):
        super(OptimizedEnsembleClassifier, self).__init__()

        self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.roberta = RobertaModel.from_pretrained("roberta-base")
        self.distilbert = DistilBertModel.from_pretrained("distilbert-base-uncased")
        self.xlnet = XLNetModel.from_pretrained("xlnet-base-cased")

        # Freeze all model layers
        for model in [self.bert, self.roberta, self.distilbert, self.xlnet]:
            for param in model.parameters():
                param.requires_grad = False

        # Unfreeze last layer only
        for model in [self.bert, self.roberta, self.distilbert, self.xlnet]:
            for param in list(model.parameters())[-1:]:
                param.requires_grad = True

        # Fully connected layer
        self.fc = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(4 * 768, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 2)
        )

    def forward(self, bert_inputs, roberta_inputs, distilbert_inputs, xlnet_inputs):
        bert_cls = self.bert(**bert_inputs).last_hidden_state[:, 0, :]
        roberta_cls = self.roberta(**roberta_inputs).last_hidden_state[:, 0, :]
        distilbert_cls = self.distilbert(**distilbert_inputs).last_hidden_state[:, 0, :]
        xlnet_cls = self.xlnet(**xlnet_inputs).last_hidden_state[:, 0, :]

        combined_features = torch.cat((bert_cls, roberta_cls, distilbert_cls, xlnet_cls), dim=1)
        return self.fc(combined_features)

#  Training Setup
EPOCHS = 1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = OptimizedEnsembleClassifier().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=2e-5, weight_decay=0.01)

#  Training Loop (1 epoch)
model.train()
total_loss, correct = 0, 0

for batch in train_loader:
    optimizer.zero_grad()

    inputs = {
        "bert_inputs": {"input_ids": batch["bert_input_ids"].to(device), "attention_mask": batch["bert_attention_mask"].to(device)},
        "roberta_inputs": {"input_ids": batch["roberta_input_ids"].to(device), "attention_mask": batch["roberta_attention_mask"].to(device)},
        "distilbert_inputs": {"input_ids": batch["distilbert_input_ids"].to(device), "attention_mask": batch["distilbert_attention_mask"].to(device)},
        "xlnet_inputs": {"input_ids": batch["xlnet_input_ids"].to(device), "attention_mask": batch["xlnet_attention_mask"].to(device)}
    }

    labels = batch["label"].to(device)
    outputs = model(**inputs)
    loss = criterion(outputs, labels)

    loss.backward()
    optimizer.step()

    total_loss += loss.item()
    correct += (outputs.argmax(1) == labels).sum().item()

print(f"Training Loss: {total_loss/len(train_loader):.4f} - Training Accuracy: {correct/len(train_dataset):.4f}")

#  Save Model
torch.save(model.state_dict(), "optimized_ensemble_model.pth")
print("Model saved successfully!")

# 🔹 Evaluate on Validation Set
model.eval()
correct = 0

with torch.no_grad():
    for batch in val_loader:
        inputs = {
        "bert_inputs": {"input_ids": batch["bert_input_ids"].to(device), "attention_mask": batch["bert_attention_mask"].to(device)},
        "roberta_inputs": {"input_ids": batch["roberta_input_ids"].to(device), "attention_mask": batch["roberta_attention_mask"].to(device)},
        "distilbert_inputs": {"input_ids": batch["distilbert_input_ids"].to(device), "attention_mask": batch["distilbert_attention_mask"].to(device)},
        "xlnet_inputs": {"input_ids": batch["xlnet_input_ids"].to(device), "attention_mask": batch["xlnet_attention_mask"].to(device)}
    }

        labels = batch["label"].to(device)
        outputs = model(**inputs)
        correct += (outputs.argmax(1) == labels).sum().item()

print(f"Validation Accuracy: {correct/len(val_dataset):.4f}")


In [4]:
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True


In [None]:
#3
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, WeightedRandomSampler
from torchvision import transforms, models
from PIL import Image
import os
import pandas as pd
from sklearn.model_selection import train_test_split

#  1. Load Dataset
DATA_DIR = 'images'
LABELS_FILE = 'labels.csv'

# Load data
df = pd.read_csv(LABELS_FILE)

# Fix labeling issue
def get_label(row):
    if row['humour'] != 'not_funny':
        return 1  # Humor
    elif row['sarcasm'] != 'not_sarcastic':
        return 0  # Sarcasm
    else:
        return -1  # Ignore if neither

df['label'] = df.apply(get_label, axis=1)
df = df[df['label'] != -1]  # Remove ambiguous cases

# Split data
train_data, val_data = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])

#  2. Define Transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

#  3. Create Custom Dataset
class ImageDataset(Dataset):
    def __init__(self, data, data_dir, transform=None):
        self.data = data
        self.data_dir = data_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img_path = os.path.join(self.data_dir, row['image_name'])
        label = row['label']

        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label, dtype=torch.long)

#  4. Create Dataloaders with Class Balancing
train_dataset = ImageDataset(train_data, DATA_DIR, transform)
val_dataset = ImageDataset(val_data, DATA_DIR, transform)

# Handle class imbalance using WeightedRandomSampler
class_counts = train_data['label'].value_counts().values
class_weights = 1. / class_counts
samples_weights = [class_weights[label] for label in train_data['label']]
sampler = WeightedRandomSampler(samples_weights, num_samples=len(samples_weights), replacement=True)

train_loader = DataLoader(train_dataset, batch_size=16, sampler=sampler)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)

#  5. Define Ensemble Model
class ImageEnsembleModel(nn.Module):
    def __init__(self):
        super(ImageEnsembleModel, self).__init__()

        # Load pretrained models
        self.resnet = models.resnet50(pretrained=True)
        self.efficientnet = models.efficientnet_b0(pretrained=True)
        self.densenet = models.densenet121(pretrained=True)

        # Remove last layer and freeze base models
        for model in [self.resnet, self.efficientnet, self.densenet]:
            for param in model.parameters():
                param.requires_grad = False

        self.resnet.fc = nn.Identity()
        self.efficientnet.classifier[1] = nn.Identity()
        self.densenet.classifier = nn.Identity()

        # Combined output size = 2048 (ResNet) + 1280 (EfficientNet) + 1024 (DenseNet)
        self.fc = nn.Sequential(
            nn.Linear(2048 + 1280 + 1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5),  # Increased dropout to reduce overfitting
            nn.Linear(512, 2)  # 2 classes: sarcasm, humor
        )

    def forward(self, x):
        with torch.no_grad():
            resnet_out = self.resnet(x)
            efficientnet_out = self.efficientnet(x)
            densenet_out = self.densenet(x)

        combined = torch.cat((resnet_out, efficientnet_out, densenet_out), dim=1)
        return self.fc(combined)

#  6. Initialize Model and Setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImageEnsembleModel().to(device)

# Improved loss function with label smoothing
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.AdamW(model.parameters(), lr=2e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.5, patience=2)

#  7. Train Model
EPOCHS = 2
for epoch in range(EPOCHS):
    model.train()
    total_loss, correct = 0, 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(images)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (outputs.argmax(1) == labels).sum().item()

    scheduler.step(total_loss / len(train_loader))
    accuracy = correct / len(train_dataset)
    print(f"Epoch {epoch+1}/{EPOCHS} - Loss: {total_loss/len(train_loader):.4f} - Accuracy: {accuracy:.4f}")

#  8. Save Model
torch.save(model.state_dict(), "ensemble_image_model.pth")
print("Model saved successfully!")

#  9. Evaluate on Validation Set
model.eval()
correct = 0
with torch.no_grad():
    for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        correct += (outputs.argmax(1) == labels).sum().item()

val_accuracy = correct / len(val_dataset)
print(f"Validation Accuracy: {val_accuracy:.4f}")

# 10. Prediction Function
def predict_image(image_path, model):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0).to(device)

    model.eval()
    with torch.no_grad():
        output = model(image)
        prob = torch.softmax(output, dim=1)[0]

        # Adjust threshold to avoid humor bias
        if prob[1] > 0.6:
            return "Humor"
        else:
            return "Sarcasm"

# 11. Test Prediction
test_image_path = "test_image.jpg"
result = predict_image(test_image_path, model)
print(f"Prediction: {result}")


In [None]:
#4
import torch
from transformers import (
    BertTokenizer, BertModel,
    RobertaTokenizer, RobertaModel,
    DistilBertTokenizer, DistilBertModel,
    XLNetTokenizer, XLNetModel
)
import pandas as pd
from tqdm import tqdm
import os

# Load data
df = pd.read_csv("labels.csv")
df = df[['image_name', 'text_corrected']].dropna()

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create output folder
os.makedirs("text_features", exist_ok=True)

# Load models & tokenizers
models = {
    'bert': (BertTokenizer.from_pretrained('bert-base-uncased'), BertModel.from_pretrained('bert-base-uncased')),
    'roberta': (RobertaTokenizer.from_pretrained('roberta-base'), RobertaModel.from_pretrained('roberta-base')),
    'distilbert': (DistilBertTokenizer.from_pretrained('distilbert-base-uncased'), DistilBertModel.from_pretrained('distilbert-base-uncased')),
    'xlnet': (XLNetTokenizer.from_pretrained('xlnet-base-cased'), XLNetModel.from_pretrained('xlnet-base-cased')),
}

# Move models to device and set to eval
for _, (_, model) in models.items():
    model.to(device)
    model.eval()

MAX_LEN = 80

@torch.no_grad()
def extract_embeddings(text):
    embeddings = {}
    for name, (tokenizer, model) in models.items():
        tokens = tokenizer(text, return_tensors='pt', padding='max_length', truncation=True, max_length=MAX_LEN)
        tokens = {k: v.to(device) for k, v in tokens.items()}
        output = model(**tokens)

        if name == 'xlnet':
            cls = output.last_hidden_state[:, -1, :]  # XLNet uses last token
        else:
            cls = output.last_hidden_state[:, 0, :]  # Others use [CLS] token

        embeddings[name] = cls.squeeze(0).cpu()
    return embeddings

# Process all entries
for i, row in tqdm(df.iterrows(), total=len(df)):
    text = row['text_corrected']
    image_name = row['image_name']
    features = extract_embeddings(text)

    save_path = os.path.join("text_features", f"{image_name}.pt")
    torch.save(features, save_path)


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
100%|██████████| 6987/6987 [1:06:56<00:00,  1.74it/s]


In [None]:
#5
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image
import os
from tqdm import tqdm
import pandas as pd

# Load dataframe with image names
df = pd.read_csv("labels.csv")
image_folder = "images"
os.makedirs("image_features", exist_ok=True)

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Image transform
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

# Load and modify models
resnet = models.resnet50(pretrained=True)
resnet.fc = nn.Identity()  # Removing the final classification layer

efficientnet = models.efficientnet_b0(pretrained=True)
efficientnet.classifier[1] = nn.Identity()  # Removing the final classification layer

densenet = models.densenet121(pretrained=True)
densenet.classifier = nn.Identity()  # Removing the final classification layer

# Move to device and eval mode
for model in [resnet, efficientnet, densenet]:
    model.to(device)
    model.eval()

@torch.no_grad()
def extract_image_features(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image).unsqueeze(0).to(device)

    # Extract features from each model
    features = {
        'resnet': resnet(image_tensor).squeeze(0).cpu(),
        'efficientnet': efficientnet(image_tensor).squeeze(0).cpu(),
        'densenet': densenet(image_tensor).squeeze(0).cpu()
    }
    return features

# Process images and save features
for image_name in tqdm(df['image_name'].unique()):
    image_path = os.path.join(image_folder, image_name)
    try:
        features = extract_image_features(image_path)
        # Save the extracted features for each model in separate files
        torch.save(features, os.path.join("image_features", f"{image_name}.pt"))
    except Exception as e:
        print(f"Failed to process {image_name}: {e}")


100%|██████████| 6992/6992 [07:06<00:00, 16.39it/s]


In [None]:
#6
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import os
from sklearn.model_selection import train_test_split

# Dataset paths
TEXT_FEAT_DIR = "text_features"
IMAGE_FEAT_DIR = "image_features"
df = pd.read_csv("labels.csv")

# Filter valid rows
df = df[df['image_name'].apply(lambda x: os.path.exists(os.path.join(TEXT_FEAT_DIR, f"{x}.pt")) and
                                             os.path.exists(os.path.join(IMAGE_FEAT_DIR, f"{x}.pt")))]
df['label'] = ((df['humour'] != 'not_funny') | (df['sarcasm'] != 'not_sarcastic')).astype(int)

# Split
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])

# Fusion Dataset
class FusionDataset(Dataset):
    def __init__(self, dataframe):
        self.df = dataframe

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_id = row['image_name']
        label = row['label']

        # Load text features
        text_feat = torch.load(os.path.join(TEXT_FEAT_DIR, f"{image_id}.pt"))
        text_emb = torch.cat([
            text_feat['bert'],
            text_feat['roberta'],
            text_feat['distilbert'],
            text_feat['xlnet']
        ], dim=0)  # Shape: (4×768,)

        # Load image features
        img_feat = torch.load(os.path.join(IMAGE_FEAT_DIR, f"{image_id}.pt"))
        img_emb = torch.cat([
            img_feat['resnet'],
            img_feat['efficientnet'],
            img_feat['densenet']
        ], dim=0)  # Shape: (2048+1280+1024,)

        fused = torch.cat([text_emb, img_emb], dim=0)
        return fused, torch.tensor(label, dtype=torch.long)

# Dataloaders
train_dataset = FusionDataset(train_df)
val_dataset = FusionDataset(val_df)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# Late Fusion MLP
class LateFusionClassifier(nn.Module):
    def __init__(self, input_dim=7424):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, 2)
        )

    def forward(self, x):
        return self.net(x)

# Train setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LateFusionClassifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

# Training loop
EPOCHS = 5
for epoch in range(EPOCHS):
    model.train()
    total_loss, correct = 0, 0

    for feats, labels in train_loader:
        feats, labels = feats.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(feats)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (outputs.argmax(1) == labels).sum().item()

    acc = correct / len(train_dataset)
    print(f"Epoch {epoch+1}/{EPOCHS} - Loss: {total_loss:.4f} - Train Accuracy: {acc:.4f}")

# Evaluation
model.eval()
correct = 0
with torch.no_grad():
    for feats, labels in val_loader:
        feats, labels = feats.to(device), labels.to(device)
        preds = model(feats)
        correct += (preds.argmax(1) == labels).sum().item()

print(f"Validation Accuracy: {correct / len(val_dataset):.4f}") 

# Save model
torch.save(model.state_dict(), "late_fusion_model.pth")
print("Fusion model saved.")


Epoch 1/5 - Loss: 56.7415 - Train Accuracy: 0.9077
Epoch 2/5 - Loss: 54.1012 - Train Accuracy: 0.9095
Epoch 3/5 - Loss: 53.4660 - Train Accuracy: 0.9095
Epoch 4/5 - Loss: 52.9431 - Train Accuracy: 0.9095
Epoch 5/5 - Loss: 51.4623 - Train Accuracy: 0.9095
Validation Accuracy: 0.9099
Fusion model saved.


In [None]:
#7
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader

# Dataset paths
TEXT_FEAT_DIR = "text_features"
IMAGE_FEAT_DIR = "image_features"
df = pd.read_csv("labels.csv")

# Filter valid rows
df = df[df['image_name'].apply(lambda x: os.path.exists(os.path.join(TEXT_FEAT_DIR, f"{x}.pt")) and
                                             os.path.exists(os.path.join(IMAGE_FEAT_DIR, f"{x}.pt")))]
df['label'] = ((df['humour'] != 'not_funny') | (df['sarcasm'] != 'not_sarcastic')).astype(int)

# Split
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['label'])

# Fusion Dataset
class FusionDataset(Dataset):
    def __init__(self, dataframe):
        self.df = dataframe

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_id = row['image_name']
        label = row['label']

        # Load text features
        text_feat = torch.load(os.path.join(TEXT_FEAT_DIR, f"{image_id}.pt"))
        text_emb = torch.cat([
            text_feat['bert'],
            text_feat['roberta'],
            text_feat['distilbert'],
            text_feat['xlnet']
        ], dim=0)  # Shape: (4×768,)

        # Load image features
        img_feat = torch.load(os.path.join(IMAGE_FEAT_DIR, f"{image_id}.pt"))
        img_emb = torch.cat([
            img_feat['resnet'],
            img_feat['efficientnet'],
            img_feat['densenet']
        ], dim=0)  # Shape: (2048+1280+1024,)

        fused = torch.cat([text_emb, img_emb], dim=0)
        return fused, torch.tensor(label, dtype=torch.long)

# Dataloaders
train_dataset = FusionDataset(train_df)
val_dataset = FusionDataset(val_df)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# Late Fusion MLP
class LateFusionClassifier(nn.Module):
    def __init__(self, input_dim=7424):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 512),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(128, 2)
        )

    def forward(self, x):
        return self.net(x)

# Train setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LateFusionClassifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

# Training loop
EPOCHS = 5
for epoch in range(EPOCHS):
    model.train()
    total_loss, correct = 0, 0

    for feats, labels in train_loader:
        feats, labels = feats.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(feats)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (outputs.argmax(1) == labels).sum().item()

    acc = correct / len(train_dataset)
    print(f"Epoch {epoch+1}/{EPOCHS} - Loss: {total_loss:.4f} - Train Accuracy: {acc:.4f}")

# Evaluation
model.eval()
correct = 0
with torch.no_grad():
    for feats, labels in val_loader:
        feats, labels = feats.to(device), labels.to(device)
        preds = model(feats)
        correct += (preds.argmax(1) == labels).sum().item()

print(f"Validation Accuracy: {correct / len(val_dataset):.4f}")

# Save model
torch.save(model.state_dict(), "late_fusion_model.pth")
print("Fusion model saved.")

# Function to perform late fusion prediction
def late_fusion_predict(image_input=None, text_input=None):
    image_features = None
    text_features = None

    if image_input is not None:
        # Extract image features dynamically from the uploaded image
        image_features = extract_image_features(image_input)  # You need this function to extract features

        # For text, use a dummy empty tensor if no text is provided
        text_features = torch.zeros(3072)  # Set to correct size for text features (3072 is 4 * 768)

    elif text_input:
        # Assuming `TEXT_FEAT_DIR` path and loading text features dynamically
        text_feat = torch.load(os.path.join("text_features", f"{text_input}.pt"))
        text_emb = torch.cat([
            text_feat['bert'],
            text_feat['roberta'],
            text_feat['distilbert'],
            text_feat['xlnet']
        ], dim=0)
        text_features = text_emb

        # For image, use a dummy empty tensor if no image is provided
        image_features = torch.zeros(4352)  # Set to correct size for image features (2048+1280+1024)

    # Combine image and text features
    if image_features is not None and text_features is not None:
        # Concatenate the image and text features
        fused = torch.cat([text_features, image_features], dim=0).unsqueeze(0).to(device)

        # Ensure the size is correct for the model input
        assert fused.shape[1] == 7424, f"Expected feature size of 7424, got {fused.shape[1]}"

        # Predict with the model
        with torch.no_grad():
            outputs = model(fused)
            probs = F.softmax(outputs, dim=1)
            predicted_class = torch.argmax(probs, dim=1).item()
            return predicted_class, probs
    else:
        return None, None  # If neither image nor text is provided

# Function to extract image features (you'll need to define this based on your use case)
def extract_image_features(image_input):
    # Placeholder function to extract image features
    # You'll need to pass the image through the relevant models like ResNet or EfficientNet
    # and return a tensor of shape (4352,) (2048+1280+1024)
    pass


Epoch 1/5 - Loss: 58.1020 - Train Accuracy: 0.9045
Epoch 2/5 - Loss: 54.6912 - Train Accuracy: 0.9095
Epoch 3/5 - Loss: 54.2749 - Train Accuracy: 0.9095
Epoch 4/5 - Loss: 53.5282 - Train Accuracy: 0.9095
Epoch 5/5 - Loss: 51.9506 - Train Accuracy: 0.9095
Validation Accuracy: 0.9099
Fusion model saved.
