# Global settings

In [1]:
# References:
# This source code file refers to:
# https://github.com/ICL-ml4csec/VulBERTa
# https://towardsdatascience.com/text-classification-with-bert-in-pytorch-887965e5820f
# https://huggingface.co/docs/transformers/model_doc/roberta


In [2]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
import random
import torch
import numpy as np
import shutil

def write_to_file(text, path, mode='a'): # 'a': append; 'w': overwrite
    with open(path, mode) as f:
        f.write(text)

def mkdir_if_not_exist(directory):
    if not directory: return
    if not os.path.exists(directory):
        os.mkdir(directory)

def remove_file_if_exist(path):
    if not path: return
    if os.path.exists(path):
        try:
            os.remove(path)
        except:
            shutil.rmtree(path)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('using', device)

# The following randomization refers to: https://github.com/ICL-ml4csec/VulBERTa/blob/main/Finetuning_VulBERTa-MLP.ipynb
seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
os.environ['WANDB_DISABLED'] = 'true'
os.environ['WANDB_MODE'] = 'dryrun'

# -------------------------------------- start

DATASET_NAME = 'combined'
# DATASET_MASKING = 'masked_'
DATASET_MASKING = ''

codeTF_check_point = 'vulberta_ 0.6865_ep2.pt'
msgTF_check_point = 'roberta_base_fc_0.9767_ep9(combined).pt'

# -------------------------------------- end

pretrained_model_path = '/root/autodl-tmp/VulBERTa/'

root_directory = '/root/autodl-tmp'
dataset_directory = f'{root_directory}/output_dataset_1/{DATASET_MASKING}{DATASET_NAME}'
init_train_path = f'{dataset_directory}/train.json'
init_val_path = f'{dataset_directory}/val.json'
init_test_path = f'{dataset_directory}/test.json'
intermediate_directory = f'{root_directory}/intermediate/{DATASET_MASKING}{DATASET_NAME}'
mkdir_if_not_exist(f'{root_directory}/intermediate')
mkdir_if_not_exist(intermediate_directory)

finetuned_ct_model_path = f'{root_directory}/codeTF_check_point/{DATASET_MASKING}{DATASET_NAME}/{codeTF_check_point}'
intermediate_ct_train_path = f'{intermediate_directory}/ct_train.txt'
intermediate_ct_val_path = f'{intermediate_directory}/ct_val.txt'
intermediate_ct_test_path = f'{intermediate_directory}/ct_test.txt'

finetuned_mt_model_path = f'{root_directory}/msgTF_check_point/{DATASET_MASKING}{DATASET_NAME}/{msgTF_check_point}'
intermediate_mt_train_path = f'{intermediate_directory}/mt_train.txt'
intermediate_mt_val_path = f'{intermediate_directory}/mt_val.txt'
intermediate_mt_test_path = f'{intermediate_directory}/mt_test.txt'


using cuda


# CodeTransformer

In [2]:
from tqdm import tqdm
import sys
import pandas as pd
import numpy as np
import csv
import pickle
import re
import torch
import sklearn
import random
import clang
from clang import *
from clang import cindex
from pathlib import Path
from tokenizers import ByteLevelBPETokenizer
from tokenizers.implementations import ByteLevelBPETokenizer
from tokenizers.processors import BertProcessing
from torch.utils.data import Dataset, DataLoader, IterableDataset
from transformers import RobertaConfig
from transformers import RobertaForMaskedLM, RobertaForSequenceClassification
from transformers import RobertaTokenizerFast
from transformers import DataCollatorForLanguageModeling
from transformers import Trainer, TrainingArguments
from transformers import LineByLineTextDataset
from transformers.modeling_outputs import SequenceClassifierOutput
from tokenizers.pre_tokenizers import PreTokenizer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers import NormalizedString,PreTokenizedString
from typing import List
from tokenizers import Tokenizer
from tokenizers import normalizers,decoders
from tokenizers.normalizers import StripAccents, unicode_normalizer_from_str, Replace
from tokenizers.processors import TemplateProcessing
from tokenizers import processors,pre_tokenizers
from tokenizers.models import BPE
from sklearn import metrics

# definitions
class MyTokenizer:
    cidx = cindex.Index.create()

    def clang_split(self, i: int, normalized_string: NormalizedString) -> List[NormalizedString]:
        ## Tokkenize using clang
        tok = []
        tu = self.cidx.parse('tmp.c',
                       args=[''],  
                       unsaved_files=[('tmp.c', str(normalized_string.original))],  
                       options=0)
        for t in tu.get_tokens(extent=tu.cursor.extent):
            spelling = t.spelling.strip()
            if spelling == '': continue
            ## Keyword no need
            ## Punctuations no need
            ## Literal all to BPE
            #spelling = spelling.replace(' ', '')
            tok.append(NormalizedString(spelling))
        return(tok)

    def pre_tokenize(self, pretok: PreTokenizedString):
        pretok.split(self.clang_split)

def process_encodings(encodings):
    input_ids=[]
    attention_mask=[]
    for enc in encodings:
        input_ids.append(enc.ids)
        attention_mask.append(enc.attention_mask)
    return {'input_ids':input_ids, 'attention_mask':attention_mask}

# ------------------------------------------------------------------------------
# tokenize and load dataset
print('Tokenizing dataset...')
vocab, merges = BPE.read_file(vocab="./tokenizer/drapgh-vocab.json", merges="./tokenizer/drapgh-merges.txt")
my_tokenizer = Tokenizer(BPE(vocab, merges, unk_token="<unk>"))

my_tokenizer.normalizer = normalizers.Sequence([StripAccents(), Replace(" ", "Ä")])
my_tokenizer.pre_tokenizer = PreTokenizer.custom(MyTokenizer())
my_tokenizer.post_processor = processors.ByteLevel(trim_offsets=False)
my_tokenizer.post_processor = TemplateProcessing(
    single="<s> $A </s>",
    special_tokens=[
    ("<s>",0),
    ("<pad>",1),
    ("</s>",2),
    ("<unk>",3),
    ("<mask>",4)
    ]
)

my_tokenizer.enable_truncation(max_length=1024)
my_tokenizer.enable_padding(direction='right', pad_id=1, pad_type_id=0, pad_token='<pad>', length=None, pad_to_multiple_of=None)

m1 = pd.read_json(init_train_path)
m2 = pd.read_json(init_val_path)

train_encodings = my_tokenizer.encode_batch(m1.commit_patch)
train_encodings = process_encodings(train_encodings)

val_encodings = my_tokenizer.encode_batch(m2.commit_patch)
val_encodings = process_encodings(val_encodings)

print('Done')


Tokenizing dataset...
Done


In [3]:
class MyCustomDataset(Dataset):
    def __init__(self, ids, encodings, labels):
        self.ids = ids
        self.encodings = encodings
        self.labels = labels
        assert len(self.encodings['input_ids']) == len(self.encodings['attention_mask']) == len(self.labels) == len(self.ids)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        item['ids'] = self.ids[idx]
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = MyCustomDataset(m1.id.tolist(), train_encodings, m1.label.tolist())
val_dataset = MyCustomDataset(m2.id.tolist(), val_encodings, m2.label.tolist())


In [4]:
# ------------------------------------------------------------------------------
# generate intermediate data by CodeTransformer
from sklearn import metrics

print('Generating intermediate data...')
model = RobertaForSequenceClassification.from_pretrained(pretrained_model_path)
model.to(device)
model.load_state_dict(torch.load(finetuned_ct_model_path))

def generate_ct_intermediate_dataset(input_data, intermediate_data_path, evaluate=True):
    data_loader = DataLoader(input_data, batch_size=128)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    total_acc = 0
    predict_all = np.array([], dtype=int)
    labels_all = np.array([], dtype=int)

    model.eval()
    with torch.no_grad():
        for batch in tqdm(data_loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            ids = batch['ids']
            outputs = model(input_ids, attention_mask=attention_mask)
            # outputs['logits'] is equal to outputs[0]
            outputs = outputs['logits']

            probs = torch.nn.functional.softmax(outputs, dim=1).tolist()
            assert(len(probs) == len(labels))
            pred_list = outputs.argmax(dim=1).data.cpu().numpy().tolist()
            for i in range(len(probs)):
                id_ = ids[i]
                prob = probs[i]
                label = int(labels[i])
                pred_ = int(pred_list[i])
                content = '\t'.join([str(i) for i in [id_] + prob + [pred_] + [label]]) + '\n'
                write_to_file(content, intermediate_data_path)

            if evaluate:
                acc = (outputs.argmax(dim=1) == labels).sum().item()
                total_acc += acc

                labels = labels.data.cpu().numpy()
                predic = outputs.argmax(dim=1).data.cpu().numpy()
                labels_all = np.append(labels_all, labels)
                predict_all = np.append(predict_all, predic)

    if evaluate:
        report = metrics.classification_report(labels_all, predict_all, target_names=['benign', 'vulnerable'], digits=4)
        confusion = metrics.confusion_matrix(labels_all, predict_all)
        print(f'Test Accuracy: {total_acc / len(input_data): .4f}')
        print(report)
        print(confusion)

remove_file_if_exist(intermediate_ct_train_path)
remove_file_if_exist(intermediate_ct_val_path)

generate_ct_intermediate_dataset(train_dataset, intermediate_ct_train_path, False)
generate_ct_intermediate_dataset(val_dataset, intermediate_ct_val_path)



Generating intermediate data...


Some weights of the model checkpoint at /root/autodl-tmp/VulBERTa/ were not used when initializing RobertaForSequenceClassification: ['lm_head.layer_norm.weight', 'lm_head.dense.bias', 'lm_head.layer_norm.bias', 'lm_head.decoder.bias', 'lm_head.decoder.weight', 'lm_head.bias', 'lm_head.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at /root/autodl-tmp/VulBERTa/ and are newly initialized: ['classifier.out_proj.bias', 'classifier.dense

Test Accuracy:  0.6865
              precision    recall  f1-score   support

      benign     0.6913    0.6898    0.6906      2389
  vulnerable     0.6816    0.6830    0.6823      2322

    accuracy                         0.6865      4711
   macro avg     0.6864    0.6864    0.6864      4711
weighted avg     0.6865    0.6865    0.6865      4711

[[1648  741]
 [ 736 1586]]





# MsgTransformer

In [2]:
import pandas as pd
import numpy as np
import torch
from transformers import BertTokenizer
from torch import nn
from transformers import BertModel
from transformers import RobertaModel, RobertaTokenizerFast
from torch.optim import Adam
from tqdm import tqdm
from sklearn import metrics
from torch.nn.parallel import DistributedDataParallel
import os
import random

# definitions
seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

BERT_CONFIG = 'roberta-base'
labels = {0:0, 1:1}
BATCH_SIZE = 128
tokenizer = RobertaTokenizerFast.from_pretrained(BERT_CONFIG)

class Dataset(torch.utils.data.Dataset):
    def __init__(self, df):
        self.labels = [labels[label] for label in df['label']]
        self.texts = [tokenizer(text, padding='max_length', max_length=512, truncation=True,
                                return_tensors="pt") for text in df['commit_message']]
        self.ids = [id_ for id_ in df['id']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):
        batch_ids = self.ids[idx]
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_ids, batch_texts, batch_y

class BertClassifier(nn.Module):
    def __init__(self, dropout=0.5):
        super(BertClassifier, self).__init__()

        self.bert = RobertaModel.from_pretrained(BERT_CONFIG)
        self.dropout = nn.Dropout(dropout)
        if BERT_CONFIG == 'roberta-large':
            self.linear = nn.Linear(1024, len(labels))
        else:
            self.linear = nn.Linear(768, len(labels))
        self.relu = nn.ReLU()

    def forward(self, input_id, mask):
        _, pooled_output = self.bert(input_ids=input_id, attention_mask=mask, return_dict=False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        # final_layer = self.relu(linear_output) # IMPO CHANGE
        return linear_output

    def check_parameters(self):
        print('The number of Bert parameters:', self.bert.num_parameters())

# ------------------------------------------------------------------------------
# generate intermediate data by MsgTransformer and evaluation
model = BertClassifier()
model.to(device)
model.load_state_dict(torch.load(finetuned_mt_model_path))

def generate_mt_intermediate_dataset(input_data, intermediate_data_path, evaluate=True):
    data_loader = torch.utils.data.DataLoader(Dataset(input_data), batch_size=BATCH_SIZE)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    total_acc = 0
    predict_all = np.array([], dtype=int)
    labels_all = np.array([], dtype=int)

    model.eval()
    with torch.no_grad():
        for ids, texts, labels in tqdm(data_loader):
            labels = labels.to(device)
            masks = texts['attention_mask'].to(device)
            input_ids = texts['input_ids'].squeeze(1).to(device)
            outputs = model(input_ids, masks)

            probs = torch.nn.functional.softmax(outputs, dim=1).tolist()
            assert(len(probs) == len(labels))
            pred_list = outputs.argmax(dim=1).data.cpu().numpy().tolist()
            assert(len(probs) == len(pred_list))
            assert(len(probs) == len(ids))
            for i in range(len(probs)):
                id_ = ids[i]
                prob = probs[i]
                label = int(labels[i])
                pred_ = int(pred_list[i])
                content = '\t'.join([str(i) for i in [id_] + prob + [pred_] + [label]]) + '\n'
                write_to_file(content, intermediate_data_path)

            if evaluate:
                acc = (outputs.argmax(dim=1) == labels).sum().item()
                total_acc += acc

                labels = labels.data.cpu().numpy()
                predic = outputs.argmax(dim=1).data.cpu().numpy()
                labels_all = np.append(labels_all, labels)
                predict_all = np.append(predict_all, predic)

    if evaluate:
        report = metrics.classification_report(labels_all, predict_all, target_names=['benign', 'vulnerable'], digits=4)
        confusion = metrics.confusion_matrix(labels_all, predict_all)
        print(f'Test Accuracy: {total_acc / len(input_data): .4f}')
        print(report)
        print(confusion)

df_train = pd.read_json(init_train_path)
df_val = pd.read_json(init_val_path)

remove_file_if_exist(intermediate_mt_train_path)
remove_file_if_exist(intermediate_mt_val_path)

generate_mt_intermediate_dataset(df_train, intermediate_mt_train_path, False)
generate_mt_intermediate_dataset(df_val, intermediate_mt_val_path)



Some weights of the model checkpoint at roberta-base were not used when initializing RobertaModel: ['lm_head.dense.bias', 'lm_head.bias', 'lm_head.layer_norm.weight', 'lm_head.decoder.weight', 'lm_head.layer_norm.bias', 'lm_head.dense.weight']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
100%|██████████| 148/148 [01:43<00:00,  1.43it/s]
100%|██████████| 37/37 [00:26<00:00,  1.42it/s]

Test Accuracy:  0.9767
              precision    recall  f1-score   support

      benign     0.9746    0.9795    0.9770      2389
  vulnerable     0.9788    0.9737    0.9763      2322

    accuracy                         0.9767      4711
   macro avg     0.9767    0.9766    0.9766      4711
weighted avg     0.9767    0.9767    0.9766      4711

[[2340   49]
 [  61 2261]]





# Combine everything into intermediate dataset

In [3]:
import json

intermediate_train_path = f'{intermediate_directory}/train.txt'
intermediate_val_path = f'{intermediate_directory}/val.txt'
intermediate_test_path = f'{intermediate_directory}/test.txt'

def generate_intermediate_dataset(intermediate_mt_data_path, intermediate_ct_data_path, intermediate_data_path, init_data_path):
    with open(intermediate_mt_data_path) as f:
        mt_data_list = f.read().split('\n')
    
    with open(intermediate_ct_data_path) as f:
        ct_data_list = f.read().split('\n')
    
    mt_data_list = mt_data_list[:-1] if not mt_data_list[-1] else mt_data_list
    ct_data_list = ct_data_list[:-1] if not ct_data_list[-1] else ct_data_list

    assert(len(mt_data_list) == len(ct_data_list))

    for i in range(len(mt_data_list)):
        mt_data = mt_data_list[i].split('\t')
        ct_data = ct_data_list[i].split('\t')
        assert(mt_data[0] == ct_data[0])
        assert(mt_data[-1] == ct_data[-1])
        data_id = mt_data[0]
        label = mt_data[-1]
        mt_pred = mt_data[-2]
        ct_pred = ct_data[-2]
        content = '\t'.join([data_id] + mt_data[1:3] + ct_data[1:3] + [str(mt_pred)] + [str(ct_pred)] + [label])
        content = content + '\n' if i < len(mt_data_list) - 1 else content
        write_to_file(content, intermediate_data_path)

remove_file_if_exist(intermediate_train_path)
remove_file_if_exist(intermediate_val_path)

generate_intermediate_dataset(intermediate_mt_train_path, intermediate_ct_train_path, intermediate_train_path, init_train_path)
generate_intermediate_dataset(intermediate_mt_val_path, intermediate_ct_val_path, intermediate_val_path, init_val_path)


# Ensemble learning

In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from sklearn import metrics

EPOCHS = 80
LR = 1e-6
BATCH_SIZE = 4
intermediate_train_path = f'{intermediate_directory}/train.txt'
intermediate_val_path = f'{intermediate_directory}/val.txt'
MODEL_SAVE_PATH = f'{root_directory}/ensemble_model/{DATASET_MASKING}{DATASET_NAME}'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

class MyDataset(Dataset):
    def __init__(self, path):
        with open(path) as f:
            data_list = f.read().split('\n')
        self.labels = [ int(data.split('\t')[-1]) for data in data_list ]
        self.inputs = []
        for data in data_list:
            tmp_list = data.split('\t')[:-1]
            for i in range(1, 5):
                tmp_list[i] = float(tmp_list[i])
            for i in range(5, len(tmp_list)):
                tmp_list[i] = int(tmp_list[i])
            self.inputs.append(tmp_list)
        
        assert(len(self.labels) == len(self.inputs))

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        x = self.inputs[idx]
        y = self.labels[idx]
        return x[0], x[1], x[2], x[3], x[4], x[5], x[6], y

class MLP(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()

        self.dropout = nn.Dropout(0.1)
        self.fc1 = nn.Linear(input_dim, 20)
        self.out = nn.Linear(20, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.out(x)
        return x

def train(model, train_dataset, val_dataset):
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
    val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

    model = model.to(device)
    optimizer = Adam(model.parameters(), lr=LR)
    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)

    for epoch_num in range(EPOCHS):
        model.train()
        total_acc_train = 0
        total_loss_train = 0
        for x0, x1, x2, x3, x4, x5, x6, y in tqdm(train_dataloader):
            x = torch.transpose(torch.stack([x1, x2, x3, x4]), 0, 1).float().to(device)
            y = y.to(device)
            y_pred = model(x)

            loss = criterion(y_pred, y)
            total_loss_train += loss.item()

            acc = (y_pred.argmax(dim=1) == y).sum().item()
            total_acc_train += acc

            model.zero_grad()
            loss.backward()
            optimizer.step()

        total_acc_val = 0
        total_loss_val = 0
        model.eval()
        with torch.no_grad():
            for x0, x1, x2, x3, x4, x5, x6, y in val_dataloader:
                x = torch.transpose(torch.stack([x1, x2, x3, x4]), 0, 1).float().to(device)
                y = y.to(device)
                y_pred = model(x)

                loss = criterion(y_pred, y)
                total_loss_val += loss.item()

                acc = (y_pred.argmax(dim=1) == y).sum().item()
                total_acc_val += acc

        print(
            f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_dataset): .4f} \
            | Train Accuracy: {total_acc_train / len(train_dataset): .4f} \
            | Val Loss: {total_loss_val / len(val_dataset): .4f} \
            | Val Accuracy: {total_acc_val / len(val_dataset): .4f}')

        val_acc = f'{total_acc_val / len(val_dataset):.4f}'
        torch.save(model.state_dict(), f'{MODEL_SAVE_PATH}/ensemble2_{val_acc}_epoch{epoch_num + 1}.pt')

def evaluate(model, test_dataset):
    test_dataloader = DataLoader(test_dataset, batch_size=1)
    model = model.to(device)
    
    real_sample_count = 0
    total_acc_test = 0
    predict_all = np.array([], dtype=int)
    labels_all = np.array([], dtype=int)
    model.eval()
    with torch.no_grad():
        for x0, x1, x2, x3, x4, x5, x6, y in test_dataloader:
            data_id = x0[0]
            # for only-bigvul testing starts ---- 
            if not data_id.startswith('bigvul_'): continue
            real_sample_count += 1
            # for only-bigvul testing ends ---- 
            
            x = torch.transpose(torch.stack([x1, x2, x3, x4]), 0, 1).float().to(device)
            y = y.to(device)
            y_pred = model(x)
            
            acc = (y_pred.argmax(dim=1) == y).sum().item()
            total_acc_test += acc
            
            y = y.data.cpu().numpy()
            predic = y_pred.argmax(dim=1).data.cpu().numpy()
            predic_n = predic[0]
            mt_pred = x5
            ct_pred = x6
                        
            if predic_n == 0:
                print('predic_n == 0 data_id:', data_id)
            
            labels_all = np.append(labels_all, y)
            predict_all = np.append(predict_all, predic)
    
    # fake neg sample for only-bigvul testing starts ---- 
    predict_all = np.append(predict_all, 0)
    labels_all = np.append(labels_all, 0)
    # fake neg sample for only-bigvul testing ends ---- 

    report = metrics.classification_report(labels_all, predict_all, target_names=['benign', 'vulnerable'], digits=4)
    confusion = metrics.confusion_matrix(labels_all, predict_all)
    print(f'Test Accuracy: {total_acc_test / len(test_dataset): .4f}')
    print(report)
    print(confusion)
    print('real_sample_count:', real_sample_count)

train_dataset = MyDataset(intermediate_train_path)
val_dataset = MyDataset(intermediate_val_path)
print('intermediate_val_path:', intermediate_val_path)



intermediate_val_path: /root/autodl-tmp/intermediate/combined/val.txt


# Test the model

In [18]:
mkdir_if_not_exist(MODEL_SAVE_PATH)
model = MLP(4, 2)
saved_model_name = 'ensemble2_0.9769_epoch80.pt'
model.load_state_dict(torch.load(f'{MODEL_SAVE_PATH}/{saved_model_name}'))
print('model path:', f'{MODEL_SAVE_PATH}/{saved_model_name}')

evaluate(model, val_dataset)

model path: /root/autodl-tmp/ensemble_model/combined/ensemble2_0.9769_epoch80.pt
predic_n == 0 data_id: bigvul_f0d1bec9d58d4c038d0ac958c9af82be6eb18045
Test Accuracy:  0.1231
              precision    recall  f1-score   support

      benign     0.5000    1.0000    0.6667         1
  vulnerable     1.0000    0.9983    0.9991       581

    accuracy                         0.9983       582
   macro avg     0.7500    0.9991    0.8329       582
weighted avg     0.9991    0.9983    0.9986       582

[[  1   0]
 [  1 580]]
real_sample_count: 581
