<a href="https://colab.research.google.com/github/MichailLepin/Fake-News-Classifier/blob/main/notebooks/lstm_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LSTM Model Training for Fake News Classification

This notebook contains code for training an LSTM model for fake news classification in Google Colab.

## Notebook Structure

1. **Install Dependencies** - Install required libraries
2. **Imports** - Import all necessary modules
3. **Load and Process Data** - Load ISOT/Kaggle dataset and preprocessing
4. **Build Vocabulary and Tokenization** - Build vocabulary and convert text to sequences
5. **Load GloVe Embeddings** - Load pre-trained word embeddings
6. **PyTorch Dataset** - Create datasets for training
7. **LSTM Model** - Define bidirectional LSTM architecture
8. **Training Functions** - Functions for training and evaluating the model
9. **Training** - Model training process
10. **Test Set Evaluation** - Final model evaluation



## Install Dependencies

Install all required libraries for PyTorch, data processing, and visualization.


In [1]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
!pip install scikit-learn pandas numpy matplotlib seaborn tqdm
!pip install kagglehub



Looking in indexes: https://download.pytorch.org/whl/cu118


## Imports

Import all necessary libraries and modules for data processing, models, and metrics.


In [2]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

# Проверка GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
if torch.cuda.is_available():
    print(f'GPU: {torch.cuda.get_device_name(0)}')
    print(f'Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB')



Using device: cpu


## Load and Process Data

Load ISOT/Kaggle dataset from Kaggle and perform preprocessing:
- Load Fake.csv and True.csv via kagglehub
- Text cleaning (lowercase, URL removal, whitespace normalization)
- Create binary labels
- Stratified train/validation/test split (64%/16%/20%)


In [3]:
import re
import kagglehub

# Download dataset via kagglehub (no API keys required)
path = kagglehub.dataset_download("clmentbisaillon/fake-and-real-news-dataset")

# Load data
fake_df = pd.read_csv(f"{path}/Fake.csv")
true_df = pd.read_csv(f"{path}/True.csv")

print(f"✓ Fake news loaded: {fake_df.shape}")
print(f"✓ True news loaded: {true_df.shape}")

# Text cleaning function (from analyze_and_integrate.py script)
def clean_text(text):
    """Clean text: lowercase, remove URLs, normalize whitespace"""
    if pd.isna(text):
        return ""
    text = str(text)
    # Convert to lowercase
    text = text.lower()
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()
    return text

# Identify text column
text_col = None
for col in fake_df.columns:
    if fake_df[col].dtype == 'object' and col.lower() in ['text', 'title', 'article']:
        text_col = col
        break
if text_col is None:
    text_col = fake_df.select_dtypes(include=['object']).columns[0]

print(f"\nUsing column: '{text_col}'")

# Add labels
fake_df['label'] = 'fake'
true_df['label'] = 'real'

# Combine data
combined_data = pd.concat([fake_df, true_df], ignore_index=True)

# Clean text
print("\nCleaning text...")
combined_data['text_cleaned'] = combined_data[text_col].apply(clean_text)

# Create binary labels
combined_data['label_binary'] = combined_data['label'].map({'fake': 1, 'real': 0})

# Remove empty texts
combined_data = combined_data[
    combined_data['text_cleaned'].notna() &
    (combined_data['text_cleaned'].str.len() > 0)
]

print(f"\nCombined dataset: {combined_data.shape}")
print(f"Label distribution: {combined_data['label'].value_counts().to_dict()}")

# Split into train/val/test with stratification
X = combined_data['text_cleaned'].values
y = combined_data['label_binary'].values

# First split: train+val (80%) and test (20%)
X_train_val, X_test, y_train_val, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Second split: train (64%) and val (16%)
X_train, X_val, y_train, y_val = train_test_split(
    X_train_val, y_train_val, test_size=0.2, random_state=42, stratify=y_train_val
)

print(f"\nData split:")
print(f"  Train: {len(X_train):,} ({len(X_train)/len(combined_data)*100:.1f}%)")
print(f"  Validation: {len(X_val):,} ({len(X_val)/len(combined_data)*100:.1f}%)")
print(f"  Test: {len(X_test):,} ({len(X_test)/len(combined_data)*100:.1f}%)")


Downloading from https://www.kaggle.com/api/v1/datasets/download/clmentbisaillon/fake-and-real-news-dataset?dataset_version_number=1...


100%|██████████| 41.0M/41.0M [00:00<00:00, 94.0MB/s]

Extracting files...





✓ Fake news loaded: (23481, 4)
✓ True news loaded: (21417, 4)

Using column: 'title'

Cleaning text...

Combined dataset: (44889, 7)
Label distribution: {'fake': 23472, 'real': 21417}

Data split:
  Train: 28,728 (64.0%)
  Validation: 7,183 (16.0%)
  Test: 8,978 (20.0%)


## Build Vocabulary and Tokenization

Build vocabulary from training data and functions to convert text to index sequences.


In [4]:
def build_vocab(texts, min_freq=2):
    """Build vocabulary from texts"""
    word_counts = Counter()
    for text in texts:
        words = str(text).lower().split()
        word_counts.update(words)

    vocab = {'<PAD>': 0, '<UNK>': 1}
    idx = 2

    for word, count in word_counts.items():
        if count >= min_freq:
            vocab[word] = idx
            idx += 1

    return vocab

def text_to_sequence(text, vocab, max_len=256):
    """Convert text to sequence of indices"""
    words = str(text).lower().split()
    sequence = [vocab.get(word, vocab['<UNK>']) for word in words[:max_len]]

    if len(sequence) < max_len:
        sequence.extend([vocab['<PAD>']] * (max_len - len(sequence)))

    return sequence[:max_len]

# Build vocabulary
print("\nBuilding vocabulary...")
vocab = build_vocab(X_train, min_freq=2)
vocab_size = len(vocab)
print(f"Vocabulary size: {vocab_size}")

# Model parameters
MAX_LEN = 256
EMBEDDING_DIM = 100




Building vocabulary...
Vocabulary size: 18321


## Load GloVe Embeddings

Load pre-trained GloVe embeddings (GloVe 6B.100d) to initialize the model's embedding layer.


In [None]:
def load_glove_embeddings(glove_path, vocab, embedding_dim=100):
    """Load pre-trained GloVe embeddings"""
    print(f"Loading GloVe embeddings from {glove_path}...")

    if not os.path.exists(glove_path):
        print("Downloading GloVe 6B.100d...")
        !wget -q http://nlp.stanford.edu/data/glove.6B.zip
        !unzip -q glove.6B.zip

    embeddings_index = {}
    with open(glove_path, 'r', encoding='utf-8') as f:
        for line in tqdm(f, desc="Loading GloVe"):
            values = line.split()
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coefs

    embedding_matrix = np.zeros((vocab_size, embedding_dim))
    found = 0

    for word, idx in vocab.items():
        if word in embeddings_index:
            embedding_matrix[idx] = embeddings_index[word]
            found += 1
        else:
            # Random initialization for unknown words
            embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))

    print(f"Found embeddings for {found}/{vocab_size} words ({found/vocab_size*100:.2f}%)")
    return embedding_matrix

# Load GloVe embeddings
GLOVE_PATH = 'glove.6B.100d.txt'
try:
    embedding_matrix = load_glove_embeddings(GLOVE_PATH, vocab, EMBEDDING_DIM)
    use_pretrained = True
except Exception as e:
    print(f"⚠ Failed to load GloVe: {e}")
    print("Using random initialization")
    embedding_matrix = None
    use_pretrained = False



Loading GloVe embeddings from glove.6B.100d.txt...
Downloading GloVe 6B.100d...


## PyTorch Dataset

Create Dataset and DataLoader classes for efficient data loading during training.


In [None]:
class NewsDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len=256):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        sequence = text_to_sequence(text, self.vocab, self.max_len)
        return torch.LongTensor(sequence), torch.LongTensor([label])

train_dataset = NewsDataset(X_train, y_train, vocab, MAX_LEN)
val_dataset = NewsDataset(X_val, y_val, vocab, MAX_LEN)
test_dataset = NewsDataset(X_test, y_test, vocab, MAX_LEN)

BATCH_SIZE = 16
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)



## LSTM Model

Define bidirectional LSTM model architecture for processing text sequences.


In [None]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim=128, num_layers=1,
                 dropout=0.3, num_classes=2, embedding_matrix=None):
        super(LSTMModel, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        if embedding_matrix is not None:
            self.embedding.weight.data.copy_(torch.from_numpy(embedding_matrix))
            self.embedding.weight.requires_grad = True

        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0
        )

        self.fc = nn.Linear(hidden_dim * 2, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, (hidden, cell) = self.lstm(embedded)
        output = torch.cat((hidden[-2], hidden[-1]), dim=1)
        output = self.dropout(output)
        output = self.fc(output)
        return output

lstm_model = LSTMModel(
    vocab_size=vocab_size,
    embedding_dim=EMBEDDING_DIM,
    hidden_dim=128,
    num_layers=1,
    dropout=0.3,
    num_classes=2,
    embedding_matrix=embedding_matrix if use_pretrained else None
).to(device)

print(f"\nLSTM Model Parameters: {sum(p.numel() for p in lstm_model.parameters()):,}")



## Training Functions

Functions for training the model for one epoch and evaluating the model on the validation set.


In [None]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for sequences, labels in tqdm(train_loader, desc="Training"):
        sequences = sequences.to(device)
        labels = labels.squeeze().to(device)

        optimizer.zero_grad()
        outputs = model(sequences)
        loss = criterion(outputs, labels)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    return total_loss / len(train_loader), 100 * correct / total

def evaluate(model, val_loader, criterion, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for sequences, labels in tqdm(val_loader, desc="Evaluating"):
            sequences = sequences.to(device)
            labels = labels.squeeze().to(device)

            outputs = model(sequences)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(val_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')

    return avg_loss, accuracy, f1, all_preds, all_labels



## Training

LSTM model training process with early stopping based on F1-score on the validation set.


In [None]:
print("\n" + "=" * 60)
print("TRAINING LSTM MODEL")
print("=" * 60)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=2e-5)

num_epochs = 10
best_f1 = 0
patience = 3
patience_counter = 0

train_losses = []
val_losses = []
train_accs = []
val_accs = []
val_f1s = []

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")

    train_loss, train_acc = train_epoch(lstm_model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, _, _ = evaluate(lstm_model, val_loader, criterion, device)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)
    val_f1s.append(val_f1)

    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Val F1: {val_f1:.4f}")

    if val_f1 > best_f1:
        best_f1 = val_f1
        patience_counter = 0
        torch.save(lstm_model.state_dict(), 'best_lstm_model.pth')
        print(f"✓ New best F1: {best_f1:.4f}, model saved")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"Early stopping after {epoch+1} epochs")
            break

print("\n" + "=" * 60)
print(f"Best validation F1: {best_f1:.4f}")
print("=" * 60)



## Test Set Evaluation

Final model evaluation on the test set with metrics (accuracy, F1-score, precision, recall) and confusion matrix.


In [None]:
lstm_model.load_state_dict(torch.load('best_lstm_model.pth'))

print("\nEvaluating LSTM model on test set:")
test_loss, test_acc, test_f1, test_preds, test_labels = evaluate(
    lstm_model, test_loader, criterion, device
)

print(f"\nTest Results:")
print(f"  Loss: {test_loss:.4f}")
print(f"  Accuracy: {test_acc:.4f}")
print(f"  F1-Score: {test_f1:.4f}")

print("\nClassification Report:")
print(classification_report(test_labels, test_preds, target_names=['Real', 'Fake']))

# Confusion Matrix
cm = confusion_matrix(test_labels, test_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Real', 'Fake'], yticklabels=['Real', 'Fake'])
plt.title('LSTM - Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()

lstm_results = {
    'test_loss': float(test_loss),
    'test_accuracy': float(test_acc),
    'test_f1': float(test_f1),
    'test_precision': float(precision_score(test_labels, test_preds, average='weighted')),
    'test_recall': float(recall_score(test_labels, test_preds, average='weighted'))
}

print(f"\nLSTM Results: {lstm_results}")

