In [None]:
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from transformers import BertTokenizer, BertModel, AdamW
import torchvision.models as models
from torch import nn
import time
from tqdm import tqdm

# Paths to the CSV files and image directories
csv_paths = {
    'train': '/kaggle/input/intent/Intent/train.csv',
    'test': '/kaggle/input/intent/Intent/validation.csv',
    'validation': '/kaggle/input/intent/Intent/validation.csv'
}

image_dirs = {
    'train': '/kaggle/input/intent/Intent/train',
    'test': '/kaggle/input/intent/Intent/validation',
    'validation': '/kaggle/input/intent/Intent/validation'
}
output_dir = '/kaggle/working/'  # Output directory to save the CSV files

# Function to check for matching Meme_ID and image files, and add image paths
def check_matches(csv_path, image_dir):
    df = pd.read_csv(csv_path)
    image_files = os.listdir(image_dir)
    image_names = {os.path.splitext(image_file)[0]: os.path.join(image_dir, image_file) for image_file in image_files}
    
    # Add Image_Path column to the dataframe
    df['Image_Path'] = df['Image_ID'].apply(lambda x: image_names.get(x, None))
    
    # Filter rows where Image_Path is not None (i.e., matched Meme_IDs)
    matched_df = df[df['Image_Path'].notna()]
    
    return matched_df

# Function to encode Intent_Taxonomy classes into labels
def encode_labels(df):
    label_encoder = LabelEncoder()
    df['Intent_Taxonomy_Labels'] = label_encoder.fit_transform(df['Intent_Taxonomy'])
    return df, label_encoder.classes_

# Check matches for each set (Train, Test, Validation)
for key in csv_paths:
    matched_df = check_matches(csv_paths[key], image_dirs[key])
    
    # Encode Intent_Taxonomy labels
    matched_df, classes = encode_labels(matched_df)
    
    matches_output_path = os.path.join(output_dir, f'{key}_matches.csv')
    
    # Save the processed dataframe to CSV
    matched_df.to_csv(matches_output_path, index=False)
    
    print(f"{key} set:")
    print(f"Matched Meme_IDs with image paths and labels saved to {matches_output_path}")
    print(f"Classes and their corresponding labels:\n{dict(zip(classes, range(len(classes))))}\n")

In [None]:
train_df = pd.read_csv('/kaggle/working/train_matches.csv')
train_df.head(10)

In [None]:
test_df = pd.read_csv('/kaggle/working/test_matches.csv')
test_df.head(10)

In [None]:
validation_df = pd.read_csv('/kaggle/working/validation_matches.csv')
validation_df.head(10)

In [40]:
# Define your transformations using transforms.Compose
transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),  # Crop the center to 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

class MyMultimodalDataset(Dataset):
    def __init__(self, image_paths, image_captions, intent_taxonomy_labels, transform=None):
        self.image_paths = image_paths
        self.image_captions = image_captions
        self.intent_taxonomy = intent_taxonomy_labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        text = self.image_captions[idx]
        label = self.intent_taxonomy[idx]

        # Load and preprocess image
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        return image, text, label

In [54]:
# Assuming you have lists or arrays of image paths, captions, and encoded labels:
train_dataset = MyMultimodalDataset(train_df['Image_Path'], train_df['Image_Caption'], train_df['Intent_Taxonomy_Labels'], transform=transform)
val_dataset = MyMultimodalDataset(validation_df['Image_Path'], validation_df['Image_Caption'], validation_df['Intent_Taxonomy_Labels'], transform=transform)
test_dataset = MyMultimodalDataset(test_df['Image_Path'], test_df['Image_Caption'], test_df['Intent_Taxonomy_Labels'], transform=transform)

# Define data loaders
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [112]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import densenet121  # Import inception_v3
from transformers import BertModel, BertTokenizer
from tqdm import tqdm
import torchvision.models as models
import time

# Initialize densenet121_model with IMAGENET1K_V1 weights
densenet121 = models.densenet121(weights='IMAGENET1K_V1', progress=True)
densenet121 = torch.nn.Sequential(*(list(densenet121.children())[:-1]))  # Remove the classification layer

In [114]:
from transformers import BertTokenizer, BertModel,AdamW
# Initialize BERT tokenizer and model
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
bert_model = BertModel.from_pretrained("bert-base-multilingual-cased")

In [None]:
# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

In [None]:
densenet121.to(device)

In [None]:
bert_model.to(device)

In [118]:
import torch
import time
from torch.optim import AdamW
from torchvision import transforms
from PIL import Image
from tqdm import tqdm

In [119]:
# Define optimizer and loss function
optimizer = AdamW(list(densenet121.parameters()) + list(bert_model.parameters()), lr=2e-5)
criterion = torch.nn.CrossEntropyLoss()

In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam
from tqdm import tqdm
import time

# Set models to evaluation mode
densenet121.eval()
bert_model.eval()

num_epochs = 1
num_classes = 6
max_seq_length = 100  # Set your desired maximum sequence length

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

start_time = time.time()

# Training loop
for epoch in range(num_epochs):
    running_train_loss = 0.0
    correct_train = 0
    total_train = 0

    for images, texts, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False):
        # Move tensors to the device
        images = images.to(device)
        labels = labels.to(device)

        # Convert texts to tensors and pad to a fixed sequence length
        texts = [bert_tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_length, return_tensors='pt') for text in texts]
        input_ids = torch.stack([text['input_ids'].squeeze(0) for text in texts], dim=0).to(device)
        attention_mask = torch.stack([text['attention_mask'].squeeze(0) for text in texts], dim=0).to(device)

        optimizer.zero_grad()

        img_feats = densenet121(images)
        img_feats = img_feats.squeeze()

        outputs = bert_model(input_ids, attention_mask=attention_mask)
        text_feats = outputs.last_hidden_state[:, 0, :]

        img_feats_reshaped = img_feats.view(img_feats.size(0), -1)  # Reshape img_feats

        # Separate classifiers for image and text features
        img_classifier = torch.nn.Sequential(
            torch.nn.Linear(img_feats_reshaped.shape[1], 512).to(device),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, num_classes).to(device),
        )

        text_classifier = torch.nn.Sequential(
            torch.nn.Linear(text_feats.shape[1], 512).to(device),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, num_classes).to(device),
        )

        # Get predictions for image and text modalities separately
        img_logits = img_classifier(img_feats_reshaped)
        text_logits = text_classifier(text_feats)

        # Combine predictions using a fusion technique (e.g., simple averaging)
        combined_logits = 0.5 * (img_logits + text_logits)  # Simple averaging

        # Ensure labels have the correct shape and type
        labels = labels.view(-1)  # Flatten labels to match batch size
        labels = labels.to(torch.long)  # Ensure labels are of type torch.long

        # Check if labels are empty
        if labels.numel() == 0:
            print(f"Skipping empty labels batch")
            continue

        # Adjust the shape of combined_logits to match the batch size of labels
        combined_logits = combined_logits.view(labels.size(0), -1)

        # Calculate loss
        loss = criterion(combined_logits, labels)

        # Backpropagation
        loss.backward()
        optimizer.step()

        running_train_loss += loss.item()

        # Calculate accuracy
        _, predicted = combined_logits.max(1)
        total_train += labels.size(0)
        correct_train += predicted.eq(labels).sum().item()

    epoch_train_loss = running_train_loss / len(train_loader)
    epoch_train_accuracy = correct_train / total_train

    train_losses.append(epoch_train_loss)
    train_accuracies.append(epoch_train_accuracy)

    # Validation loop
    running_val_loss = 0.0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for val_images, val_texts, val_labels in val_loader:
            val_images = val_images.to(device)
            val_labels = val_labels.to(device)

            val_texts = [bert_tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_length, return_tensors='pt') for text in val_texts]
            val_input_ids = torch.stack([text['input_ids'].squeeze(0) for text in val_texts], dim=0).to(device)
            val_attention_mask = torch.stack([text['attention_mask'].squeeze(0) for text in val_texts], dim=0).to(device)

            val_img_feats = densenet121(val_images)
            val_img_feats = val_img_feats.squeeze()

            val_outputs = bert_model(val_input_ids, attention_mask=val_attention_mask)
            val_text_feats = val_outputs.last_hidden_state[:, 0, :]

            val_img_feats_reshaped = val_img_feats.view(val_img_feats.size(0), -1)  # Reshape val_img_feats

            # Separate classifiers for image and text features
            val_img_classifier = torch.nn.Sequential(
                torch.nn.Linear(val_img_feats_reshaped.shape[1], 512).to(device),
                torch.nn.ReLU(),
                torch.nn.Dropout(0.5),
                torch.nn.Linear(512, num_classes).to(device),
            )

            val_text_classifier = torch.nn.Sequential(
                torch.nn.Linear(val_text_feats.shape[1], 512).to(device),
                torch.nn.ReLU(),
                torch.nn.Dropout(0.5),
                torch.nn.Linear(512, num_classes).to(device),
            )

            # Get predictions for image and text modalities separately
            val_img_logits = val_img_classifier(val_img_feats_reshaped)
            val_text_logits = val_text_classifier(val_text_feats)

            # Combine predictions using a fusion technique (e.g., simple averaging)
            val_combined_logits = 0.5 * (val_img_logits + val_text_logits)  # Simple averaging

            # Ensure val_labels have the correct shape and type
            val_labels = val_labels.view(-1)  # Flatten val_labels to match batch size
            val_labels = val_labels.to(torch.long)  # Ensure val_labels are of type torch.long


            # Check if val_labels are empty
            if val_labels.numel() == 0:
                print(f"Skipping empty validation labels batch")
                continue

            # Adjust the shape of val_combined_logits to match the batch size of val_labels
            val_combined_logits = val_combined_logits.view(val_labels.size(0), -1)

            # Calculate validation loss
            val_loss = criterion(val_combined_logits, val_labels)

            running_val_loss += val_loss.item()

            # Calculate validation accuracy
            _, val_predicted = val_combined_logits.max(1)
            total_val += val_labels.size(0)
            correct_val += val_predicted.eq(val_labels).sum().item()

    epoch_val_loss = running_val_loss / len(val_loader)
    epoch_val_accuracy = correct_val / total_val

    val_losses.append(epoch_val_loss)
    val_accuracies.append(epoch_val_accuracy)

    print(f"Epoch [{epoch + 1}/{num_epochs}] - "
          f"Train Loss: {epoch_train_loss:.4f}, Train Acc: {epoch_train_accuracy:.4f}, "
          f"Val Loss: {epoch_val_loss:.4f}, Val Acc: {epoch_val_accuracy:.4f}")

end_time = time.time()
execution_time = end_time - start_time
print(f"Total execution time: {execution_time:.2f} seconds")


In [None]:
test_losses = []
test_accuracies = []
predicted_labels = []
true_labels = []

# Test loop
with torch.no_grad():
    for test_images, test_texts, test_labels in test_loader:
        test_images = test_images.to(device)
        test_labels = test_labels.to(device)

        test_texts = [bert_tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_length, return_tensors='pt') for text in test_texts]
        test_input_ids = torch.stack([text['input_ids'].squeeze(0) for text in test_texts], dim=0).to(device)
        test_attention_mask = torch.stack([text['attention_mask'].squeeze(0) for text in test_texts], dim=0).to(device)

        test_img_feats = densenet121(test_images)
        test_img_feats = test_img_feats.squeeze()

        test_outputs = bert_model(test_input_ids, attention_mask=test_attention_mask)
        test_text_feats = test_outputs.last_hidden_state[:, 0, :]

        test_img_feats_reshaped = test_img_feats.view(test_img_feats.size(0), -1)  # Reshape test_img_feats

        # Separate classifiers for image and text features
        test_img_classifier = torch.nn.Sequential(
            torch.nn.Linear(test_img_feats_reshaped.shape[1], 512).to(device),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, num_classes).to(device),
        )

        test_text_classifier = torch.nn.Sequential(
            torch.nn.Linear(test_text_feats.shape[1], 512).to(device),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, num_classes).to(device),
        )

        # Get predictions for image and text modalities separately
        test_img_logits = test_img_classifier(test_img_feats_reshaped)
        test_text_logits = test_text_classifier(test_text_feats)

        # Combine predictions using a fusion technique (e.g., simple averaging)
        test_combined_logits = 0.5 * (test_img_logits + test_text_logits)  # Simple averaging

        # Ensure test_labels have the correct shape and type
        test_labels = test_labels.view(-1)  # Flatten test_labels to match batch size
        test_labels = test_labels.to(torch.long)  # Ensure test_labels are of type torch.long

        # Check if test_labels are empty
        if test_labels.numel() == 0:
            print(f"Skipping empty test labels batch")
            continue

        # Adjust the shape of test_combined_logits to match the batch size of test_labels
        test_combined_logits = test_combined_logits.view(test_labels.size(0), -1)

        # Calculate test loss
        test_loss = criterion(test_combined_logits, test_labels)
        test_losses.append(test_loss.item())

        # Calculate test accuracy
        _, test_predicted = test_combined_logits.max(1)
        test_accuracy = (test_predicted == test_labels).sum().item() / test_labels.size(0)
        test_accuracies.append(test_accuracy)

        # Store predicted and true labels for further evaluation
        predicted_labels.extend(test_predicted.cpu().numpy())
        true_labels.extend(test_labels.cpu().numpy())

# Calculate evaluation metrics (e.g., precision, recall, F1-score)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

test_accuracy = accuracy_score(true_labels, predicted_labels)
precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predicted_labels, average='weighted')

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")


In [None]:
test_losses = []
test_accuracies = []
predicted_labels = []
true_labels = []

# Initialize a confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

conf_matrix = np.zeros((num_classes, num_classes), dtype=int)

# Test loop
with torch.no_grad():
    for test_images, test_texts, test_labels in test_loader:
        test_images = test_images.to(device)
        test_labels = test_labels.to(device)

        test_texts = [bert_tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_length, return_tensors='pt') for text in test_texts]
        test_input_ids = torch.stack([text['input_ids'].squeeze(0) for text in test_texts], dim=0).to(device)
        test_attention_mask = torch.stack([text['attention_mask'].squeeze(0) for text in test_texts], dim=0).to(device)

        test_img_feats = densenet121(test_images)
        test_img_feats = test_img_feats.squeeze()

        test_outputs = bert_model(test_input_ids, attention_mask=test_attention_mask)
        test_text_feats = test_outputs.last_hidden_state[:, 0, :]

        test_img_feats_reshaped = test_img_feats.view(test_img_feats.size(0), -1)  # Reshape test_img_feats

        # Separate classifiers for image and text features
        test_img_classifier = torch.nn.Sequential(
            torch.nn.Linear(test_img_feats_reshaped.shape[1], 512).to(device),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, num_classes).to(device),
        )

        test_text_classifier = torch.nn.Sequential(
            torch.nn.Linear(test_text_feats.shape[1], 512).to(device),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.5),
            torch.nn.Linear(512, num_classes).to(device),
        )

        # Get predictions for image and text modalities separately
        test_img_logits = test_img_classifier(test_img_feats_reshaped)
        test_text_logits = test_text_classifier(test_text_feats)

        # Combine predictions using a fusion technique (e.g., simple averaging)
        test_combined_logits = 0.5 * (test_img_logits + test_text_logits)  # Simple averaging

        # Ensure test_labels have the correct shape and type
        test_labels = test_labels.view(-1)  # Flatten test_labels to match batch size
        test_labels = test_labels.to(torch.long)  # Ensure test_labels are of type torch.long

        # Check if test_labels are empty
        if test_labels.numel() == 0:
            print(f"Skipping empty test labels batch")
            continue

        # Adjust the shape of test_combined_logits to match the batch size of test_labels
        test_combined_logits = test_combined_logits.view(test_labels.size(0), -1)

        # Calculate test loss
        test_loss = criterion(test_combined_logits, test_labels)
        test_losses.append(test_loss.item())

        # Calculate test accuracy
        _, test_predicted = test_combined_logits.max(1)
        test_accuracy = (test_predicted == test_labels).sum().item() / test_labels.size(0)
        test_accuracies.append(test_accuracy)

        # Update confusion matrix
        conf_matrix += confusion_matrix(test_labels.cpu().numpy(), test_predicted.cpu().numpy(), labels=np.arange(num_classes))

        # Store predicted and true labels for further evaluation
        predicted_labels.extend(test_predicted.cpu().numpy())
        true_labels.extend(test_labels.cpu().numpy())

# Calculate evaluation metrics (e.g., precision, recall, F1-score)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

test_accuracy = accuracy_score(true_labels, predicted_labels)
precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predicted_labels, average='weighted')

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")



In [None]:
# Display confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=np.arange(num_classes), yticklabels=np.arange(num_classes))
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()

In [None]:
# Plot confusion matrix
plt.figure(figsize=(8, 6))
# Class names according to the label encoding mapping
class_names = ['Advocative', 'Controversial', 'ExhIbitionist', 'Expressive', 'Informative', 'Promotive']
sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt='d', xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()