### Импорт

In [None]:
!pip install pymorphy2
!pip install torchmetrics

In [None]:
import json
import numpy as np
import pandas as pd
from zipfile import ZipFile
from nltk.probability import FreqDist
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchmetrics import F1Score
from torch.autograd import Variable

import nltk
from nltk.tokenize import word_tokenize
nltk.download("punkt")


random_state = 9

In [None]:
!mkdir ~/.kaggle
!touch ~/.kaggle/kaggle.json

In [None]:
with open('../config.json', 'r') as config_file:
    config = json.load(config_file)
TOKEN = config['TOKEN']

api_token = {"username":"w1nston","key":TOKEN}

with open('/root/.kaggle/kaggle.json', 'w+') as file:
    json.dump(api_token, file)

In [None]:
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d clmentbisaillon/fake-and-real-news-dataset

In [None]:
with ZipFile('fake-and-real-news-dataset.zip', 'r') as zip_ref:
    zip_ref.extractall()

In [244]:
df_true = pd.read_csv('/content/True.csv')
df_fake = pd.read_csv('/content/Fake.csv')

df_true['type'] = 0
df_fake['type'] = 1

df = pd.concat([df_true, df_fake], ignore_index=True)

### Токенизация и векторизация текстов

In [245]:
x_train_red, x_test_red, y_train, y_test = train_test_split(df['title'], df['type'],
                                                            shuffle=True, random_state=random_state)

In [251]:
texts = list(df['title'])

In [252]:
tokens = [] #будет содержать все слова корпуса
for text in texts:
    tokens.extend(word_tokenize(text)) #текст -> список слов -> добавить в общий список
tokens_filtered = [word for word in tokens if word.isalnum()]

In [253]:
max_words = 1500
dist = FreqDist(tokens_filtered) #список слов -> словарь(слово: его частота)
tokens_filtered_top = [pair[0].lower() for pair in dist.most_common(max_words-1)] #возвращает топ max_words слов по частоте, саму частоту выкидывает

In [254]:
vocabulary = {v: k for k, v in dict(enumerate(tokens_filtered_top, 1)).items()}

In [255]:
def text_to_sequence(text, maxlen):
    result = []

    tokens = word_tokenize(text.lower())
    tokens_filtered = [word for word in tokens if word.isalnum()]

    for word in tokens_filtered:
        if word in vocabulary:
            result.append(vocabulary[word])

    padding = [0]*(maxlen-len(result))
    output = padding + result[-maxlen:]

    return output

In [None]:
#простая проверка для выбора длины вектора
len_list = []
for text in texts:
    len_list.append(len(word_tokenize(text)))
print(np.mean(len_list), max(len_list))

In [257]:
max_len = 50

#создание векторов текстов
x_train = np.array([text_to_sequence(text, max_len) for text in x_train_red], dtype=np.int32)
x_test = np.array([text_to_sequence(text, max_len) for text in x_test_red], dtype=np.int32)

y_test = np.array(y_test)
y_train = np.array(y_train)

### Обучение CNN

In [265]:
class NewsData(Dataset):
    def __init__(self, data, labels):
        self.data = torch.from_numpy(data).long()
        self.labels = torch.from_numpy(labels).long()

    def __getitem__(self, index):
        x = self.data[index]
        y = self.labels[index]

        return x, y

    def __len__(self):
        return len(self.data)

In [266]:
class NewsClassifier(nn.Module):
    def __init__(self, vocab_size=1500, embedding_dim=50, out_channel=128, num_classes=2):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.conv = nn.Conv1d(embedding_dim, out_channel, kernel_size=2)
        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()
        self.linear = nn.Linear(out_channel, num_classes)



    def forward(self, x):
        output = self.embedding(x)
        output = output.permute(0, 2, 1) # bs, emb_dim, len
        output = self.conv(output)
        output = self.relu(output)
        output = torch.max(output, axis=2).values
        output = self.dropout(output)
        output = self.linear(output)
        return output

In [None]:
batch_size = 128
epochs = 7

model = NewsClassifier()
print(model)
print("Parameters:", sum([param.nelement() for param in model.parameters()]))

model.train()
f1 = F1Score(task="binary")

optimizer = torch.optim.Adam(model.parameters(), lr=10e-3)
criterion = nn.CrossEntropyLoss()

train_dataset = NewsData(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

loss_history = []

for epoch in range(1,epochs+1):
    print(f"Train epoch {epoch}/{epochs}")
    temp_loss = []
    temp_metrics = []
    for i, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()

        data = data
        target = target
        output = model(data)

        loss = criterion(output, target)

        loss.backward()

        optimizer.step()
        temp_loss.append(loss.float().item())

        temp_metrics.append(f1(output.argmax(1), target).item())

    epoch_loss = np.array(temp_loss).mean()
    epoch_f1 = np.array(temp_metrics).mean()
    print(f'Loss: {epoch_loss}, f1 score: {epoch_f1}')

### Обучение LSTM

In [268]:
class NewsClassifierLSTM(nn.Module):
    def __init__(self, vocab_size=1500, embedding_dim=50, out_channel=128, n_layers=2, hidden_dim = 128, num_classes=2, drop_prob=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, out_channel, n_layers, dropout=drop_prob, batch_first=True)
        self.linear = nn.Linear(out_channel, num_classes)
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers


    def forward(self, x):
        out = self.embedding(x)

        batch_size = x.size()[0]
        hidden = self.init_hidden(batch_size)
        lstm_out, _ = self.lstm(out, hidden)

        out = self.linear(lstm_out[:,-1])

        return out

    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_(),
                  weight.new(self.n_layers, batch_size, self.hidden_dim).zero_())
        return hidden

In [None]:
batch_size = 128
epochs = 7

model_2 = NewsClassifierLSTM()
print(model_2)
print("Parameters:", sum([param.nelement() for param in model_2.parameters()]))

model_2.train()
f1 = F1Score(task="binary")

optimizer = torch.optim.Adam(model_2.parameters(), lr=10e-3)
criterion = nn.CrossEntropyLoss()

train_dataset = NewsData(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

loss_history = []

for epoch in range(1,epochs+1):
    h = model_2.init_hidden(data.size(0))
    print(f"Train epoch {epoch}/{epochs}")
    temp_loss = []
    temp_metrics = []
    for i, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()

        data = data
        target = target
        output = model_2(data)

        loss = criterion(output, target)
        loss.backward()

        optimizer.step()
        temp_loss.append(loss.float().item())
        temp_metrics.append(f1(output.argmax(1), target).item())


    epoch_loss = np.array(temp_loss).mean()
    epoch_f1 = np.array(temp_metrics).mean()
    print(f'Loss: {epoch_loss}, f1 score: {epoch_f1}')

### Сравнение CNN/LSTM

In [280]:
def comp(model, test_loader):
    temp_loss = []
    temp_metrics = []
    for i, (data, target) in enumerate(test_loader):
        data = data
        target = target
        output = model(data)

        loss = criterion(output, target)
        temp_loss.append(loss.float().item())
        temp_metrics.append(f1(output.argmax(1), target).item())


    epoch_loss = np.array(temp_loss).mean()
    epoch_f1 = np.array(temp_metrics).mean()
    print(f'{epoch_loss} // {epoch_f1}')

In [None]:
val_dataset = NewsData(x_test, y_test)
val_loader = DataLoader(val_dataset, batch_size=64)

print(comp(model, val_loader))
print(comp(model_2, val_loader))