This notebook documents the procedure for training the single-label models during the 2023 TRAM effort.

The `bootstrap-training-data` file contains the annotations that existed prior, as well as the annotations that were produced during the 2023 effort.

In [3]:
import pandas as pd
import json

with open('../data/training/bootstrap-training-data.json') as f:
    data_json = json.loads(f.read())

data = pd.DataFrame(
    [
        {'text': row['text'], 'label': row['mappings'][0]['attack_id']}
        for row in data_json['sentences']
        if len(row['mappings']) > 0
    ]
)

data

Unnamed: 0,text,label
0,network traffic communicates over a raw socket.,T1095
1,has the ability to set file attributes to hidden.,T1564.001
2,searches for files that are 60mb and less and ...,T1083
3,Attackers can use legitimate domains that are ...,T1090
4,has registered two registry keys for shim data...,T1112
...,...,...
11125,usually the encrypted payload,T1027
11126,includes a capability to modify the Beacon pay...,T1027
11127,has collected the username of the victim system.,T1033
11128,has given malware the same name as an existing...,T1036.005


We then load the model and move it to the GPU.

In [None]:
import transformers
import torch

mode: 'bert or gpt' = 'bert'
cuda = torch.device('cuda')

if mode == 'bert':
    model = transformers.BertForSequenceClassification.from_pretrained(
        "allenai/scibert_scivocab_uncased",
        num_labels=data['label'].nunique(),
        output_attentions=False,
        output_hidden_states=False,
    )
    tokenizer = transformers.BertTokenizer.from_pretrained("allenai/scibert_scivocab_uncased", max_length=512)
elif mode == 'gpt':
    model = transformers.GPT2ForSequenceClassification.from_pretrained(
        "gpt2",
        num_labels=data['label'].nunique(),
        output_attentions=False,
        output_hidden_states=False,
    )
    tokenizer = transformers.GPT2Tokenizer.from_pretrained("gpt2", max_length=512)
    tokenizer.pad_token = tokenizer.eos_token
    model.config.pad_token_id = tokenizer.pad_token_id
else:
    raise ValueError(f"mode must be one of bert or gpt, but is {mode = !r}")

model.train().to(cuda)


We will represent the labels using one hot encoding.

The `apply_attention_mask` function returns an attention mask (which is a tensor) where the element for every non-padding token is `1`.

In [5]:
from sklearn.preprocessing import OneHotEncoder as OHE

encoder = OHE(sparse_output=False)
encoder.fit(data[['label']])

def tokenize(samples: 'list[str]'):
    return tokenizer(samples, return_tensors='pt', padding='max_length', truncation=True, max_length=512).input_ids

def load_data(x, y, batch_size=10):
    x_len, y_len = x.shape[0], y.shape[0]
    assert x_len == y_len
    for i in range(0, x_len, batch_size):
        slc = slice(i, i + batch_size)
        yield x[slc].to(cuda), y[slc].to(cuda)

def apply_attention_mask(x):
    return x.ne(tokenizer.pad_token_id).to(int)


In [6]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(data, test_size=.2, stratify=data['label'])

x_train = tokenize(train['text'].tolist())
x_train

tensor([[  102,   434,   501,  ...,     0,     0,     0],
        [  102,  4199, 14562,  ...,     0,     0,     0],
        [  102,   147,  3901,  ...,     0,     0,     0],
        ...,
        [  102, 10740,   111,  ...,     0,     0,     0],
        [  102,   434, 13037,  ...,     0,     0,     0],
        [  102,   121,   993,  ...,     0,     0,     0]])

In [7]:
y_train = torch.Tensor(encoder.transform(train[['label']]))
y_train

tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        ...,
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])

The hyperparameters shown here are those that we used, including the number of epochs and batch size.

In [None]:
from torch.optim import AdamW
from tqdm import tqdm
from statistics import mean

optim = AdamW(model.parameters(), lr=2e-5, eps=1e-8)

for epoch in range(6):
    epoch_losses = []
    for x, y in tqdm(load_data(x_train, y_train, batch_size=10)):
        model.zero_grad()
        out = model(x, attention_mask=apply_attention_mask(x), labels=y)
        epoch_losses.append(out.loss.item())
        out.loss.backward()
        optim.step()
    print(f"epoch {epoch + 1} loss: {mean(epoch_losses)}")


In [None]:
import torch.nn.functional as F

model.eval()

preds = []
batch_size = 20

x_test = tokenize(test['text'].tolist())

with torch.no_grad():
    for i in range(0, x_test.shape[0], batch_size):
        x = x_test[i : i + batch_size].to(cuda)
        out = model(x, attention_mask=apply_attention_mask(x))
        preds.extend(out.logits.to('cpu'))

predicted_labels = (
    encoder.inverse_transform(
        F.one_hot(
            torch.vstack(preds).softmax(-1).argmax(-1),
            num_classes=50
        )
        .numpy()
    )
    .reshape(-1)
)

predicted_labels

In [None]:
from sklearn.metrics import precision_recall_fscore_support as calculate_score

predicted = list(predicted_labels)
actual = test['label'].tolist()

labels = sorted(data['label'].unique())

scores = calculate_score(actual, predicted, labels=labels)

scores_df = pd.DataFrame(scores).T
scores_df.columns = ['P', 'R', 'F1', '#']
scores_df.index = labels
scores_df.loc['(micro)'] = calculate_score(actual, predicted, average='micro', labels=labels)
scores_df.loc['(macro)'] = calculate_score(actual, predicted, average='macro', labels=labels)

scores_df