In [26]:
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertModel

import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transform
import torch.nn as nn
from tqdm import tqdm
from torch.optim import Adam
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, ConfusionMatrixDisplay

pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)

In [14]:
def build_one_hot_vocab(input_text):
    vocab = set()
    input_text = input_text.str.lower()
    for word in input_text:
        vocab.add(word)
    vocab.add("<UNK>")
    return {token: i for i, token in enumerate(vocab)}


def one_hot_encode(input_text, vocab):
    vectorized_text = np.zeros(len(vocab))
    for word in input_text:
        if word in vocab:
            vectorized_text[vocab[word]] += 1
        else:
            vectorized_text[vocab["<UNK>"]] += 1
    return vectorized_text

def build_specialized_vocab(input_text):
    vocab = set()
    vocab.add("<UNK>")
    input_text = input_text.str.lower().astype(str)

    # Build vocabulary
    for text in input_text:
        for word in text.split(";"):
            word = word.strip()  # Remove extra spaces
            if word:
                vocab.add(word)

    return {token: i for i, token in enumerate(vocab)}


def vectorize_text(input_text, vocab):
    # Ensure the input is a string
    vectorized_text = np.zeros(len(vocab))
    for word in input_text.split(";"):
        if word in vocab:
            vectorized_text[vocab[word]] += 1
        else:
            vectorized_text[vocab["<UNK>"]] += 1
    return vectorized_text

### Converting csv to a PyTorch Dataset Object

In [32]:
def process_data(df):
    print("processing data")
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
    # drop useless data
    dropped_columns = ["id", "date"]
    df = df.drop(dropped_columns, axis=1)
    
    textual_columns = ["statement", "justification", "speaker_description", "subject", "state_info", "speaker", "context"]
    
    for col in textual_columns:
        df[col] = df[col].fillna("None").astype(str)
        df[col] = df[col].apply(lambda x: tokenizer.encode_plus(
                x,
                padding='max_length',
                truncation=True,
                max_length=256,
                return_tensors="pt",
                return_attention_mask=True
            ))

    return df
    

In [16]:
class SentimentDataset(Dataset):
    def __init__(self, path):
        """
        Initializes the dataset.

        Args:
            path (str): Path to the dataset CSV.
            vocabs (dict): Pre-computed vocabularies for non-text columns.
            transform (callable, optional): Transformation to apply to non-textual data.
        """
        self.sentiment = pd.read_csv(path)
        self.sentiment = process_data(self.sentiment)

    def __len__(self):
        return len(self.sentiment)

    def __getitem__(self, idx):
        data = self.sentiment.iloc[idx]
        label = data["label"]
        data = data.drop("label")

        text_columns = ["statement", "justification", "speaker_description", "subject", "state_info", "speaker", "context"]
        non_text_data = []
        tokenized_text = {}

        for col in data.index:
            value = data[col]

            if col in text_columns:
                tokenized_text[col] = torch.tensor(value, dtype=torch.long)
            else:
                if isinstance(value, (np.ndarray, list)):
                    non_text_data.append(np.array(value))
                else:
                    non_text_data.append(np.array([value], dtype=np.float32))

        non_text_features = np.concatenate(non_text_data) if non_text_data else np.empty(0)

        return tokenized_text, torch.tensor(non_text_features, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)


In [8]:
t = transform.Compose([transform.ToTensor()])
vocabs = create_vocabs()
train_dataset = SentimentDataset(path="data/train.csv", vocabs=vocabs)
dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
iterator = iter(dataloader)
_, label, data = next(iterator)

print(data.shape, label.shape)

processing data


ValueError: not enough values to unpack (expected 4, got 2)

### Training Neural Network

In [28]:
class FakeNewsClassifier(nn.Module):
    def __init__(self, non_text_feature_size=6, num_classes=6):
        super(FakeNewsClassifier, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        for param in self.bert.parameters():
            param.requires_grad = False
            
        self.non_text_fc = nn.Linear(non_text_feature_size, 128)
        self.final_fc = nn.Linear(128 + self.bert.config.hidden_size, num_classes)

    def forward(self, tokenized_text, non_text_features):
        # Process tokenized text through BERT
        text_features = []
        for key, tokens in tokenized_text.items():
            attention_mask = (tokens > 0).int()  # Create attention mask
            outputs = self.bert(input_ids=tokens, attention_mask=attention_mask)
            # Use [CLS] token embedding (or pooler output)
            text_features.append(outputs.last_hidden_state[:, 0, :])  # CLS embedding

        # Concatenate all textual features
        text_features = torch.cat(text_features, dim=-1)

        # Process non-textual data
        non_text_out = self.non_text_fc(non_text_features)

        # Combine and classify
        combined_features = torch.cat((text_features, non_text_out), dim=-1)
        return self.final_fc(combined_features)


In [21]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

vocabs = create_vocabs()
train_dataset = SentimentDataset(path="data/train.csv")
test_dataset = SentimentDataset(path="data/test.csv")
val_dataset = SentimentDataset(path="data/valid.csv")

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=True)

processing data
processing data
processing data


In [29]:
model = FakeNewsClassifier().to(device)
optimizer = Adam(model.parameters(), lr=.0001, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss()

epochs = 10
training_losses = []
val_losses = []
for epoch in range(1, epochs + 1):
    train_loss = 0
    val_loss = 0
    train_correct_predictions = 0
    train_total_samples = 0
    val_correct_predictions = 0
    val_total_samples = 0
    
    model.train()
    for features, labels in tqdm(train_loader, desc="Training", unit="its"):
        features = features.to(device).float()
        labels = labels.to(device).long()
        
        optimizer.zero_grad()
        outputs = model(features)
        t_loss = criterion(outputs, labels)
        t_loss.backward()
        optimizer.step()
        
        train_loss += t_loss.item()
        _, predicted = torch.max(outputs, 1)  # Get the class with the highest score
        train_correct_predictions += (predicted == labels).sum().item()
        train_total_samples += labels.size(0)
        
    model.eval()
    for features, labels in tqdm(val_loader, desc="Validating", unit="its"):
        features = features.to(device).float()
        labels = labels.to(device).long()
        with torch.no_grad():
            outputs = model(features)
            v_loss = criterion(outputs, labels)
        
        val_loss += v_loss.item()
        _, predicted = torch.max(outputs, 1)
        val_correct_predictions += (predicted == labels).sum().item()
        val_total_samples += labels.size(0)
        
    train_loss /= len(train_loader)
    val_loss /= len(val_loader)
    training_losses.append(train_loss)
    val_losses.append(val_loss)
    train_accuracy = train_correct_predictions / train_total_samples * 100
    val_accuracy = val_correct_predictions / val_total_samples * 100
    print(f"Epoch {epoch}/{epochs}: Train Loss: {train_loss: .4f}, Val Loss: {val_loss}, Train Accuracy: {train_accuracy: .2f}, Val Accuracy: {val_accuracy: .2f}")
    
torch.save(model.state_dict(), "trained_model.pth")

plt.plot(epochs, training_losses, label="Train")
plt.plot(epochs, val_losses, label="Validation")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Evolution of loss during training")
plt.legend()
plt.tight_layout()
plt.show()

Training:   0%|          | 0/144 [00:00<?, ?its/s]


ValueError: could not determine the shape of object type 'BatchEncoding'

In [None]:
test_model = FakeNewsClassifier().to(device)
test_model.load_state_dict(torch.load("trained_model.pth"))
test_model.eval()

all_predictions = []
all_labels = []

with torch.no_grad():
    for features, labels in tqdm(test_loader, desc="Testing", unit="batch"):
        features = features.to(device).float()
        labels = labels.to(device).long()
        
        outputs = model(features)
        _, predicted = torch.max(outputs, 1)

        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Calculate Metrics
accuracy = accuracy_score(all_labels, all_predictions)
precision = precision_score(all_labels, all_predictions, average="weighted")
recall = recall_score(all_labels, all_predictions, average="weighted")
f1 = f1_score(all_labels, all_predictions, average="weighted")

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

print("\nClassification Report:")
print(classification_report(all_labels, all_predictions))

corresponding_labels = ["Pants on Fire", "False", "Barely True", "Half True", "Mostly True", "True"]
ConfusionMatrixDisplay.from_predictions(all_labels, all_predictions, display_labels=corresponding_labels)
plt.title("Confusion Matrix")
plt.save("results/confusion_matrix.png")
plt.show()