In [1]:
import os
basepath = os.path.dirname(os.path.abspath("."))
REPO_ROOT = os.path.join(basepath, "..")

import sys
sys.path.append(REPO_ROOT)

import json
from nebula.preprocessing import JSONTokenizerBPE, JSONTokenizerWhiteSpace
from nebula.constants import JSON_CLEANUP_SYMBOLS
from pandas import read_csv

LIMIT = 30
folder = os.path.join(REPO_ROOT, r"data\data_raw\Avast\Public_Avast_CTU_CAPEv2_Dataset_Small\public_small_reports")
examples = [os.path.join(folder, x) for x in os.listdir(folder)[:LIMIT]]
label_file = os.path.join(REPO_ROOT, r"data\data_raw\Avast\Public_Avast_CTU_CAPEv2_Dataset_Small\public_labels.csv")


field = 'classification_family'

label_df = read_csv(label_file)
label_map = dict(zip(
    sorted(label_df[field].unique()),
    list(range(label_df[field].nunique()))
))

In [2]:
capa_normalizer = {
    "resolved_apis": lambda x: x.lower(),
    "mutexes": lambda x: x.lower()
}

X_raw = []
y = []
for example in examples:
    hhash = os.path.basename(example).replace(".json", "")
    family = label_df[label_df['sha256'] == hhash][field].iloc[0]
    
    with open(example) as f:
        sample = json.load(f)
    sample = sample["behavior"]['summary']
    normalized_sample = {field: [capa_normalizer[field](x) for x in sample[field]] for field in capa_normalizer}
    X_raw.append(normalized_sample)
    y.append(label_map[family])

In [3]:
tokenizer = JSONTokenizerWhiteSpace(
    vocab_size=300,
    seq_len=512,
    cleanup_symbols=JSON_CLEANUP_SYMBOLS,
    stopwords=[]
)

tokenizer.train(X_raw)
encoded = tokenizer.encode(X_raw)
print(encoded.shape)

100%|██████████| 30/30 [00:00<00:00, 4285.30it/s]


(30, 512)


In [4]:
tokenizer_bpe = JSONTokenizerBPE(
    vocab_size=300,
    seq_len=512,
    cleanup_symbols=JSON_CLEANUP_SYMBOLS,
    stopwords=[]
)
tokenizer_bpe.train(X_raw)
encoded = tokenizer.encode(X_raw)
print(encoded.shape)

	You need to train tokenizer with .train() or specify 'model_path=' during initialization!


(30, 512)
