In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data as data
import torch.optim as optim
import numpy as np

### **Constants** 

In [1]:
input_dim = 1024
output_dim = 4
learning_rate = 0.001
word_vector_size = 100

In [3]:
model_path = '..\model'

## **Load Data**

In [4]:
train_data_loader = data.DataLoader() #TODO
test_data_loader = data.DataLoader() #TODO

TypeError: DataLoader.__init__() missing 1 required positional argument: 'dataset'

In [5]:
#dummy data
dummy_labels = {
        'Dynamic API Resolution': 'Adversaries may obfuscate then dynamically resolve API functions called by their malware in order to conceal malicious functionalities and impair defensive analysis. Malware commonly uses various Native API functions provided by the OS to perform various tasks such as those involving processes, files, and other system artifacts.',
        'HTML Smuggling':'Adversaries may smuggle data and files past content filters by hiding malicious payloads inside of seemingly benign HTML files. HTML documents can store large binary objects known as JavaScript Blobs (immutable data that represents raw bytes) that can later be constructed into file-like objects. Data may also be stored in Data URLs, which enable embedding media type or MIME files inline of HTML documents. HTML5 also introduced a download attribute that may be used to initiate file downloads.',
        'Network Denial of Service':'Adversaries may perform Network Denial of Service (DoS) attacks to degrade or block the availability of targeted resources to users. Network DoS can be performed by exhausting the network bandwidth services rely on. Example resources include specific websites, email services, DNS, and web-based applications. Adversaries have been observed conducting network DoS attacks for political purposes and to support other malicious activities, including distraction, hacktivism, and extortion.',
        'Network Sniffing':'Adversaries may sniff network traffic to capture information about an environment, including authentication material passed over the network. Network sniffing refers to using the network interface on a system to monitor or capture information sent over a wired or wireless connection. An adversary may place a network interface into promiscuous mode to passively access data in transit over the network, or use span ports to capture a larger amount of data.'
}   

## **Word2Vec**

In [6]:
import string
import gensim.downloader as api
from gensim.models.word2vec import Word2Vec

In [7]:
corpus = [description.split() for label, description in dummy_labels.items()]
corpus = []
for label, description in dummy_labels.items():
    words = [word.translate(str.maketrans('', '', string.punctuation)) for word in (f"{label} {description}").split()]
    corpus.append(words)

In [8]:
#corpus = api.load('text8')
gensim_model = Word2Vec(corpus, vector_size=word_vector_size, min_count=1)

In [9]:
class LabelsEmbeddings():
    def __init__(self, gensim_model: Word2Vec):
        self.model = gensim_model

    def fix_vectors_sizes(self, vectors: list) -> list:
        fixed_vectors = []
        max_size = max([len(v) for v in vectors])
        for vector in vectors:
            size_diff = max_size - len(vector)
            vector.extend([[0] * word_vector_size] * size_diff)
            fixed_vectors.append(vector)
        return fixed_vectors

    def generate_vectors(self, labels: dict):
        vectors = []
        for label, _ in labels.items():
            description_vector = []
            words = [word.translate(str.maketrans('', '', string.punctuation)) for word in label.split()]
            for word in words:
                if word in self.model.wv.index_to_key:
                    description_vector.append(self.model.wv[word])
                else:
                    description_vector.append(word_vector_size * [0])
            vectors.append(description_vector)
        return self.fix_vectors_sizes(vectors)

In [10]:
labels_embeddings = LabelsEmbeddings(gensim_model=gensim_model)
labels_vectors = labels_embeddings.generate_vectors(labels=dummy_labels)
#TODO need to pad?

In [11]:
[len(v) for v in labels_vectors]

[4, 4, 4, 4]

In [13]:
for vector in labels_vectors:
    for word in vector:
        assert len(word) == word_vector_size


### **Map layer**
Last layer of the model should be a map between incident embeddings and labels embeddings - we want to map given input data to the most corresponding Word2Vec label vector. To do this we have to initialize weights of this layer and freeze them.

In [19]:
def map_layer_init(shape: int, w2c_vectors: list) -> torch.Tensor:
    vectors = np.asarray(w2c_vectors, dtype=float)
    return torch.from_numpy(vectors)

## **Neural Network for Network Data**

In [20]:
class NetNet(nn.Module):
    def __init__(self, input_dim: int, words_embeddings_dim: int, output_dim: int, labels_vectors: torch.Tensor):
        super().__init__()
        self.linear1 = nn.Linear(input_dim, 512)
        self.linear2 = nn.Linear(512, 256)
        self.linear3 = nn.Linear(256, words_embeddings_dim)
        self.linear4 = nn.Linear(words_embeddings_dim, output_dim)
        self.ReLU = nn.ReLU()
        self.softmax = nn.Softmax()

        # weight initialize
        self.linear4.weight.data = map_layer_init(shape=words_embeddings_dim, w2c_vectors=labels_vectors)
        # freeze layer weights
        self.linear4.weight.requires_grad = False


    def forward(self, x):
        x = self.ReLU(self.linear1(x))
        x = self.ReLU(self.linear2(x))
        x = self.ReLU(self.linear3(x))
        x = self.softmax(self.linear4(x))
        return x

In [21]:
model = NetNet(
    input_dim=input_dim, 
    words_embeddings_dim=word_vector_size, 
    output_dim=output_dim, 
    labels_vectors=labels_vectors
)

### **Training** 

In [None]:
def train_model(model: NetNet, epochs: int, data_loader: data.DataLoader, loss_fn: nn.MSELoss):
        model.train()

        for epoch in range(epochs):
            for inputs, labels in data_loader:
                outputs = model(inputs)
                loss = loss_fn(outputs, labels)
                loss.backward()
                model.optim.step()
                model.optim.zero_grad()

            if epoch % 10 == 1:
                print(f"Epoch: {epoch}, loss: {loss.item():.3}")

In [None]:
loss_fn = nn.MSELoss()
model.optim = optim.Adam(model.parameters(), lr=learning_rate)
train_model(model=model, epochs=150, data_loader=train_data_loader, loss_fn=loss_fn)

In [22]:
print(model)

NetNet(
  (linear1): Linear(in_features=64, out_features=512, bias=True)
  (linear2): Linear(in_features=512, out_features=256, bias=True)
  (linear3): Linear(in_features=256, out_features=100, bias=True)
  (linear4): Linear(in_features=100, out_features=1, bias=True)
  (ReLU): ReLU()
  (softmax): Softmax(dim=None)
)


### **Removing last layer**

In [32]:
zsl_model = nn.Sequential(*(list(model.children())[:3] + list(model.children())[4:-1]))

In [33]:
print(zsl_model)

Sequential(
  (0): Linear(in_features=64, out_features=512, bias=True)
  (1): Linear(in_features=512, out_features=256, bias=True)
  (2): Linear(in_features=256, out_features=100, bias=True)
  (3): ReLU()
)


### **Evaluation** 
For this step we will calculate euclidean distance and find the vector which is the closest.

In [None]:
def find_closest_vector(vector: torch.Tensor, classes: torch.Tensor) -> tuple(str, torch.Tensor):
    min_dist = float('inf')
    min_dist_label = ''
    for label, class_vector in classes:
        dist = torch.cdist(class_vector, vector, p=2)
        dist = dist.squeeze(dim=0)
        if dist < min_dist:
            min_dist = dist
            min_dist_label = label
    return min_dist_label, min_dist

In [None]:
def evaluate_model(model: NetNet, data_loader: data.DataLoader, dataset: data.Dataset):
        model.eval()
        true_predictions, predicitons_amount = 0., 0.
        labels = dataset.targets
             
        with torch.no_grad():
            for inputs, label in data_loader:
                pred_input = model(inputs)
                pred_label, dist = find_closest_vector(vector=pred_input, classes=labels)
                predictions = predictions.squeeze(dim=1)
                true_predictions += int(pred_label == label)
                predicitons_amount += 1

            accuracy = 100.0 * true_predictions / predicitons_amount
        
        print(f"Accuracy of the model: {accuracy:4.2f}%")