In [None]:
from pathlib import Path
import torch.nn as nn
from settings import sets
from truncation.static_truncation import truncate_static
from truncation.behaviour_truncation import truncate_behaviour

# Change settings in settings file!
# Restart kernel after changing settings!

models = {
    "bert": "bert-base-uncased",
    "longformer": "allenai/longformer-base-4096"
}

hyperparams = {
    "model_name": models[sets["model_short_name"]],
    "max_length": sets["truncation_size"],
    "batch_size": 128,  # Set to 32 for Longformer due to GPU load
    "lr": 5e-4,
    "epochs": 7,
    "checkpoint_frequency": 200,
    "cross_validation_folds": 5,
}

from torch.utils.data import Dataset


class MalwareDataset(Dataset):
    def __init__(self, encodings, labels=None):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        if self.labels:
            item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.encodings.input_ids)


def process_data(data, tokenizer):
    data_texts, data_labels = zip(*data)
    data_encodings = tokenizer(list(data_texts), truncation=True, padding=True, max_length=params["max_length"])
    # also take sha
    return MalwareDataset(data_encodings, labels=list(data_labels))


# Gets the data from the fromatted json file and returns a training and test set
def get_data(data, categories):
    inverted_categories = {v: k for k, v in categories.items()}

    # for all data make a list of tuples
    data_list = []
    name_list = []
    for details in data:
        index = inverted_categories[details.get('family_name')]
        features = details.get('features_json')
        data_list.append((features, index))

        name_list.append(details.get('name'))

    return data_list, name_list


def format_data(data, features):
    simplified_data = []
    distinct_families = set()

    if features == 'static':
        for name, details in data.items():
            distinct_families.add(details.get('family_name'))
            simplified_data.append({
                "name": name,
                "features_json": details.get('features_json'),
                "family_name": details.get('family_name')
            })

    elif features == 'behaviour':
        for details in data:
            distinct_families.add(details.get('Family Name'))
            simplified_data.append({
                "name": str(details.get('SHA')),
                "features_json": str(details.get('Behavior')),
                "family_name": str(details.get('Family Name'))
            })

    categories = {}
    for i, family in enumerate(distinct_families):
        categories[i] = family

    return simplified_data, categories


In [None]:
from torch.utils.data import DataLoader

from tqdm import tqdm


# gets the accuracy of the model
def get_accuracy(preds, labels):
    preds_tensor = torch.tensor(preds)
    labels_tensor = torch.tensor(labels)
    correct = (preds_tensor == labels_tensor).float().sum()
    accuracy = correct / len(labels)
    return accuracy.item()  # Converts tensor to Python float


def run_test(model, test_set, device):
    # Put the model in evaluation mode
    model.eval()
    print("Testing...")

    # DataLoader for test set
    test_loader = DataLoader(test_set, batch_size=params['batch_size'], shuffle=False, )

    all_preds = []
    all_labels = []
    losses = []
    accuracies = []

    with torch.no_grad():
        for batch in tqdm(test_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss.sum()

            # Convert logits to probabilities to get the predicted class (highest probability)
            all_preds.extend(torch.argmax(outputs.logits, dim=1).tolist())
            all_labels.extend(labels.tolist())

            losses.append(loss.item())
            accuracies.append(get_accuracy(all_preds, all_labels))

    accuracy = get_accuracy(all_preds, all_labels)
    avg_loss = sum(losses) / len(losses)
    print(f'Average loss: {avg_loss}')
    print(f'Test Accuracy: {accuracy}')
    return accuracy, avg_loss


def train(model, optimizer, training_set, device, checkpoint_dir):
    print("Training...")
    # Training loop
    model.train()

    train_loader = DataLoader(training_set, batch_size=params["batch_size"], shuffle=True)
    print("Data loaded")

    for epoch in range(params["epochs"]):
        losses = []
        accuracies = []
        all_preds = []
        all_labels = []

        for batch in tqdm(train_loader, desc=f'Epoch {epoch + 1}/{params["epochs"]}'):
            optimizer.zero_grad()
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

            loss = outputs.loss
            loss = loss.sum()
            loss.backward()

            all_preds.extend(torch.argmax(outputs.logits, dim=1).tolist())
            all_labels.extend(labels.tolist())

            losses.append(loss.item())
            accuracies.append(get_accuracy(all_preds, all_labels))

            optimizer.step()
            
        print(
            f'Epoch {epoch + 1}/{params["epochs"]}, average loss: {sum(losses) / len(losses)}, average accuracy: {sum(accuracies) / len(accuracies)}')

    print("Training finished")
    save_checkpoint(model, optimizer, checkpoint_dir, filename="final_checkpoint.pth.tar")


# saves the model and optimizer to a checkpoint, can later be used
def save_checkpoint(model, optimizer, checkpoint_dir, filename="checkpoint.pth.tar"):
    # Ensure directory exists
    filename = os.path.join(checkpoint_dir, filename)

    # Save checkpoint
    torch.save({
        'model_state_dict': model.module.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, filename)


# loads a checkpoint
def load_checkpoint(checkpoint_path, model, optimizer, device):
    # Load the checkpoint
    checkpoint = torch.load(checkpoint_path, map_location=device)

    # Load the saved model and optimizer states
    model.module.load_state_dict(checkpoint['model_state_dict'])

    # Move optimizer state to device
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    for state in optimizer.state.values():
        for k, v in state.items():
            if isinstance(v, torch.Tensor):
                state[k] = v.to(device)

    return model, optimizer

In [None]:
import json
import random
import time
import os
from math import sqrt

params = hyperparams

# external imports
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification

device = torch.device("cuda")
torch.cuda.device_count()

checkpoint_dir = f'checkpoints/{sets["model_short_name"]}_{time.strftime("%Y-%m-%d-%H-%M-%S")}'

feature_type = "behaviour" if sets["file_name"] == "behavior_features" else "static"

input_file = f'data/truncated_{sets["file_name"]}_to_{sets["truncation_size"]}.json'


def summarize_results(accuracies, losses, n_folds, best_acc, fold, output_file):
    avg_acc = round(np.average(accuracies), 3)
    sd_acc = round(np.std(accuracies), 3)
    sem_acc = round(sd_acc / sqrt(n_folds), 3)

    avg_loss = round(np.average(losses), 3)
    sd_loss = round(np.std(losses), 3)
    sem_loss = round(sd_loss / sqrt(n_folds), 3)
    best_acc = round(best_acc, 3)

    accuracies = [round(acc, 3) for acc in accuracies]
    losses = [round(loss, 3) for loss in losses]

    with open(output_file, 'w') as file:
        file.write(f"Best fold: {fold}\n\n")
        file.write(f"Accuracies: {accuracies}\n")
        file.write(f"Average: {avg_acc}\n")
        file.write(f"Standard deviation: {sd_acc}\n")
        file.write(f"Standard error of the mean: {sem_acc}\n")
        file.write(f"Best accuracy: {best_acc}\n\n")

        file.write(f"Losses: {losses}\n")
        file.write(f"Average: {avg_loss}\n")
        file.write(f"Standard deviation: {sd_loss}\n")
        file.write(f"Standard error of the mean: {sem_loss}\n")

def get_embeddings(model, data_set, name_list):
    model.eval()
    embeddings = {}

    for idx, sha in enumerate(tqdm(name_list)):
        data = data_set[idx]
        embedding_json = {}
        with torch.no_grad():
            input_ids = data['input_ids'].unsqueeze(0).to(device)
            attention_mask = data['attention_mask'].unsqueeze(0).to(device)

            output = model(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True)
            hidden_states = output.hidden_states[-1].mean(dim=1).squeeze().tolist()
            embedding_json["embeddings"] = hidden_states
            embedding_json["family"] = data["labels"].item()
            embeddings[sha] = embedding_json

    return embeddings


def run_cross_validation():
    print("==== CROSS VALIDATION ====")
    device = torch.device("cuda")
    torch.cuda.device_count()
    print("Device:", device)

    print(sets)
    print(params)

    if sets["truncation_needed"]:
        print("Truncating...")
        truncate_behaviour() if feature_type == "behaviour" else truncate_static()

    with open(input_file, 'r') as file:
        data = json.load(file)

    data, categories = format_data(data, feature_type)

    random.Random(69).shuffle(data)

    num_labels = len(categories)
    n_folds = params["cross_validation_folds"]

    # Initialize tokenizer and model
    model_name = params['model_name']
    tokenizer = AutoTokenizer.from_pretrained(model_name)

    data, name_list = get_data(data, categories)

    test_size = int(len(data) / n_folds)
    accuracies = []
    losses = []

    best_acc = -1
    best_model = None
    best_optimizer = None
    best_fold = -1

    test_only = sets["test_only"]
    data[:] = process_data(data[:], tokenizer)

    for i in range(n_folds):
        print(f"--- Fold {i + 1}/{n_folds} ---")

        # Split the data and process it
        start = test_size * i
        end = start + test_size

        training_set = data[:start] + data[end:]
        test_set = data[start:end]

        model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
        model.to(device)
        model = nn.DataParallel(model)
        optimizer = torch.optim.AdamW(model.parameters(), lr=params["lr"])

        # Freeze all layers except the classifier
        for param in model.module.base_model.parameters():
            param.requires_grad = False

        local_model = sets["local_model"]
        # If a local model should be run instead of the model from the Hugging Face model hub
        if local_model is not None:
            print(f"Loading local model {local_model}")
            model, optimizer = load_checkpoint(local_model, model, optimizer, device)

        # make sure it runs on cuda
        model.to(device)

        print("Model and tokenizer initialized")

        if not test_only:
            checkpoint_dir_fold = checkpoint_dir + f"/fold_{i + 1}"
            Path(checkpoint_dir_fold).mkdir(parents=True, exist_ok=True)
            train(model, optimizer, training_set, device, checkpoint_dir_fold)
        acc, loss = run_test(model, test_set, device)
        accuracies.append(acc)
        losses.append(loss)

        if acc > best_acc:
            best_acc = acc
            best_model = model
            best_optimizer = optimizer
            best_test = test_set
            print(f"New best acc {best_acc}")
            best_fold = i + 1

    summarize_results(accuracies, losses, n_folds, best_acc, best_fold, checkpoint_dir + "/results.txt")
    checkpoint_dir_best = checkpoint_dir + "/best"
    Path(checkpoint_dir_best).mkdir(parents=True, exist_ok=True)
    save_checkpoint(best_model, best_optimizer, checkpoint_dir_best, filename="best_model.pth.tar")

    print("Extracting embeddings...")
    embeddings = get_embeddings(best_model.module.base_model, data, name_list)
    output_file = checkpoint_dir + "/embeddings.json"

    print("Writing embeddings...")
    with open(output_file, 'w') as file:
        json.dump(embeddings, file, indent=4)

    return best_model, best_test


best_model, best_test = run_cross_validation()
print("All done!")