Project 2: Model Engineering
===

___

Submitted by:

* <u>*Arthur Humblot*</u>
* <u>*Bekhzod Anvarov*</u>
* <u>*Ghita El Belghiti*</u>


University: **Politechnico di Torino**

Academic Year: **2025 - 2026**

In [31]:
#imports here
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch.utils.data import Dataset, DataLoader
import torch
from torch import nn
import time

## 1. Task 1: Frequency-based baseline

In Machine Learning problems, it is always good practice to compare against baseline solutions. Typically, one baseline involves a simple approach that helps determine whether simple choices and assumptions can already address the problem - before progressing to potentially more complex architectures like RNNs or GNNs.

In this context, a suitable baseline is a **frequency-based** approach.

Specifically:

In [2]:
# read dataset
df_train = pd.read_json("../data/train.json")
df_test = pd.read_json("../data/test.json")

# instruction check
print(df_train.head())
print(df_test.head())

                                   api_call_sequence  is_malware
0  [LdrGetDllHandle, LdrGetProcedureAddress, LdrL...           1
1  [NtAllocateVirtualMemory, LdrLoadDll, LdrGetPr...           1
2  [FindResourceExW, LoadResource, FindResourceEx...           1
3  [FindResourceExW, LoadResource, FindResourceEx...           1
4  [LdrGetProcedureAddress, SetErrorMode, LdrLoad...           1
                                   api_call_sequence  is_malware
0  [NtQueryValueKey, NtClose, NtOpenKey, NtQueryV...           1
1  [LdrGetProcedureAddress, NtClose, NtOpenKey, N...           1
2  [NtOpenKey, NtQueryValueKey, NtClose, NtOpenKe...           1
3  [NtAllocateVirtualMemory, LdrLoadDll, LdrGetPr...           1
4  [NtOpenKey, NtQueryValueKey, NtClose, LdrGetPr...           1


* Extract the vocabulary from your input dataset - that is, the **set of all the API calls** appearing in it

In [3]:
# extract sequences(api) and labels
train_seqs = df_train['api_call_sequence'].tolist()
test_seqs = df_test['api_call_sequence'].tolist()

train_labels = df_train['is_malware'].tolist()
test_labels = df_test['is_malware'].tolist()

# instruction check
print(train_seqs[0][:5])
print(f"Type of sequence: {type(train_seqs[0]).__name__}")

['LdrGetDllHandle', 'LdrGetProcedureAddress', 'LdrLoadDll', 'LdrGetProcedureAddress', 'LdrGetDllHandle']
Type of sequence: list


* **Q:** How many unique API calls does the training set contain?

In [4]:
# create train vocabulary unique api
train_vocab = set()

for train_seq in train_seqs:
    for api_call in train_seq:
        train_vocab.add(api_call)

print(f"Number of unique API calls the training set contain: {len(train_vocab)}")

Number of unique API calls the training set contain: 258


And how many the test set?

In [5]:
# create test vocal unique api
test_vocab = set()

for test_seq in test_seqs:
    for api_call in test_seq:
        test_vocab.add(api_call)

print(f"Number of unique API calls the test set contain: {len(test_vocab)}")

Number of unique API calls the test set contain: 232


* **Q:** Are there any API calls that appear only in the test set (but not in the training set)? If yes, how many? And which one are they?

In [6]:
# features, which appear only in test set, and not in train set
only_in_test = test_vocab - train_vocab
print(f"Number of unique API calls only the test set contain(but not in the training set): {len(only_in_test)}")
print(f"Unique API calls only the test set contain:\n{only_in_test}")

Number of unique API calls only the test set contain(but not in the training set): 3
Unique API calls only the test set contain:
{'WSASocketA', 'ControlService', 'NtDeleteKey'}


In [7]:
# sorted vocabulary
train_vocab_sorted = sorted([i for i in train_vocab])
test_vocab_sorted = sorted([i for i in test_vocab])

# instruction check
print(train_vocab_sorted[:5])
print(test_vocab_sorted[:5])

['CertOpenStore', 'CertOpenSystemStoreW', 'CoCreateInstance', 'CoCreateInstanceEx', 'CoGetClassObject']
['CoCreateInstance', 'CoCreateInstanceEx', 'CoGetClassObject', 'CoInitializeEx', 'CoInitializeSecurity']


* **Q:** Can you use the test vocabulary to build the new test dataframe? If not, how do you handle API calls in the test set that do not exist in the training vocabulary?

In [8]:
feature_names = train_vocab_sorted + ['<UNK>']

We add **< UNK >** - for features unknown for train set and appears on test set only

* Use this vocabulary as the **feature set**: for each row in the input dataset, count the **number of times** (frequency) each vocabulary term occurs

In [9]:
# map api with their positions
api_to_idx = dict()

for i in range(len(feature_names)):
    api_to_idx[feature_names[i]] = i

# creating features for train
X_train = list()

for seq in train_seqs:
    freq = [0 for _ in range(len(feature_names))]   # frequency vector for train features
    for api_call in seq:
        if api_call in api_to_idx:
            freq[api_to_idx[api_call]] += 1
    X_train.append(freq)

# creating features for train
X_test = list()

for seq in test_seqs:
    freq = [0 for _ in range(len(feature_names))]   # frequency vector for test features
    for api_call in seq:
        if api_call in api_to_idx:
            freq[api_to_idx[api_call]] += 1     # UNK features, which are only on test
        else:
            freq[-1] += 1
    X_test.append(freq)

* **Q:** One issue of this frequency-based approach is that it creates sparse vectors (i.e., vectors with many zeros per row):
    * how many non-zero elements per row do you have on average in the training set?
    * How many in the test set ?
    * What is the ratio with respect to the number of elements per row?

In [10]:
# sparsity for the train set (non-zero per row)
nnz_train_per_row = list()  # num of non zeros

for freq in X_train:
    nnz_train_per_row.append(sum([1 if i > 0 else 0 for i in freq]))

avg_non_zero_train = sum(nnz_train_per_row) / len(X_train)
print(f"Average non-zero elements per row in training set: {avg_non_zero_train:.2f}")
ratio_train = avg_non_zero_train / len(feature_names)
print(f"Ratio with respect to the number of elements per row in training set: {ratio_train:.2f}\n")


# sparsity for the test set (non-zero per row)
nnz_test_per_row = list()  # num of non zeros

for freq in X_test:
    nnz_test_per_row.append(sum([1 if i > 0 else 0 for i in freq]))

avg_non_zero_test = sum(nnz_test_per_row) / len(X_test)
print(f"Average non-zero elements per row in test set: {avg_non_zero_test:.2f}")
ratio_test = avg_non_zero_test / len(feature_names)
print(f"Ratio with respect to the number of elements per row in test set: {ratio_test:.2f}")

Average non-zero elements per row in training set: 21.95
Ratio with respect to the number of elements per row in training set: 0.08

Average non-zero elements per row in test set: 24.28
Ratio with respect to the number of elements per row in test set: 0.09


## 2. Task 2: Feed Forward Neural Network (FFNN)

## 3. Task 3: Recurrent Neural Network (RNN)

### 3.1 Preprocessing of Data

In [27]:
# splitting to train and validation sets
train_seqs, val_seqs, train_labels, val_labels  = train_test_split(train_seqs, train_labels, test_size=0.2, random_state=42)

In [29]:
# adding pad entry to api_to_idx
api_to_idx['<PAD>'] = len(api_to_idx)

# unknown and padding indexes
unk_index = api_to_idx['<UNK>']
pad_index = api_to_idx['<PAD>']

# convert each api call string to numeric value - index on train set
train_ids = list()
for seq in train_seqs:
    seq_ids = list()
    for api_call in seq:
        if api_call in api_to_idx:
            seq_ids.append(api_to_idx[api_call])
        else:
            seq_ids.append(api_to_idx['<UNK>'])
    train_ids.append(seq_ids)

# convert each api call string to numeric value - index on val set
val_ids = list()
for seq in val_seqs:
    seq_ids = list()
    for api_call in seq:
        if api_call in api_to_idx:
            seq_ids.append(api_to_idx[api_call])
        else:
            seq_ids.append(api_to_idx['<UNK>'])
    val_ids.append(seq_ids)

# convert each api call string to numeric value - index on test set
test_ids = list()
for seq in test_seqs:
    seq_ids = list()
    for api_call in seq:
        if api_call in api_to_idx:
            seq_ids.append(api_to_idx[api_call])
        else:
            seq_ids.append(api_to_idx['<UNK>'])
    test_ids.append(seq_ids)

In [12]:
# Custom Dataset
class APICallDataset(Dataset):
    def __init__(self, ids: list[list[int]], labels: list[int]):
        self.ids = ids
        self.labels = labels

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, idx: int):
        sequence = torch.tensor(self.ids[idx], dtype=torch.long)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return sequence, label

In [13]:
# collate function to add padding per each batch
def collate_fn(batch):
    ids, labels = zip(*batch)
    lengths = torch.tensor([len(s) for s in ids])

    sorted_indices = lengths.argsort(descending=True)
    ids = [ids[i] for i in sorted_indices]
    labels = torch.stack([labels[i] for i in sorted_indices])
    lengths = lengths[sorted_indices]

    padded_ids = pad_sequence(ids, batch_first=True, padding_value=pad_index)

    return padded_ids, lengths, labels

In [30]:
train_dataset = APICallDataset(train_ids, train_labels)
val_dataset = APICallDataset(val_ids, val_labels)
test_dataset = APICallDataset(test_ids, test_labels)

train_loader = DataLoader(train_dataset, batch_size=32, collate_fn=collate_fn, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, collate_fn=collate_fn, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn, shuffle=False)

### 3.2 Recurrent Neural Network Models

Monodirectional RNN model

In [18]:
class MonoRNN(nn.Module):
    def __init__(self,
                 vocab_size: int,
                 embedding_dim: int,
                 hidden_size: int,
                 output_size: int,
                 num_layers: int,
                 pad_idx: int):
        super(MonoRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.rnn = nn.RNN(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed_embedded = pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=True)

        out, h = self.rnn(packed_embedded)
        return self.linear(h[-1])

Bidirectional RNN model

In [32]:
class BiRNN(nn.Module):
    def __init__(self,
                 vocab_size: int,
                 embedding_dim: int,
                 hidden_size: int,
                 output_size: int,
                 num_layers: int,
                 pad_idx: int):
        super(BiRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.rnn = nn.RNN(embedding_dim, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.linear = nn.Linear(hidden_size * 2, output_size)

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed_embedded = pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=True)

        packed_out, h = self.rnn(packed_embedded)
        out, _ = pad_packed_sequence(packed_out, batch_first=True)

        hidden_cat = torch.cat((h[-2], h[-1]), dim=1)
        return self.linear(hidden_cat)

Monodirectional LSTM RNN model

In [20]:
class MonoLSTMRNN(nn.Module):
    def __init__(self,
                 vocab_size: int,
                 embedding_dim: int,
                 hidden_size: int,
                 output_size: int,
                 num_layers: int,
                 pad_idx: int):
        super(MonoLSTMRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed_embedded = pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=True)

        out, (h, cell) = self.lstm(packed_embedded)
        return self.linear(h[-1])

Bidirectional LSTM RNN model

In [33]:
class BiLSTMRNN(nn.Module):
    def __init__(self,
                 vocab_size: int,
                 embedding_dim: int,
                 hidden_size: int,
                 output_size: int,
                 num_layers: int,
                 pad_idx: int):
        super(BiLSTMRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.linear = nn.Linear(hidden_size * 2, output_size)

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed_embedded = pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=True)

        packed_out, (h, cell) = self.lstm(packed_embedded)
        out, _ = pad_packed_sequence(packed_out, batch_first=True)

        hidden_cat = torch.cat((h[-2], h[-1]), dim=1)

        return self.linear(hidden_cat)

## 4. Task 4: Graph Neural Network (GNN)