In [None]:
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from transformers import BertTokenizer, BertModel, AdamW
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import torchvision.models as models
from torch import nn
import time 
import seaborn as sns
import matplotlib.pyplot as plt
from tqdm import tqdm

# Paths to the CSV files and image directories
csv_paths = {
    'train': '/kaggle/input/fake-news/Train.csv',
    'test': '/kaggle/input/fake-news/Test.csv',
    'validation': '/kaggle/input/fake-news/Val.csv'
}

image_dirs = {
    'train': '/kaggle/input/fake-news/train',
    'test': '/kaggle/input/fake-news/test',
    'validation': '/kaggle/input/fake-news/validation'
}
output_dir = '/kaggle/working/'  # Output directory to save the CSV files

# Function to check for matching Meme_ID and image files, and add image paths
def check_matches(csv_path, image_dir):
    # Read the CSV file into a DataFrame
    df = pd.read_csv(csv_path)
    
    # List all files in the image directory
    image_files = os.listdir(image_dir)
    
    # Create a dictionary to map image filenames (without extensions) to their full paths
    image_names = {os.path.splitext(image_file)[0]: os.path.join(image_dir, image_file) for image_file in image_files}
    
    # Add an Image_Path column to the DataFrame
    df['Image_Path'] = df['image_id'].apply(lambda x: image_names.get(x, None))
    
    return df

# Function to encode Intent_Taxonomy classes into labels
def encode_labels(df):
    label_encoder = LabelEncoder()
    df['label'] = label_encoder.fit_transform(df['label'])
    return df, label_encoder.classes_

# Check matches for each set (Train, Test, Validation)
for key in csv_paths:
    matched_df = check_matches(csv_paths[key], image_dirs[key])
    
    matches_output_path = os.path.join(output_dir, f'{key}_matches.csv')
    
    # Save the processed dataframe to CSV
    matched_df.to_csv(matches_output_path, index=False)
    
    print(f"{key} set:")
    print(f"Matched image_ids with image paths and labels saved to {matches_output_path}")

In [None]:
train_df = pd.read_csv('/kaggle/working/train_matches.csv')
train_df.head(10)

In [None]:
test_df = pd.read_csv('/kaggle/working/test_matches.csv')
test_df.head(10)

In [None]:
validation_df = pd.read_csv('/kaggle/working/validation_matches.csv')
validation_df.head(10)

In [None]:
# Define your transformations using transforms.Compose
transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),  # Crop the center to 224x224
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

class MyMultimodalDataset(Dataset):
    def __init__(self, image_paths, description, label, transform=None):
        self.image_paths = image_paths
        self.description = description
        self.label = label
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        text = self.description[idx]
        label = self.label[idx]
        #print(img_path)
        
        # Ensure img_path is a string
        if not isinstance(img_path, str):
            raise ValueError(f"Invalid image path at index {idx}: {img_path}")
        # Load and preprocess image
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        
        return image, text, label

In [None]:
# Assuming you have lists or arrays of image paths, captions, and encoded labels:
train_dataset = MyMultimodalDataset(train_df['Image_Path'], train_df['description'], train_df['label'], transform=transform)
val_dataset = MyMultimodalDataset(validation_df['Image_Path'],validation_df['description'], validation_df['label'], transform=transform)
test_dataset = MyMultimodalDataset(test_df['Image_Path'],test_df['description'], test_df['label'], transform=transform)

# Define data loaders
train_loader = DataLoader(train_dataset, batch_size=10, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=10, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=10, shuffle=False)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertModel, BertTokenizer,AdamW
from tqdm import tqdm
import torchvision.models as models
import time
from torchvision.models import resnet152

In [None]:
# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

In [None]:
# Initialize resnet152_model with IMAGENET1K_V1 weights
resnet152 = models.resnet152(weights='IMAGENET1K_V1', progress=True)
resnet152 = torch.nn.Sequential(*(list(resnet152.children())[:-1]))  # Remove the classification layer

In [None]:
#Initialize BERT tokenizer and model
bert_tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')
bert_model = BertModel.from_pretrained("bert-base-multilingual-cased")

# from transformers import AutoTokenizer, XLMRobertaModel, AdamW
# # Initialize BERT tokenizer and model
# bert_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base")
# bert_model = XLMRobertaModel.from_pretrained("xlm-roberta-base")

In [None]:
resnet152.to(device)

In [None]:
bert_model.to(device)

In [None]:
import torch
import time
from torch.optim import AdamW
from torchvision import transforms
from PIL import Image
from tqdm import tqdm
import torch.nn as nn

In [None]:
# Define optimizer and loss function
optimizer = AdamW(list(resnet152.parameters()) + list(bert_model.parameters()), lr=2e-5)
criterion = torch.nn.CrossEntropyLoss()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

class CombinedClassifier(nn.Module):
    def __init__(self, img_feature_dim, text_feature_dim, num_classes):
        super(CombinedClassifier, self).__init__()
        self.shared_fc = nn.Sequential(
            nn.Linear(img_feature_dim + text_feature_dim, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, img_features, text_features):
        fused_features = torch.cat((img_features, text_features), dim=-1)  # Concatenate along the feature dimension
        combined_logits = self.shared_fc(fused_features)
        return combined_logits


# Example usage assuming img_feature_dim is correctly set
combined_classifier = CombinedClassifier(img_feature_dim=2048, text_feature_dim=768, num_classes=2).to(device)

# Define optimizer and criterion
optimizer = optim.AdamW(list(resnet152.parameters()) + list(bert_model.parameters()) + list(combined_classifier.parameters()), lr=1e-5)
criterion = nn.CrossEntropyLoss()

# Set number of epochs and other parameters
num_epochs = 35
max_seq_length = 80

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

start_time = time.time()

# Training loop
for epoch in range(num_epochs):
    resnet152.train()
    bert_model.train()
    combined_classifier.train()
    
    running_train_loss = 0.0
    correct_train = 0
    total_train = 0

    for images, texts, labels in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}', leave=False):
        # Move tensors to the device
        images = images.to(device)
        labels = labels.to(device)

        # Extract image features using resnet152
        with torch.no_grad():
            img_feats = resnet152(images)
        
        # Reshape img_feats
        img_feats = img_feats.view(img_feats.size(0), -1)

        # Convert texts to tensors and pad to a fixed sequence length
        texts = [bert_tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_length, return_tensors='pt') for text in texts]
        input_ids = torch.stack([text['input_ids'].squeeze(0) for text in texts], dim=0).to(device)
        attention_mask = torch.stack([text['attention_mask'].squeeze(0) for text in texts], dim=0).to(device)

        optimizer.zero_grad()

        outputs = bert_model(input_ids, attention_mask=attention_mask)
        text_feats = outputs.last_hidden_state[:, 0, :]

        # Early fusion: concatenate raw image and text features
        combined_logits = combined_classifier(img_feats, text_feats)

        # Ensure the labels tensor is flattened to match the combined_logits batch size
        labels = labels.view(-1)

        loss = criterion(combined_logits, labels)

        loss.backward()
        optimizer.step()

        running_train_loss += loss.item()
        _, predicted = combined_logits.max(1)
        total_train += labels.size(0)
        correct_train += predicted.eq(labels).sum().item()

    epoch_train_loss = running_train_loss / len(train_loader)
    epoch_train_accuracy = correct_train / total_train

    train_losses.append(epoch_train_loss)
    train_accuracies.append(epoch_train_accuracy)
    
    # Validation loop
    resnet152.eval()
    bert_model.eval()
    combined_classifier.eval()

    running_val_loss = 0.0
    correct_val = 0
    total_val = 0

    with torch.no_grad():
        for val_images, val_texts, val_labels in val_loader:
            val_images = val_images.to(device)
            val_labels = val_labels.to(device)

            val_img_feats = resnet152(val_images)
            val_img_feats = val_img_feats.view(val_img_feats.size(0), -1)

            val_texts = [bert_tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_length, return_tensors='pt') for text in val_texts]
            val_input_ids = torch.stack([text['input_ids'].squeeze(0) for text in val_texts], dim=0).to(device)
            val_attention_mask = torch.stack([text['attention_mask'].squeeze(0) for text in val_texts], dim=0).to(device)

            val_outputs = bert_model(val_input_ids, attention_mask=val_attention_mask)
            val_text_feats = val_outputs.last_hidden_state[:, 0, :]

            # Early fusion: concatenate raw image and text features
            val_combined_logits = combined_classifier(val_img_feats, val_text_feats)

            # Ensure the val_labels tensor is flattened to match the val_combined_logits batch size
            val_labels = val_labels.view(-1)

            val_loss = criterion(val_combined_logits, val_labels)

            running_val_loss += val_loss.item()
            _, val_predicted = val_combined_logits.max(1)
            total_val += val_labels.size(0)
            correct_val += val_predicted.eq(val_labels).sum().item()

    epoch_val_loss = running_val_loss / len(val_loader)
    epoch_val_accuracy = correct_val / total_val

    val_losses.append(epoch_val_loss)
    val_accuracies.append(epoch_val_accuracy)

    print(f"Epoch [{epoch + 1}/{num_epochs}] - "
          f"Train Loss: {epoch_train_loss:.4f}, Train Acc: {epoch_train_accuracy:.4f}, "
          f"Val Loss: {epoch_val_loss:.4f}, Val Acc: {epoch_val_accuracy:.4f}")

end_time = time.time()
execution_time = end_time - start_time
print(f"Total execution time: {execution_time:.2f} seconds")


In [None]:
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
predicted_labels = []
true_labels = []

# Set the models to evaluation mode
resnet152.eval()
bert_model.eval()
combined_classifier.eval()

with torch.no_grad():
    for test_images, test_texts, test_labels in tqdm(test_loader, desc='Testing', leave=False):
        # Move tensors to the device
        test_images = test_images.to(device)
        test_labels = test_labels.to(device)

        # Extract image features using resnet152
        test_img_feats = resnet152(test_images)
        test_img_feats = test_img_feats.view(test_img_feats.size(0), -1)

        # Convert texts to tensors and pad to a fixed sequence length
        test_texts = [bert_tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_length, return_tensors='pt') for text in test_texts]
        test_input_ids = torch.stack([text['input_ids'].squeeze(0) for text in test_texts], dim=0).to(device)
        test_attention_mask = torch.stack([text['attention_mask'].squeeze(0) for text in test_texts], dim=0).to(device)

        test_outputs = bert_model(test_input_ids, attention_mask=test_attention_mask)
        test_text_feats = test_outputs.last_hidden_state[:, 0, :]

        # Early fusion: concatenate raw image and text features
        test_combined_logits = combined_classifier(test_img_feats, test_text_feats)

        # Get the predicted labels
        _, test_predicted = test_combined_logits.max(1)

        # Store predicted and true labels
        predicted_labels.extend(test_predicted.cpu().numpy())
        true_labels.extend(test_labels.cpu().numpy())

# Convert lists to numpy arrays for further analysis
predicted_labels = np.array(predicted_labels)
true_labels = np.array(true_labels)

# Print accuracy
# Calculate metrics
accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels, average='weighted')
recall = recall_score(true_labels, predicted_labels, average='weighted')
f1 = f1_score(true_labels, predicted_labels, average='weighted')
conf_matrix = confusion_matrix(true_labels, predicted_labels)

# Print metrics
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1 Score: {f1:.4f}")


In [None]:
conf_matrix

In [None]:
# Plot confusion matrix
plt.figure(figsize=(8, 6))
# Class names according to the label encoding mapping
class_names = ['Real', 'Fake']
sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt='d',xticklabels=class_names, yticklabels=class_names)
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Confusion Matrix')
plt.show()