## **Multi-label SecBERT**

### Import

In [8]:
!pip install -q transformers

In [9]:
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
import re
import copy
from tqdm.notebook import tqdm
import gc

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.utils.data import Dataset, DataLoader

from sklearn.metrics import (
    accuracy_score,
    recall_score,
    precision_score,
    multilabel_confusion_matrix,
    f1_score,
    classification_report
)

from transformers import (
    AutoTokenizer,
    AutoModel,
    get_linear_schedule_with_warmup
)

from transformers import BertTokenizer


### Load the dataset

In [10]:
X_train = pd.read_csv('X_train.csv')
y_train = pd.read_csv('y_train.csv')
X_test = pd.read_csv('X_test.csv')
y_test = pd.read_csv('y_test.csv')
y_train = y_train.astype(int)
y_test = y_test.astype(int)

our_X_test = pd.read_csv('our_X_test.csv')

### Auxiliary Functions

In [11]:
def text_processing(text):
  doc = nlp(text)
  # Extract tokens for the given doc
  tokens = [token.lemma_ for token in doc if not (token.is_punct or token.is_space or token.is_stop) ]
  return tokens

def text_processing_keep_nouns_verbs(text):
  doc = nlp(text)
  # Extract tokens for the given doc
  tokens = [token.lemma_ for token in doc if not (token.is_punct or token.is_space or token.is_stop) and (token.pos_=='NOUN' or token.pos_=='VERB')]
  return tokens

In [12]:
def compute_metrics(predicted_y, true_y, metric_function, columns, limit):
  results = pd.DataFrame(columns = columns)
  results.loc[len(results)] = metric_function(true_y, predicted_y, average=None)
  sorted_results = results.sort_values(by=0, axis=1, ascending=False)
  return sorted_results.iloc[:, :limit]

In [13]:
def print_confusion_matrix(cf_matrix, name):
  ax = sns.heatmap(cf_matrix, annot=True, cmap='Blues')

  ax.set_title(name + ' Confusion Matrix \n\n');
  ax.set_xlabel('\nPredicted Values')
  ax.set_ylabel('Actual Values ');

  ## Ticket labels - List must be in alphabetical order
  ax.xaxis.set_ticklabels(['False','True'])
  ax.yaxis.set_ticklabels(['False','True'])

  ## Display the visualization of the Confusion Matrix.
  plt.show()

In [14]:
def print_F1_based_on_distribution(y_true, y_pred, Y, columns):
  fig,ax = plt.subplots()

  results = pd.DataFrame(columns = columns)
  results.loc[len(results)] = f1_score(y_true, y_pred, average=None)

  Y_count = Y.apply(np.sum, axis=0)
  Y_count_sorted = Y_count.sort_values(ascending=False)

  ax.bar(Y_count_sorted.index, Y_count_sorted.values)
  ax.set_xlabel("Techniques")
  ax.set_ylabel("Number of CVEs")
  plt.xticks(rotation=90)

  ax2=ax.twinx()
  ax2.plot(Y_count_sorted.index, results[Y_count_sorted.index].iloc[0], color='red')
  ax2.set_ylabel("F1 Score")

  ax = plt.gca()
  plt.show()

### Model Configuration

In [15]:
class Config:
  def __init__(self):
    super(Config, self).__init__()

    self.SEED = 42
    self.MODEL_PATH = 'jackaduma/SecBERT'
    self.NUM_LABELS = 31

    # data
    self.TOKENIZER = AutoTokenizer.from_pretrained(self.MODEL_PATH)
    self.MAX_LENGTH = 320
    self.BATCH_SIZE = 16
    self.VALIDATION_SPLIT = 0.25

     # model
    self.DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    self.FULL_FINETUNING = True
    self.LR = 3e-5
    self.OPTIMIZER = 'AdamW'
    self.CRITERION = 'BCEWithLogitsLoss'
    self.N_VALIDATE_DUR_TRAIN = 3
    self.N_WARMUP = 0
    self.SAVE_BEST_ONLY = True
    self.EPOCHS = 50

config = Config()

Downloading (…)lve/main/config.json:   0%|          | 0.00/467 [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/378k [00:00<?, ?B/s]

In [16]:
def clean_abstract(text):
    text = text.split()
    text = [x.strip() for x in text]
    text = [x.replace('\n', ' ').replace('\t', ' ') for x in text]
    text = ' '.join(text)
    text = re.sub('([.,!?()])', r' \1 ', text)
    return text

def get_texts(df):
    texts = df.apply(lambda x: clean_abstract(x))
    texts = texts.values.tolist()
    return texts

class TransformerDataset(Dataset):
  def __init__(self, df, labels=None, set_type=None):
    super(TransformerDataset, self).__init__()

    self.texts = get_texts(df)

    self.set_type = set_type
    if self.set_type != 'test':
      self.labels = labels


    self.tokenizer = config.TOKENIZER
    self.max_length = config.MAX_LENGTH

  def __len__(self):
      return len(self.texts)

  def __getitem__(self, index):
    tokenized = self.tokenizer.encode_plus(
        self.texts[index],
        max_length=self.max_length,
        pad_to_max_length=True,
        truncation=True,
        return_attention_mask=True,
        return_token_type_ids=False,
        return_tensors='pt'
    )
    input_ids = tokenized['input_ids'].squeeze()
    attention_mask = tokenized['attention_mask'].squeeze()

    if self.set_type != 'test':
      return {
          'input_ids': input_ids.long(),
          'attention_mask': attention_mask.long(),
          'labels': torch.Tensor(self.labels[index]).float(),
      }

    return {
        'input_ids': input_ids.long(),
        'attention_mask': attention_mask.long(),
    }

In [17]:
train_data = TransformerDataset(X_train['Text'], y_train.values)
val_data = TransformerDataset(X_test['Text'], y_test.values)

train_dataloader = DataLoader(train_data, batch_size=config.BATCH_SIZE)
val_dataloader = DataLoader(val_data, batch_size=config.BATCH_SIZE)

b = next(iter(train_dataloader))
for k, v in b.items():
    print(f'{k} shape: {v.shape}')

input_ids shape: torch.Size([16, 320])
attention_mask shape: torch.Size([16, 320])
labels shape: torch.Size([16, 31])




In [18]:
class Model(nn.Module):
  def __init__(self):
    super(Model, self).__init__()

    self.transformer_model = AutoModel.from_pretrained(
        config.MODEL_PATH
    )
    self.dropout = nn.Dropout(0.5)
    self.output = nn.Linear(768, config.NUM_LABELS)

  def forward(self, input_ids, attention_mask=None, token_type_ids=None):
    _, o2 = self.transformer_model(
        input_ids=input_ids,
        attention_mask=attention_mask,
        token_type_ids=token_type_ids,
        return_dict=False
    )
    x = self.dropout(o2)
    x = self.output(x)

    return x

In [19]:
device = config.DEVICE
device

device(type='cpu')

In [20]:
def val(model, val_dataloader, criterion):

    val_loss = 0
    true, pred = [], []

    # set model.eval() every time during evaluation
    model.eval()

    for step, batch in enumerate(val_dataloader):
        # unpack the batch contents and push them to the device (cuda or cpu).
        b_input_ids = batch['input_ids'].to(device)
        b_attention_mask = batch['attention_mask'].to(device)
        b_labels = batch['labels'].to(device)

        # using torch.no_grad() during validation/inference is faster -
        # - since it does not update gradients.
        with torch.no_grad():
            # forward pass
            logits = model(input_ids=b_input_ids, attention_mask=b_attention_mask)

            # calculate loss
            loss = criterion(logits, b_labels)
            val_loss += loss.item()
            # since we're using BCEWithLogitsLoss, to get the predictions -
            # - sigmoid has to be applied on the logits first
            logits = torch.sigmoid(logits)

            logits = np.round(logits.cpu().numpy())

            labels = b_labels.cpu().numpy()

            # the tensors are detached from the gpu and put back on -
            # - the cpu, and then converted to numpy in order to -
            # - use sklearn's metrics.

            pred.extend(logits)
            true.extend(labels)

    avg_val_loss = val_loss / len(val_dataloader)
    print('Eval Val loss:', avg_val_loss)
    print('Eval Val accuracy:', accuracy_score(true, pred))


    val_micro_f1_score = f1_score(true, pred, average='micro')
    print('Eval Val micro f1 score:', val_micro_f1_score)
    return val_micro_f1_score

def train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, epoch):

    # we validate config.N_VALIDATE_DUR_TRAIN times during the training loop
    nv = config.N_VALIDATE_DUR_TRAIN
    temp = len(train_dataloader) // nv

    if temp > 100:
      temp = temp - (temp % 100)
    validate_at_steps = [temp * x for x in range(1, nv + 1)]

    train_loss = 0
    for step, batch in enumerate(tqdm(train_dataloader,
                                      desc='Epoch ' + str(epoch))):
        # set model.eval() every time during training
        model.train()

        # unpack the batch contents and push them to the device (cuda or cpu).
        b_input_ids = batch['input_ids'].to(device)
        b_attention_mask = batch['attention_mask'].to(device)
        b_labels = batch['labels'].to(device)

        # clear accumulated gradients
        optimizer.zero_grad()

        # forward pass
        logits = model(input_ids=b_input_ids, attention_mask=b_attention_mask)

        # calculate loss
        loss = criterion(logits, b_labels)
        train_loss += loss.item()

        # backward pass
        loss.backward()

        # update weights
        optimizer.step()

        # update scheduler
        scheduler.step()

        if step in validate_at_steps:
            print(f'-- Step: {step}')
            _ = val(model, val_dataloader, criterion)

    avg_train_loss = train_loss / len(train_dataloader)
    print('Training loss:', avg_train_loss)

In [21]:
def run():
    # setting a seed ensures reproducible results.
    # seed may affect the performance too.
    torch.manual_seed(config.SEED)

    criterion = nn.BCEWithLogitsLoss()

    # define the parameters to be optmized -
    # - and add regularization
    if config.FULL_FINETUNING:
        param_optimizer = list(model.named_parameters())
        no_decay = ["bias", "LayerNorm.bias", "LayerNorm.weight"]
        optimizer_parameters = [
            {
                "params": [
                    p for n, p in param_optimizer if not any(nd in n for nd in no_decay)
                ],
                "weight_decay": 0.001,
            },
            {
                "params": [
                    p for n, p in param_optimizer if any(nd in n for nd in no_decay)
                ],
                "weight_decay": 0.0,
            },
        ]
        optimizer = optim.AdamW(optimizer_parameters, lr=config.LR)

    num_training_steps = len(train_dataloader) * config.EPOCHS
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=0,
        num_training_steps=num_training_steps
    )

    max_val_micro_f1_score = float('-inf')
    for epoch in range(config.EPOCHS):
        train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, epoch)
        val_micro_f1_score = val(model, val_dataloader, criterion)
        print("Epoch " + str(epoch) + "/" + str(config.EPOCHS) + ": F1 Score " + str(val_micro_f1_score))
        if config.SAVE_BEST_ONLY:
            if val_micro_f1_score > max_val_micro_f1_score:
                best_model = copy.deepcopy(model)
                best_val_micro_f1_score = val_micro_f1_score

                model_name = 'scibertfft_best_model'
                torch.save(best_model.state_dict(), model_name + '.pt')

                print(f'--- Best Model. Val loss: {max_val_micro_f1_score} -> {val_micro_f1_score}')
                max_val_micro_f1_score = val_micro_f1_score

    return best_model, best_val_micro_f1_score

### Train

In [22]:
model = Model()
model.to(device);

Downloading model.safetensors:   0%|          | 0.00/336M [00:00<?, ?B/s]

In [None]:
best_model, best_val_micro_f1_score = run()

Epoch 0:   0%|          | 0/85 [00:00<?, ?it/s]



-- Step: 28
Eval Val loss: 0.2769408904016018
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0




-- Step: 56
Eval Val loss: 0.2346184089779854
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0




-- Step: 84
Eval Val loss: 0.21935744360089302
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0
Training loss: 0.3052684550776201




Eval Val loss: 0.21935744360089302
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0
Epoch 0/50: F1 Score 0.0
--- Best Model. Val loss: -inf -> 0.0


Epoch 1:   0%|          | 0/85 [00:00<?, ?it/s]



-- Step: 28
Eval Val loss: 0.2122371420264244
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0




-- Step: 56
Eval Val loss: 0.209991854429245
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0




-- Step: 84
Eval Val loss: 0.20836231634020805
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0
Training loss: 0.21243441928835477




Eval Val loss: 0.20836231634020805
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0
Epoch 1/50: F1 Score 0.0


Epoch 2:   0%|          | 0/85 [00:00<?, ?it/s]



-- Step: 28
Eval Val loss: 0.2063296116888523
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0




-- Step: 56
Eval Val loss: 0.20368406772613526
Eval Val accuracy: 0.0
Eval Val micro f1 score: 0.0




-- Step: 84
Eval Val loss: 0.2000017873942852
Eval Val accuracy: 0.0031545741324921135
Eval Val micro f1 score: 0.02903225806451613
Training loss: 0.20325303042636197




Eval Val loss: 0.2000017873942852
Eval Val accuracy: 0.0031545741324921135
Eval Val micro f1 score: 0.02903225806451613
Epoch 2/50: F1 Score 0.02903225806451613
--- Best Model. Val loss: 0.0 -> 0.02903225806451613


Epoch 3:   0%|          | 0/85 [00:00<?, ?it/s]



-- Step: 28
Eval Val loss: 0.19623705893754959
Eval Val accuracy: 0.006309148264984227
Eval Val micro f1 score: 0.03225806451612903




### Test

In [None]:
test_data = TransformerDataset(our_X_test['Text'], y_test.values, set_type='test')
test_dataloader = DataLoader(test_data, batch_size=config.BATCH_SIZE)

In [None]:
def predict(model):
    val_loss = 0
    test_pred = []
    model.eval()
    for step, batch in enumerate(test_dataloader):
        b_input_ids = batch['input_ids'].to(device)
        b_attention_mask = batch['attention_mask'].to(device)

        with torch.no_grad():
            logits = model(input_ids=b_input_ids, attention_mask=b_attention_mask)
            logits = torch.sigmoid(logits)
            logits = np.round(logits.cpu().numpy())
            test_pred.extend(logits)

    test_pred = np.array(test_pred)
    return test_pred

In [None]:
test_pred = predict(best_model)

In [None]:
final_df = pd.DataFrame(test_pred, column = ['Process Injection',
                               'Access Token Manipulation',
                               'Hijack Execution Flow',
                               'Data from Local System',
                               'External Remote Services',
                               'Data Manipulation',
                               'Network Sniffing',
                               'Exploitation for Privilege Escalation',
                               'Command and Scripting Interpreter',
                               'Phishing',
                               'Server Software Component',
                               'Archive Collected Data',
                               'Data Destruction',
                               'Browser Session Hijacking',
                               'Exploitation for Credential Access',
                               'Abuse Elevation Control Mechanism',
                               'Adversary-in-the-Middle',
                               'User Execution',
                               'Unsecured Credentials',
                               'Brute Force',
                               'File and Directory Discovery',
                               'Valid Accounts',
                               'Exploitation for Defense Evasion',
                               'Create Account',
                               'Endpoint Denial of Service',
                               'Drive-by Compromise	',
                               'Exploitation for Client Execution',
                               'Exploitation of Remote Services',
                               'Stage Capabilities',
                               'Exploit Public-Facing Application',
                               'Forge Web Credentials',],index = None))

print(final_df)
final_df.to_csv('our_y_test.csv',encoding = 'utf-8',index = None)