# Software Vulnerability Detection using Deep Learning (Experiment Replication)

In [1]:
!apt install unrar

In [2]:
# !pip install transformers

In [3]:
!git clone https://github.com/danzz006/AVD.git

In [4]:
import transformers
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup
# from torch.optim import AdamW as AdamW
import torch
import numpy as np
import pandas as pd
import seaborn as sns
from pylab import rcParams
import matplotlib.pyplot as plt
from matplotlib import rc
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from collections import defaultdict
import torch.nn.functional as F

import tensorflow as tf
# import mlflow


from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
%matplotlib inline
%config InlineBackend.figure_format='retina'
sns.set(style='whitegrid', palette='muted', font_scale=1.2)
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))
rcParams['figure.figsize'] = 12, 8
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print("Available device: ", device)

In [5]:
%cd AVD

In [6]:
!unrar e DataSet1000.rar

In [7]:
data=pd.read_csv("DataSet1000.csv")

In [8]:
data.head()

In [9]:
for dataset in [data]:
    for col in range(2,14):
            dataset.iloc[:,col] = dataset.iloc[:,col].map({'False':0, 'True':1,False:0, True:1,'1':1,'0':0,'1.0':1,'0.0':0, 'NV':1, 'warning':0, 'high': 0})


In [10]:
data.head()

In [11]:
del data['Category']


for col in data:
    if col != "Name":
        if data[col].value_counts()[1] < 1000:
            del data[col]
     

In [12]:
data.head()

In [13]:
shuffled = data.sample(frac=1).reset_index(drop=True) # for randomizing data

In [14]:
shuffled.head()

In [15]:
shuffled[shuffled.columns[1:5]]

In [16]:
from sklearn.model_selection import train_test_split

In [17]:
x_tmp, x_test, y_tmp, y_test = train_test_split(shuffled["Name"], shuffled[shuffled.columns[1:5]], test_size=0.2, random_state=42)

In [18]:
x_train, x_val, y_train, y_val = train_test_split(x_tmp, y_tmp, test_size=0.1, random_state=42)

In [19]:
y_val.value_counts()

In [20]:
PRE_TRAINED_MODEL_NAME = 'bert-base-cased'

In [21]:
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)

In [22]:
# sample_txt = x_train[164212]

In [23]:
# sample_txt

In [24]:
# sample_txt = sample_txt.replace("\n", " ")
# sample_txt = sample_txt.replace(";", "")

In [25]:
# tokens = tokenizer.tokenize(sample_txt)
# token_ids = tokenizer.convert_tokens_to_ids(tokens)
# print(f' Sentence: {sample_txt}')
# print(f'   Tokens: {tokens}')
# print(f'Token IDs: {token_ids}')

In [26]:
# encoding = tokenizer.encode_plus(
#   sample_txt,
#   truncation=True,
#   max_length=32,
#   add_special_tokens=True, # Add '[CLS]' and '[SEP]'
#   return_token_type_ids=False,
#   padding='max_length',
#   return_attention_mask=True,
#   return_tensors='pt',  # Return PyTorch tensors
# )
# encoding.keys()

In [27]:
# print(len(encoding['input_ids'][0]))
# encoding['input_ids'][0]

In [28]:
# print(len(encoding['attention_mask'][0]))
# encoding['attention_mask']

In [29]:
# tokenizer.convert_ids_to_tokens(encoding['input_ids'][0])

In [31]:
# token_lens = []
# for txt in data.Name:
#   tokens = tokenizer.encode(txt, truncation=True, max_length=512)
#   token_lens.append(len(tokens))

In [None]:
# sns.histplot(token_lens)
# plt.xlim([0, 256]);
# plt.xlabel('Token count');

In [32]:
MAX_LEN = 500

In [33]:
class SWVulnerabilityDataset(Dataset):
    
  def __init__(self, code, targets, tokenizer, max_len):
    self.code = code
    self.targets = targets
    self.tokenizer = tokenizer
    self.max_len = max_len
    
  def __len__(self):
    return len(self.code)

  def __getitem__(self, item):
    code = str(self.code[item])
    target = self.targets[item]
    encoding = self.tokenizer.encode_plus(
      code,
      truncation=True,
      add_special_tokens=True,
      max_length=self.max_len,
      return_token_type_ids=False,
      padding='max_length',
      return_attention_mask=True,
      return_tensors='pt',
    )
    return {
      'code_text': code,
      'input_ids': encoding['input_ids'].flatten(),
      'attention_mask': encoding['attention_mask'].flatten(),
      'targets': torch.tensor(target, dtype=torch.long)
    }

In [34]:
def create_data_loader(code, targets, tokenizer, max_len, batch_size):
  ds = SWVulnerabilityDataset(
    code=code,
    targets=targets,
    tokenizer=tokenizer,
    max_len=max_len
  )
  return DataLoader(
    ds,
    batch_size=batch_size,
    num_workers=4
  )

In [35]:
BATCH_SIZE = 16

train_data_loader = create_data_loader(x_train.to_numpy(), y_train.to_numpy(), tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(x_val.to_numpy(), y_val.to_numpy(), tokenizer, MAX_LEN, BATCH_SIZE)
test_data_loader = create_data_loader(x_test.to_numpy(), y_test.to_numpy(), tokenizer, MAX_LEN, BATCH_SIZE)

In [36]:
# data = next(iter(train_data_loader))
# data.keys()

In [37]:
# print(data['input_ids'].shape)
# print(data['attention_mask'].shape)
# print(data['targets'].shape)

In [38]:
bert_model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)

In [39]:
# last_hidden_state, pooled_output = bert_model(
#   input_ids=encoding['input_ids'],
#   attention_mask=encoding['attention_mask']
# )[0:]

In [40]:
# last_hidden_state.shape, pooled_output.shape

In [64]:
class SWVulnerabilityClassifier(nn.Module):
  def __init__(self, n_classes):
    super(SWVulnerabilityClassifier, self).__init__()
    self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME)
    self.drop = nn.Dropout(p=0.3)
    self.out = nn.Linear(self.bert.config.hidden_size, n_classes)
#     self.softmax = F.softmax()
    
  def forward(self, input_ids, attention_mask):
    _, pooled_output = self.bert(
      input_ids=input_ids,
      attention_mask=attention_mask
    )[0:]
    output = self.drop(pooled_output)
    return F.softmax(self.out(output))

In [65]:
model = SWVulnerabilityClassifier(len(y_train.columns))
model = model.to(device)

In [43]:
# input_ids = data['input_ids'].to(device)
# attention_mask = data['attention_mask'].to(device)
# print(input_ids.shape) # batch size x seq length
# print(attention_mask.shape) # batch size x seq length

In [66]:
EPOCHS = 50
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
total_steps = len(train_data_loader) * EPOCHS
scheduler = get_linear_schedule_with_warmup(
  optimizer,
  num_warmup_steps=0,
  num_training_steps=total_steps
)
loss_fn = nn.CrossEntropyLoss().to(device)

In [67]:
losses = []
correct_predictions = 0

history = defaultdict(list)

for epoch in range(EPOCHS):
    model = model.train()
    batch_data = next(iter(train_data_loader))
    input_ids = batch_data["input_ids"].to(device)
    attention_mask = batch_data["attention_mask"].to(device)
    targets = batch_data["targets"].float().to(device)

    outputs = model(
      input_ids=input_ids,
      attention_mask=attention_mask
    )
    _, preds = torch.max(outputs, dim=1)
    loss = loss_fn(outputs, targets)
    
    targets_arr = []
    for i in targets:
        targets_arr.append(np.argmax(i.cpu()).numpy().item())
    targets = torch.Tensor(targets_arr).to(device)
    
    correct_predictions += torch.sum(preds == targets)
    losses.append(loss.item())
    loss.backward()
    nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    optimizer.step()
    scheduler.step()
    optimizer.zero_grad()
    
    train_acc = correct_predictions.double() / len(train_data_loader)
    train_loss = np.mean(losses)
    print("*"*40)
    print(f"Epoch: {epoch}")
    print("Training accuracy: ", train_acc.item())
    print("Training loss: ", train_loss)

    
    history['train_acc'].append(train_acc.item())
    history['train_loss'].append(train_loss)
    
    print("Evaluating model..")
    model = model.eval()
    val_losses = []
    val_correct_predictions = 0
    with torch.no_grad():
        val_data = next(iter(val_data_loader))
        input_ids = val_data["input_ids"].to(device)
        attention_mask = val_data["attention_mask"].to(device)
        targets = val_data["targets"].float().to(device)
        outputs = model(
        input_ids=input_ids,
        attention_mask=attention_mask
        )
        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, targets)
        
        targets_arr = []
        for i in targets:
            targets_arr.append(np.argmax(i.cpu()).numpy().item())
        targets = torch.Tensor(targets_arr).to(device)
        
        val_correct_predictions += torch.sum(preds == targets)
        val_losses.append(loss.item())
        
        val_acc = val_correct_predictions.double() / len(val_data_loader)
        val_loss = np.mean(val_losses)
        
    print("Eval accuracy: ", val_acc.item())
    print("Eval loss: ", val_loss)
    print("*"*40)

    history['val_acc'].append(val_acc.item())
    history['val_loss'].append(val_loss)
    
    

In [68]:
plt.plot(history['train_acc'], label='train accuracy')
plt.plot(history['val_acc'], label='validation accuracy')
plt.title('Training history')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.ylim([0, 1]);