In [1]:
import pandas as pd
import json, os

import tensorflow as tf
import numpy as np
import csv
from transformers import AutoTokenizer, TFAutoModel, TFAutoModelForSequenceClassification #, BertModel, BertTokenizer, TFBertForSequenceClassification
import matplotlib.pyplot as plt
import random
from tensorflow.keras.callbacks import CSVLogger
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow.keras.backend as K
from collections import OrderedDict
import time
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score, \
roc_auc_score, confusion_matrix, classification_report
from sklearn.model_selection import StratifiedKFold
import random

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.initializers import TruncatedNormal
from tensorflow.keras.losses import CategoricalCrossentropy
from tensorflow.keras.metrics import CategoricalAccuracy
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Input, Dense, GlobalMaxPool1D

from sklearn.metrics import classification_report

Set the seeder to have as stable random operations as possible

In [2]:
seed = 123
np.random.seed(seed)
random.seed(seed)
tf.random.set_seed(seed)

Read data

In [3]:
bow_data = pd.read_csv('bow_data.csv') # bow
sequences_data = pd.read_csv('sequences_data.csv') # sequences of tokens

In [4]:
print(sequences_data.head())

                                       Vulnerability       Category  Length
0              f"str$id""str$id""str$id"         ...  sql_injection       9
1      client.listentcp()    proxy = proxy(proxy_...           xsrf       8
2  from django.http import httpresponse, httpresp...  open_redirect       9
3  def write_preset(conn, queryin, descriptin):\t...  sql_injection     175
4                          update_query = self.up...  sql_injection      14


In [29]:
label_frequencies = sequences_data['Category'].value_counts()
print("Label Frequencies:\n", label_frequencies)
print("Total samples ", len(sequences_data))

Label Frequencies:
 sql_injection            1424
xsrf                      976
command_injection         721
path_disclosure           481
open_redirect             442
remote_code_execution     334
xss                       145
Name: Category, dtype: int64
Total samples  4523


Word Embedding

Word2Vec - load pre-trained word2vec embeddings - NL knowledge - static embeddings

In [6]:
import gensim.downloader
#w2v_vectors = gensim.downloader.load('word2vec-google-news-300')



In [8]:
import nltk
from nltk.tokenize import word_tokenize

# Download the Punkt tokenizer models if not already downloaded
nltk.download('punkt')

tokenized_list = [word_tokenize(sentence) for sentence in sequences_data["Vulnerability"].tolist()]

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Ilias\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt.zip.


In [33]:
# Encode each token using Word2Vec embeddings
def w2vEncoding(model, tokenized_list):
    encoded_list = []
    for sentence_tokens in tokenized_list:
        encoded_sentence = []
        for token in sentence_tokens:
            if token in model:
                encoded_token = model[token]
                encoded_sentence.append(encoded_token)
        encoded_list.append(encoded_sentence)
    
    return encoded_list

In [34]:
sequences_data["w2v"] = w2vEncoding(w2v_vectors, tokenized_list)

Use corpus to train word2vec vectors on python source code - PL knowledge - static embeddings

In [30]:
# # this should be executed only in the training set during cross-validation
# from gensim.models import Word2Vec

# w2v_model = Word2Vec(sentences=sequences_data["Vulnerability"], vector_size=300, window=5, min_count=1, workers=4)
# w2v_model.save("python_word2vec.model")
# #w2v_model = Word2Vec.load("python_word2vec.model")
# sequences_data["py_w2v"] = w2vEncoding(w2v_model.wv, tokenized_list)

BERT - load pre-trained bert embeddings - NL knowledge - contextual embeddings

In [12]:
model_variation = "bert-base-uncased" # "roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_variation)
bert = TFAutoModel.from_pretrained(model_variation)

Some layers from the model checkpoint at bert-base-uncased were not used when initializing TFBertModel: ['mlm___cls', 'nsp___cls']
- This IS expected if you are initializing TFBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFBertModel were initialized from the model checkpoint at bert-base-uncased.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions without further training.


In [13]:
bert_embeddings = bert.get_input_embeddings()
embedding_matrix = bert_embeddings.weights[0].numpy()
num_words = len(embedding_matrix)
print(num_words)
dim = len(embedding_matrix[0])
print(dim)

30522
768


In [14]:
sequences = [tokenizer.encode(sente, padding=True, truncation=True, add_special_tokens=False, return_tensors="tf").numpy() for sente in sequences_data["Vulnerability"]] # Tokenize the complete sentences

lines_pad = []
for seq in sequences:
    lines_pad.append(seq[0])

In [15]:
sequences_data["bert"] = lines_pad

CodeBERT - load pre-trained codebert embeddings - PL knowledge - contextual embeddings

In [17]:
model_variation = "microsoft/codebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_variation, do_lower_case=True)
codebert = TFAutoModel.from_pretrained(model_variation)

All model checkpoint layers were used when initializing TFRobertaModel.

All the layers of TFRobertaModel were initialized from the model checkpoint at microsoft/codebert-base.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaModel for predictions without further training.


In [18]:
codebert_embeddings = codebert.get_input_embeddings()
embedding_matrix = codebert_embeddings.weights[0].numpy()
num_words = len(embedding_matrix)
print(num_words)
dim = len(embedding_matrix[0])
print(dim)

50265
768


In [19]:
sequences = [tokenizer(sente, return_tensors="tf", padding=True, truncation=True, add_special_tokens=False) for sente in sequences_data["Vulnerability"]]


In [20]:
def extractEmbList(sequences):
    lines_pad = []
    for sequence in sequences:
        seq = sequence['input_ids'].numpy()[0]
#         if len(seq) < max_len:
#             for i in range(len(seq), max_len):
#                 seq = np.append(seq, 0)
        lines_pad.append(seq)
    
    [arr.tolist() for arr in lines_pad]
    
    return lines_pad

lines_pad = np.array(extractEmbList(sequences))

  lines_pad = np.array(extractEmbList(sequences))


In [21]:
sequences_data["codebert"] = lines_pad

In [22]:
sequences_data.iloc[0,:]

Vulnerability                f"str$id""str$id""str$id"         ...
Category                                             sql_injection
Length                                                           9
w2v              [[-0.21679688, 0.13574219, 0.18652344, 0.11376...
bert             [1042, 1000, 2358, 2099, 1002, 8909, 1000, 100...
codebert         [1437, 1437, 1437, 1437, 1437, 1437, 1437, 143...
Name: 0, dtype: object

RNN model, LSTM specifically

In [None]:
def buildLstm(max_len, top_words, dim, seed, embedding_matrix, multi):
    model=Sequential()
    #model.add(Embedding(input_dim=top_words+1, output_dim=dim, input_length=None, mask_zero=True))
    model.add(Embedding(input_dim=top_words, output_dim=dim, input_length=None, weights=[embedding_matrix], mask_zero=True, trainable=False))
    #model.add(SimpleRNN(300, dropout=0.3, stateful=False))
    model.add(LSTM(100, dropout=0.2, return_sequences=True, stateful=False))
    model.add(LSTM(50, dropout=0.1, stateful=False))
    #model.add(Bidirectional(LSTM(300, dropout=0.3, stateful=False)))
    #model.add(GRU(300, dropout=0.3, stateful=False))
    model.add(Activation('relu')) #dropout=0.2, recurrent_dropout=0.2, kernel_constraint=max_norm(3), bias_constraint=max_norm(3)
    model.add(BatchNormalization(momentum=0.0))
    if multi == False:
        model.add(Dense(1,activation='sigmoid'))
        model.compile(loss='binary_crossentropy', optimizer='adam')  
    else: 
        pass
    return model

CNN model 1-d

In [None]:
def buildCnn(max_len, top_words, dim, seed, embedding_matrix):
    cnn_model = Sequential()
    cnn_model.add(Embedding(top_words, dim, input_length=None, weights=[embedding_matrix], mask_zero=True, trainable=False))
    cnn_model.add(Conv1D(filters = 128, kernel_size = 5, activation = 'relu'))
    '''cnn_model.add(MaxPooling1D(pool_size = 5))
    cnn_model.add(Conv1D(filters = 128, kernel_size = 5, activation = 'relu'))
    cnn_model.add(MaxPooling1D(pool_size = 5))
    cnn_model.add(Conv1D(filters = 128, kernel_size = 5, activation = 'relu'))'''
    cnn_model.add(GlobalMaxPool1D())
    #cnn_model.add(Dense(units = 128, activation = 'relu'))
    if multi == False:
        cnn_model.add(Dense(units = 1, activation = 'sigmoid'))
        cnn_model.compile(loss = "binary_crossentropy", optimizer = "adam", metrics = [recall_metric])
    else:
        pass
    return cnn_model

Cross-Validation

Binary Classification: Recognition of Injection Vulnerabilities (command_injection and sql_injection merged)

In [26]:
# Define a function to determine if the category is an injection or not
def is_injection(category):
    if category in ['sql_injection', 'command_injection']:
        return '1'
    else:
        return '0'

sequences_data['Injection'] = sequences_data['Category'].apply(is_injection)

Multi-class Classification: Categorization of all detected vulnerabilities

Use ML models and BoW code representation

Use DL modles and sequences of tokens code representation