In [None]:
#importing libraries
import warnings
warnings.filterwarnings("ignore")
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader, Dataset

from transformers import (
    AutoTokenizer,
    AutoModel,
    AutoModelForSequenceClassification,
    AutoModelForImageClassification,
    ViTModel, 
    BertModel, 
    ViTFeatureExtractor, 
    BertTokenizer
)
from dataset import MemeDataset
from sklearn.model_selection import train_test_split
from torchvision import transforms
from PIL import Image, UnidentifiedImageError
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, mean_squared_error
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import os
import torch.nn.functional as F
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error
from tqdm import tqdm  

In [None]:
#dataset loading
df=pd.read_csv('/Users/aryamannsrivastava/Desktop/CS/Sentiment Analysis/Assignment-3/Multimodal_dataset_assignment3/labels.csv')
image_folder='/Users/aryamannsrivastava/Desktop/CS/Sentiment Analysis/Assignment-3/Multimodal_dataset_assignment3/images'
df["image_name"] = image_folder + df["image_name"]

In [None]:
#defining each mapping
sentiment_mapping = {
    "very_positive": 2, "positive": 2,
    "very_negative": 0, "negative": 0,
    "neutral":1
}
df["H"]=0
H = {
    "hilarious": 1,
    "very_funny": 0,
    "funny": 1,
    "not_funny": 0
}
df["H"] = df["humour"].map(H).fillna(0).astype(int)
df["S"]=0
S = {
    "very_twisted": 1,
    "twisted_meaning": 1,
    "general": 1,
    "not_sarcastic": 0
}
df["S"] = df["sarcasm"].map(S).fillna(0).astype(int)
df["O"]=0
O = {
    "hateful_offensive": 1,
    "very_offensive": 1,
    "slight": 1,
    "not_offensive": 0
    
}
df["O"] = df["offensive"].map(O).fillna(0).astype(int)

df["overall_sentiment"] = df["overall_sentiment"].map(sentiment_mapping).fillna(0).astype(int)
offensive_mapping = {
    "hateful_offensive": 3,
    "very_offensive": 2,
    "slight": 1,
    "not_offensive": 0
}
df["offensive"] = df["offensive"].map(offensive_mapping).fillna(0).astype(int)
sarcasm_mapping = {
    "very_twisted": 3,
    "twisted_meaning": 2,
    "general": 1,
    "not_sarcastic": 0
}
df["sarcasm"] = df["sarcasm"].map(sarcasm_mapping).fillna(0).astype(int)

humour_mapping = {
    "hilarious": 3,
    "very_funny": 2,
    "funny": 1,
    "not_funny": 0
}
df["humour"] = df["humour"].map(humour_mapping).fillna(0).astype(int)

df = df.reset_index(drop=True)

In [None]:
#CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
class MemeDataset(Dataset):
    def __init__(self, df, img_dir, transform=None, tokenizer=None):
        self.data = df
        self.img_dir = img_dir
        self.transform = transform
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.data.iloc[idx]['image_name'])
        try:
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            image = torch.zeros((3, 224, 224))  # Placeholder for failed images

        text = str(self.data.iloc[idx]['text_corrected'])
        text_encoded = self.tokenizer(text, padding='max_length', truncation=True, max_length=128, return_tensors='pt')

        sentiment = self.data.iloc[idx]['overall_sentiment']
        humor = self.data.iloc[idx]['H']
        sarcasm = self.data.iloc[idx]['S']
        offense = self.data.iloc[idx]['O']
        funny_level = self.data.iloc[idx]['humour']
        sarcastic_level = self.data.iloc[idx]['sarcasm']
        offensive_level = self.data.iloc[idx]['offensive']

        # One-hot encoding for binary classification tasks
        humor_onehot = F.one_hot(torch.tensor(humor), num_classes=2).float()
        sarcasm_onehot = F.one_hot(torch.tensor(sarcasm), num_classes=2).float()
        offense_onehot = F.one_hot(torch.tensor(offense), num_classes=2).float()

        return {
            'image': image,
            'text': text_encoded,
            'sentiment': torch.tensor(sentiment, dtype=torch.long),
            'humor': humor_onehot,
            'sarcasm': sarcasm_onehot,
            'offense': offense_onehot,
            'funny_level': torch.tensor(funny_level, dtype=torch.float),
            'sarcastic_level': torch.tensor(sarcastic_level, dtype=torch.float),
            'offensive_level': torch.tensor(offensive_level, dtype=torch.float)
        }


vit_feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224')
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

img_dir=image_folder


train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, shuffle=True)

train_dataset = MemeDataset(train_df, img_dir, transform=transform, tokenizer=bert_tokenizer)
test_dataset = MemeDataset(test_df, img_dir, transform=transform, tokenizer=bert_tokenizer)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

In [None]:
class MemeAnalysisModel(nn.Module):
    def __init__(self, num_classes_sentiment=3, num_classes_binary=2):
        super(MemeAnalysisModel, self).__init__()
        self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224')
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        
        combined_dim = self.vit.config.hidden_size + self.bert.config.hidden_size
        
        self.sentiment_classifier = nn.Linear(combined_dim, num_classes_sentiment)
        self.humor_classifier = nn.Linear(combined_dim, num_classes_binary)
        self.sarcasm_classifier = nn.Linear(combined_dim, num_classes_binary)
        self.offense_classifier = nn.Linear(combined_dim, num_classes_binary)
        
        self.funny_regressor = nn.Linear(combined_dim, 1)
        self.sarcastic_regressor = nn.Linear(combined_dim, 1)
        self.offensive_regressor = nn.Linear(combined_dim, 1)

    def forward(self, image_input, text_input):
        vit_output = self.vit(image_input).last_hidden_state[:, 0, :]  # [CLS] token
        bert_output = self.bert(**text_input).last_hidden_state[:, 0, :]  # [CLS] token
        
        combined_features = torch.cat((vit_output, bert_output), dim=1)
        
        sentiment_output = self.sentiment_classifier(combined_features)
        humor_output = F.one_hot(self.humor_classifier(combined_features).argmax(dim=1), num_classes=2).float()
        sarcasm_output = F.one_hot(self.sarcasm_classifier(combined_features).argmax(dim=1), num_classes=2).float()
        offense_output = F.one_hot(self.offense_classifier(combined_features).argmax(dim=1), num_classes=2).float()
        
        funny_level = self.funny_regressor(combined_features)
        sarcastic_level = self.sarcastic_regressor(combined_features)
        offensive_level = self.offensive_regressor(combined_features)
        
        return sentiment_output, humor_output, sarcasm_output, offense_output, funny_level, sarcastic_level, offensive_level

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = MemeAnalysisModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

#Add ReduceLROnPlateau scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)

#Loss functions
ce_loss = nn.CrossEntropyLoss()
bce_loss = nn.BCEWithLogitsLoss()
mse_loss = nn.MSELoss()

num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
    for batch in progress_bar:
        optimizer.zero_grad()
        
        images = batch['image'].to(device)
        text_input = {k: v.squeeze(1).to(device) for k, v in batch['text'].items()}
        
        sentiment_output, humor_output, sarcasm_output, offense_output, funny_level, sarcastic_level, offensive_level = model(images, text_input)
        
        loss_sentiment = ce_loss(sentiment_output, batch['sentiment'].to(device))
        loss_humor = bce_loss(humor_output, batch['humor'].to(device))
        loss_sarcasm = bce_loss(sarcasm_output, batch['sarcasm'].to(device))
        loss_offense = bce_loss(offense_output, batch['offense'].to(device))
        
        loss_funny = mse_loss(funny_level.squeeze(), batch['funny_level'].to(device))
        loss_sarcastic = mse_loss(sarcastic_level.squeeze(), batch['sarcastic_level'].to(device))
        loss_offensive = mse_loss(offensive_level.squeeze(), batch['offensive_level'].to(device))
        
        total_loss = loss_sentiment + loss_humor + loss_sarcasm + loss_offense + loss_funny + loss_sarcastic + loss_offensive
        
        total_loss.backward()
        optimizer.step()
        
        progress_bar.set_postfix({'loss': total_loss.item()})
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss.item()}")
    
    #Step the scheduler
    scheduler.step(total_loss)

In [None]:
#Ensure model is in evaluation mode
model.eval()

#Define task categories
tasks = ['sentiment', 'humor', 'sarcasm', 'offense', 'funny_level', 'sarcastic_level', 'offensive_level']
classification_tasks = ['sentiment', 'humor', 'sarcasm', 'offense']
regression_tasks = ['funny_level', 'sarcastic_level', 'offensive_level']

#Initialize dictionaries for predictions and labels
all_preds = {task: [] for task in tasks}
all_labels = {task: [] for task in tasks}

with torch.no_grad():
    progress_bar = tqdm(test_loader, desc="Evaluating")
    
    for batch in test_loader:
        images = batch['image'].to(device)
        text_input = {k: v.squeeze(1).to(device) for k, v in batch['text'].items()}
        
        #Forward pass
        outputs = model(images, text_input)
        
        #Process each task
        for i, task in enumerate(tasks):
            preds = outputs[i].cpu().numpy()
            
            if task in classification_tasks:
                if task == 'sentiment':  #Multi-class classification
                    preds = np.argmax(preds, axis=1)
                else:  #Binary classification(One-hot encoded labels)
                    preds = (preds > 0.5).astype(int)  #Convert logits to binary values

            else:  # Regression tasks
                preds = preds.squeeze().astype(float)

            all_preds[task].extend(preds)
            all_labels[task].extend(batch[task].cpu().numpy())

#Convert lists to numpy arrays
for task in tasks:
    all_preds[task] = np.array(all_preds[task])
    all_labels[task] = np.array(all_labels[task])

#Compute Macro F1 scores for classification tasks
for task in classification_tasks:
    y_true = all_labels[task]
    y_pred = all_preds[task]
    
    if task == 'sentiment':  # Multi-class classification
        macro_f1 = f1_score(y_true, y_pred, average='macro')
    else:  # Binary classification
        macro_f1 = f1_score(y_true, y_pred, average='macro')

    print(f"Macro F1 score for {task}: {macro_f1:.4f}")

#Compute Macro F1 scores for regression tasks (Rounded)
for task in regression_tasks:
    y_true = all_labels[task]
    y_pred = np.round(all_preds[task])  # Round off predictions

    # Ensure values stay within valid range
    y_pred = np.clip(y_pred, y_true.min(), y_true.max())

    macro_f1 = f1_score(y_true, y_pred, average='macro')
    print(f"Rounded Macro F1 score for {task}: {macro_f1:.4f}")

#Compute MSE for regression tasks
for task in regression_tasks:
    mse = mean_squared_error(all_labels[task], all_preds[task])
    print(f"MSE for {task}: {mse:.4f}")