# VsoshRnnProject / RNN model

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import gensim
import nltk
import gc

import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_sequence
%matplotlib inline

from ipywidgets import FloatProgress

In [None]:
# need onle at first start
nltk.download('punkt_tab')

# Training RNN Model

Device

In [None]:
# переносим вычисления на графический процессор
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Cuda available: {torch.cuda.is_available()} \n")

# .to(device)

Load gensim model (Word2Vec)

In [None]:
w2v = gensim.models.Word2Vec.load("word2vec_weights.model")

Create data

In [None]:
def text_to_vec(text):
    tokens = nltk.tokenize.word_tokenize(text.lower())
    vector = [w2v.wv[token].tolist() if token in w2v.wv else w2v.wv['notfound'].tolist() for token in tokens]
    return vector

def modif_ans(ans):
    if int(ans) == 0:
        return 0.01
    else:
        return 0.99

In [None]:
df = pd.read_csv('all.csv')
df = df.dropna(subset=['text'])
df = df.dropna(subset=['label'])

# Включаем tqdm для отслеживания прогресса
tqdm.pandas(desc="text_to_vec")
# Применяем text_to_vec с прогресс-баром
df['vec'] = df['text'].progress_apply(text_to_vec)

# Включаем tqdm для отслеживания прогресса
tqdm.pandas(desc="modif_ans")
# Применяем modif_ans с прогресс-баром
df['ans'] = df['label'].progress_apply(modif_ans)

Create Dataset and Dataloader

In [None]:
class ProcessedTextDataset(Dataset):
    def __init__(self, vectors, labels):
        self.vectors = vectors
        self.labels = labels

    def __len__(self):
        return len(self.vectors)

    def __getitem__(self, idx):
        vector_tensor = torch.tensor(self.vectors[idx], dtype=torch.float32)
        label_tensor = torch.tensor(self.labels[idx], dtype=torch.float32)
        return vector_tensor.to(device), label_tensor.to(device)

def collate_fn(batch):
    # Разделяем батч на входные данные и метки
    sequences = [item[0] for item in batch]
    labels = [item[1] for item in batch]

    # Считаем длины последовательностей
    lengths = torch.tensor([len(seq) for seq in sequences], dtype=torch.int64)

    # Сортируем последовательности по длине (по убыванию)
    sorted_indices = torch.argsort(lengths, descending=True)
    sequences = [sequences[i] for i in sorted_indices]
    labels = [labels[i] for i in sorted_indices]
    lengths = lengths[sorted_indices]

    # Паддим последовательности до максимальной длины
    padded_sequences = pad_sequence(sequences, batch_first=True)
    labels = torch.tensor(labels, dtype=torch.float32)

    # Возвращаем упакованные последовательности
    packed_sequences = pack_padded_sequence(padded_sequences, lengths, batch_first=True, enforce_sorted=True)
    return packed_sequences.to(device), labels.to(device)

In [None]:
# Разделяем данные
X_train, X_test, Y_train, Y_test = train_test_split(df['vec'], df['ans'], test_size=0.3, shuffle=True)

# Создаем датасеты
train_dataset = ProcessedTextDataset(X_train.tolist(), Y_train.tolist())
test_dataset = ProcessedTextDataset(X_test.tolist(), Y_test.tolist())

# Создаем DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

# # Проверка работы DataLoader
# for batch_idx, (packed_batch, labels_batch) in enumerate(train_loader):
#     print(f"Batch {batch_idx + 1}")
#     print("Packed batch:", packed_batch)  # Упакованные последовательности
#     print("Labels shape:", labels_batch.shape)  # Метки


Rnn initialization

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.3):
        super(RNNModel, self).__init__()
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Sequential(torch.nn.Linear(hidden_size, output_size), torch.nn.Sigmoid())

    def forward(self, packed_input):
        packed_output, (hidden, _) = self.rnn(packed_input)  # RNN принимает PackedSequence
        output = self.fc(hidden[-1])  # Используем последний скрытый слой
        return output

In [None]:
# Инициализируем модель
rnn = RNNModel(input_size=100, hidden_size=45, num_layers=2, output_size=1).to(device)

Traning

In [None]:
def traning_loop_tqdm(model, num_epochs=1, learning_rate=0.01):
    model.train()
    criterion = torch.nn.MSELoss() # оптимизатор
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5) # функция ошибки

    # цикл обучения
    for epoch in range(num_epochs):
      with tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch", colour="green") as t:
        for packed_sequences, labels in t:
            y_pred = model(packed_sequences)
            optimizer.zero_grad()

            # обратное распространение ошибки
            loss = criterion(y_pred.squeeze(), labels)
            loss.backward()

            # корректировка весов модели
            optimizer.step()
            t.set_postfix(loss=loss.item() / len(train_loader))

In [None]:
print("Let's start training!")
traning_loop_tqdm(rnn)

# Score

Accurecy

In [None]:
# процент правильных предсказаний в тестовой выборке через dataloader
rnn.eval()
correct = 0
total = 0
with torch.no_grad():
    for texts, labels in test_loader:
        outputs = rnn(texts)

        outputs = torch.squeeze(outputs)
        outputs = torch.round(outputs)
        labels = torch.round(labels)
        correct_vec = torch.eq(outputs, labels)
        correct += torch.sum(correct_vec)
        total += labels.size(0)

accuracy = 100 * correct / total
print(f'Correct: {correct} total: {total}')
print(f'Accuracy: {accuracy:.4f}%')

Recall, Precision, F1_score, Accurecy

In [None]:
def evaluate_classification(model, dataloader, device):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            preds = torch.squeeze(outputs)
            preds = torch.round(preds)
            labels = torch.round(labels)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Метрики
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average="weighted")
    recall = recall_score(all_labels, all_preds, average="weighted")
    f1 = f1_score(all_labels, all_preds, average="weighted")

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1
    }

In [None]:
results = evaluate_classification(rnn, test_loader, device)
print("Классификация:", results)

Saving

In [None]:
torch.save(rnn.state_dict(), "rnn_model_weights.pth")

Clearning the memory

In [None]:
del rnn, X_test, Y_test, X_train, Y_train, test_loader, train_loader, test_dataset, train_dataset
gc.collect()
torch.cuda.empty_cache()

# Using and testing

Rnn initialization + function

In [None]:
class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(RNNModel, self).__init__()
        self.rnn = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Sequential(torch.nn.Linear(hidden_size, 1), torch.nn.Sigmoid())

    def forward(self, packed_input):
        packed_output, (hidden, _) = self.rnn(packed_input)  # RNN принимает PackedSequence
        output = self.fc(hidden[-1])  # Используем последний скрытый слой
        return output

model = RNNModel(input_size=100, hidden_size=128, num_layers=1, output_size=1)

In [None]:
def text_to_vec(text):
    tokens = nltk.tokenize.word_tokenize(text.lower())
    vector = [w2v.wv[token].tolist() if token in w2v.wv else w2v.wv['notfound'].tolist() for token in tokens]
    return vector

Load rnn and gensim modeles

In [None]:
model.load_state_dict(torch.load("rnn_model_weights.pth"))
model.eval()

w2v = gensim.models.Word2Vec.load("word2vec_weights.model")

Using

In [None]:
# пользовательский ввод
input_for_model = input()

In [None]:
# пользовательский ввод
input_for_model = "' OR 1 = 1 #"

In [None]:
# получение предсказания

input_vector = text_to_vec(input_for_model)
input_vector = torch.tensor(input_vector)

predict = model(input_vector)
print(predict)
ans = float(predict.item())
print(f'Ans: {ans:.8f}')