## Imports

In [1]:
import time
import os
import pickle
import numpy as np
from bisect import bisect_left
import torch
from transformers import BertTokenizer
from pathlib import Path
from torch.utils.data import TensorDataset, Dataset, DataLoader
import itertools
import time
import os
import numpy as np
from bisect import bisect_left
import torch
import torch
import time
import sys
import pandas as pd
from torch.utils.flop_counter import FlopCounterMode
from IPython.display import clear_output

In [2]:
if True:
    class VocabFactory():

        def __init__(self, tokenizer) -> None:
            self.tokens_dict = {}
            self.tokenizer = tokenizer
            self.vocab_id = {}
            self.id_vocab = {}

        def create_vocab(self, texts, vocab_count=0):
            i = 0
            self.tokens_dict['❿'] = sys.maxsize

            for text in texts:
                if i %100000 == 0:
                    print(i)
                i+=1

                tokens = list(self.tokenizer(text))
                for token in tokens:
                    # token = self.tokenizer(token)
                    if token not in self.tokens_dict:
                        self.tokens_dict[token] = 0
                    self.tokens_dict[token] += 1
            self.finalize(vocab_count)

        def finalize(self, vocab_count=0):
            all_tokens = pd.DataFrame({'tokens': list(self.tokens_dict.keys()), 'counts': list(self.tokens_dict.values())})
            all_tokens = all_tokens.sort_values(by=['counts'], ascending=False)
            tokens = all_tokens['tokens'].values
            if vocab_count != 0:
                tokens = tokens[:vocab_count]
            self.vocab_id = {t: i for i, t in enumerate(tokens)}
            self.id_vocab = {i: t for i, t in enumerate(tokens)}

    def read_data(file_dir):
        with open(file_dir) as file:
            urls = []
            labels = []
            for line in file.readlines():
                items = line.split('\t')
                label = int(items[0])
                if label == 1:
                    labels.append(1)
                else:
                    labels.append(0)
                url = items[1][:-1]
                urls.append(url)
        return urls, labels

    def split_url(line, part):
        if line.startswith("http://"):
            line=line[7:]
        if line.startswith("https://"):
            line=line[8:]
        if line.startswith("ftp://"):
            line=line[6:]
        if line.startswith("www."):
            line = line[4:]
        slash_pos = line.find('/')
        if slash_pos > 0 and slash_pos < len(line)-1: # line = "fsdfsdf/sdfsdfsd"
            primarydomain = line[:slash_pos]
            path_argument = line[slash_pos+1:]
            path_argument_tokens = path_argument.split('/')
            pathtoken = "/".join(path_argument_tokens[:-1])
            last_pathtoken = path_argument_tokens[-1]
            if len(path_argument_tokens) > 2 and last_pathtoken == '':
                pathtoken = "/".join(path_argument_tokens[:-2])
                last_pathtoken = path_argument_tokens[-2]
            question_pos = last_pathtoken.find('?')
            if question_pos != -1:
                argument = last_pathtoken[question_pos+1:]
                pathtoken = pathtoken + "/" + last_pathtoken[:question_pos]
            else:
                argument = ""
                pathtoken = pathtoken + "/" + last_pathtoken
            last_slash_pos = pathtoken.rfind('/')
            sub_dir = pathtoken[:last_slash_pos]
            filename = pathtoken[last_slash_pos+1:]
            file_last_dot_pos = filename.rfind('.')
            if file_last_dot_pos != -1:
                file_extension = filename[file_last_dot_pos+1:]
                filename = filename[:file_last_dot_pos]
            else:
                file_extension = ""
        elif slash_pos == 0:    # line = "/fsdfsdfsdfsdfsd"
            primarydomain = line[1:]
            pathtoken = ""
            argument = ""
            sub_dir = ""
            filename = ""
            file_extension = ""
        elif slash_pos == len(line)-1:   # line = "fsdfsdfsdfsdfsd/"
            primarydomain = line[:-1]
            pathtoken = ""
            argument = ""
            sub_dir = ""
            filename = ""
            file_extension = ""
        else:      # line = "fsdfsdfsdfsdfsd"
            primarydomain = line
            pathtoken = ""
            argument = ""
            sub_dir = ""
            filename = ""
            file_extension = ""
        if part == 'pd':
            return primarydomain
        elif part == 'path':
            return pathtoken
        elif part == 'argument':
            return argument
        elif part == 'sub_dir':
            return sub_dir
        elif part == 'filename':
            return filename
        elif part == 'fe':
            return file_extension
        elif part == 'others':
            if len(argument) > 0:
                return pathtoken + '?' +  argument
            else:
                return pathtoken
        else:
            return primarydomain, pathtoken, argument, sub_dir, filename, file_extension

    def get_word_vocab(urls, max_length_words, tokenizer, min_word_freq=0):
        vocab_factory = VocabFactory(tokenizer)
        vocab_factory.create_vocab(urls)
        vocab_id = vocab_factory.vocab_id
        start = time.time()
        tokenized_texts = [tokenizer(text) for text in urls]
        x = ([torch.tensor([vocab_id[token] for token in t_text]) for t_text in tokenized_texts])
        x = torch.nn.utils.rnn.pad_sequence(x, batch_first=True, padding_value=0)[:,:max_length_words]
        print("Finished build vocabulary and mapping to x in {}".format(time.time() - start))
        vocab_dict = vocab_factory.vocab_id
        reverse_dict = vocab_factory.id_vocab
        print("Size of word vocabulary: {}".format(len(reverse_dict)))
        return x, reverse_dict

    def get_words(x, reverse_dict, delimit_mode, urls=None):
        processed_x = []
        if delimit_mode == 0:
            for url in x:
                words = []
                for word_id in url:
                    if word_id.item() != 0:
                        words.append(reverse_dict[word_id.item()])
                    else:
                        break
                processed_x.append(words)
        elif delimit_mode == 1:
            for i in range(x.shape[0]):
                word_url = x[i]
                raw_url = urls[i]
                words = []
                for w in range(len(word_url)):
                    word_id = word_url[w]
                    if word_id.item() == 0:
                        words.extend(list(raw_url))
                        break
                    else:
                        word = reverse_dict[word_id.item()]
                        idx = raw_url.index(word)
                        special_chars = list(raw_url[0:idx])
                        words.extend(special_chars)
                        words.append(word)
                        raw_url = raw_url[idx+len(word):]
                        if w == len(word_url) - 1:
                            words.extend(list(raw_url))
                processed_x.append(words)
        return processed_x

    def get_char_ngrams(ngram_len, word):
        word = f"<{word}>"
        return [word[i:i + ngram_len] for i in range(len(word) - ngram_len + 1)]

    def char_id_x(urls, char_dict, max_len_chars):
        chidx = [min(len(url), max_len_chars) for url in urls]
        chared_id_x = [torch.from_numpy(np.array([char_dict[url[i]] if url[i] in char_dict else 0 for i in range(chidx[l])])) for l, url in enumerate(urls)]
        chared_id_x = torch.nn.utils.rnn.pad_sequence(chared_id_x, batch_first=True, padding_value=char_dict['❿'])
        return chared_id_x

    def ngram_id_x_dicts(word_x, max_len_subwords, high_freq_words=None):
        char_ngram_len = 1
        all_ngrams = set()
        ngramed_x = []
        all_words = set()
        worded_x = []
        counter = 0
        for counter, url in enumerate(word_x):
            if counter % 100000 == 0:
                print("Processing #url {}".format(counter))
            url_in_ngrams = []
            url_in_words = []
            for word in url:
                ngrams = get_char_ngrams(char_ngram_len, word)
                if (len(ngrams) > max_len_subwords) or \
                    (high_freq_words is not None and len(word)>1 and not is_in(high_freq_words, word)):
                    ngram_slice = ngrams[:max_len_subwords]
                    all_ngrams.update(ngram_slice)
                    url_in_ngrams.append(ngram_slice)
                    all_words.add("<UNKNOWN>")
                    url_in_words.append("<UNKNOWN>")
                else:
                    all_ngrams.update(ngrams)
                    url_in_ngrams.append(ngrams)
                    all_words.add(word)
                    url_in_words.append(word)
            ngramed_x.append(url_in_ngrams)
            worded_x.append(url_in_words)
        print("worded_x completed")
        all_ngrams = ['❿'] + list(all_ngrams)
        ngrams_dict = {ngram: i + 1 for i, ngram in enumerate(all_ngrams)}

        print("ngrams_dict completed")
        all_words = list(all_words)
        words_dict = {word: i + 1 for i, word in enumerate(all_words)}
        
        return ngrams_dict, words_dict, ngramed_x, worded_x

    def ngram_id_x(ngrams_dict, words_dict, ngramed_x, worded_x, max_len_subwords):
        print("words_dict completed")

        fill_char = ngrams_dict['❿']
        ngramed_id_x = [
            [
                [
                    ngrams_dict[ngram] for ngram in ngramed_word[:max_len_subwords]
                ] + [fill_char] * max(0, max_len_subwords - len(ngramed_word))
                for ngramed_word in ngramed_url
            ]
            for ngramed_url in ngramed_x
        ]
        ngramed_id_x = torch.from_numpy(np.array(ngramed_id_x))
        print("ngramed_id_x completed")
        worded_id_x = [
            [words_dict[word] for word in worded_url]
            for worded_url in worded_x
        ]
        worded_id_x = torch.from_numpy(np.array(worded_id_x))

        print("worded_id_x completed")
        return ngramed_id_x, worded_id_x

    def ngram_id_x_from_dict(word_x, max_len_subwords, ngram_dict, word_dict = None):
        char_ngram_len = 1
        print("Index of <UNKNOWN> word: {}".format(word_dict["<UNKNOWN>"]))
        ngramed_id_x = []
        worded_id_x = []
        counter = 0
        if word_dict:
            word_vocab = sorted(list(word_dict.keys()))
        for url in word_x:
            if counter % 100000 == 0:
                print("Processing url #{}".format(counter))
            counter += 1
            url_in_ngrams = []
            url_in_words = []
            words = url
            for word in words:
                ngrams = get_char_ngrams(char_ngram_len, word)
                if len(ngrams) > max_len_subwords:
                    word = "<UNKNOWN>"
                ngrams_id = []
                for ngram in ngrams:
                    if ngram in ngram_dict:
                        ngrams_id.append(ngram_dict[ngram])
                    else:
                        ngrams_id.append(0)
                url_in_ngrams.append(ngrams_id)
                if is_in(word_vocab, word):
                    word_id = word_dict[word]
                else:
                    word_id = word_dict["<UNKNOWN>"]
                url_in_words.append(word_id)
            ngramed_id_x.append(url_in_ngrams)
            worded_id_x.append(url_in_words)

        return ngramed_id_x, worded_id_x

    def bisect_search(a,x):
        i = bisect_left(a,x)
        if i != len(a) and a[i] == x:
            return i
        raise ValueError

    def is_in(a,x):
        i = bisect_left(a,x)
        if i != len(a) and a[i] == x:
            return True
        else:
            return False

    def prep_train_test(pos_x, neg_x, dev_pct):
        np.random.seed(10)
        shuffle_indices=np.random.permutation(np.arange(len(pos_x)))
        pos_x_shuffled = pos_x[shuffle_indices]
        dev_idx = -1 * int(dev_pct * float(len(pos_x)))
        pos_train = pos_x_shuffled[:dev_idx]
        pos_test = pos_x_shuffled[dev_idx:]

        np.random.seed(10)
        shuffle_indices=np.random.permutation(np.arange(len(neg_x)))
        neg_x_shuffled = neg_x[shuffle_indices]
        dev_idx = -1 * int(dev_pct * float(len(neg_x)))
        neg_train = neg_x_shuffled[:dev_idx]
        neg_test = neg_x_shuffled[dev_idx:]

        x_train = torch.from_numpy(np.array(list(pos_train) + list(neg_train)))
        y_train = len(pos_train)*[1] + len(neg_train)*[0]
        x_test = torch.from_numpy(np.array(list(pos_test) + list(neg_test)))
        y_test = len(pos_test)*[1] + len(neg_test)*[0]

        y_train = torch.nn.functional.one_hot(torch.tensor(y_train), num_classes=2).float()
        y_test = torch.nn.functional.one_hot(torch.tensor(y_test), num_classes=2).float()

        np.random.seed(10)
        shuffle_indices = np.random.permutation(np.arange(len(x_train)))
        x_train = x_train[shuffle_indices]
        y_train = y_train[shuffle_indices]

        np.random.seed(10)
        shuffle_indices = np.random.permutation(np.arange(len(x_test)))
        x_test = x_test[shuffle_indices]
        y_test = y_test[shuffle_indices]

        print("Train Mal/Ben split: {}/{}".format(len(pos_train), len(neg_train)))
        print("Test Mal/Ben split: {}/{}".format(len(pos_test), len(neg_test)))
        print("Train/Test split: {}/{}".format(len(y_train), len(y_test)))
        print("Train/Test split: {}/{}".format(len(x_train), len(x_test)))

        return x_train, y_train, x_test, y_test


    def prep_train_test2(grouped_ids, dev_pct):

        np.random.seed(10)
        train_ids = []
        test_ids = []

        for ids in grouped_ids:
            shuffle_indices=np.random.permutation(np.arange(len(ids)))
            ids_shuffled = ids[shuffle_indices]
            dev_idx = -1 * int(dev_pct * float(len(ids)))
            ids_train = ids_shuffled[:dev_idx]
            ids_test = ids_shuffled[dev_idx:]
            train_ids.append(ids_train)
            test_ids.append(ids_test)
        y_train = [len(train_ids[i]) * [i] for i in range(len(train_ids))]
        y_test = [len(test_ids[i]) * [i] for i in range(len(test_ids))]
        y_train = list(itertools.chain.from_iterable(y_train))
        y_test = list(itertools.chain.from_iterable(y_test))
        x_train = torch.tensor(list(itertools.chain.from_iterable(train_ids)))
        x_test = torch.tensor(list(itertools.chain.from_iterable(test_ids)))

        y_train = torch.nn.functional.one_hot(torch.tensor(y_train), num_classes=len(grouped_ids)).float()
        y_test = torch.nn.functional.one_hot(torch.tensor(y_test), num_classes=len(grouped_ids)).float()

        np.random.seed(10)
        shuffle_indices = np.random.permutation(np.arange(len(x_train)))
        x_train = x_train[shuffle_indices]
        y_train = y_train[shuffle_indices]

        np.random.seed(10)
        shuffle_indices = np.random.permutation(np.arange(len(x_test)))
        x_test = x_test[shuffle_indices]
        y_test = y_test[shuffle_indices]
        return x_train, y_train, x_test, y_test


    def get_ngramed_id_x(x_idxs, ngramed_id_x):
        output_ngramed_id_x = []
        for idx in x_idxs:
            output_ngramed_id_x.append(ngramed_id_x[idx])
        return torch.stack(output_ngramed_id_x)

    def pad_seq(urls, max_d1=0, max_d2=0, embedding_size=128):
        if max_d1 == 0 and max_d2 == 0:
            for url in urls:
                if len(url) > max_d1:
                    max_d1 = len(url)
                for word in url:
                    if len(word) > max_d2:
                        max_d2 = len(word)
        pad_idx = np.zeros((len(urls), max_d1, max_d2, embedding_size))
        pad_urls = np.zeros((len(urls), max_d1, max_d2))
        pad_vec = [1 for i in range(embedding_size)]
        for d0 in range(len(urls)):
            url = urls[d0]
            for d1 in range(len(url)):
                if d1 < max_d1:
                    word = url[d1]
                    for d2 in range(len(word)):
                        if d2 < max_d2:
                            pad_urls[d0,d1,d2] = word[d2]
                            pad_idx[d0,d1,d2] = pad_vec
        return pad_urls, pad_idx

    def pad_seq_in_word(urls, max_d1=0, embedding_size=128):
        if max_d1 == 0:
            url_lens = [len(url) for url in urls]
            max_d1 = max(url_lens)
        pad_urls = np.zeros((len(urls), max_d1))
        #pad_idx = np.zeros((len(urls), max_d1, embedding_size))
        #pad_vec = [1 for i in range(embedding_size)]
        for d0 in range(len(urls)):
            url = urls[d0]
            for d1 in range(len(url)):
                if d1 < max_d1:
                    pad_urls[d0,d1] = url[d1]
                    #pad_idx[d0,d1] = pad_vec
        return pad_urls

    def softmax(x):
        e_x = np.exp(x - np.max(x))
        return e_x / e_x.sum()

    def batch_iter(data, batch_size, num_epochs, shuffle=True):
        data = np.array(data)
        data_size = len(data)
        num_batches_per_epoch = int((len(data)-1)/batch_size) + 1
        for epoch in range(num_epochs):
            if shuffle:
                shuffle_indices = np.random.permutation(np.arange(data_size))
                shuffled_data = data[shuffle_indices]
            else:
                shuffled_data = data
            for batch_num in range(num_batches_per_epoch):
                start_idx = batch_num * batch_size
                end_idx = min((batch_num+1) * batch_size, data_size)
                yield shuffled_data[start_idx:end_idx]

    def save_test_result(labels, all_predictions, all_scores, output_dir):
        output_labels = []
        for i in labels:
            if i == 1:
                output_labels.append(i)
            else:
                output_labels.append(-1)
        output_preds = []
        for i in all_predictions:
            if i == 1:
                output_preds.append(i)
            else:
                output_preds.append(-1)
        softmax_scores = [softmax(i) for i in all_scores]
        with open(output_dir, "w") as file:
            output = "label\tpredict\tscore\n"
            file.write(output)
            for i in range(len(output_labels)):
                output = str(int(output_labels[i])) + '\t' + str(int(output_preds[i])) + '\t' + str(softmax_scores[i][1]) + '\n'
                file.write(output)


In [3]:
def prep_train_test2(grouped_ids, dev_pct):

    np.random.seed(10)
    train_ids = []
    test_ids = []

    for ids in grouped_ids:
        shuffle_indices=np.random.permutation(np.arange(len(ids)))
        ids_shuffled = ids[shuffle_indices]
        dev_idx = -1 * int(dev_pct * float(len(ids)))
        ids_train = ids_shuffled[:dev_idx]
        ids_test = ids_shuffled[dev_idx:]
        train_ids.append(ids_train)
        test_ids.append(ids_test)
    y_train = [len(train_ids[i]) * [i] for i in range(len(train_ids))]
    y_test = [len(test_ids[i]) * [i] for i in range(len(test_ids))]
    y_train = list(itertools.chain.from_iterable(y_train))
    y_test = list(itertools.chain.from_iterable(y_test))
    x_train = torch.tensor(list(itertools.chain.from_iterable(train_ids)))
    x_test = torch.tensor(list(itertools.chain.from_iterable(test_ids)))

    y_train = torch.nn.functional.one_hot(torch.tensor(y_train), num_classes=len(grouped_ids)).float()
    y_test = torch.nn.functional.one_hot(torch.tensor(y_test), num_classes=len(grouped_ids)).float()

    np.random.seed(10)
    shuffle_indices = np.random.permutation(np.arange(len(x_train)))
    x_train = x_train[shuffle_indices]
    y_train = y_train[shuffle_indices]

    np.random.seed(10)
    shuffle_indices = np.random.permutation(np.arange(len(x_test)))
    x_test = x_test[shuffle_indices]
    y_test = y_test[shuffle_indices]

    # print("Train Mal/Ben split: {}/{}".format(len(pos_train), len(neg_train)))
    # print("Test Mal/Ben split: {}/{}".format(len(pos_test), len(neg_test)))
    # print("Train/Test split: {}/{}".format(len(y_train), len(y_test)))
    # print("Train/Test split: {}/{}".format(len(x_train), len(x_test)))

    return x_train, y_train, x_test, y_test

In [4]:
torch.cuda.is_available()

True

## Prepare Data

In [16]:
df = pd.read_csv(r"datasets\PhishStorm\urlset.csv", on_bad_lines='warn', encoding = 'ISO-8859-1', low_memory=False)[['domain','label']]
df.dropna(inplace=True)
df.columns = ['url', 'Topic']
class_list = ['benign', 'phishing']
class_id = {'benign':0, 'phishing': 1}
id_class = {0: 'benign', 1:'phishing'}
df['label'] = df.Topic#.apply(lambda t: class_id[t])
urls = df["url"].values
labels = df["label"].values.astype(np.longlong)

Skipping line 18273: expected 14 fields, saw 15

  df = pd.read_csv(r"datasets\PhishStorm\urlset.csv", on_bad_lines='warn', encoding = 'ISO-8859-1', low_memory=False)[['domain','label']]


In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [7]:
if True:
    default_max_len_words = 200
    max_len_words = default_max_len_words
    default_max_len_chars = 200
    max_len_chars = default_max_len_chars
    default_max_len_subwords = 20
    max_len_subwords = default_max_len_subwords
    default_min_word_freq = 1
    min_word_freq = default_min_word_freq

    default_delimit_mode = 1
    delimit_mode = default_delimit_mode
    default_max_len_words = 200
    max_len_words = default_max_len_words
    default_max_len_chars = 200
    max_len_chars = default_max_len_chars
    default_max_len_subwords = 20
    max_len_subwords = default_max_len_subwords
    default_min_word_freq = 1
    min_word_freq = default_min_word_freq
    default_dev_pct = 0.001
    dev_pct=0.1 # default_dev_pct

    # model args
    default_emb_dim = 32
    emb_dim = default_emb_dim
    default_filter_sizes = "3,4,5,6"
    filter_sizes = default_filter_sizes
    default_emb_mode = 5
    emb_mode = default_emb_mode

    # train args
    default_nb_epochs = 5
    nb_epochs = default_nb_epochs
    default_batch_size = 1024
    batch_size = default_batch_size
    l2_reg_lambda = 0.0
    default_lr = 0.001
    lr = default_lr

    # log args
    output_dir = r"scripts\URLNet\outputs"
    print_every = 50
    eval_every = 500
    checkpoint_every = 500

In [8]:
import re
class SepTokenizer:

    def __init__(self):
        self.splitter = re.compile(r'[/?.=-_]+')

    def __call__(self, text):
        return self.splitter.split(text)

tokenizer = SepTokenizer()
tokenizer('uk.linkedin.com/pub/steve-rubenstein/8/718/755')

['uk', 'linkedin', 'com', 'pub', 'steve-rubenstein', '8', '718', '755']

In [9]:
import os

high_freq_words = None
curpath = os.path.abspath(os.curdir)
abs_path = os.path.join(curpath, r"TestsOnPhishStorm\tempURLNET\ProcessedData\word_x.pkl")

In [10]:
if Path(r"TestsOnPhishStorm\tempURLNET\ProcessedData\word_x.pkl").is_file():
  with open(fr'{abs_path}', 'rb') as f:
    word_x2 = pickle.load(f)
else:
  x, word_reverse_dict = get_word_vocab(urls[:], max_len_words, tokenizer)
  word_x2 = get_words(x, word_reverse_dict, delimit_mode, urls)
  for i in range(len(word_x2)):
    if i%100000==0:
      print(i)
    if len(word_x2[i])> max_len_words:
        word_x2[i] = word_x2[i][:max_len_words]
    while len(word_x2[i]) < max_len_words:
        word_x2[i].append('❿')
  clear_output()
  with open(fr'{abs_path}', 'wb') as f:
    pickle.dump(word_x2, f)

if Path(r"TestsOnPhishStorm\tempURLNET\ProcessedData\ngrams_dict.pkl").is_file():
  with open(os.path.join(curpath, r"TestsOnPhishStorm\tempURLNET\ProcessedData\ngrams_dict.pkl"), 'rb') as f:
    ngrams_dict = pickle.load(f)
  with open(os.path.join(curpath, r"TestsOnPhishStorm\tempURLNET\ProcessedData\words_dict.pkl"), 'rb') as f:
    words_dict = pickle.load(f)
else:
  ngrams_dict, words_dict, ngramed_x, worded_x = ngram_id_x_dicts(word_x2, max_len_subwords, high_freq_words)
  with open(os.path.join(curpath, r"TestsOnPhishStorm\tempURLNET\ProcessedData\ngrams_dict.pkl"), 'wb') as f:
    pickle.dump(ngrams_dict, f)
  with open(os.path.join(curpath, r"TestsOnPhishStorm\tempURLNET\ProcessedData\words_dict.pkl"), 'wb') as f:
    pickle.dump(words_dict, f)
    
chars_dict = ngrams_dict

In [11]:
from pathlib import Path

if Path(rf"TestsOnPhishStorm\tempURLNET\ProcessedData\chared_id_x.pt").is_file():
    chared_id_x = torch.load(rf'TestsOnPhishStorm\tempURLNET\ProcessedData\chared_id_x.pt')
else:
    chared_id_x = char_id_x(urls, chars_dict, max_len_chars)
    torch.save(chared_id_x, rf'TestsOnPhishStorm\tempURLNET\ProcessedData\chared_id_x.pt')

In [12]:
word_x3 = word_x2[:]

In [13]:
ngramed_id_x = torch.zeros((len(word_x2), len(word_x2[0]), max_len_subwords), dtype=torch.int32)
worded_id_x = torch.zeros((len(word_x2), len(word_x2[0])), dtype=torch.int32)

In [14]:
intervals = 50000
previous = 0
for i in range(intervals, len(word_x3)+intervals, intervals):
    i = min(i, len(word_x3))
    if Path(rf"TestsOnPhishStorm\tempURLNET\ProcessedData\ngramed_id_x_{previous}_{i}.pt").is_file():
        print("True")
        ngramed_id_x[previous:i] = torch.load(rf'TestsOnPhishStorm\tempURLNET\ProcessedData\ngramed_id_x_{previous}_{i}.pt')
        worded_id_x[previous:i] = torch.load(rf'TestsOnPhishStorm\tempURLNET\ProcessedData\worded_id_x_{previous}_{i}.pt')
    else:
        print("False")
        word_x = word_x3[previous:i]
        _, _, ngramed_x, worded_x = ngram_id_x_dicts(word_x, max_len_subwords, high_freq_words)
        ngramed_id_x[previous:i], worded_id_x[previous:i] = ngram_id_x(ngrams_dict, words_dict, ngramed_x, worded_x, max_len_subwords)
        torch.save(ngramed_id_x[previous:i], rf'TestsOnPhishStorm\tempURLNET\ProcessedData\ngramed_id_x_{previous}_{i}.pt')
        torch.save(worded_id_x[previous:i], rf'TestsOnPhishStorm\tempURLNET\ProcessedData\worded_id_x_{previous}_{i}.pt')
    
    print(previous, i)
    previous = i

True
0 50000
True
50000 95913


In [23]:
y_splits = [[] for i in range(len(class_list))]

for i in range(len(labels)):
    y_splits[labels[i]].append(i)

y_splits = [np.array(splt) for splt in y_splits]


print(f"Overall {class_list[0]}/{class_list[1]} split: {len(y_splits[0])}/{len(y_splits[1])}")

x_train, y, x_test, y_test = prep_train_test2(y_splits, dev_pct)

x_char = get_ngramed_id_x(x_train, ngramed_id_x)
x_test_char = get_ngramed_id_x(x_test, ngramed_id_x)

x_word = get_ngramed_id_x(x_train, worded_id_x)
x_test_word = get_ngramed_id_x(x_test, worded_id_x)

x_char_seq = get_ngramed_id_x(x_train, chared_id_x)
x_test_char_seq = get_ngramed_id_x(x_test, chared_id_x)

Overall benign/phishing split: 48009/47904


In [24]:
print(np.sum(~(labels[x_test] == np.argmax(y_test.numpy(), axis=1))))
print(np.sum(~(labels[x_train] == np.argmax(y.numpy(), axis=1))))

0
0


In [25]:
class URLNetDataset(Dataset):

    def __init__(self, emb_mode, x_char_seq, x_word, x_char, y) -> None:
        super().__init__()

        if emb_mode == 1:
            self.dataset = TensorDataset(x_char_seq, y)
        elif emb_mode == 2:
            self.dataset = TensorDataset(x_word, y)
        elif emb_mode == 3:
            self.dataset = TensorDataset(x_char_seq, x_word, y)
        elif emb_mode == 4:
            self.dataset = TensorDataset(x_word, x_char, y) #, char_pad_idx
        elif emb_mode == 5:
            self.dataset = TensorDataset(x_char_seq, x_word, x_char, y)

    def __getitem__(self, index):
        return self.dataset[index]

    def __len__(self):
        return len(self.dataset)

In [26]:
train_dataset = URLNetDataset(emb_mode, x_char_seq, x_word, x_char, y)
test_dataset = URLNetDataset(emb_mode, x_test_char_seq, x_test_word, x_test_char, y_test)

train_dataloader = DataLoader(train_dataset, batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size, shuffle=False)

In [27]:
_ = next(iter(train_dataloader))

In [28]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class TextCNN(nn.Module):
    def __init__(self, char_ngram_vocab_size, word_ngram_vocab_size, char_vocab_size,
                 word_seq_len, char_seq_len, embedding_size, l2_reg_lambda=0,
                 filter_sizes=[3, 4, 5, 6], mode=0, num_classes=2):
        super(TextCNN, self).__init__()
        self.mode = mode
        self.word_seq_len = word_seq_len
        self.char_seq_len = char_seq_len
        self.embedding_size = embedding_size
        self.filter_sizes = filter_sizes

        if mode in [4, 5]:
            self.char_embedding = nn.Embedding(char_ngram_vocab_size+1, embedding_size)
            torch.nn.init.uniform_(self.char_embedding.weight)
        if mode in [2, 3, 4, 5]:
            self.word_embedding = nn.Embedding(word_ngram_vocab_size+1, embedding_size)
            torch.nn.init.uniform_(self.word_embedding.weight)
        if mode in [1, 3, 5]:
            self.char_seq_embedding = nn.Embedding(char_vocab_size+1, embedding_size)
            torch.nn.init.uniform_(self.char_seq_embedding.weight)

        self.dropout_keep_prob = nn.Dropout(0.5)
        self.num_filters_total = 256 * len(filter_sizes)
        self.conv_layers = nn.ModuleList()
        for filter_size in filter_sizes:
            conv = nn.Conv1d(embedding_size, 256, filter_size, padding="valid")
            nn.init.trunc_normal_(conv.weight, std=0.1)
            nn.init.constant_(conv.bias, 0.1)
            self.conv_layers.append(conv)

        if mode in [3, 5]:
            self.fc_word = nn.Linear(len(filter_sizes) * 256, 512)
            torch.nn.init.xavier_normal_(self.fc_word.weight)
            torch.nn.init.constant_(self.fc_word.bias, 0.1)
            self.fc_char = nn.Linear(len(filter_sizes) * 256, 512)
            torch.nn.init.xavier_normal_(self.fc_char.weight)
            torch.nn.init.constant_(self.fc_char.bias, 0.1)
            # self.fc_concat = nn.Linear(1024, 512)
        # elif mode in [2, 4]:
        #     self.fc = nn.Linear(len(filter_sizes) * 256, 512)
        # elif mode == 1:
        #     self.fc = nn.Linear(len(filter_sizes) * 256, 512)

        self.fc1 = nn.Linear(1024, 512)
        torch.nn.init.xavier_normal_(self.fc1.weight)
        torch.nn.init.constant_(self.fc1.bias, 0.1)
        self.fc2 = nn.Linear(512, 256)
        torch.nn.init.xavier_normal_(self.fc2.weight)
        torch.nn.init.constant_(self.fc2.bias, 0.1)
        self.fc3 = nn.Linear(256, 128)
        torch.nn.init.xavier_normal_(self.fc3.weight)
        torch.nn.init.constant_(self.fc3.bias, 0.1)
        self.fc4 = nn.Linear(128, num_classes)
        torch.nn.init.xavier_normal_(self.fc2.weight)
        torch.nn.init.constant_(self.fc2.bias, 0.1)

    def forward(self, x_word=None, x_char=None, x_char_seq=None, x_char_pad_idx=None):
        pooled_x = []

        if self.mode in [4, 5]:
            x_char = self.char_embedding(x_char)
            # x_char = x_char * x_char_pad_idx

        if self.mode in [2, 3, 4, 5]:
            x_word = self.word_embedding(x_word)

        if self.mode in [1, 3, 5]:
            x_char_seq = self.char_seq_embedding(x_char_seq)

        if self.mode in [4, 5]:
            x_char = torch.sum(x_char, dim=2)
            x_combined = x_char + x_word
            x_combined = torch.permute(x_combined, (0, 2, 1))
            # x_combined = x_combined.unsqueeze(1)
        if self.mode in [2, 3]:
            x_combined = torch.permute(x_word, (0, 2, 1))
            # x_combined = x_word.unsqueeze(1)
        if self.mode in [1, 3, 5]:
            char_x_expanded = torch.permute(x_char_seq, (0, 2, 1))
            # char_x_expanded = x_char_seq.unsqueeze(1)

        if self.mode == 2 or self.mode == 3 or self.mode == 4 or self.mode == 5:

            for i, conv in enumerate(self.conv_layers):
                h = F.relu(conv(x_combined))
                pooled = F.max_pool2d(h, (1, self.word_seq_len - self.filter_sizes[i] + 1))
                pooled_x.append(pooled)

            h_pooled = torch.cat(pooled_x, 1) #?
            h_pooled = h_pooled.squeeze(2) #?

            x_flat = torch.reshape(h_pooled, [h_pooled.shape[0], -1])
            h_drop = self.dropout_keep_prob(x_flat)


        if self.mode == 1 or self.mode == 3 or self.mode == 5:
            pooled_char_x = []
            for i, conv in enumerate(self.conv_layers):
                h = F.relu(conv(char_x_expanded))
                pooled = F.max_pool2d(h, (1, self.char_seq_len - self.filter_sizes[i] + 1))
                pooled_char_x.append(pooled)

            h_char_pool = torch.cat(pooled_char_x, 1) #?
            h_char_pool = h_char_pool.squeeze(2) #?
            # h_char_pool = h_char_pool.squeeze(2) #?

            char_x_flat = torch.reshape(h_char_pool, [h_char_pool.shape[0], -1])
            char_h_drop = self.dropout_keep_prob(char_x_flat)

        # if self.mode in [3, 5]:
        #     word_output = F.relu(self.fc_word(self.dropout(h_pooled)))
        #     char_output = F.relu(self.fc_char(self.dropout(h_pooled)))
        #     conv_output = torch.cat([word_output, char_output], 1)
        if self.mode in [3, 5]:
            word_output = self.fc_word(h_drop)
            char_output = self.fc_char(char_h_drop)
            conv_output = torch.cat([word_output, char_output], 1)
        elif self.mode in [2, 4]:
            conv_output = h_drop
        elif self.mode == 1:
            conv_output = char_h_drop
        # else:
        #     conv_output = F.relu(self.fc(self.dropout(h_pooled)))

        output0 = F.relu(self.fc1(conv_output))
        output1 = F.relu(self.fc2(output0))
        output2 = F.relu(self.fc3(output1))
        scores = self.fc4(output2)

        return scores

In [29]:
# cnn = TextCNN(
#                 char_ngram_vocab_size = len(ngrams_dict)+1,
#                 word_ngram_vocab_size = len(words_dict)+1,
#                 char_vocab_size = len(chars_dict)+1,
#                 embedding_size=emb_dim,
#                 word_seq_len=max_len_words,
#                 char_seq_len=max_len_chars,
#                 l2_reg_lambda=l2_reg_lambda,
#                 mode=emb_mode,
#                 filter_sizes=list(map(int, filter_sizes.split(","))),
#                 num_classes=4)

# optimizer = torch.optim.Adam(cnn.parameters(), lr=lr, weight_decay=l2_reg_lambda)
# loss_func = torch.nn.BCEWithLogitsLoss()

In [30]:
# if emb_mode == 1:
#     print(cnn(x_char_seq=_[0]).shape)
# elif emb_mode == 2:
#     print(cnn(x_word=_[0]).shape)
# elif emb_mode == 3:
#     print(cnn(x_char_seq=_[0], x_word=_[1]).shape)
# elif emb_mode == 4:
#     print(cnn(x_word=_[0], x_char=_[1]).shape)
# elif emb_mode == 5:
#     print(cnn(x_char_seq=_[0], x_word=_[1], x_char=_[2]).shape)

In [31]:
# from torch_scatter import scatter_max, scatter_mean, scatter_sum, scatter_std
import torchmetrics
from pytorch_lightning.loggers import CSVLogger
import pytorch_lightning as L
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint

In [32]:

class ClassifierLightningModel(L.LightningModule):
    def __init__(
        self,
        model,
        num_classes,
        optimizer=None,
        loss_func=None,
        learning_rate=0.01,
        batch_size=64,
        lr_scheduler=None,
        user_lr_scheduler=False,
        min_lr=0.0,
    ):
        super(ClassifierLightningModel, self).__init__()
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.model = model
        self.min_lr = min_lr
        # self.save_hyperparameters(ignore=["model"])
        self.save_hyperparameters("model", logger=False)
        self.optimizer = self._get_optimizer(optimizer)
        self.lr_scheduler = (
            self._get_lr_scheduler(lr_scheduler) if user_lr_scheduler else None
        )
        self.loss_func = loss_func
        self.train_losses = []
        self.val_losses = []
        self.train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)
        self.val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)
        self.test_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)

    def forward(self, x_word=None, x_char=None, x_char_seq=None, x_char_pad_idx=None, *args, **kwargs):
        return self.model(x_word, x_char, x_char_seq, x_char_pad_idx)

    # def on_train_epoch_start(self) -> None:
    #     param_groups = next(iter(self.optimizer.param_groups))
    #     if "lr" in param_groups and param_groups["lr"] is not None:
    #         current_learning_rate = float(param_groups["lr"])
    #         self.log(
    #             "lr",
    #             current_learning_rate,
    #             batch_size=self.batch_size,
    #             on_epoch=True,
    #             on_step=False,
    #         )

    def run_model(self, batch):
        if emb_mode == 1:
            return self(x_char_seq=batch[0])
        elif emb_mode == 2:
            return self(x_word=batch[0])
        elif emb_mode == 3:
            return self(x_char_seq=batch[0], x_word=batch[1])
        elif emb_mode == 4:
            return self(x_word=batch[0], x_char=batch[1])
        elif emb_mode == 5:
            return self(x_char_seq=batch[0], x_word=batch[1], x_char=batch[2])

    def training_step(self, batch, *args, **kwargs):
        for i in range(len(batch)):
            batch[i] = batch[i].to(self.device)

        self.model.train()
        y_out = self.run_model(batch)

        loss = self.loss_func(y_out.view(batch[-1].shape), batch[-1])
        self.train_losses.append(loss.detach().item())
        self.log(
            "train_loss",
            loss,
            prog_bar=True,
            batch_size=self.batch_size,
            on_epoch=True,
            on_step=True,
        )

        self.train_acc(torch.argmax(y_out, dim=1), torch.argmax(batch[-1], dim=1))
        self.log('train_acc', self.train_acc, prog_bar=True, on_epoch=True, on_step=True, batch_size=self.batch_size)

        return loss

    def validation_step(self, batch, *args, **kwargs):
        for i in range(len(batch)):
            batch[i] = batch[i].to(self.device)

        self.model.eval()
        y_out = self.run_model(batch)
        loss = self.loss_func(y_out.view(batch[-1].shape), batch[-1])
        self.val_losses.append(loss.detach().item())

        self.log(
            "val_loss",
            loss,
            prog_bar=True,
            batch_size=self.batch_size,
            on_epoch=True
        )

        self.val_acc(torch.argmax(y_out, dim=1), torch.argmax(batch[-1], dim=1))
        self.log('val_acc', self.val_acc, prog_bar=True, on_epoch=True, batch_size=self.batch_size)

    def configure_optimizers(self):
        if self.lr_scheduler is None:
            return self.optimizer

        return {
            "optimizer": self.optimizer,
            "lr_scheduler": {
                "scheduler": self.lr_scheduler,
                "monitor": "train_loss",
                "interval": "epoch",
                "frequency": 1,
            },
        }

    def update_learning_rate(self, learning_rate: float):
        self.learning_rate = learning_rate
        for g in self.optimizer.param_groups:
            g["lr"] = learning_rate

    def _get_optimizer(self, optimizer):
        return (
            optimizer
            if optimizer is not None
            else torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
        )

    def _get_lr_scheduler(self, lr_scheduler):
        return (
            lr_scheduler
            if lr_scheduler is not None
            else torch.optim.lr_scheduler.ReduceLROnPlateau(
                self.optimizer, patience=5, factor=0.5, mode="min", min_lr=self.min_lr
            )
        )


In [33]:
seed = 911
# for i in range(5):
callbacks = [
    EarlyStopping(monitor='val_loss',mode='min',patience=25),
    # CustomModelCheckpoint(dirpath=r'models\malicious_urls_model', filename=f'malicious_urls_model_', every_n_epochs=3, mode='min', monitor='val_loss_epoch', save_on_train_epoch_end=True),
    ModelCheckpoint(save_top_k=5, mode='min', monitor='val_loss', save_last=True)
    ]
classifier_torch_model = TextCNN(
                char_ngram_vocab_size = len(ngrams_dict)+1,
                word_ngram_vocab_size = len(words_dict)+1,
                char_vocab_size = len(chars_dict)+1,
                embedding_size=emb_dim,
                word_seq_len=max_len_words,
                char_seq_len=max_len_chars,
                l2_reg_lambda=l2_reg_lambda,
                mode=emb_mode,
                filter_sizes=list(map(int, filter_sizes.split(","))),
                num_classes=len(class_id)).to(device)

# classifier_torch_model = CNN_for_Text(num_embedding=len(vocab_dict), batch_size=batch_size, hidden_dim=hidden_dim, embedding_dim=embedding_dim, max_char_count=256, dropout=0.15, num_out_features=len(class_id), kernel_size=[5, 5, 5, 3], seed=seed)
optimizer = torch.optim.Adam(classifier_torch_model.parameters(), lr=lr, weight_decay=0.00011)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[10, 20, 30, 40, 50, 60, 70],gamma=0.5)
loss_func = torch.nn.BCEWithLogitsLoss()
classfier_lightning_model = ClassifierLightningModel(classifier_torch_model,
                                                    num_classes=len(class_id),
                                            learning_rate=lr,
                                            batch_size=batch_size,
                                            optimizer=optimizer,
                                            loss_func=loss_func,
                                            lr_scheduler=lr_scheduler,
                                            user_lr_scheduler=True
                                            ).to(device)



trainer = L.Trainer(
            callbacks=callbacks,
            max_epochs=80,
            accelerator= 'gpu' if torch.cuda.is_available() else 'cpu',
            logger=CSVLogger(save_dir='logs/', name='log2')
        )

  rank_zero_warn(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [34]:
_ = next(iter(test_dataloader))
flopt_counter = FlopCounterMode(classfier_lightning_model.model)
for i in range(len(_)):
    _[i] = _[i].to(device)
with flopt_counter:
    classfier_lightning_model.model(x_char_seq=_[0], x_word=_[1], x_char=_[2])

Module                   FLOP    % Total
-------------------  --------  ---------
TextCNN              122.072B    100.00%
 - aten.convolution  118.514B     97.09%
 - aten.addmm          3.557B      2.91%
 TextCNN.fc_word       1.074B      0.88%
  - aten.addmm         1.074B      0.88%
 TextCNN.fc_char       1.074B      0.88%
  - aten.addmm         1.074B      0.88%
 TextCNN.fc1           1.074B      0.88%
  - aten.addmm         1.074B      0.88%
 TextCNN.fc2           0.268B      0.22%
  - aten.addmm         0.268B      0.22%
 TextCNN.fc3           0.067B      0.05%
  - aten.addmm         0.067B      0.05%
 TextCNN.fc4           0.001B      0.00%
  - aten.addmm         0.001B      0.00%


In [35]:
trainer.fit(classfier_lightning_model, train_dataloaders=train_dataloader, val_dataloaders=test_dataloader)

You are using a CUDA device ('NVIDIA GeForce RTX 3080') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type               | Params
-------------------------------------------------
0 | model     | TextCNN            | 6.1 M 
1 | loss_func | BCEWithLogitsLoss  | 0     
2 | train_acc | MulticlassAccuracy | 0     
3 | val_acc   | MulticlassAccuracy | 0     
4 | test_acc  | MulticlassAccuracy | 0     
-------------------------------------------------
6.1 M     Trainable params
0         Non-trainable params
6.1 M     Total params
24.347    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

In [38]:
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report, confusion_matrix
from torchmetrics.classification import ConfusionMatrix
def calculate_metrics(cl_model):
    cm = ConfusionMatrix(task="multiclass", num_classes=len(class_id))

    y_pred = []
    y_true = []

    cl_model = cl_model.eval()
    cl_model.to(device)
    for _ in tqdm(test_dataloader):
        for i in range(len(_)):
            _[i] = _[i].to(device)
        with torch.no_grad():
            y_p = cl_model.run_model(_)
            y_p = y_p.cpu()
        y_pred.append(y_p)
        y_true.append(_[-1].cpu())
    y_pred = torch.cat(y_pred, dim=0)
    y_true = torch.cat(y_true, dim=0)
    y_pred2 = torch.argmax(y_pred, dim=1)
    y_true2 = torch.argmax(y_true, dim=1)
    print(f'classification report: \n {classification_report(y_true2, y_pred2, digits=4)}')
    print(f'confusion matrix:\n {cm(y_pred2, y_true2)}')
    print('================================')


In [39]:
classfier_lightning_model.model = classfier_lightning_model.model.eval()
classfier_lightning_model = classfier_lightning_model.eval()
calculate_metrics(classfier_lightning_model)

100%|██████████| 10/10 [00:01<00:00,  7.57it/s]


classification report: 
               precision    recall  f1-score   support

           0     0.9836    0.9850    0.9843      4800
           1     0.9849    0.9835    0.9842      4790

    accuracy                         0.9843      9590
   macro avg     0.9843    0.9843    0.9843      9590
weighted avg     0.9843    0.9843    0.9843      9590

confusion matrix:
 tensor([[4728,   72],
        [  79, 4711]])


In [None]:
p = 0.9871
r = 0.9773
(2*p*r)/(p+r)

0.9821755548768072