In [7]:
import datasets
import torch
import numpy as np
import pandas as pd
from tqdm import tqdm
from torch.utils.data import Dataset
from tokenizers.normalizers import NFKC
from tokenizers.pre_tokenizers import Whitespace
from transformers import PreTrainedTokenizerFast
from tokenizers import Tokenizer, trainers, models
from transformers import Trainer, TrainingArguments
from transformers import DataCollatorForLanguageModeling
from transformers import RobertaForSequenceClassification
from transformers import RobertaForMaskedLM, RobertaConfig
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import os
from typing import List
from collections import defaultdict, Counter

os.environ["WANDB_MODE"] = "offline"

DATA_PATH = "/Volumes/malware-dataset/opcodes/processed-data"
MAX_LENGTH = 64

In [8]:
def get_data(path: os.PathLike, full_path: bool = True) -> List[str]:
    all_files = os.listdir(path)

    if full_path:
        return [
            os.path.join(path, file)
            for file in all_files
            if file.endswith(".txt") and not file.startswith("._")
        ]
    else:
        return all_files


def get_labels(filenames):
    return [1 if "VirusShare" in filename else 0 for filename in filenames]


paths = get_data(DATA_PATH)
labels = get_labels(paths)

In [9]:
class OpcodeDataset(Dataset):
    def __init__(self, paths: List[str], labels: List[str]):
        assert len(paths) == len(labels), "Mismatch between number of files and labels"
        self.paths = paths
        self.labels = labels

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        assert 0 <= idx <= len(self), "Index out of range"
        label = self.labels[idx]

        with open(self.paths[idx], "r") as file:
            content = file.readlines()

        return " ".join([opcode.rstrip() for opcode in content]), label


opcode_dataset = OpcodeDataset(paths, labels)

In [4]:
from tokenizers.processors import RobertaProcessing

if not os.path.exists("../MalBERTa"):
    tokenizer = Tokenizer(models.WordLevel(unk_token="<unk>"))
    tokenizer.normalizer = NFKC()
    tokenizer.pre_tokenizer = Whitespace()

    trainer = trainers.WordLevelTrainer(
        vocab_size=1293,
        special_tokens=[
            "<s>",
            "<pad>",
            "</s>",
            "<unk>",
            "<mask>",
        ],
    )
    tokenizer.train(paths, trainer)
    tokenizer.post_processor = RobertaProcessing(
        cls=("<s>", tokenizer.token_to_id("<s>")),
        sep=("</s>", tokenizer.token_to_id("</s>")),
    )
    tokenizer.save("../MalBERTa/tokenizer.json")

    hf_tokenizer = PreTrainedTokenizerFast(
        tokenizer_file="../MalBERTa/tokenizer.json",
        unk_token="<unk>",
        bos_token="<s>",
        eos_token="</s>",
        pad_token="<pad>",
        mask_token="<mask>",
    )
    hf_tokenizer.save_pretrained("../MalBERTa")
    tokenizer = hf_tokenizer
else:
    tokenizer = PreTrainedTokenizerFast.from_pretrained("../MalBERTa")

In [10]:
def dataset_generator():
    for text, label in tqdm(opcode_dataset):
        yield {"text": text, "label": label}


if not os.path.exists("../data/raw"):
    dataset = datasets.Dataset.from_generator(dataset_generator)
    dataset = dataset.train_test_split(test_size=0.2)
    dataset.save_to_disk("../data/raw")
else:
    dataset = datasets.load_from_disk("../data/raw")
print(f"Original Dataset: {dataset}")
print(
    f"Class Distribution:\n\tTrain: {Counter(dataset['train']['label'])}\n\tTest: {Counter(dataset['test']['label'])}"
)

In [11]:
def new_generator():
    obfuscated_benign_path = (
        "/Volumes/malware-dataset/obfuscated-benign/disassembly/opcode-sequences"
    )
    files = [
        os.path.join(obfuscated_benign_path, file)
        for file in os.listdir(obfuscated_benign_path)
        if file.endswith(".txt")
    ]
    labels = [0] * len(files)

    ds = OpcodeDataset(files, labels)

    for text, label in ds:
        yield {
            "text": text,
            "label": label,
        }


obfuscated_benign = datasets.Dataset.from_generator(new_generator)
obfuscated_benign

Dataset({
    features: ['text', 'label'],
    num_rows: 2627
})

In [None]:
# Merge the two datasets
def is_malicious(args):
    return args["label"] != 0


merged = datasets.concatenate_datasets([dataset["train"], dataset["test"]])

new_dataset = datasets.concatenate_datasets([merged, obfuscated_benign])
class_dist = Counter(new_dataset["label"])
mal = new_dataset.filter(is_malicious)
ben = new_dataset.filter(lambda s: not is_malicious(s))

subset_ben = ben.shuffle().select(range(min(class_dist.values())))
new_dataset = datasets.concatenate_datasets([mal, subset_ben])
Counter(new_dataset["label"])

new_dataset = new_dataset.train_test_split(test_size=0.2)
new_dataset.save_to_disk("../data/partially_obfuscated_benign")
print(f"New dataset: {new_dataset}")
print(
    f"Class Distribution:\n\tTrain: {Counter(new_dataset['train']['label'])}\n\tTest: {Counter(new_dataset['test']['label'])}"
)

Saving the dataset (5/5 shards): 100%|██████████| 5518/5518 [00:06<00:00, 822.84 examples/s]
Saving the dataset (2/2 shards): 100%|██████████| 1380/1380 [00:01<00:00, 847.97 examples/s]


In [None]:
def handle_sample(sample):
    texts = sample["text"]
    labels = sample["label"]

    flattened = defaultdict(list)

    for text, label in zip(texts, labels):
        tokenized = tokenizer(
            text,
            padding="max_length",
            max_length=MAX_LENGTH,
            return_overflowing_tokens=True,
            truncation=True,
            return_special_tokens_mask=True,
        )

        for i in range(len(tokenized["input_ids"])):
            for k in tokenized:
                flattened[k].append(tokenized[k][i])
            flattened["label"].append(label)

    return dict(flattened)


processed_dataset = dataset.map(
    handle_sample,
    remove_columns=dataset["test"].column_names,
    batch_size=64,
    batched=True,
    num_proc=8,
)


In [None]:
config = RobertaConfig(
    vocab_size=tokenizer.vocab_size,
    max_position_embeddings=MAX_LENGTH + 2,
    num_attention_heads=4,
    num_hidden_layers=4,
    type_vocab_size=1,
    hidden_size=128,
    intermediate_size=2048,
)

model = RobertaForMaskedLM(config=config)
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer, mlm=True, mlm_probability=0.15
)

train_ds = processed_dataset["train"].remove_columns("label")
test_ds = processed_dataset["test"].remove_columns("label")

train_args = TrainingArguments(
    output_dir="./MalBERTa",
    overwrite_output_dir=True,
    num_train_epochs=1,
    per_device_train_batch_size=64,
    save_steps=10_000,
    save_total_limit=2,
)

trainer = Trainer(
    model=model,
    args=train_args,
    processing_class=tokenizer,
    data_collator=data_collator,
    train_dataset=train_ds,
    eval_dataset=test_ds,
)

trainer.train()

In [None]:
def predict(token_ids):
    X = data_collator(torch.tensor(token_ids["input_ids"]))
    preds = trainer.predict(X["input_ids"])

    Y_hat = tokenizer.batch_decode(preds.predictions.argmax(-1))
    Y = tokenizer.batch_decode(token_ids["input_ids"])

    df = pd.DataFrame(
        data={
            "Input": tokenizer.batch_decode(X["input_ids"]),
            "Predicted": Y_hat,
            "Actual": Y,
        }
    )

    return df


data = test_ds.select(range(10))
predict(data)

In [None]:
model = RobertaForSequenceClassification.from_pretrained("./MalBERTa")
data = processed_dataset["train"][0]

input_ids = torch.tensor(data["input_ids"]).unsqueeze(0)
attention_mask = torch.tensor(data["attention_mask"]).unsqueeze(0)
label = torch.tensor(data["label"])

model(input_ids, attention_mask, labels=label)

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)
    return {
        "accuracy": accuracy_score(labels, predictions),
        "precision": precision_score(
            labels, predictions, average="weighted", zero_division=0
        ),
        "recall": recall_score(
            labels, predictions, average="weighted", zero_division=0
        ),
        "f1": f1_score(labels, predictions, average="weighted", zero_division=0),
    }


train_args = TrainingArguments(
    output_dir="./MalBERTa-classifier",
    overwrite_output_dir=True,
    num_train_epochs=1,
    per_device_train_batch_size=256,
    per_device_eval_batch_size=256,
    save_steps=10_000,
    save_total_limit=2,
    eval_steps=100,
    eval_strategy="steps",
)

trainer = Trainer(
    model=model,
    args=train_args,
    processing_class=tokenizer,
    train_dataset=processed_dataset["train"],
    eval_dataset=processed_dataset["test"],
    compute_metrics=compute_metrics,
)

trainer.evaluate()

In [None]:
model.classifier(processed_dataset["test"])