In [None]:
import sys
import torch
import string
import random
import time
import math
import matplotlib.pyplot as plt
from tqdm import tqdm
from IPython.display import clear_output

In [None]:
all_categories = ["normal", "attack"]
n_categories = len(all_categories)
category_lines = {}

PATH_NORMAL = "dataset/normal.txt"
PATH_ATTACK = "dataset/attack.txt"

def readLines(filename: str) -> list[str]:
    lines = open(filename, encoding="utf-8").read().strip().split("\n")
    return lines

def removeBlankLines(file_path: str) -> None:
    with open(file_path, 'r') as f:
        lines = [line for line in f if line.strip()]

    with open(file_path, 'w') as f:
        f.writelines(lines)

removeBlankLines(PATH_NORMAL)
removeBlankLines(PATH_ATTACK)
lines_normal = readLines(PATH_NORMAL)  # list_of_normal
lines_attack = readLines(PATH_ATTACK)  # list_of_attack

category_lines["normal"] = lines_normal
category_lines["attack"] = lines_attack

In [None]:
all_char = string.printable
n_all_char = len(all_char)


def char2index(char: str):
    return all_char.find(char)


def char2tensor(char: str):
    tensor = torch.zeros(1, n_all_char)
    tensor[0][char2index(char)] = 1
    return tensor


def line2tensor(line: str):
    tensor = torch.zeros(len(line), n_all_char)
    for i, char in enumerate(line):
        tensor[i][char2index(char)] = 1
    return tensor

In [None]:
def categoryFromOutput(output):
    top_n, top_i = output.topk(1)
    category_i = top_i[0].item()
    return all_categories[category_i], category_i


def randomChoice(l):
    return l[random.randint(0, len(l) - 1)]


def randomTrainingExample():
    category = randomChoice(all_categories)
    line = randomChoice(category_lines[category])
    category_tensor = torch.tensor([all_categories.index(category)], dtype=torch.long)
    line_tensor = line2tensor(line)
    return category, line, category_tensor, line_tensor

In [None]:
class RNN(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super().__init__()

        self.num_layers = num_layers
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.rnn = torch.nn.RNN(
            input_size=self.input_size,
            hidden_size=self.hidden_size,
            num_layers=num_layers,
        )
        self.h2o = torch.nn.Linear(hidden_size, output_size)

    def forward(self, input):
        hidden = torch.zeros(self.num_layers, self.hidden_size)
        input, _ = self.rnn(input, hidden)
        output = self.h2o(input[-1, :])
        return output.unsqueeze(0)


input_size = n_all_char
output_size = 2
hidden_size = 128
num_layers = 2
learning_rate = 0.00008
rnn = RNN(input_size, hidden_size, output_size, num_layers)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(rnn.parameters(), lr=learning_rate)

In [None]:
n_iters = 20000
print_every = n_iters / 200
plot_every = 100
save_every = n_iters / 5


def train(category_tensor, line_tensor):
    output = rnn(line_tensor)
    loss = criterion(output, category_tensor)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return output, loss.item()


plt.figure()
current_loss = 0
all_losses = []

for iter in tqdm(range(1, n_iters + 1), file=sys.stdout):
    category, line, category_tensor, line_tensor = randomTrainingExample()
    output, loss = train(category_tensor, line_tensor)
    current_loss += loss

    if iter % print_every == 0:
        guess, guess_i = categoryFromOutput(output)
        correct = "YES" if guess == category else "NO (%s)" % category
        tqdm.write(
            f"{iter} {iter / n_iters * 100:.1f}% {loss:.4f} {guess} {correct} {line}"
        )

    if iter % plot_every == 0:
        all_losses.append(current_loss / plot_every)
        current_loss = 0

    if iter % (plot_every * 10) == 0:
        clear_output(wait=True)
        plt.plot(all_losses)
        plt.show()

    if iter % save_every == 0:
        state = {"state": rnn.state_dict(), "epoch": iter}
        filename = "pt/" + str(iter) + "epo_rnnad.pt"
        torch.save(state, filename)

In [None]:
def train_env(line_tensor):
    output = rnn(line_tensor)
    return output


def evaluate(line):
    testdatatotensor = line2tensor(line)
    testoutput = train_env(testdatatotensor)
    return categoryFromOutput(testoutput)[0]


correct = 0
total = 10000
with torch.no_grad():
    for epoch in tqdm(range(total)):
        category, line, category_tensor, line_tensor = randomTrainingExample()
        predicted = evaluate(line)
        if predicted == category:
            correct += 1
print("accuracy on test set: %d %% " % (100 * correct / total))