In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertTokenizer, BertModel
from torchvision import models, transforms
from PIL import Image
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import numpy as np

# Load and Split Dataset
df = pd.read_csv('politifact_cleaned_filtered.csv')
# Drop rows with missing values
df = df.dropna(subset=['Claim', 'Image Filename', 'Rating'])

train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['Rating'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42, stratify=temp_df['Rating'])

train_df.to_csv('train_data.csv', index=False)
val_df.to_csv('val_data.csv', index=False)
test_df.to_csv('test_data.csv', index=False)

# Load BERT Tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define Text + Image Dataset
class FakeNewsDataset(Dataset):
    def __init__(self, csv_file, image_dir, tokenizer, max_length=128):
        self.data = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        text = self.data.iloc[idx]['Claim']
        image_filename = self.data.iloc[idx]['Image Filename']
        label = 1 if self.data.iloc[idx]['Rating'] == 'True' else 0
        
        encoding = self.tokenizer(text, padding='max_length', truncation=True, max_length=self.max_length, return_tensors='pt')
        input_ids = encoding['input_ids'].squeeze(0)
        attention_mask = encoding['attention_mask'].squeeze(0)
        
        image_path = f"{self.image_dir}/{image_filename}"
        try:
            image = Image.open(image_path).convert('RGB')
            image = self.transform(image)
        except:
            image = torch.zeros(3, 224, 224)  # Handle missing images
        
        return input_ids, attention_mask, image, torch.tensor(label, dtype=torch.long)

# Define the Multimodal Model
class MultimodalFakeNewsModel(nn.Module):
    def __init__(self):
        super(MultimodalFakeNewsModel, self).__init__()
        
        # BERT for text processing
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.text_fc = nn.Linear(768, 256)
        
        # ResNet for image processing
        self.resnet = models.resnet50(pretrained=True)
        self.resnet.fc = nn.Identity()  # Remove final classification layer
        self.image_fc = nn.Linear(2048, 256)
        
        # Fusion and classification
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, 1)
        self.dropout = nn.Dropout(0.3)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, input_ids, attention_mask, images):
        # Text embedding
        text_out = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        text_feat = self.text_fc(text_out.pooler_output)
        text_feat = self.relu(text_feat)
        
        # Image embedding
        image_feat = self.resnet(images)
        image_feat = self.image_fc(image_feat)
        image_feat = self.relu(image_feat)
        
        # Concatenate text and image features
        combined = torch.cat((text_feat, image_feat), dim=1)
        combined = self.relu(self.fc1(combined))
        combined = self.dropout(combined)
        output = self.sigmoid(self.fc2(combined))
        
        return output.squeeze(1)

# Training Setup
def train(model, train_dataloader, val_dataloader, criterion, optimizer, device, epochs=5):
    model.to(device)
    model.train()
    
    for epoch in range(epochs):
        total_loss = 0
        for input_ids, attention_mask, images, labels in train_dataloader:
            input_ids, attention_mask, images, labels = input_ids.to(device), attention_mask.to(device), images.to(device), labels.to(device, dtype=torch.float)
            
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask, images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_dataloader)}")
        
        # Evaluate on validation set
def evaluate(model, dataloader, device):
    model.to(device)
    model.eval()
    
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for input_ids, attention_mask, images, labels in dataloader:
            input_ids, attention_mask, images, labels = (
                input_ids.to(device), 
                attention_mask.to(device), 
                images.to(device), 
                labels.to(device, dtype=torch.float)
            )

            outputs = model(input_ids, attention_mask, images)
            preds = (outputs > 0.5).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    return accuracy_score(all_labels, all_preds)

# Load Data and Train
if __name__ == "__main__":
    train_dataset = FakeNewsDataset('train_data.csv', 'path/to/images', tokenizer)
    train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
    
    val_dataset = FakeNewsDataset('val_data.csv', 'path/to/images', tokenizer)
    val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=False)
    
    test_dataset = FakeNewsDataset('test_data.csv', 'path/to/images', tokenizer)
    test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = MultimodalFakeNewsModel()
    
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=2e-5)
    
    train(model, train_dataloader, val_dataloader, criterion, optimizer, device, epochs=5)
    
    test_accuracy = evaluate(model, test_dataloader, device)
    print(f"Test Accuracy: {test_accuracy}")




Epoch 1/5, Loss: 0.5094402459534731
Epoch 2/5, Loss: 0.3402996063232422
Epoch 3/5, Loss: 0.2313016178933057
Epoch 4/5, Loss: 0.1529625044627623
Epoch 5/5, Loss: 0.10971758785572919
Test Accuracy: 1.0


In [13]:
# Save the trained model after training
torch.save(model.state_dict(), "multimodal_fake_news.pth")
print("Model saved successfully.")


Model saved successfully.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertTokenizer, BertModel
from torchvision import models, transforms
from PIL import Image
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import numpy as np

# Load and Split Dataset
df = pd.read_csv('politifact_cleaned_filtered.csv')
df = df.dropna(subset=['Claim', 'Image Filename', 'Rating'])  # Handle NaN values
train_df, temp_df = train_test_split(df, test_size=0.3, random_state=42, stratify=df['Rating'])
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42, stratify=temp_df['Rating'])

train_df.to_csv('train_data.csv', index=False)
val_df.to_csv('val_data.csv', index=False)
test_df.to_csv('test_data.csv', index=False)

# Load BERT Tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Define Text + Image Dataset
class FakeNewsDataset(Dataset):
    def __init__(self, csv_file, image_dir, tokenizer, max_length=128):
        self.data = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        text = self.data.iloc[idx]['Claim']
        image_filename = self.data.iloc[idx]['Image Filename']
        label = 1 if self.data.iloc[idx]['Rating'] == 'True' else 0
        
        encoding = self.tokenizer(text, padding='max_length', truncation=True, max_length=self.max_length, return_tensors='pt')
        input_ids = encoding['input_ids'].squeeze(0)
        attention_mask = encoding['attention_mask'].squeeze(0)
        
        image_path = f"{self.image_dir}/{image_filename}"
        try:
            image = Image.open(image_path).convert('RGB')
            image = self.transform(image)
        except:
            image = torch.zeros(3, 224, 224)  # Handle missing images
        
        return input_ids, attention_mask, image, torch.tensor(label, dtype=torch.long)

# Define the Multimodal Model
class MultimodalFakeNewsModel(nn.Module):
    def __init__(self):
        super(MultimodalFakeNewsModel, self).__init__()
        
        # BERT for text processing
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.text_fc = nn.Linear(768, 256)
        
        # ResNet for image processing
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        self.resnet.fc = nn.Identity()  # Remove final classification layer
        self.image_fc = nn.Linear(2048, 256)
        
        # Fusion and classification
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, 1)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, input_ids, attention_mask, images):
        # Text embedding
        text_out = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        text_feat = self.text_fc(text_out.pooler_output)
        text_feat = self.relu(text_feat)
        
        # Image embedding
        image_feat = self.resnet(images)
        image_feat = self.image_fc(image_feat)
        image_feat = self.relu(image_feat)
        
        # Concatenate text and image features
        combined = torch.cat((text_feat, image_feat), dim=1)
        combined = self.relu(self.fc1(combined))
        combined = self.dropout(combined)
        output = self.sigmoid(self.fc2(combined))
        
        return output.squeeze(1)

# Evaluation Function with Class Accuracy
def evaluate(model, dataloader, device):
    model.to(device)
    model.eval()
    
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for input_ids, attention_mask, images, labels in dataloader:
            input_ids, attention_mask, images, labels = (
                input_ids.to(device), 
                attention_mask.to(device), 
                images.to(device), 
                labels.to(device, dtype=torch.float)
            )

            outputs = model(input_ids, attention_mask, images)
            preds = (outputs > 0.5).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    conf_matrix = confusion_matrix(all_labels, all_preds)
    class_accuracy = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    return {
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1 Score": f1,
        "Confusion Matrix": conf_matrix.tolist(),
        "Class Accuracy": class_accuracy.tolist()
    }

# Load Data and Evaluate
if __name__ == "__main__":
    test_dataset = FakeNewsDataset('test_data.csv', 'path/to/images', tokenizer)
    test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = MultimodalFakeNewsModel()
    model.load_state_dict(torch.load("multimodal_fake_news.pth"))  # Load trained weights
    
    evaluation_results = evaluate(model, test_dataloader, device)
    print(evaluation_results)


{'Accuracy': 1.0, 'Precision': 0.0, 'Recall': 0.0, 'F1 Score': 0.0, 'Confusion Matrix': [[37]], 'Class Accuracy': [1.0]}


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


: 