<a href="https://colab.research.google.com/github/hishamp3/codeXGLUE/blob/main/Defect%20Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install transformers
!pip install sentencepiece

In [None]:
import pandas as pd
import re
import string

In [None]:
import json
with open('./sample_data/function.json') as f:
    d = json.load(f)

In [None]:
df1 = pd.json_normalize(d)

In [None]:
df = df1[['func','target']]
print(df.target.value_counts())

0    14858
1    12460
Name: target, dtype: int64


In [None]:
print(df.head(5))

                                                func  target
0  static av_cold int vdadec_init(AVCodecContext ...       0
1  static int transcode(AVFormatContext **output_...       0
2  static void v4l2_free_buffer(void *opaque, uin...       0
3  int ff_get_wav_header(AVFormatContext *s, AVIO...       0
4  int av_opencl_buffer_write(cl_mem dst_cl_buf, ...       0


In [None]:
print(df.columns)

Index(['func', 'target'], dtype='object')


In [None]:
#model_name = 'xlm-roberta-base'
model_name = 'roberta-base'
#model_name = 'bert-base-cased'
#model_name = 'microsoft/codebert-base'

In [None]:
from transformers import BertTokenizer
from transformers import RobertaTokenizer, TFRobertaModel
from transformers import XLNetTokenizer, XLNetModel
from transformers import AutoTokenizer,XLMRobertaForMaskedLM

tokenizer = AutoTokenizer.from_pretrained(model_name)
example_text = 'The Weather is good tonight'
bert_input = tokenizer(example_text,padding='max_length', max_length = 10,
                       truncation=True, return_tensors="pt")


print(bert_input['input_ids'])
print(bert_input['attention_mask'])

config.json:   0%|          | 0.00/481 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

tensor([[   0,  133, 5842,   16,  205, 3422,    2,    1,    1,    1]])
tensor([[1, 1, 1, 1, 1, 1, 1, 0, 0, 0]])


In [None]:
example_text = tokenizer.decode(bert_input.input_ids[0])
print(example_text)

<s>The Weather is good tonight</s><pad><pad><pad>


In [None]:
import torch
import numpy as np
from transformers import BertTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_name)
labels = {0:0,
          1:1
          }

class Dataset(torch.utils.data.Dataset):

    def __init__(self, df):

        self.labels = [labels[label] for label in df['target']]
        self.texts = [tokenizer(text,
                               padding='max_length', max_length = 512, truncation=True,
                                return_tensors="pt") for text in df['func']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):

        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)

        return batch_texts, batch_y

In [None]:
np.random.seed(112)
df_train, df_val, df_test = np.split(df.sample(frac=1, random_state=42),
                                     [int(.8*len(df)), int(.9*len(df))])

print(len(df_train),len(df_val), len(df_test))

21854 2732 2732


In [None]:
from torch import nn
from transformers import BertModel
from transformers import RobertaTokenizer, RobertaModel
from transformers import XLMRobertaForCausalLM, AutoConfig
from transformers import AutoModelForMaskedLM
from transformers import AutoModel

class Classifier(nn.Module):

    def __init__(self, dropout=0.1):

        super(Classifier, self).__init__()
        self.model = AutoModel.from_pretrained(model_name)
        self.dropout = nn.Dropout(dropout)
        self.linear1 = nn.Linear(768, 64)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(64,5)

    def forward(self, input_id, mask):

        _, pooled_output = self.model(input_ids= input_id, attention_mask=mask,return_dict=False)
        dropout_output = self.dropout(pooled_output)

        # First Layer
        linear_output = self.linear1(dropout_output)
        layer_output = self.relu(linear_output)

        # Second Layer
        linear_output = self.linear2(layer_output)
        final_layer = self.relu(linear_output)

        return final_layer

In [None]:
from torch.optim import Adam
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score

def train(model, train_data,learning_rate, epochs, valid_data):

    train = Dataset(train_data)
    valid = Dataset(valid_data)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=8, shuffle=True)
    valid_dataloader = torch.utils.data.DataLoader(valid, batch_size=8, shuffle=True)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr= learning_rate)

    if use_cuda:

            model = model.cuda()
            criterion = criterion.cuda()

    for epoch_num in range(epochs):

            total_acc_train = 0
            total_loss_train = 0

            for train_input, train_label in tqdm(train_dataloader):

                train_label = train_label.to(device)
                mask = train_input['attention_mask'].to(device)
                input_id = train_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)

                batch_loss = criterion(output, train_label.long())
                total_loss_train += batch_loss.item()

                acc = (output.argmax(dim=1) == train_label).sum().item()
                total_acc_train += acc

                model.zero_grad()
                batch_loss.backward()
                optimizer.step()

            print(
                f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_data): .3f} \
                | Train Accuracy: {total_acc_train / len(train_data): .3f}'
                )

            y_pred = []
            y_true = []
            total_loss_val = 0
            total_acc_val = 0

            for valid_input, valid_label in tqdm(valid_dataloader):

              valid_label = valid_label.to(device)
              mask = valid_input['attention_mask'].to(device)
              input_id = valid_input['input_ids'].squeeze(1).to(device)

              output = model(input_id, mask)

              batch_loss = criterion(output, valid_label.long())
              total_loss_val += batch_loss.item()

              acc = (output.argmax(dim=1) == valid_label).sum().item()
              total_acc_val += acc

              y_pred.extend((torch.max(torch.exp(output), 1)[1]).data.cpu().numpy())
              y_true.extend(valid_label.data.cpu().numpy())
              cf_matrix = confusion_matrix(y_true, y_pred)
              score_f1 = f1_score(y_true, y_pred)
              accuracy = accuracy_score(y_true, y_pred)

            print(f'Accuracy: {accuracy}')
            print(f'F1 score: {score_f1}')


In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
# Training
EPOCHS = 2
model = Classifier()
LR = 2e-5

train(model, df_train, LR, EPOCHS,df_val)

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score

def evaluate(model, test_data):

    test = Dataset(test_data)

    test_dataloader = torch.utils.data.DataLoader(test, batch_size=8)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.CrossEntropyLoss()
    if use_cuda:

        model = model.cuda()
        criterion = criterion.cuda()

    total_loss_test = 0
    total_acc_test = 0
    y_pred = []
    y_true = []
    with torch.no_grad():

        for test_input, test_label in test_dataloader:

              test_label = test_label.to(device)
              mask = test_input['attention_mask'].to(device)
              input_id = test_input['input_ids'].squeeze(1).to(device)

              output = model(input_id, mask)

              batch_loss = criterion(output, test_label.long())
              total_loss_test += batch_loss.item()

              acc = (output.argmax(dim=1) == test_label).sum().item()
              total_acc_test += acc

              y_pred.extend((torch.max(torch.exp(output), 1)[1]).data.cpu().numpy())
              y_true.extend(test_label.data.cpu().numpy())
              cf_matrix = confusion_matrix(y_true, y_pred)
              score_f1 = f1_score(y_true, y_pred)

    print(f'Test Accuracy: {total_acc_test / len(test_data): .3f}|Val Loss: {total_loss_test / len(test_data): .3f}')
    print(f'Confusion Matrix: {cf_matrix}')
    print(f'F1 score: {score_f1}')

In [None]:
# Evaluation
evaluate(model, df_test)

Test Accuracy:  0.531|Val Loss:  0.167
Confusion Matrix: [[1451    0]
 [1281    0]]
F1 score: 0.0


In [None]:
#save model
#torch.save(model, './model')

In [None]:
# cuda check
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

#sample code
#example_code = "char *pos = memchr(array, '@', 42);"
example_code = "char str[20]; gets(str);"
code = tokenizer(example_code,padding='max_length', max_length = 512, truncation=True, return_tensors="pt").to(device)

In [None]:
output = model(code['input_ids'],code['attention_mask'])
prediction = (torch.max(torch.exp(output), 1)[1]).data.cpu().numpy()
print("vulnerability") if prediction[0] == 0  else print("Not vulnerability")

vulnerability
