<a href="https://colab.research.google.com/github/AImSecure/Laboratory2/blob/main/lab/notebooks/Lab2_FFNN_RNN_GNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Laboratory 2 — Model Engineering

text

## Setup

In [9]:
# --- Check Python and pip versions ---
!python --version
!pip install --upgrade pip

Python 3.12.12


In [10]:
# --- Install required libraries ---
!pip install torch
!pip install numpy pandas scikit-learn matplotlib seaborn
!pip install tqdm



In [38]:
# --- Import libraries ---
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset

from tqdm import tqdm

### Colab Pro

In [12]:
# --- Check GPU availability ---
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

/bin/bash: line 1: nvidia-smi: command not found


In [13]:
# --- Check RAM availability ---
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

if ram_gb < 20:
  print('Not using a high-RAM runtime')
else:
  print('You are using a high-RAM runtime!')

Your runtime has 13.6 gigabytes of available RAM

Not using a high-RAM runtime


### Paths setup


In [14]:
# --- Mount Google Drive (for Google Colab users) ---
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [19]:
# --- Define Paths ---
group = 'AImSecure'
laboratory = 'Laboratory2'

base_path = '/content/drive/MyDrive/'
project_path = base_path + f'Projects/{group}/{laboratory}/'
data_path = project_path + 'data/'
results_path = project_path + 'results/'

# Ensure directories exist
os.makedirs(project_path, exist_ok=True)
os.makedirs(data_path, exist_ok=True)
os.makedirs(results_path, exist_ok=True)

print(f"Project path: {project_path}")
print(f"Data path: {data_path}")
print(f"Results path: {results_path}")

Project path: /content/drive/MyDrive/Projects/AImSecure/Laboratory2/
Data path: /content/drive/MyDrive/Projects/AImSecure/Laboratory2/data/
Results path: /content/drive/MyDrive/Projects/AImSecure/Laboratory2/results/


In [20]:
# --- Set visual style ---
sns.set(style="whitegrid", palette="muted", font_scale=1.1)

def save_plot(fig: plt.Figure, filename: str, path: str = "./plots/", fmt: str = "png", dpi: int = 300, close_fig: bool = False) -> None:
    """
    Save a Matplotlib figure in a specific to a specified directory.

    Args:
        fig (plt.Figure): Matplotlib figure object to save.
        filename (str): Name of the file to save (e.g., 'plot.png').
        path (str, optional): Directory path to save the figure. Defaults to './plots/'.
        fmt (str, optional): File format for the saved figure. Defaults to 'png'.
        dpi (int, optional): Dots per inch for the saved figure. Defaults to 300.

    Returns:
        None
    """
    # Ensure the directory exists
    os.makedirs(path, exist_ok=True)
    save_path = os.path.join(path, f"{filename}.{fmt}")

    # Save the figure
    fig.savefig(save_path, bbox_inches='tight', pad_inches=0.1, dpi=dpi, format=fmt)
    # plt.close(fig) # Removed to display plots in notebook

    if close_fig:
        plt.close(fig)

    print(f"Saved plot: {save_path}")

## Task 1 — Frequency-based baseline

We implement a simple frequency-based baseline.  
- Transform sequences into feature vectors counting API call occurrences.  
- Helps evaluate whether simple approaches already perform well before using complex models.


In [21]:
# Create directory for plots
save_dir = results_path + 'images/' + 'task1_plots/'
os.makedirs(save_dir, exist_ok=True)

### Frequency-Based Approach

- Extract vocabulary from train and test datasets.
- Use vocabulary to create feature vectors (frequency counts per API call).
- Output dataframe: one row per sequence, one column per API call.

In [26]:
file_path_train = data_path + 'train.json'
df_train = pd.read_json(file_path_train)
file_path_test = data_path + 'test.json'
df_test = pd.read_json(file_path_test)

# Basic info
print("Info training dataset:")
print("Shape (raw):", df_train.shape)
print("Columns:", list(df_train.columns))

# Basic info
print("\nInfo test dataset:")
print("Shape (raw):", df_test.shape)
print("Columns:", list(df_test.columns))

Info training dataset:
Shape (raw): (16325, 2)
Columns: ['api_call_sequence', 'is_malware']

Info test dataset:
Shape (raw): (6505, 2)
Columns: ['api_call_sequence', 'is_malware']


In [24]:
df_train

Unnamed: 0,api_call_sequence,is_malware
0,"[LdrGetDllHandle, LdrGetProcedureAddress, LdrL...",1
1,"[NtAllocateVirtualMemory, LdrLoadDll, LdrGetPr...",1
2,"[FindResourceExW, LoadResource, FindResourceEx...",1
3,"[FindResourceExW, LoadResource, FindResourceEx...",1
4,"[LdrGetProcedureAddress, SetErrorMode, LdrLoad...",1
...,...,...
16320,"[LdrGetProcedureAddress, LdrLoadDll, LdrGetPro...",1
16321,"[NtClose, LdrGetProcedureAddress, CryptCreateH...",1
16322,"[LdrGetProcedureAddress, LdrGetDllHandle, LdrG...",1
16323,"[LdrGetProcedureAddress, LdrGetDllHandle, LdrG...",1


In [27]:
df_test

Unnamed: 0,api_call_sequence,is_malware
0,"[NtQueryValueKey, NtClose, NtOpenKey, NtQueryV...",1
1,"[LdrGetProcedureAddress, NtClose, NtOpenKey, N...",1
2,"[NtOpenKey, NtQueryValueKey, NtClose, NtOpenKe...",1
3,"[NtAllocateVirtualMemory, LdrLoadDll, LdrGetPr...",1
4,"[NtOpenKey, NtQueryValueKey, NtClose, LdrGetPr...",1
...,...,...
6500,"[SetErrorMode, NtOpenFile, NtClose, SHGetFolde...",0
6501,"[NtProtectVirtualMemory, RegOpenKeyExW, RegQue...",1
6502,"[RegOpenKeyExW, RegQueryValueExW, RegCloseKey,...",1
6503,"[NtQueryValueKey, NtClose, RegOpenKeyExA, RegQ...",1


### Extract the vocabularies

In [30]:
# --- Extract vocabulary from training set ---
train_vocab = set(api_call for seq in df_train['api_call_sequence'] for api_call in seq)

# --- Count unique API calls ---
print(f"Train: {len(train_vocab)}")

Train: 258


In [31]:
# --- Extract vocabulary from test set ---
test_vocab = set(api_call for seq in df_test['api_call_sequence'] for api_call in seq)

# --- Count unique API calls ---
print(f"Test: {len(test_vocab)}")

Test: 232


#### Q: How many unique API calls does the training set contain? How many in the test set?

*   Train set: 258
*   Test set: 232

In [36]:
# --- Find API calls only in the test set ---
only_in_test = test_vocab - train_vocab

# --- Count and print them ---
print(f"Number of API calls only in test set: {len(only_in_test)}")
print("API calls only in test set:", only_in_test)

Number of API calls only in test set: 3
API calls only in test set: {'WSASocketA', 'ControlService', 'NtDeleteKey'}


#### Q: Are there any API calls that appear only in the test set (but not in the training set)? If yes, how many? Which ones?

Number of API calls only in test set: 3

API calls only in test set: {'WSASocketA', 'ControlService', 'NtDeleteKey'}


### New frequency-based dataframes

- Create dataframe using training vocabulary as features.
- Count occurrences of each API call per sequence.

In [57]:
# --- Create frequency-based dataframe for training data ---
train_df_freq = pd.DataFrame([{api: seq.count(api) for api in train_vocab} for seq in df_train['api_call_sequence']])
display(train_df_freq.head())

Unnamed: 0,GetSystemDirectoryW,GetFileAttributesW,DrawTextExW,UuidCreate,connect,NtOpenKeyEx,RegCloseKey,NtReadFile,RegQueryInfoKeyW,CryptAcquireContextA,...,RemoveDirectoryA,SHGetSpecialFolderLocation,LoadResource,NtSetValueKey,GetUserNameExW,DeleteUrlCacheEntryA,NtOpenDirectoryObject,WSARecv,GetSystemTimeAsFileTime,WriteProcessMemory
0,0,0,2,0,0,0,1,0,0,0,...,0,0,8,0,0,0,0,0,0,0
1,2,0,0,0,0,5,0,0,0,0,...,0,0,0,0,0,0,1,0,2,0
2,1,0,2,0,0,0,0,0,0,0,...,0,0,21,0,0,0,0,0,0,0
3,0,0,2,0,0,0,0,0,0,0,...,0,0,18,0,0,0,0,0,0,0
4,0,0,1,0,0,0,0,0,0,0,...,0,0,16,0,0,0,0,0,0,0


In [33]:
print("Info training dataset:")
print("Shape (raw):", train_df_freq.shape)
print("Columns:", list(train_df_freq.columns))

Info training dataset:
Shape (raw): (16325, 258)
Columns: ['GetSystemDirectoryW', 'GetFileAttributesW', 'DrawTextExW', 'UuidCreate', 'connect', 'NtOpenKeyEx', 'RegCloseKey', 'NtReadFile', 'RegQueryInfoKeyW', 'CryptAcquireContextA', 'GetUserNameExA', 'NtAllocateVirtualMemory', 'RegSetValueExA', 'GetFileVersionInfoSizeW', 'FindResourceA', 'GetFileType', 'CryptEncrypt', 'NtSetInformationFile', 'InternetOpenUrlW', 'GetBestInterfaceEx', 'GetKeyState', 'NtEnumerateValueKey', 'CreateProcessInternalW', 'gethostbyname', 'CoCreateInstanceEx', 'CryptProtectData', 'NtOpenProcess', 'StartServiceA', 'HttpSendRequestA', 'InternetCloseHandle', 'LdrLoadDll', 'CoInitializeSecurity', 'InternetSetStatusCallback', 'SHGetFolderPathW', 'SetErrorMode', 'IsDebuggerPresent', 'CopyFileW', 'CopyFileExW', 'GetFileInformationByHandleEx', 'SetStdHandle', 'NtTerminateThread', 'RegEnumKeyExW', 'InternetOpenW', 'GetAdaptersAddresses', 'GetFileVersionInfoW', 'NtUnmapViewOfSection', 'GetComputerNameW', 'WriteConsoleW', '

In [34]:
# --- Create frequency-based dataframe for training data ---
test_df_freq = pd.DataFrame([{api: seq.count(api) for api in test_vocab} for seq in df_test['api_call_sequence']])
display(test_df_freq.head())

Unnamed: 0,GetSystemDirectoryW,GetFileAttributesW,DrawTextExW,UuidCreate,connect,NtOpenKeyEx,RegCloseKey,NtReadFile,RegQueryInfoKeyW,CryptAcquireContextA,...,RemoveDirectoryW,RemoveDirectoryA,SHGetSpecialFolderLocation,LoadResource,NtSetValueKey,DeleteUrlCacheEntryA,NtOpenDirectoryObject,WSARecv,GetSystemTimeAsFileTime,WriteProcessMemory
0,0,0,2,0,0,0,0,0,0,0,...,0,0,0,23,0,0,0,0,0,0
1,1,0,1,0,0,0,3,13,0,0,...,0,0,0,0,0,0,0,0,0,0
2,0,0,2,0,0,0,1,0,0,0,...,0,0,0,8,0,0,0,0,0,0
3,1,0,4,0,0,0,3,0,0,0,...,0,0,0,0,0,0,0,0,2,0
4,0,0,0,1,0,0,8,1,2,0,...,0,0,0,1,0,0,0,0,3,0


In [35]:
print("Info training dataset:")
print("Shape (raw):", test_df_freq.shape)
print("Columns:", list(test_df_freq.columns))

Info training dataset:
Shape (raw): (6505, 232)
Columns: ['GetSystemDirectoryW', 'GetFileAttributesW', 'DrawTextExW', 'UuidCreate', 'connect', 'NtOpenKeyEx', 'RegCloseKey', 'NtReadFile', 'RegQueryInfoKeyW', 'CryptAcquireContextA', 'NtAllocateVirtualMemory', 'RegSetValueExA', 'GetFileVersionInfoSizeW', 'FindResourceA', 'GetFileType', 'CryptEncrypt', 'NtSetInformationFile', 'GetBestInterfaceEx', 'GetKeyState', 'NtEnumerateValueKey', 'CreateProcessInternalW', 'gethostbyname', 'StartServiceA', 'CoCreateInstanceEx', 'NtOpenProcess', 'InternetCloseHandle', 'CryptProtectData', 'HttpSendRequestA', 'LdrLoadDll', 'CoInitializeSecurity', 'SHGetFolderPathW', 'SetErrorMode', 'IsDebuggerPresent', 'CopyFileW', 'CopyFileExW', 'SetStdHandle', 'NtTerminateThread', 'RegEnumKeyExW', 'InternetOpenW', 'GetAdaptersAddresses', 'GetFileVersionInfoW', 'NtUnmapViewOfSection', 'GetComputerNameW', 'WriteConsoleW', 'HttpOpenRequestW', 'RegQueryInfoKeyA', 'InternetQueryOptionA', 'LoadStringW', 'Module32FirstW', 'Reg

#### Q: Can you use the test vocabulary to build the new test dataframe? If not, how do you handle API calls in the test set that do not exist in the training vocabulary?

just ignore them

In [37]:
# --- Calculate average non-zero elements per row for training data ---
avg_nonzero_train = (train_df_freq != 0).sum(axis=1).mean()
print(f"Average non-zero elements per row in train_df_freq: {avg_nonzero_train:.2f}")

# --- Calculate ratio of non-zero elements for training data ---
ratio_nonzero_train = avg_nonzero_train / train_df_freq.shape[1]
print(f"Ratio of non-zero elements per row in train_df_freq: {ratio_nonzero_train:.4f}")

# --- Calculate average non-zero elements per row for test data ---
avg_nonzero_test = (test_df_freq != 0).sum(axis=1).mean()
print(f"Average non-zero elements per row in test_df_freq: {avg_nonzero_test:.2f}")

# --- Calculate ratio of non-zero elements for test data ---
ratio_nonzero_test = avg_nonzero_test / test_df_freq.shape[1]
print(f"Ratio of non-zero elements per row in test_df_freq: {ratio_nonzero_test:.4f}")

Average non-zero elements per row in train_df_freq: 21.95
Ratio of non-zero elements per row in train_df_freq: 0.0851
Average non-zero elements per row in test_df_freq: 24.28
Ratio of non-zero elements per row in test_df_freq: 0.1046


#### Q: How many non-zero elements per row do you have on average in the training set? How many in the test set? What is the ratio with respect to the number of elements per row?

#### Q: The original API sequences were ordered. Is it still the case now in the frequency-based dataframe? Why?

### Feed the frequency-based datasets to a classifier

- Any classifier can be used (shallow/deep neural or non-neural).
- Goal: evaluate baseline performance on sparse vectors without sequence information.

In [60]:
# --- Split data into features (X) and target (y) ---
X_train = train_df_freq
y_train = df_train['is_malware']
print(X_train.shape)
# ---
X_test = pd.DataFrame([{api: seq.count(api) for api in train_vocab} for seq in df_test['api_call_sequence']])
print(X_test.shape)
y_test = df_test['is_malware']

# Using default hyperparameters for a simple baseline
clf = LogisticRegression(random_state=42, solver='liblinear') # 'liblinear' solver is good for small datasets and sparse data
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print(f"Test Accuracy: {accuracy_score(y_test, y_pred):.4f}")

(16325, 258)
(6505, 258)
Test Accuracy: 0.9694


In [56]:
print("\nClassification Report:\n", classification_report(y_pred, y_test))


Classification Report:
               precision    recall  f1-score   support

           0       0.32      0.70      0.44       112
           1       0.99      0.97      0.98      6393

    accuracy                           0.97      6505
   macro avg       0.66      0.84      0.71      6505
weighted avg       0.98      0.97      0.97      6505



#### Q: Report how you chose the hyperparameters of your classifier, and the final performance on the test set.

#### Q: Is the final performance good, even ignoring the order of API calls and handling very sparse vectors?

## Task 2 - Feed Forward Neural Network (FFNN)

text

In [None]:
# Create directory for plots
save_dir = results_path + 'images/' + 'task2_plots/'
os.makedirs(save_dir, exist_ok=True)

### API Call Statistics

#### Q: Do you have the same number of API calls per sequence? If not, is the distribution of API calls per sequence the same for training and test sets?

#### Q: Can a FFNN handle a variable number of elements? If not, why?

### Fixed-Size Sequences

#### Q: How to estimate a fixed-size candidate? Which partition do you use to estimate it?

#### Q: Given the estimate, what technique could you use to obtain the same number of API calls per sequence?

#### Q: If at test time you have more API calls than the fixed-size, what do you do with the exceeding API calls?

### Handling Categorical Features

#### Q: Use a FFNN in both cases. Report how you selected the hyperparameters of your final model, and justify your choices.

#### Q: Can you obtain the same results for sequential identifiers and learnable embeddings? If not, why?

## Task 3 - Recursive Neural Network (RNN)

text

In [None]:
# Create directory for plots
save_dir = results_path + 'images/' + 'task3_plots/'
os.makedirs(save_dir, exist_ok=True)

### Sequence Modeling with RNNs

#### Q: With RNNs, do you still have to pad your data? If yes, how?

#### Q: Do you have to truncate the testing sequences? Justify your answer.

#### Q: Is the RNN padding more memory efficient compared to the FFNN’s one? Why?

#### Q: Start with a simple one-directional RNN. Is your network as fast as the FFNN? If not, why?

### Network Variations

#### Q: Is the RNN training as stable as the FFNN's one?

#### Q: How does your model's performance compare to the simple frequency baseline?

## Task 4 - Graph Neural Network (GNN)

text

In [None]:
# Create directory for plots
save_dir = results_path + 'images/' + 'task4_plots/'
os.makedirs(save_dir, exist_ok=True)

### Modeling API Sequences as Graphs

#### Q: Do you still have to pad your data? If yes, how?

#### Q: Do you have to truncate the testing sequences? Justify your answer.

#### Q: What is the advantage of modeling your problem with a GNN compared to an RNN? What do you lose?

### GNN Variations

#### Q: How does each model perform compared to the previous architectures? Can you beat the baseline?