# Fine Tuning DistilBERT for MultiLabel Vulnerability Classification Using Source Code

## Prepare

In [None]:
!pip install transformers

In [None]:
!pip install evaluate

In [None]:
# Importing stock ml libraries
import warnings
warnings.simplefilter('ignore')
import numpy as np
# import pandas as pd
from tqdm import tqdm
from sklearn import metrics
import transformers
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from transformers import DistilBertTokenizer, DistilBertModel
import logging
logging.basicConfig(level=logging.ERROR)
# Importing the libraries needed
import pandas as pd
import torch
import transformers
from torch.utils.data import Dataset, DataLoader
from transformers import DistilBertModel, DistilBertTokenizer
import re
import evaluate


In [None]:
# # Setting up the device for GPU usage

from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained('/home/bombbom/Documents/NLP_in_Detection_System/save_model/tokenizer_opcode/')

In [None]:
tokenizer

In [None]:
distil_model = DistilBertModel.from_pretrained("/home/bombbom/Documents/NLP_in_Detection_System/save_model/pre_train_opcode/checkpoint-50000/")
for prama in distil_model.parameters():
    prama.requires_grad=False

In [None]:
distil_model

In [None]:
data = pd.read_pickle("/home/bombbom/Documents/NLP_in_Detection_System/dataset_example/labeled_SBW_datasets.pkl")
data

In [None]:
def remove_comments(string):
    pattern = r"(\".*?\"|\'.*?\')|(/\*.*?\*/|//[^\r\n]*$)"
    # first group captures quoted strings (double or single)
    # second group captures comments (//single-line or /* multi-line */)
    regex = re.compile(pattern, re.MULTILINE|re.DOTALL)
    def _replacer(match):
        # if the 2nd group is not None, then we have captured a real comment string.
        if match.group(2) is not None:
            return "" 
        else: # otherwise, we will return the 1st group
            return match.group(1) 
    return regex.sub(_replacer, string)

In [None]:
def mapping(input):
    # return input
    input = input[1:-1].replace("'", "").replace(" ", "").split(",")
    label = [0,0,0,0,0,0,0,0]
    if 'access_control' in input:
        label[0] = 1
    if 'arithmetic' in input:
        label[1] = 1
    if "denial_service" in input:
        label[2] = 1
    if "front_running" in input:
        label[3] = 1
    if "reentrancy" in input:
        label[4] = 1
    if "time_manipulation" in input:
        label[5] = 1
    if "unchecked_low_calls" in input:
        label[6] = 1
    if "Other" in input:
        label[7] = 1
    
    return list(label)

In [None]:
data.categories = data.categories.apply(mapping)

In [None]:
data

In [None]:
data.source_code = data.source_code.apply(remove_comments)

In [None]:
data

In [None]:
data.to_pickle("dataset_example/address_source_vul_lines_SBW.pkl")

In [None]:
data = pd.read_pickle("dataset_example/address_opcode_vul_lines_SBW.pkl")

In [None]:
data = data.rename(columns = {"source_code": "text", "categories": "labels"})

In [None]:
data = pd.DataFrame(data)

In [None]:
data

In [None]:
# Sections of config

# Defining some key variables that will be used later on in the training
MAX_LEN = 512
TRAIN_BATCH_SIZE = 32
VALID_BATCH_SIZE = 32
EPOCHS = 10
LEARNING_RATE = 1e-05

In [None]:
class MultiLabelDataset(Dataset):

    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.text = dataframe.text
        self.targets = self.data.labels
        self.max_len = max_len

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]


        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.float)
        }

In [None]:
train_size = 0.8
train_data=data.sample(frac=train_size,random_state=20)
test_data=data.drop(train_data.index).reset_index(drop=True)
train_data = train_data.reset_index(drop=True)

In [None]:
print("FULL Dataset: {}".format(data.shape))
print("TRAIN Dataset: {}".format(train_data.shape))
print("TEST Dataset: {}".format(test_data.shape))

training_set = MultiLabelDataset(train_data, tokenizer, MAX_LEN)
testing_set = MultiLabelDataset(test_data, tokenizer, MAX_LEN)

In [None]:
train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)

# Fine Tune with MLP

In [None]:
choose_model = "MLP"

In [None]:
# Creating the customized model, by adding a drop out and a dense layer on top of distil bert to get the final output for the model. 

class DistilBERTClass(torch.nn.Module):
    def __init__(self):
        super(DistilBERTClass, self).__init__()
        # self.l1 = DistilBertModel.from_pretrained("distilbert-base-uncased")
        self.l1 = distil_model
        self.pre_classifier = torch.nn.Linear(768, 768)
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(768, 8)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state = output_1[0]
        pooler = hidden_state[:, 0]
        pooler = self.pre_classifier(pooler)
        pooler = torch.nn.Tanh()(pooler)
        pooler = self.dropout(pooler)
        output = self.classifier(pooler)
        return output

model = DistilBERTClass()
model.to(device)

# Fine Tune with LSTM

In [None]:
choose_model = "LSTM"

In [None]:
# Creating the customized model, by adding a drop out and a dense layer on top of distil bert to get the final output for the model. 

class DistilBERTClass(torch.nn.Module):
    def __init__(self):
        super(DistilBERTClass, self).__init__()
        # self.l1 = DistilBertModel.from_pretrained("distilbert-base-uncased")
        self.l1 = distil_model
        self.lstm1 = torch.nn.LSTM(input_size=768,
                            hidden_size=256,
                            num_layers=2,
                            batch_first=True,
                            bidirectional=False) 
        self.pre_classifier = torch.nn.Linear(256*2, 256)
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(768, 8)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask)
        hidden_state = output_1[0]
        # pooler = hidden_state[:, 0]
        self.lstm1.flatten_parameters()
        output, (hidden, cell) = self.lstm1(hidden_state)
        x = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        
        pooler = self.pre_classifier(x)
        pooler = torch.nn.Tanh()(pooler)
        pooler = self.dropout(pooler)
        output = self.classifier(pooler)
        return output

model = DistilBERTClass()
model.to(device)

# Fine Tune with BiLSTM

In [None]:
choose_model = "BiLSTM"

In [None]:
# Creating the customized model, by adding a drop out and a dense layer on top of distil bert to get the final output for the model. 

class DistilBERTClass(torch.nn.Module):
    def __init__(self):
        super(DistilBERTClass, self).__init__()
        # self.l1 = DistilBertModel.from_pretrained("distilbert-base-uncased")
        self.l1 = distil_model
        self.lstm1 = torch.nn.LSTM(input_size=768,
                            hidden_size=256,
                            num_layers=2,
                            batch_first=True,
                            bidirectional=True) 
        self.pre_classifier = torch.nn.Linear(256*2, 256)
        self.dropout = torch.nn.Dropout(0.1)
        self.classifier = torch.nn.Linear(768, 8)

    def forward(self, input_ids, attention_mask, token_type_ids):
        output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask)
        # What is output of output_1
        
        hidden_state = output_1[0]
        # pooler = hidden_state[:, 0]
        self.lstm1.flatten_parameters()
        output, (hidden, cell) = self.lstm1(hidden_state)
        x = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        
        pooler = self.pre_classifier(x)
        pooler = torch.nn.Tanh()(pooler)
        pooler = self.dropout(pooler)
        output = self.classifier(pooler)
        return output

model = DistilBERTClass()
model.to(device)

## Traning


In [None]:
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

In [None]:
optimizer = torch.optim.Adam(params =  model.parameters(), lr=LEARNING_RATE)

In [None]:
def train(epoch):
    model.train()
    for _,data in tqdm(enumerate(training_loader, 0)):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.float)

        outputs = model(ids, mask, token_type_ids)

        optimizer.zero_grad()
        loss = loss_fn(outputs, targets)
        if _%5000==0:
            print(f'Epoch: {epoch}, Loss:  {loss.item()}')
        
        loss.backward()
        optimizer.step()
    path_name = "Model_source_code_{choose_model}" + str(epoch) + ".pth"
    torch.save(obj=model.state_dict(),f=path_name)

In [None]:
for epoch in range(EPOCHS):
    train(epoch)

## Testing

In [None]:
def validation(testing_loader):
    model.eval()
    fin_targets=[]
    fin_outputs=[]
    with torch.no_grad():
        for _, data in tqdm(enumerate(testing_loader, 0)):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
            targets = data['targets'].to(device, dtype = torch.float)
            outputs = model(ids, mask, token_type_ids)
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())
    return fin_outputs, fin_targets

In [None]:
outputs, targets = validation(testing_loader)

final_outputs = np.array(outputs) >=0.5

In [None]:
clf_metrics = evaluate.combine(["accuracy", "f1", "precision", "recall"])

In [None]:
clf_metrics.compute(references=targets, predictions=final_outputs)