In [None]:
import pathlib
import os
import glob
import json 

import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from torch.utils.data import Dataset
from sklearn.preprocessing import LabelEncoder
import numpy as np

In [None]:
# load the training data
training_dir = pathlib.Path("../training_data/")
positive_example = "ls"
anomalies = set(next(os.walk(training_dir))[1]) - {positive_example}
subdir = positive_example

def load_dir(subdir):
    sentences = []
    print(str(training_dir / subdir / "*json"))
    for fname in glob.glob(str(training_dir / subdir / "*json")):
        with open(fname, "r") as fp:
            sentence = json.load(fp)
            sentences.append(sentence)
    return sentences

positives = load_dir(positive_example)
syscalls = positives
for sd in anomalies:
    syscalls = syscalls + load_dir(sd)
labels = [1,] * len(positives) + [0,] * (len(syscalls) - len(positives))
num_syscalls = max([max(s) for s in syscalls])

In [None]:
# train albert on the anomaly detection task
class SysCallDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

model_name = "albert-base-v2"
output_dir = "../models/albert"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# Suppose your sequences of system calls are in a list of lists where each inner list is a sequence
# And `labels` is a list of same length indicating whether each sequence is normal (0) or anomalous (1)
sequences = syscalls
labels = labels

# Convert sequences to string as transformers Tokenizer requires string inputs
sequences_str = [' '.join(map(str, seq)) for seq in sequences]

# Tokenize sequences
inputs = tokenizer(sequences_str, return_tensors="pt", padding=True, truncation=True, max_length=512)

# Prepare labels
le = LabelEncoder()
labels_encoded = le.fit_transform(labels)
labels_tensor = torch.tensor(labels_encoded)

# Convert to Dataset
dataset = SysCallDataset(inputs, labels_tensor)

# Setup training
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=16,
    logging_steps=500,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    compute_metrics=None,
)

# Train
trainer.train()


In [None]:
# export to onnx runtime 
dynamic_axes = {
    'input_ids': {0: 'batch_size', 1: 'sequence_length'},
    'attention_mask': {0: 'batch_size', 1: 'sequence_length'},
    'logits': {0: 'batch_size'}
}

# After training, export model to ONNX
torch.onnx.export(
    model,
    (inputs['input_ids'], inputs['attention_mask']),
    os.path.join(output_dir, "model.onnx"),
    input_names=["input_ids", "attention_mask"],
    output_names=["logits"],
    dynamic_axes=dynamic_axes,
    opset_version=13,
)

# Save tokenizer
tokenizer.save_pretrained(output_dir)