In [1]:
# 1. Imports
import pandas as pd
import numpy as np
import re
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split

In [2]:
# 2. Load the Datasets
# Load the Kaggle dataset
kaggle_df = pd.read_csv('Data/Data.csv')

# Load the ISOT datasets
true_df = pd.read_csv('Data/True.csv')
fake_df = pd.read_csv('Data/Fake.csv')

# For ISOT, assign labels: assume true news = 1 and fake news = 0
true_df['label'] = 1
fake_df['label'] = 0

In [3]:
# 3. Combine the Datasets
# Combine the two ISOT datasets
isot_df = pd.concat([true_df, fake_df], ignore_index=True)

# Optionally, inspect shapes
print("Kaggle dataset shape:", kaggle_df.shape)
print("ISOT dataset shape:", isot_df.shape)

# Combine Kaggle and ISOT data into one DataFrame
df = pd.concat([kaggle_df, isot_df], ignore_index=True)

# Shuffle the combined dataset
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
print("Combined dataset shape:", df.shape)

Kaggle dataset shape: (20800, 5)
ISOT dataset shape: (44898, 5)
Combined dataset shape: (65698, 7)


In [4]:
# 4. Handle Missing Values
# Drop rows missing 'text' (critical for classification)
df = df.dropna(subset=['text'])

# Fill missing 'title' values with a placeholder
df['title'] = df['title'].fillna("No Title Provided")

# If there is an 'author' column, fill missing values with "Unknown"
if 'author' in df.columns:
    df['author'] = df['author'].fillna("Unknown")

In [5]:
# 5. Create the 'content' Column
# Combine 'title' and 'text' into a single text field
if 'content' not in df.columns:
    df['content'] = df['title'] + " " + df['text']

In [6]:
# 6. Split into Training and Testing Sets
X = df['content']
y = df['label']

# Split the data (80% training, 20% testing)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [7]:
# 7. Preprocess Text for the LSTM

def tokenize(text):
    # Simple whitespace and word-boundary tokenization (lowercase)
    text = text.lower()
    tokens = re.findall(r'\b\w+\b', text)
    return tokens

# Build vocabulary from training texts
all_tokens = []
for text in X_train:
    all_tokens.extend(tokenize(text))

freq = Counter(all_tokens)
# Create vocabulary: reserve index 0 for padding, 1 for unknown tokens
vocab = {word: i+2 for i, (word, count) in enumerate(freq.items())}
vocab_size = len(vocab) + 2

def text_to_sequence(text, vocab):
    tokens = tokenize(text)
    return [vocab.get(token, 1) for token in tokens]  # 1 for unknown

# Convert texts to sequences
X_train_seq = [text_to_sequence(text, vocab) for text in X_train]
X_test_seq = [text_to_sequence(text, vocab) for text in X_test]

# Pad sequences to a fixed length
max_len = 500  # You can adjust this as needed
def pad_sequence(seq, max_len):
    if len(seq) < max_len:
        return seq + [0]*(max_len - len(seq))
    else:
        return seq[:max_len]

X_train_pad = [pad_sequence(seq, max_len) for seq in X_train_seq]
X_test_pad = [pad_sequence(seq, max_len) for seq in X_test_seq]

# Convert lists to torch tensors
X_train_tensor = torch.tensor(X_train_pad, dtype=torch.long)
X_test_tensor = torch.tensor(X_test_pad, dtype=torch.long)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.long)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.long)

In [8]:
# 8. Create a PyTorch Dataset and DataLoader
class NewsDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

batch_size = 64
train_dataset = NewsDataset(X_train_tensor, y_train_tensor)
test_dataset = NewsDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

In [9]:
# 9. Define the LSTM Model
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=1):
        super(LSTMClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        # x: [batch_size, seq_length]
        embedded = self.embedding(x)                # [batch_size, seq_length, embedding_dim]
        lstm_out, _ = self.lstm(embedded)             # [batch_size, seq_length, hidden_dim]
        last_hidden = lstm_out[:, -1, :]              # Use the last hidden state
        output = self.fc(last_hidden)                 # [batch_size, output_dim]
        return output

embedding_dim = 100
hidden_dim = 128
output_dim = 2  # Binary classification (0 or 1)

model = LSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim)

In [10]:
# 10. Train the LSTM Model on GPU
device = torch.device("cuda:2" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for texts, labels in train_loader:
        texts = texts.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(texts)  # [batch_size, output_dim]
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * texts.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / total
    epoch_acc = correct / total
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f} - Accuracy: {epoch_acc:.4f}")

Epoch 1/5 - Loss: 0.6905 - Accuracy: 0.5252
Epoch 2/5 - Loss: 0.6163 - Accuracy: 0.6426
Epoch 3/5 - Loss: 0.2870 - Accuracy: 0.8902
Epoch 4/5 - Loss: 0.1177 - Accuracy: 0.9575
Epoch 5/5 - Loss: 0.0600 - Accuracy: 0.9804


In [11]:
# 11. Evaluate the Model on the Test Set
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for texts, labels in test_loader:
        texts = texts.to(device)
        labels = labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

test_accuracy = correct / total
print("Test Accuracy:", test_accuracy)

Test Accuracy: 0.9656564118184587


In [14]:
# Define an improved LSTM model with stacking, bidirectionality, and dropout
class ImprovedLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=2, bidirectional=True, dropout=0.5):
        super(ImprovedLSTMClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        # Using a 2-layer bidirectional LSTM with dropout applied between layers
        self.lstm = nn.LSTM(
            embedding_dim, 
            hidden_dim, 
            num_layers=num_layers, 
            batch_first=True, 
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0
        )
        # If bidirectional, output dimension doubles
        lstm_out_dim = hidden_dim * 2 if bidirectional else hidden_dim
        # Extra fully-connected layer before the final output layer
        self.fc1 = nn.Linear(lstm_out_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        # x shape: [batch_size, seq_length]
        embedded = self.embedding(x)  # [batch_size, seq_length, embedding_dim]
        lstm_out, _ = self.lstm(embedded)  # [batch_size, seq_length, hidden_dim*(2 if bidirectional else 1)]
        
        # For bidirectional LSTM, concatenate the last hidden state from the forward and the first from the backward pass.
        if self.lstm.bidirectional:
            # Forward LSTM: take the last time step; Backward LSTM: take the first time step
            forward_hidden = lstm_out[:, -1, :self.lstm.hidden_size]
            backward_hidden = lstm_out[:, 0, self.lstm.hidden_size:]
            last_hidden = torch.cat((forward_hidden, backward_hidden), dim=1)
        else:
            last_hidden = lstm_out[:, -1, :]
            
        x = self.fc1(last_hidden)
        x = self.dropout(x)
        output = self.fc2(x)
        return output

# Set device to use GPU index 2 (for example)
device = torch.device("cuda:2" if torch.cuda.is_available() else "cpu")

# Initialize the improved model and move it to the specified device
improved_model = ImprovedLSTMClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=2, bidirectional=True, dropout=0.5)
improved_model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(improved_model.parameters(), lr=0.001)

num_epochs = 5
for epoch in range(num_epochs):
    improved_model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for texts, labels in train_loader:
        texts = texts.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = improved_model(texts)  # [batch_size, output_dim]
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * texts.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / total
    epoch_acc = correct / total
    print(f"Improved Model - Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f} - Accuracy: {epoch_acc:.4f}")

# Evaluate the Improved LSTM Model on the Test Set
improved_model.eval()  # Set model to evaluation mode
correct = 0
total = 0

with torch.no_grad():
    for texts, labels in test_loader:
        texts = texts.to(device)
        labels = labels.to(device)
        outputs = improved_model(texts)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

test_accuracy = correct / total
print("Improved Model - Test Accuracy:", test_accuracy)

Improved Model - Epoch 1/5 - Loss: 0.2423 - Accuracy: 0.8995
Improved Model - Epoch 2/5 - Loss: 0.1073 - Accuracy: 0.9617
Improved Model - Epoch 3/5 - Loss: 0.0673 - Accuracy: 0.9784
Improved Model - Epoch 4/5 - Loss: 0.0448 - Accuracy: 0.9862
Improved Model - Epoch 5/5 - Loss: 0.0317 - Accuracy: 0.9904
Improved Model - Test Accuracy: 0.9687785561985989
