In [36]:
import sklearn
from sklearn.metrics import classification_report
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
import re
import sys
import numpy as np
from collections import Counter
import torch
from torch import nn, optim
from tqdm.auto import tqdm, trange

def parse_file(name):
    fp = open(name,'r')
    data = fp.read()
    fp.close()
    data = str(data).encode('latin-1','ignore')
    return get_tokens(data.decode())

def get_tokens(data):
    data = data.replace('\r', '')
    data = data.replace('\t', ' ')
    data = data.lower()
    re_fun = re.compile(r'([_.->\w]+)\s*\([^)]*\)')
    re_var = re.compile(r'\s*([_\d\w]*)(\s*)=(\s*)[^;]*')

    
    var_idx = 0 
    func_idx = 0
    words = []
    for line in data.split('\n'):  

        s = re_var.search(line)
        if s:
            var_idx += 1
            var_name = s.group(1)
            line = re.compile(r'([_\d\w]+\s*=\s*)').sub('var%d = '% var_idx, line)

        s = re_fun.search(line)
        if s:
            got_stmt = False
            for st in ['if','for','while']:
                if line.find(st) >=0:
                    got_stmt = True
                    break
            if not got_stmt:
                func_idx += 1
                func = s.group(1)
                line = line.replace('%s' % func, 'func%d' % func_idx)

        chars = ['(', ')', '{', '}', '*', '/', '+', '-', '=', ';', ',']
        for ch in chars:
            line = line.replace(ch, ' %s ' % ch)

        if line and len(line) >= 1:         
            for w in line.split(' '):
                w = w.strip()
                if w:
                    words.append(w) 

    return words


In [37]:
import re
import sys
import numpy as np
from collections import Counter
import torch
from torch import nn, optim
from tqdm.auto import tqdm, trange
import pickle

import os
import wandb

from gensim.models import Word2Vec

import glob
import random

sys.path.append('.')
import preprocess as pr

In [122]:
class CPP_Dataset(torch.utils.data.Dataset):
    def __init__(self, dataset, wv):
        with open(dataset, 'rb') as f:
            self.ds = pickle.load(f)

        self.maxlen = len(self.ds['vuln'])
        self.wv = wv

    def __len__(self):
        return len(self.ds['vuln']) * 2

    def get_lines(self, data):
        return data.split('\n')

    def __getitem__(self, idx):
        vuln = 1
        ds = self.ds['vuln']

        if idx >= self.maxlen:
            idx = idx % self.maxlen
            vuln = 0
            ds = self.ds['notvuln']


        data = ds[idx]
        toks = pr.get_tokens(data)
        if len(toks) >= 100:
            toks = toks[:100]
        else:
            for i in range(100-len(toks)):
                toks.append(';')

        vectors = []
        for tok in toks:
            vec = None
            try:
                vec = self.wv[tok]
            except:
                vec = None
            if vec is not None:
                vectors.extend(vec)

        vectors = vectors[:3000]
        return torch.tensor(vectors), vuln


class MLP_Model(nn.Module):
    def __init__(self):
        super().__init__()

        self.fc1 = nn.Linear(30*100, 100)
        self.fc2 = nn.Linear(100, 2)
        self.relu = nn.LeakyReLU()


    def forward(self,x ):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

def train_mlp():
    device = "cuda" if torch.cuda.is_available() else "cpu"

    use_wandb = True
    num_epochs = 100
    os.environ['WANDB_CONSOLE']='off'
 
    w2v_model = Word2Vec.load("word2vec_gensim.model")

    train_dataset = CPP_Dataset('dataset_train.bin', w2v_model.wv)
    test_dataset = CPP_Dataset('dataset_val.bin', w2v_model.wv)
    
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=True)

    model = MLP_Model()
    if use_wandb:
        wandb.login(key='', verify=True) 
        wandb.init(project='fitzel1')
        wandb.watch(model)

    criterion = nn.CrossEntropyLoss()
    opt = optim.Adam(model.parameters(), lr=0.00003)
    
    print('Started training loop, using device %s' % device)

    for epoch in range(num_epochs):
        for inputs, targets in tqdm(train_dataloader,desc='Epoch'):
            opt.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)   
            loss.backward()
            opt.step()

        val_loss =[]
        val_accuracy = []
        cal_cal = []
        acc_acc = []
        with torch.no_grad():
            for inputs, targets in tqdm(test_dataloader):
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss.append(loss.numpy())
                
                preds = outputs.argmax(axis=1)
                targets = targets.to('cpu')
                compare = (preds == targets).type(torch.float32)
                val_accuracy.extend(compare.numpy().tolist())
                rec = recall_score(targets, preds)
                acc = accuracy_score(targets, preds)
                cal_cal.append(rec)
                acc_acc.append(acc)

        if use_wandb:
            wandb.log({'loss': np.mean(val_loss), 'accuracy': np.mean(acc_acc), 'recall': np.mean(cal_cal)})
        
        print(f"Epoch: {epoch}, loss: {np.mean(val_loss)}, accuracy: {np.mean(val_accuracy)} ")

    if use_wandb:
        wandb.finish(quiet=True)



In [None]:
train_mlp()