# Classificação de Fake News usando Transformadores


## Configuração


### Importando pacotes e dados


In [None]:
import os

os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [None]:
# Importando pacotes
from typing import List, Any
from transformers import PreTrainedTokenizerBase

import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset

from transformers import AutoTokenizer

In [3]:
# Importing data
true_data = pd.read_csv('../datasets/news_dataset/true.csv')
true_data['real'] = True
true_columns = true_data[['text', 'real']]

fake_data = pd.read_csv('../datasets/news_dataset/fake.csv')
fake_data['real'] = False
fake_columns = fake_data[['text', 'real']]


# Creating dataset
data = pd.concat([true_columns, fake_columns]).sample(frac=1)

In [4]:
data['real'].value_counts()

real
False    23481
True     21417
Name: count, dtype: int64

### Criando device


In [6]:
# Usando gpu caso disponível
if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'mps' if torch.backends.mps.is_available() else 'cpu'
print(f'Using device: {device}')

Using device: mps


### Classes


In [7]:
# Classe de dataset
class NewsDataset(Dataset):
    def __init__(
        self,
        texts: List[str],
        labels: List[int],
        tokenizer_name: str,
        max_length: int = 512,
    ):
        super(NewsDataset, self).__init__()
        self.texts = texts
        self.labels = labels
        self.max_length = max_length
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

        self.tokenizer_name = tokenizer_name

    def __getitem__(self, index):
        encoding = self.tokenizer(
            self.texts[index],
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt',
        )
        item = {key: val.squeeze(0) for key, val in encoding.items()}
        item['labels'] = torch.tensor(self.labels[index], dtype=torch.long)
        return item

    def __len__(self) -> int:
        return len(self.labels)

### Funções


In [8]:
# Função para tokenizar frase
def tokenize_sequence(
    tokenizer: PreTrainedTokenizerBase, sequence: List[str], max_length: int
):
    return tokenizer(
        sequence,
        padding='max_length',
        truncation=True,
        max_length=max_length,
        return_tensors='pt',
    )

In [None]:
# Calculate accuracy (a classification metric)
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

In [10]:
# Functions for train and test steps
def train_step(
    estimator: nn.Module,
    loss_fn: nn.Module,
    optimizer: torch.optim.Optimizer,
    dataloader: DataLoader,
    device: torch.device,
) -> None:

    train_loss, train_acc = 0, 0

    estimator.to(device)
    estimator.train()

    for batch in dataloader:
        X = batch['input_ids'].to(device)
        y = batch['labels'].to(device)

        estimator.train()

        y_pred = estimator(X)

        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_acc += accuracy_fn(y, y_pred.argmax(dim=1))

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()

    train_loss /= len(dataloader)
    train_acc /= len(dataloader)

    print(f'Train loss: {train_loss:.3f} | Train accuracy: {train_acc:.2f}%')

In [11]:
# Function for training loop
def test_step(
    estimator: nn.Module,
    loss_fn: nn.Module,
    dataloader: DataLoader,
    device: torch.device,
) -> None:

    test_loss, test_acc = 0, 0
    estimator.to(device)
    estimator.eval()
    with torch.inference_mode():

        for batch in dataloader:
            X = batch['input_ids'].to(device)
            y = batch['labels'].to(device)

            test_pred = estimator(X)

            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y, test_pred.argmax(dim=1))

        test_loss /= len(dataloader)

        test_acc /= len(dataloader)

    print(f'Test loss:  {test_loss:.3f} | Test accuracy:  {test_acc:.2f}%\n')

## EDA


In [12]:
# Visualizando dataset
data.head()

Unnamed: 0,text,real
3944,The following statements were posted to the ve...,True
16880,Environmental Protection Agency (EPA) enforcer...,False
12923,"Well, what would Friday be without the latest ...",False
2525,"On Tuesday afternoon, legendary journalist Dan...",False
17117,VALLETTA (Reuters) - The son of Malta s best-k...,True


In [13]:
# Tratando valores nulos
data.dropna(inplace=True)

In [None]:
# Verificando tamanho médio dos textos
np.mean([i.count(' ') for _, i in enumerate(data['text'].to_list())])

np.float64(414.7604124905341)

## Criando dataset


In [17]:
# Separando dados em treino e teste
X = data['text']
y = data['real']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

# Transformando dados para listas
X_train, X_test = X_train.tolist(), X_test.tolist()
y_train, y_test = y_train.tolist(), y_test.tolist()

In [18]:
# Definindo hiperparâmetros
max_length = 200

# num_classes = len(bias_id)
num_classes = 2

tokenizer_name = 'bert-base-uncased'

In [19]:
# Criando datasets
train_dataset = NewsDataset(
    texts=X_train, labels=y_train, tokenizer_name=tokenizer_name, max_length=max_length
)
test_dataset = NewsDataset(
    texts=X_test, labels=y_test, tokenizer_name=tokenizer_name, max_length=max_length
)

# Criando dataloaders
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)

## Criando modelo


In [None]:
class EncoderClassifier(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        num_classes: int,
        embed_dim: int,
        num_layers: int,
        num_heads: int,
        max_length: int,
        dropout: float,
    ):
        super().__init__()

        self.emb = nn.Embedding(vocab_size, embed_dim)
        self.pos_embed = nn.Parameter(torch.randn(1, max_length, embed_dim))
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            batch_first=True,
        )

        self.encoder = nn.TransformerEncoder(
            encoder_layer=self.encoder_layer,
            num_layers=num_layers,
        )

        self.linear = nn.Linear(embed_dim, num_classes)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x) + self.pos_embed[:, : x.size(1), :]

        x = self.encoder(x)
        x = self.dropout(x)
        x = x.max(dim=1)[0]
        out = self.linear(x)

        return out

## Treinando modelo


In [None]:
# Instanciando modelo
model = EncoderClassifier(
    vocab_size=train_dataset.tokenizer.vocab_size,
    num_classes=num_classes,
    embed_dim=128,
    num_layers=2,
    num_heads=4,
    max_length=max_length,
    dropout=0.1,
).to(device)

In [29]:
# Definindo perda
loss_fn = nn.CrossEntropyLoss()

# Definindo otimizador
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=1e-2)

In [None]:
# Setting seed
torch.manual_seed(42)

# Setting epochs
epochs = 3

# Main train loop
for epoch in tqdm(range(epochs)):
    print(
        f'Epoch: {epoch}',
        '-' * 90,
        sep='\n',
        end='\n',
    )

    # Train step
    train_step(
        estimator=model,
        dataloader=train_dataloader,
        loss_fn=loss_fn,
        optimizer=optimizer,
        device=device,
    )

    # Test step
    test_step(
        estimator=model,
        loss_fn=loss_fn,
        dataloader=test_dataloader,
        device=device,
    )