In [None]:
import pandas as pd
import numpy as np
import csv
import pickle
import re
import torch
import sklearn
import os
import random
import custom
import models
import regex_def
import clang
from clang import *
from clang import cindex
from pathlib import Path
from tokenizers import ByteLevelBPETokenizer
from tokenizers.implementations import ByteLevelBPETokenizer
from tokenizers.processors import BertProcessing
from torch.utils.data import Dataset, DataLoader, IterableDataset
from transformers import RobertaConfig
from transformers import RobertaForMaskedLM, RobertaForSequenceClassification
from transformers import RobertaTokenizerFast
from transformers import DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments
from transformers import LineByLineTextDataset
from transformers.modeling_outputs import SequenceClassifierOutput
from custom import CustomDataCollatorForLanguageModeling

## Pre-requisites stuff

In [None]:
## Set default device (GPU or CPU)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
## Deterministic/reproducible flags

seedlist = [42, 834, 692, 489, 901, 408, 819, 808, 531, 166]

seed = seedlist[0]
os.environ['PYTHONHASHSEED'] = str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.enabled = True
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
## Weights and Biases flags

os.environ['WANDB_DISABLED'] = 'true'
os.environ['WANDB_MODE'] = 'dryrun'
# os.environ["CUDA_VISIBLE_DEVICES"]=""
#os.environ['WANDB_NOTEBOOK_NAME'] = 'Pretrain word-level VulBERTa on Draper'
#os.environ['WANDB_NAME'] = 'linux'
#os.environ['WANDB_PROJECT'] = 'projectName'

## Load/initialise custom tokenizer

In [None]:
## Tokenizer

from tokenizers.pre_tokenizers import PreTokenizer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers import NormalizedString,PreTokenizedString
from typing import List 

class MyTokenizer:
    
    cidx = cindex.Index.create()
        

    def clang_split(self, i: int, normalized_string: NormalizedString) -> List[NormalizedString]:
        ## Tokkenize using clang
        tok = []
        tu = self.cidx.parse('tmp.c',
                       args=[''],  
                       unsaved_files=[('tmp.c', str(normalized_string.original))],  
                       options=0)
        for t in tu.get_tokens(extent=tu.cursor.extent):
            spelling = t.spelling.strip()
            
            if spelling == '':
                continue
                
            ## Keyword no need

            ## Punctuations no need

            ## Literal all to BPE
            
            #spelling = spelling.replace(' ', '')
            tok.append(NormalizedString(spelling))

        return(tok)
    
    def pre_tokenize(self, pretok: PreTokenizedString):
        pretok.split(self.clang_split)
        
## Custom tokenizer

from tokenizers import Tokenizer
from tokenizers import normalizers,decoders
from tokenizers.normalizers import StripAccents, unicode_normalizer_from_str, Replace
from tokenizers.processors import TemplateProcessing
from tokenizers import processors,pre_tokenizers
from tokenizers.models import BPE

## Init new tokenizers
#my_tokenizer = Tokenizer(BPE(unk_token="<unk>"))
#my_tokenizer = Tokenizer(BPE())


## Load pre-trained tokenizers
vocab, merges = BPE.read_file(vocab="./tokenizer/drapgh-vocab.json", merges="./tokenizer/drapgh-merges.txt")
my_tokenizer = Tokenizer(BPE(vocab, merges, unk_token="<unk>"))

my_tokenizer.normalizer = normalizers.Sequence([StripAccents(), Replace(" ", "Ä")])
my_tokenizer.pre_tokenizer = PreTokenizer.custom(MyTokenizer())
my_tokenizer.post_processor = processors.ByteLevel(trim_offsets=False)
my_tokenizer.post_processor = TemplateProcessing(
    single="<s> $A </s>",
    special_tokens=[
    ("<s>",0),
    ("<pad>",1),
    ("</s>",2),
    ("<unk>",3),
    ("<mask>",4)
    ]
)


### Choose and prepare testing dataset

In [None]:
### Choose the dataset ('draper','vuldeepecker','devign','reveal')
mydataset = 'mvd'

In [None]:
my_tokenizer.enable_truncation(max_length=1024)
my_tokenizer.enable_padding(direction='right', pad_id=1, pad_type_id=0, pad_token='<pad>', length=None, pad_to_multiple_of=None)

In [None]:
def process_encodings(encodings):
    input_ids=[]
    attention_mask=[]
    for enc in encodings:
        input_ids.append(enc.ids)
        attention_mask.append(enc.attention_mask)
    return {'input_ids':input_ids, 'attention_mask':attention_mask}

In [None]:
def cleaner(code):
    ## Remove code comments
    pat = re.compile(r'(/\*([^*]|(\*+[^*/]))*\*+/)|(//.*)')
    code = re.sub(pat,'',code)
    code = re.sub('\n','',code)
    code = re.sub('\t','',code)
    return(code)

In [None]:
class MyCustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
        assert len(self.encodings['input_ids']) == len(self.encodings['attention_mask']) ==  len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [None]:
if mydataset=='devign':
    test_index=set()

    with open('data/finetune/devign/test.txt') as f:
        for line in f:
            line=line.strip()
            test_index.add(int(line))
    mydata = pd.read_json('data/finetune/devign/Devign.json')
    m3=mydata.iloc[list(test_index)]

    mydata = None
    del(mydata)
    m3.func = m3.func.apply(cleaner)

    test_encodings = my_tokenizer.encode_batch(m3.func)
    test_encodings = process_encodings(test_encodings)
    test_dataset = MyCustomDataset(test_encodings, m3.target.tolist())
else:
    m3 = pd.read_pickle('data/finetune/%s/%s_test.pkl'%(mydataset,mydataset))
    
    
    try:
        m3.functionSource = m3.functionSource.apply(cleaner)
        test_encodings = my_tokenizer.encode_batch(m3.functionSource)
        test_encodings = process_encodings(test_encodings)
        
        if  mydataset =='draper':
            test_dataset = MyCustomDataset(test_encodings, (m3['combine']*1).tolist())
        else:
            test_dataset = MyCustomDataset(test_encodings, m3.label.tolist())
    except:
        m3.func = m3.func.apply(cleaner)
        test_encodings = my_tokenizer.encode_batch(m3.func)
        test_encodings = process_encodings(test_encodings)
        test_dataset = MyCustomDataset(test_encodings, m3.label.tolist())

In [None]:
## D2A ONLY
task = 'function'
m3 = pd.read_csv('data/finetune/%s/%s/d2a_lbv1_%s_val.csv'%(mydataset,task,task))
m3.code = m3.code.apply(cleaner)
test_encodings = my_tokenizer.encode_batch(m3.code)
test_encodings = process_encodings(test_encodings)
test_dataset = MyCustomDataset(test_encodings, m3.label.tolist())
#test_dataset = MyCustomDataset(test_encodings, [0]*len(m3))

## Load fine-tuned VulBERTa-MLP model

In [None]:
mymodel=mydataset

In [None]:
model = RobertaForSequenceClassification.from_pretrained('./models/VB/%s' % mymodel)
print(model.num_parameters())

In [None]:
test_loader = DataLoader(test_dataset, batch_size=128)

In [None]:
def softmax_accuracy(probs,all_labels):
    def getClass(x):
        return(x.index(max(x)))
    
    all_labels = all_labels.tolist()
    probs = pd.Series(probs.tolist())
    all_predicted = probs.apply(getClass)
    all_predicted.reset_index(drop=True, inplace=True)
    vc = pd.value_counts(all_predicted == all_labels)
    try:
        acc = vc[1]/len(all_labels)
    except:
        if(vc.index[0]==False):
            acc = 0
        else:
            acc = 1
    return(acc,all_predicted)


In [None]:
%%capture

multigpu=True
if multigpu:
    model = torch.nn.DataParallel(model)
model.to(device)

### Predict

In [None]:
all_pred=[]
all_labels=[]
all_probs=[]
model.eval()
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs[0]
        acc_val,pred = softmax_accuracy(torch.nn.functional.softmax(outputs[1],dim=1),labels)
        all_pred += pred.tolist()
        all_labels += labels.tolist()
        all_probs += outputs[1].tolist()

### Calculate the evaluation metrics

In [None]:
confusion = sklearn.metrics.confusion_matrix(y_true=all_labels, y_pred=all_pred)
print('Confusion matrix: \n',confusion)

tn, fp, fn, tp = confusion.ravel()
print('\nTP:',tp)
print('FP:',fp)
print('TN:',tn)
print('FN:',fn)

probs2=[]
for x in all_probs:
    probs2.append(x[1])

## Performance measure
print('\nAccuracy: '+ str(sklearn.metrics.accuracy_score(y_true=all_labels, y_pred=all_pred)))
print('Precision: '+ str(sklearn.metrics.precision_score(y_true=all_labels, y_pred=all_pred)))
print('Recall: '+ str(sklearn.metrics.recall_score(y_true=all_labels, y_pred=all_pred)))
print('F-measure: '+ str(sklearn.metrics.f1_score(y_true=all_labels, y_pred=all_pred)))
print('Precision-Recall AUC: '+ str(sklearn.metrics.average_precision_score(y_true=all_labels, y_score=probs2)))
print('AUC: '+ str(sklearn.metrics.roc_auc_score(y_true=all_labels, y_score=probs2)))
print('MCC: '+ str(sklearn.metrics.matthews_corrcoef(y_true=all_labels, y_pred=all_pred)))