# CSCE 491 - Research Code

## Title: "Automated Bug Detecting In Source Code Using Machine Learning"

## Description
This project is to determine the viability and present a proof of concept of how machine learning can be used to detect bugs in code.

---

## General Notes
- DATASET : NIST Juliet C/C++ v1.3
    - Filters: No cpp, No Win32, No multi-file code
- ML Libraries : Sklearn, Pytorch

---

## TYPE Classification
- TYPE_BUFFER_BUGS
    - Only buffer overflow, underflow, underread, etc
- TYPE_NUMERICAL_BUGS
    - Integer overflow, integer underflow, signed VS unsigned, divide by zero, etc
- TYPE_LOGIC_BUGS
    - Infinite Loop, Assign VS Compare, Recursion, Unchecked loop condition, etc
- TYPE_MEMORY_BUGS
    - NULLPTR deref, Double free, use after free, Uncontrolled memory alloc, etc

---

## Personal Notes
1. Precision = Correct Classification / All Classified (In a category) (TP / (TP + FP))
1. Recall = Correct Classification / All True (In a category) (TP / (TP + FN))
1. F-1 Score = Balanced Mean of Precision & Recall (Both need to be good for high F-1)

In [None]:
### IMPORTS ###
import json
import re
import torch
import os
import subprocess
import pickle
import scipy
import torchtext
import h5py

import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import torch.cuda as cuda

# Misc Items
from random import seed
from random import shuffle
from random import randint
from time import time
from math import sqrt

# More Matplot
from matplotlib import colors

# Sklearn Items
from sklearn.model_selection import *
from sklearn.metrics import *
from sklearn.ensemble import *
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import label_binarize

# Pytorch and Parsing stuff
from torch.utils.data import Dataset, DataLoader
from pprint import pprint

# JUST USING CLANG
import clang.cindex

In [None]:
## Needed for the random shuffle later
seed(int(time()))

In [None]:
### SELECTION CONSTANTS ###
## These constants select how the rest of the code will run and work

## TOKEN TYPE
# Selection variable for which type of tokens to use [0 for TOKEN | 2 for CLANG Tokens | 3 for EXTENDED Tokens]
USING_TOKEN_TYPE = 2

## MULTICLASS
# Selection variable to select multiclass [0] or binary [1] training
USING_CLASSES = 0

In [None]:
### CONSTANTS ###

## Only use when needed to have a "constant" random seed
# RANDOM_SEED = 69

# Use GPU over CPU
DEVICE = 'cuda' if cuda.is_available() else 'cpu'
print(DEVICE)

### Relevant for RFC
# Number of NGRAMS (universal for program)
NGRAMS = 4
# Number of estimators for RFC
ESTIMATORS = 64

# Ratio of Training set to be used as Testing
## NOTE - I AM NOT SURE THIS IS USED ANYMORE
TESTTRAINRATIO = 0.1

### Relevant for the CLANG parsing [DO NOT TOUCH]
INDENT = 4
K = clang.cindex.CursorKind
###

# Conversion Dictionary to turn tokens to integers (ease of use in ML)
TOKENLIST = [
    "KEYWORD",
    "PUNCTUATION",
    "IDENTIFIER",
    "LITERAL",
    "COMMENT"
]
TOKENDICT = {val: idx for idx,val in enumerate(TOKENLIST)}
INVTOKENDICT = {idx: val for idx,val in enumerate(TOKENLIST)}

# Extended Token Conversions
EXTENDEDLIST = [
    "KEYWORD",
    "PUNCTUATION",
    "IDENTIFIER",
    "LITERAL",
    "COMMENT",
    "LOOPFOR",
    "LOOPWHILE",
    "CMPEQUALS",
    "CMPNEQUALS",
    "CMPLT",
    "CMPGT",
    "CMPLTE",
    "CMPGTE",
    "IFSTMT",
    "ELSESTMT",
    "TYPEINT",
    "TYPECHAR",
    "TYPEVOID",
    "TYPEDOUBLE",
    "TYPEFLOAT"
]
EXTENDEDTOKENS = {val: idx for idx,val in enumerate(EXTENDEDLIST)}
INVEXTENDEDTOKENS = {idx: val for idx,val in enumerate(EXTENDEDLIST)}

clangTokenList = [
    "KEYWORD",
    "PUNCTUATION",
    "IDENTIFIER",
    "LITERAL",
    "COMMENT",
    "LOOPFOR",
    "LOOPWHILE",
    "CMPEQUALS",
    "CMPNEQUALS",
    "CMPLT",
    "CMPGT",
    "CMPLTE",
    "CMPGTE",
    "IFSTMT",
    "ELSESTMT",
    "TYPEINT",
    "TYPECHAR",
    "TYPEVOID",
    "TYPEDOUBLE",
    "TYPEFLOAT",
    "PARENL",
    "PARENR",
    "BRACESL",
    "BRACESR",
    "ASSIGN",
    "PTR",
    "REF"
]

## Relevant for removing the unecessary (I.E. C++ specific) token types
## NOTE - The "__class__" MUST stay
## NOTE - The "CXX" should be removed if the code is extended to clases
## NOTE - The "OMP" is only used for multithreadding (I DO NOT THINK IT WILL BE USEFUL)
for x in dir(clang.cindex.CursorKind):
    if(x == "__class__"):
        break
    if(x[:3] == "CXX" or x[:5] == "CLASS"):
        continue
    # TODO : MAYBE THESE ARE NECESSARY TOKENS IDK (MIGHT REMOVE LATER)
    if(x[:3] == "OMP"):
        continue
    clangTokenList.append(x)

CLANGTOKENS = dict()
for i,x in enumerate(clangTokenList):
    CLANGTOKENS[x] = i


### CLASS DICTIONARIES
CLASSDICT = []
if(USING_CLASSES == 0):
    CLASSDICT = {0:"BUFFER", 1:"CLEAN", 2:"LOGIC", 3:"MEMORY", 4:"NUMERICAL"}
else:
    CLASSDICT = {0:"0", 1:"1"}

N_CLASSES = len(CLASSDICT)

# JUST FOR VERIFICATION (5, 20, 176)
print(len(TOKENDICT), len(EXTENDEDTOKENS), len(CLANGTOKENS))

In [None]:
### CLANG FUNCTIONS ###     
# Create tokens of the form "(TOKEN_TYPE_INT, TOKEN_STRING)"
def tokenizeString(cString):
    retList = []
    idx = clang.cindex.Index.create()
    tu = idx.parse('tmp.c', args=[''], unsaved_files=[('tmp.c', cString)], options=0)
    for t in tu.get_tokens(extent=tu.cursor.extent):
        retList.append((TOKENDICT[str(t.kind.name)], t.spelling))
    return retList

# Create tokens of the form "(TOKEN_TYPE_STR)"
def tokenizeWithoutValue(cString):
    retList = []
    idx = clang.cindex.Index.create()
    tu = idx.parse('tmp.c', args=[''], unsaved_files=[('tmp.c', cString)], options=0)
    for t in tu.get_tokens(extent=tu.cursor.extent):
        retList.append(str(t.kind.name))
    return retList

# Create tokens of the form "(TOKEN_TYPE_STR)" but more specified about the kinds
# NOTE - This is a manual process and could be changed for more or less in the future
def tokenizeExtended(cString):
    retList = []
    idx = clang.cindex.Index.create()
    tu = idx.parse('tmp.c', args=[''], unsaved_files=[('tmp.c', cString)], options=0)
    for t in tu.get_tokens(extent=tu.cursor.extent):
        tok = str(t.kind.name)
        
        if(t.spelling == "for"):
            tok = "LOOPFOR"
        elif(t.spelling == "while"):
            tok = "LOOPWHILE"        
        elif(t.spelling == "=="):
            tok = "CMPEQUALS" 
        elif(t.spelling == "!="):
            tok = "CMPNEQUALS"     
        elif(t.spelling == "<"):
            tok = "CMPLT"       
        elif(t.spelling == ">"):
            tok = "CMPGT"        
        elif(t.spelling == "<="):
            tok = "CMPLTE"      
        elif(t.spelling == ">="):
            tok = "CMPGTE" 
        elif(t.spelling == "if"):
            tok = "IFSTMT"
        elif(t.spelling == "else"):
            tok = "ELSESTMT"     
        elif(t.spelling == "int"):
            tok = "TYPEINT"  
        elif(t.spelling == "char"):
            tok = "TYPECHAR"
        elif(t.spelling == "void"):
            tok = "TYPEVOID"
        elif(t.spelling == "double"):
            tok = "TYPEDOUBLE"
        elif(t.spelling == "float"):
            tok = "TYPEFLOAT"
            
        retList.append(tok)
    return retList

# DO NOT TOUCH
def is_std_ns(node):
    return node.kind == K.NAMESPACE and node.spelling == 'std'

# DO NOT TOUCH
def vit(node: clang.cindex.Cursor, indent: int, saw):
    pre = ' ' * indent
    k = node.kind  # type: clang.cindex.CursorKind
    # skip printting UNEXPOSED_*
    if not k.is_unexposed():
        print(pre, end='')
        print(k.name, end=' ')
        if node.spelling:
            print('s:', node.spelling, end=' ')
            if node.type.spelling:
                print('t:', node.type.spelling, end=' ')
            # FIXME: print opcode or literal
        print()
    saw.add(node.hash)
    if node.referenced is not None and node.referenced.hash not in saw:
        vit(node.referenced, indent + INDENT, saw)
    # FIXME: skip auto generated decls
    skip = len([c for c in node.get_children()
                if indent == 0 and is_std_ns(c)])
    for c in node.get_children():
        if not skip:
            vit(c, indent + INDENT, saw)
        if indent == 0 and is_std_ns(c):
            skip -= 1
    saw.remove(node.hash)
    
# NOTE - This gets the ORIGINAL form of the clang AST (I am using a modified version for results)
# NOTE - This is not used anywhere but should remain for legacy purposes
def clangOldAST(filename):
    index = clang.cindex.Index.create()
    tu = index.parse(filename)
    vit(tu.cursor, 0, set())

# Combine Nodes & Tokens from AST into linear sequence (similar to ASM in a way)
def parseNode(node):
    retStr = str(node.kind.name) +"\n"
    
    numChilds = len([0 for _ in node.get_children()])
    multiList = ["TRANSLATION_UNIT", "FUNCTION_DECL", "COMPOUND_STMT"]
    if(numChilds < 2 and str(node.kind.name) not in multiList):
        for t in node.get_tokens():
            tok = str(t.kind.name)
        
            if(t.spelling == "for"):
                tok = "LOOPFOR"
            elif(t.spelling == "while"):
                tok = "LOOPWHILE"
            elif(t.spelling == "=="):
                tok = "CMPEQUALS"
            elif(t.spelling == "!="):
                tok = "CMPNEQUALS"
            elif(t.spelling == "<"):
                tok = "CMPLT"
            elif(t.spelling == ">"):
                tok = "CMPGT"
            elif(t.spelling == "<="):
                tok = "CMPLTE"
            elif(t.spelling == ">="):
                tok = "CMPGTE"
            elif(t.spelling == "if"):
                tok = "IFSTMT"
            elif(t.spelling == "else"):
                tok = "ELSESTMT"
            elif(t.spelling == "int"):
                tok = "TYPEINT"
            elif(t.spelling == "char"):
                tok = "TYPECHAR"
            elif(t.spelling == "void"):
                tok = "TYPEVOID"
            elif(t.spelling == "double"):
                tok = "TYPEDOUBLE"
            elif(t.spelling == "float"):
                tok = "TYPEFLOAT"
            elif(t.spelling == "("):
                tok = "PARENL"
            elif(t.spelling == ")"):
                tok = "PARENR"
            elif(t.spelling == "{"):
                tok = "BRACESL"
            elif(t.spelling == "}"):
                tok = "BRACESR"
            elif(t.spelling == "="):
                tok = "ASSIGN"
            elif(t.spelling == "*"):
                tok = "PTR"
            elif(t.spelling == "&"):
                tok = "REF"
                
            retStr += str(tok) + "\n"
    
    for ch in node.get_children():
        retStr += parseNode(ch)
        
    return retStr
 
# Call this to get the string form of the "new" CLANG AST
def clangFileAST(filename):
    index = clang.cindex.Index.create()
    tu = index.parse(filename)
    return parseNode(tu.cursor)

# Returns a list of the tokens in the "new" CLANG AST
def clangFileASTList(filename):
    return clangFileAST(filename).split("\n")

In [None]:
### PICKLE FUNCTIONS ###
# Filename constants - Makes life easier
pickleFileFunctions = "./Functions.pkl"
pickleFilePairTokens = "./PairTokens.pkl"
pickleFileSingleTokens = "./SingleTokens.pkl"
pickleFileClangTokens = "./ClangTokens.pkl"
pickleFileUnknownFiles = "./UnknownFiles.pkl"
pickleFileExtendedTokens = "./ExtendedTokens.pkl"
pickleFileHPFiles = "./HPFiles.pkl"
pickleFileCustomDataset = "./CustomDataset.pkl"

def pickleIntoFile(filename, data):
    with open(filename, "wb") as wFile:
        pickle.dump(data, wFile)

def pickleOutFromFile(filename):
    with open(filename, "rb") as rFile:
        return pickle.load(rFile)

In [None]:
### PARSING FUNCTIONS ###

# NOTE - This is the most cursed code section
# It's a mixture of file reading, compiling, preprocessing, and pain
# For now this works - If you want to re-do it feel free

def reverseBraces(inputStr):
    leftcount = 0
    rightcount = 0
    sentinel = 0
    newarr = []
    
    for line in inputStr[::-1]:
        newarr.append(line)
        if(sentinel == 1):
            break

        for c in line:
            if(c == "{"):
                leftcount += 1
            elif(c == "}"):
                rightcount += 1

        if(leftcount == rightcount and leftcount > 0):
            sentinel = 1
    
    return "\n".join(newarr[::-1])

def reverseUntilHashtag(inputStr, fileName):
    newarr = []
    for line in inputStr[::-1]:
        if(len(line) == 0):
            continue
        if(line[0] == "#"):
            if(fileName in line):
                continue
            else:
                break
        newarr.append(line)
        
    return "\n".join(newarr[::-1])

def parseUnknownFiles():
    functionList = []
    
    # Read in files from folder given filters and parse good & bad functions / sections
    ### THIS IS JUST FOR TESTING, FOR ACTUAL RUNS PUT MAX_COUNT AT LIKE 1000000
    counter = 0
    MAX_COUNT = 10000

    for root, dirs, files in os.walk("./TestCode"):
        if(counter >= MAX_COUNT):
            break

        for filename in files:            
            counter += 1
            rFileName = root + "/" + filename
                
            ###
        
            tempList = []

            p1 = subprocess.run(
                ["./unifdef", rFileName], 
                capture_output=True,
                encoding="utf-8")
            stage1 = p1.stdout
            print(stage1)
            with open("./tempFile.c", "w") as writeTemp:
                writeTemp.write(stage1)
                
            p2 = subprocess.run(
                ["gcc", "-E", "./tempFile.c"], 
                capture_output=True,
                encoding="utf-8")
            stage2 = p2.stdout
            
            tempList.append(reverseUntilHashtag(stage2.split("\n"), "./tempFile.c"))
            typeOfCode = filename.split(".")[0]
            typeOfCode = typeOfCode.split("-")[-1]
            functionList.append((tempList[0], typeOfCode, filename))
    
    return functionList

def parseJulietFiles(USENUMS):
    functionList = []
    
    # Read in files from folder given filters and parse good & bad functions / sections
    ### THIS IS JUST FOR TESTING, FOR ACTUAL RUNS PUT MAX_COUNT AT LIKE 1000000
    counter = 0
    MAX_COUNT = 1000000

    for root, dirs, files in os.walk("./Data/Juliet-C/testcases"):
        # BREAK CASE
        if(counter >= MAX_COUNT):
            break

        for filename in files:
            # FILTERS
            if("TYPE_" not in root):
                continue

            if(filename.split(".")[-1] != "c"):
                continue
            if(filename.split(".")[0][-1] in "abcdefghijklmnopqrstuvwxyz"):
                continue
            if("w32" in filename):
                continue
            
            # PASSING CASE
            counter += 1
            rFileName = root + "/" + filename
            
            number = "1"
            if(USENUMS == True):
                u = root.split("/")[-1]
                number = u.split("_")[1]
                    
            ###

            p1 = subprocess.run(
                ["Data/unifdef-2.12/unifdef", "-D", "OMITGOOD", "-U", "INCLUDEMAIN", rFileName], 
                capture_output=True,
                encoding="utf-8")
            stage1 = p1.stdout
            with open("Data/tempFile.c", "w") as writeTemp:
                writeTemp.write(stage1)

            p2 = subprocess.run(
                ["gcc", "-E", "Data/tempFile.c"], 
                capture_output=True,
                encoding="utf-8")
            stage2 = p2.stdout

            badFunctions = reverseUntilHashtag(stage2.split("\n"), "Data/tempFile.c")
            
            ##### ADDED CODE - A
            
            # Split into list of functions and "top stuff"
            badFunctionList = []
            while(len(badFunctions) > 0):
                badCode = reverseBraces(badFunctions.split("\n"))
                badFunctionList.append(badCode)
                badFunctions = badFunctions[:len(badFunctions) - len(badCode)]
            
            newBadFunctionList = []
            for x in badFunctionList:
                if("CWE" in x):
                    newBadFunctionList.append(x)
                    
            ##### ADDED CODE - A
            
            ##### REMOVED CODE - B
#             functionList.append((badFunctions, number))
            ##### REMOVED CODE - B
    
            functionList += [(x, number) for x in newBadFunctionList]

            ###

            p3 = subprocess.run(
                ["Data/unifdef-2.12/unifdef", "-D", "OMITBAD", "-U", "INCLUDEMAIN", rFileName], 
                capture_output=True,
                encoding="utf-8")
            stage3 = p3.stdout
            with open("Data/tempFile.c", "w") as writeTemp:
                writeTemp.write(stage3)

            p4 = subprocess.run(
                ["gcc", "-E", "Data/tempFile.c"], 
                capture_output=True,
                encoding="utf-8")
            stage4 = p4.stdout
            
            goodFunctions = reverseUntilHashtag(stage4.split("\n"), "Data/tempFile.c")
            
            # Extra 2 lines to remove "good handler-function"
            bottomGood = reverseBraces(goodFunctions.split("\n"))
            goodFunctions = goodFunctions[:len(goodFunctions) - len(bottomGood) - 1]
            
            # Split into list of functions and "top stuff"
            goodFunctionList = []
            while(len(goodFunctions) > 0):
                goodCode = reverseBraces(goodFunctions.split("\n"))
                goodFunctionList.append(goodCode)
                goodFunctions = goodFunctions[:len(goodFunctions) - len(goodCode)]
            
            # Now a list where *PERHAPS* the last N are general and need to be at the front
            newGoodFunctionList = []
            goodFunctionList = goodFunctionList[::-1]
            
            for x in goodFunctionList:
                if("static void" in x):
                    newGoodFunctionList.append(x)
                
                ##### REMOVED CODE - C
#             if(len(goodFunctionList) != 0):
#                 if("static void" not in goodFunctionList[0]):
#                     for x in range(len(goodFunctionList)-1):
#                         newGoodFunctionList.append(goodFunctionList[0] + goodFunctionList[x+1])
#                 else:
#                     newGoodFunctionList = goodFunctionList    
#             else:
#                 newGoodFunctionList = goodFunctionList
                ##### REMOVED CODE - C
            
            # Add Tuples of Strings of Function[s] to list
            functionList += [(x, "CLEAN") for x in newGoodFunctionList]
            
    # Return
    return functionList

# Function to remove function name and replace with generic "function" (focus more on code not fn name)
def removeFuncName(reg, replace, inputStr):
    newStr = []
    for l in inputStr.split("\n"):
        newStr.append(re.sub(reg, replace, l))
    return "\n".join(newStr)

## PICKLE Section

- This just runs the above commands for reading, parsing, and tokenizing the files but pickles them at the end to save time
- It checks for those files in the current directory and if they exist, then it just loads them into memory
    - This saves a lot of time for re-running the program and not re-reading all the files every time

In [None]:
functionCore = []

# Preproc and Pickle
if(os.path.isfile(pickleFileFunctions)):
    ## Already Pickled so just load in
    functionCore = pickleOutFromFile(pickleFileFunctions)
else:
    # Parse the files
    functionPairs = parseJulietFiles(True)
    
    # Some preprocessing and minor validation
    functionPreproc = [x for x in functionPairs if x[0] != ""]
    
    ## Remove function names
    functionCore = [(removeFuncName('(good|CWE)(.*)\(', 'function(', x[0]), x[1]) for x in functionPreproc]

    ## Pickling into file the lists of functions as single strings
    pickleIntoFile(pickleFileFunctions, functionCore)

In [None]:
unknownFiles = []

# Preproc and Pickle
if(os.path.isfile(pickleFileUnknownFiles)):
    ## Already Pickled so just load in
    unknownFiles = pickleOutFromFile(pickleFileUnknownFiles)
else:
    # Parse the files
    functionPairs = parseUnknownFiles()
    
    # Some preprocessing and minor validation
    functionPreproc = [x for x in functionPairs if x[0] != ""]
    
    ## Remove function names
    unknownFiles = [(removeFuncName('(good|CWE)(.*)\(', 'function(', x[0]), x[1]) for x in functionPreproc]

    ## Pickling into file the lists of functions as single strings
    pickleIntoFile(pickleFileUnknownFiles, unknownFiles)

In [None]:
tokenizedFuncs = []

if(USING_TOKEN_TYPE == 0):
    # Tokenize and Pickle
    if(os.path.isfile(pickleFileSingleTokens)):
        ## Already Tokenized so load
        tokenizedFuncs = pickleOutFromFile(pickleFileSingleTokens)
    else:
        ## Tokenizing Functions
        tokenizedFuncs = [(tokenizeWithoutValue(x[0]), x[1]) for x in functionCore]
        ## Pickling into file the lists of functions as single strings
        pickleIntoFile(pickleFileSingleTokens, tokenizedFuncs)

elif(USING_TOKEN_TYPE == 1):
    if(os.path.isfile(pickleFilePairTokens)):
        tokenizedFuncs = pickleOutFromFile(pickleFilePairTokens)
    else:
        tokenizedFuncs = [(tokenizeString(x[0]), x[1]) for x in functionCore]
        pickleIntoFile(pickleFilePairTokens, tokenizedFuncs) 

elif(USING_TOKEN_TYPE == 2):
    if(os.path.isfile(pickleFileClangTokens)):
        tokenizedFuncs = pickleOutFromFile(pickleFileClangTokens)
    else:
        tokenizedFuncs = []
        for x in functionCore:
            open("tempWriteFile.c", "w").write(x[0])
            tokenizedFuncs.append((clangFileASTList("tempWriteFile.c"), x[1]))
        pickleIntoFile(pickleFileClangTokens, tokenizedFuncs)

elif(USING_TOKEN_TYPE == 3):
    if(os.path.isfile(pickleFileExtendedTokens)):
        tokenizedFuncs = pickleOutFromFile(pickleFileExtendedTokens)
    else:
        tokenizedFuncs = [(tokenizeExtended(x[0]), x[1]) for x in functionCore]
        pickleIntoFile(pickleFileExtendedTokens, tokenizedFuncs) 

In [None]:
hpList = []

if(os.path.isfile(pickleFileHPFiles)):
    hpList = pickleOutFromFile(pickleFileHPFiles)
else:
    hpFile = h5py.File("VDISC_test.hdf5", "r")
    print(list(hpFile.keys()))

    funcs = np.array(hpFile.get("functionSource"))
    funcs = [x.decode('utf-8') for x in funcs]
    cwe119 = np.array(hpFile.get("CWE-119"))  # BUFFER
    cwe120 = np.array(hpFile.get("CWE-120"))  # BUFFER
    cwe469 = np.array(hpFile.get("CWE-469"))  # MEMORY
    cwe476 = np.array(hpFile.get("CWE-476"))  # MEMORY
    cweOther = np.array(hpFile.get("CWE-other"))  # NUMERICAL (CHEAT)
    if(USING_TOKEN_TYPE == 0):
        for i, x in enumerate(funcs):
            tokenized = tokenizeWithoutValue(x)
            if(cwe119[i] or cwe120[i]):
                hpList.append((tokenized, "BUFFER"))
            elif(cwe469[i] or cwe476[i]):
                hpList.append((tokenized, "MEMORY"))
            elif(cweOther[i]):
                # THIS IS A CHEAT FOR RIGHT NOW
                hpList.append((tokenized, "NUMERICAL"))
            else:
                hpList.append((tokenized, "CLEAN"))

    elif(USING_TOKEN_TYPE == 1):
        hpList = [(tokenizeString(x), x[1]) for x in funcs]

    elif(USING_TOKEN_TYPE == 2):
        for i, x in enumerate(funcs):
            f = open("tempWriteFile.c", "w")
            f.write(x[0])
            tokenized = clangFileASTList("tempWriteFile.c")
            f.close()

            if(cwe119[i] or cwe120[i]):
                hpList.append((tokenized, "BUFFER"))
            elif(cwe469[i] or cwe476[i]):
                hpList.append((tokenized, "MEMORY"))
            elif(cweOther[i]):
                # THIS IS A CHEAT FOR RIGHT NOW
                hpList.append((tokenized, "NUMERICAL"))
            else:
                hpList.append((tokenized, "CLEAN"))

    elif(USING_TOKEN_TYPE == 3):
        for i, x in enumerate(funcs):
            tokenized = tokenizeExtended(x)
            if(cwe119[i] or cwe120[i]):
                hpList.append((tokenized, "BUFFER"))
            elif(cwe469[i] or cwe476[i]):
                hpList.append((tokenized, "MEMORY"))
            elif(cweOther[i]):
                # THIS IS A CHEAT FOR RIGHT NOW
                hpList.append((tokenized, "NUMERICAL"))
            else:
                hpList.append((tokenized, "CLEAN"))
                
    pickleIntoFile(pickleFileHPFiles, hpList) 

---

# Machine Learning Setup Section
- Turn (a list of a) list of tokens into ngrams of a given length
- Output confusion matrix, accuracy and precision scores, and TP/FP/TN/FN results
- Print ROC Curves for different classifiers
- Prediction functions for the RFC
- Prediction functions for the LSTM and CNN

---

In [None]:
### GENERAL USE FUNCTIONS ###
def create_ngrams(listOfTokens, n):
    newList = []
    for k in listOfTokens:
        newSubList = []
        for i in range(0, len(k)-n):
            newSubList.append(tuple(k[i:i+n]))
        newList.append(newSubList)
    return newList

def outputResults(yT, yP, fName, TF):
    # Print Title (File Name)
    if(fName != None):
        half = "-" * ((64 - len(fName))//2)
        print(half + fName + half)
    
    # Confusion Matrix Construction
    C = confusion_matrix(yT, yP)
    
    # Labels
    labs = [x for x in set(yT)]
    labs.sort()
    
    # Figure Plot
    fig, ax = plt.subplots()
    ax = sns.heatmap(C, square=True, annot=True, cbar=False, 
                    fmt='g', cmap='viridis', norm=colors.LogNorm(),
                    xticklabels=labs, yticklabels=labs)
    plt.xlabel("Predicted Value")
    plt.ylabel("Actual Value")
    if(fName != None):
        plt.savefig(f"{fName}.png")
    plt.show()
    
    # Print Out Classification Report
    print(classification_report(yT, yP))
    print("="*64)

    # True / False Ratios (Only make sense for binary classifications)
    if(TF and len(C) == 2):
        tp = C[1][1]
        fp = C[0][1]
        tn = C[0][0]
        fn = C[1][0]

        print("="*32)
        print("True Positive: " + str(tp))
        print("False Positive: " + str(fp))
        print("Ratio: " + str( tp / (tp + fp) ))
        print("="*32)
        print("True Negative: " + str(tn))
        print("False Negative: " + str(fn))
        print("Ratio: " + str( tn / (tn + fn) ))
        print("="*32)

In [None]:
### (TFIDF + RFC) RELATED CODE ####
# ROC Curve (MIGHT MOVE LATER)
def printRFCROCCurve(MODEL, TFIDF, xList, yList):
    fpr = dict()
    tpr = dict()
    threshold = dict()
    roc_auc = dict()

    # Label/Classes
    labs = [x for x in set(yList)]
    labs.sort()

    # Binarize Classes
    yBinarized = label_binarize(yList, classes=labs)
    
    if(len(yBinarized[0]) == 1):
        yTrue = [int(x) for x in yList]
        if(len(CLASSDICT) != 2):
            predCols = [max(x[0], max(x[2:])) for x in MODEL.predict_proba(getTransformedData(TFIDF, xList, NGRAMS))]
        else:
            predCols = [x[1] for x in MODEL.predict_proba(getTransformedData(TFIDF, xList, NGRAMS))]
        
        fpr, tpr, threshold = roc_curve(yTrue, predCols)
        roc_auc = auc(fpr, tpr)

        # Plots
        plt.plot(fpr, tpr, linestyle="--", label=f"GOOD vs BAD | AUC = {str(roc_auc)[:5]}")
    
    else:      
        for i in range(len(labs)):
            binCols = [x[i] for x in yBinarized]
            predCols = [x[i] for x in MODEL.predict_proba(getTransformedData(TFIDF, xList, NGRAMS))]

            fpr[i], tpr[i], threshold[i] = roc_curve(binCols, predCols)
            roc_auc[i] = auc(fpr[i], tpr[i])

            # Plots
            plt.plot(fpr[i], tpr[i], linestyle="--", label=f"{labs[i]} vs REST | AUC = {str(roc_auc[i])[:5]}")

    plt.plot([0,1], [0,1], "--", color="black")
    plt.xlim([0,1])
    plt.ylim([0,1])

    if(USING_TOKEN_TYPE == 0):
        plt.title("ROC Curve for RFC - SIMPLE TOKENS")
    elif(USING_TOKEN_TYPE == 2):
        plt.title("ROC Curve for RFC - CLANG TOKENS")
    elif(USING_TOKEN_TYPE == 3):
        plt.title("ROC Curve for RFC - EXTENDED TOKENS")

    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.grid(True)
    plt.legend(loc="lower right")
    plt.show()


def printCNNROCCurve(MODEL, xList, yList, pad, tokens):
    fpr = dict()
    tpr = dict()
    threshold = dict()
    roc_auc = dict()

    # Label/Classes
    labs = [x for x in set(yList)]
    labs.sort()

    # Binarize Classes
    yBinarized = label_binarize(yList, classes=labs)

    for i in range(len(labs)):
        # To Save Data
        predCols = [] 
        binCols = [x[i] for x in yBinarized]

        # Create Data || ASSUME TENSORS
        for u in xList:
            # a = u.type(torch.FloatTensor)
            # b = torch.unsqueeze(a, 0)
            # f = torch.permute(b, (0, 2, 1))
            
            f = u.type(torch.FloatTensor)
            newF = f.to(DEVICE)
            d = MODEL(newF)[i]
            predCols.append(d)

            del newF
        
        # Generate Results
        fpr[i], tpr[i], threshold[i] = roc_curve(binCols, predCols)
        roc_auc[i] = auc(fpr[i], tpr[i])

        # Plots
        plt.plot(fpr[i], tpr[i], linestyle="--", label=f"{labs[i]} vs REST | AUC = {str(roc_auc[i])[:5]}")

    plt.plot([0,1], [0,1], "--", color="black")
    plt.xlim([0,1])
    plt.ylim([0,1])
    if(USING_TOKEN_TYPE == 0):
        plt.title("ROC Curve for CNN - SIMPLE TOKENS")
    elif(USING_TOKEN_TYPE == 2):
        plt.title("ROC Curve for CNN - CLANG TOKENS")
    elif(USING_TOKEN_TYPE == 3):
        plt.title("ROC Curve for CNN - EXTENDED TOKENS")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.grid(True)
    plt.legend(loc="lower right")
    plt.show()


def predictCode(TFIDF, MODEL, CODE, FN, N_NGRAMS):
    temp = create_ngrams([FN(CODE)], N_NGRAMS)
    tokenMatrix = TFIDF.transform(temp)
    return MODEL.predict(tokenMatrix)

def getTransformedData(TFIDF, TOKENLIST, N_NGRAMS):
    temp = create_ngrams(TOKENLIST, N_NGRAMS)
    return TFIDF.transform(temp)
    
def predictTokens(TFIDF, MODEL, TOKENLIST, N_NGRAMS):
    temp = create_ngrams(TOKENLIST, N_NGRAMS)
    return MODEL.predict(TFIDF.transform(temp))

def trainRFC(N_NGRAMS, N_ESTIMATORS, SPLITSIZE, LIST, TF, FNAME):
    # Final Preprocessing
    LOCALLIST = LIST
    shuffle(LOCALLIST)

    # Create Ngram Lists
    tempList = [x[0] for x in LOCALLIST]
    funcs = [(y, x[1]) for x, y in zip(LOCALLIST, create_ngrams(tempList, N_NGRAMS))]
    
    # Create X and Y lists for the data
    funcsX = [x[0] for x in funcs]
    funcsY = [x[1] for x in funcs]
    
    # Split into Train and Test Data
    xTrain, xTest, yTrain, yTest = train_test_split(funcsX, funcsY, test_size=SPLITSIZE)
    
    # TFIDF Vectorization
    tfidf = TfidfVectorizer(tokenizer=lambda x: x, preprocessor=lambda y: y)
    total = xTrain + xTest
    tfFit = tfidf.fit_transform(total)
    
    # CREATE & TRAIN RFC
    count = len(xTrain)
    model = RandomForestClassifier(n_estimators=N_ESTIMATORS)
    model.fit(tfFit[0:count], yTrain)
    yPred = model.predict(tfFit[count:])
    
    # Output Results
    outputResults(yTest, yPred, FNAME, TF)
    
    # Return Trained RFC Model
    return tfidf, model

In [None]:
def createTensor(seq, pad):
    minLen = min(pad, len(seq))

    if(USING_TOKEN_TYPE == 0):
        tensor = torch.zeros(pad, len(TOKENDICT))
        for u in range(minLen):
            index = 0
            for i in TOKENDICT.keys():
                if(seq[u] == i):
                    index = TOKENDICT[i]
                    break
            tensor[u][index] = 1
        for i in range(minLen, pad):
            tensor[i] = torch.zeros(len(TOKENDICT))

        #     index = [i for i in INVTOKENDICT if INVTOKENDICT[i] == seq[u]][0]
        #     tensor[u][index] = 1
        # for i in range(minLen, PADDING_LEN):
        #         tensor[i] = torch.zeros(len(TOKENDICT))

    elif(USING_TOKEN_TYPE == 3):
        tensor = torch.zeros(pad, len(EXTENDEDTOKENS))
        for u in range(minLen):
            index = [i for i in INVEXTENDEDTOKENS if INVEXTENDEDTOKENS[i] == seq[u]][0]
            tensor[u][index] = 1
        for i in range(minLen, pad):
            tensor[i] = torch.zeros(len(EXTENDEDTOKENS))
    
    elif(USING_TOKEN_TYPE == 2):
        tensor = torch.zeros(pad, len(CLANGTOKENS))
        for u in range(minLen):
            index = 0
            for i in CLANGTOKENS.keys():
                if(seq[u] == i):
                    index = CLANGTOKENS[i]
                    break
            tensor[u][index] = 1
        for i in range(minLen, pad):
            tensor[i] = torch.zeros(len(CLANGTOKENS))
            
    return tensor

def getCategoryFromOutputTensor(out):
    categoryIdx = torch.argmax(out).item()
    return CLASSDICT[categoryIdx]

def predict(MODEL, inputFunc, isLast):
    with torch.no_grad():
        out = MODEL(inputFunc)
        if(not isLast):
            return getCategoryFromOutputTensor(out)
        else:
            return getCategoryFromOutputTensor(out[-1])
    
def predictFunctions(MODEL, funcList, isLast):
    retList = []
    for k in funcList:
        newK = k.to(DEVICE)
        retList.append(predict(MODEL, newK, isLast))
        del newK
    return retList

In [None]:
### RNN RELATED CODE ###
class LSTM(nn.Module):
    def __init__(self, inSize, lstmLayers, hiddenSize1, hiddenSize2, numClasses, direction):
        super(LSTM, self).__init__()
        self.lstm = nn.LSTM(inSize, hiddenSize1, num_layers=lstmLayers, bidirectional=direction)

        self.lin1 = None
        if(direction):
            self.lin1 = nn.Linear(hiddenSize1*2, hiddenSize2)
        else:
            self.lin1 = nn.Linear(hiddenSize1, hiddenSize2)
        self.lin2 = nn.Linear(hiddenSize2, numClasses)
        self.drop = nn.Dropout(0.25)
        self.soft = nn.Softmax(dim=1)
        
    def forward(self, x):
        outA, (h, c) = self.lstm(x)
        outB = F.relu(self.drop(self.lin1(outA)))
        outC = F.relu(self.drop(self.lin2(outB)))
        # outD = self.soft(outC) ## NOT NEEDED WHEN USING nn.CrossEntropyLoss()
        return outC

In [None]:
### CNN RELATED CODE ###
class CNN(nn.Module):
    def __init__(self, inSize, convSizes, linSizes, numClasses):
        super(CNN, self).__init__()
        self.kernels = [16, 8]
        self.conv1 = nn.Conv1d(inSize, convSizes[0], self.kernels[0])
        self.conv2 = nn.Conv1d(convSizes[0], convSizes[1], self.kernels[1])
        self.lin1 = nn.Linear(convSizes[1] * (PADDING_LEN - sum(self.kernels) + len(self.kernels)), linSizes[0])
        self.lin2 = nn.Linear(linSizes[0], numClasses)

    def forward(self, x):
        outA = F.relu(self.conv1(x))
        outB = F.relu(self.conv2(outA))
        outB = torch.flatten(outB)
        
        outC = F.relu(self.lin1(outB))
        outD = self.lin2(outC)
        return outD

---

# Functionality Section
### Final Before ML Stuff
1. Separate tokenizedFuncs list into "good" and "bad" lists for balancing later
2. Shuffle these lists to ensure "fair" chance at random functions being used
3. Balance lists (same number of good and bad) into Training (both train and test) and Validation lists

---

In [None]:
badJulietList = [x for x in tokenizedFuncs if x[1] != "CLEAN"]
goodJulietList = [x for x in tokenizedFuncs if x[1] == "CLEAN"]

badHPList = [x for x in hpList if x[1] != "CLEAN"]
goodHPList = [x for x in hpList if x[1] == "CLEAN"]

shuffle(goodHPList)
shuffle(badHPList)
shuffle(goodJulietList)
shuffle(badJulietList)

JNUM = int(0.33 * len(badJulietList))
HNUM = int(0.5 * len(badHPList))

RAT = 1.0

JRAT = int(RAT * len(goodJulietList) / len(badJulietList))
HRAT = int(RAT * len(goodHPList) / len(badHPList))

badList = badJulietList[:JNUM] + badHPList[:HNUM]
badValidateList = badJulietList[JNUM:] # + badHPList[HNUM:]

goodList = goodJulietList[:JRAT * JNUM] + goodHPList[:HRAT * HNUM]
goodValidateList = goodJulietList[JRAT * JNUM:] # + goodHPList[HRAT * HNUM: (HRAT*HNUM + len(goodJulietList))]

### Binary Training Use
if(USING_CLASSES == 1):
    goodList = [(x[0], "0") for x in goodList]
    badList = [(x[0], "1") for x in badList]

    goodValidateList = [(x[0], "0") for x in goodValidateList]
    badValidateList = [(x[0], "1") for x in badValidateList]

trainList = badList + goodList
validateList = badValidateList + goodValidateList
validateListX = [x[0] for x in validateList]
validateListY = [x[1] for x in validateList]

shuffle(trainList)
shuffle(validateList)

print("="*40)
print(f"Good Label\t| Buggy\t| Train = {JNUM}")
print(f"Good Label\t| Clean\t| Train = {HNUM}")
print(f"Bad Label\t| Buggy\t| Train = {JNUM * JRAT}")
print(f"Bad Label\t| Clean\t| Train = {HNUM * HRAT}")
print("="*40)
print(f"Good Label\t| Buggy\t| Test = {len(badJulietList) - JNUM}")
print(f"Good Label\t| Clean\t| Test = {len(goodJulietList) - (JNUM*JRAT)}")
print("="*40)
print(f"Len of Train = {len(trainList)}")
print(f"Len of Test = {len(validateList)}")
print(f"Ratio Train/Test = {len(trainList) / len(validateList)}")
print("="*40)

In [None]:
# IGNORE THIS CODE
# This was an attempt at using "unknown" files and testing them but it did not yield good results

# unknownList = []

# if(USING_TOKEN_TYPE == 0):
#     unknownList = [(tokenizeWithoutValue(x[0]), x[1]) for x in unknownFiles]
# elif(USING_TOKEN_TYPE == 1):
#     unknownList = [(tokenizeString(x[0]), x[1]) for x in unknownFiles]
# elif(USING_TOKEN_TYPE == 2):
#     for x in unknownFiles:
#         f = open("tempWriteFile.c", "w")
#         f.write(x[0])
#         unknownList.append((clangFileASTList("tempWriteFile.c"), x[1]))
#         f.close()
# elif(USING_TOKEN_TYPE == 3):
#     unknownList = [(tokenizeExtended(x[0]), x[1]) for x in unknownFiles]

In [None]:
def oneHotEncode(inputList, tokenList, encType, padLen):
    ohe = []
    
    if(encType == 0):
        ohe = [torch.zeros(padLen, NUM_TOKENS) for x in inputList]
        for u in range(len(inputList)):
            minLen = min(len(inputList[u][0]), padLen)
            for v in range(minLen):
                index = 0
                for i in tokenList.keys():
                    if(inputList[u][0][v] == i):
                        index = tokenList[i]
                        break
                ohe[u][v][index] = 1
            for i in range(minLen, padLen):
                ohe[u][i] = torch.zeros(NUM_TOKENS)
        # [functionX BY lenghtOfFuncX BY numOfTokens] where token type = 1

    elif(encType == 1):
        ohe = [torch.zeros(padLen) for x in inputList]
        for u in range(len(inputList)):
            minLen = min(len(inputList[u][0]), padLen)
            for v in range(minLen):
                index = 0
                for i in tokenList.keys():
                    if(inputList[u][0][v] == i):
                        index = tokenList[i]
                        break
                ohe[u][v] = index
            for i in range(minLen, padLen):
                ohe[u][i] = -1
        # [functionX BY lenghtOfFuncX] where VALUE = token type

    return ohe

## RFC Running Code

In [None]:
TFIDF, RFC = trainRFC(NGRAMS, ESTIMATORS, TESTTRAINRATIO, trainList, True, f"CWE-{NGRAMS}-{ESTIMATORS}-Training")

predicted = predictTokens(TFIDF, RFC, validateListX, NGRAMS)

outputResults(validateListY, predicted, f"CWE-{NGRAMS}-{ESTIMATORS}-Testing", True)

# outputResults(["0" if x == "CLEAN" else "1" for x in validateListY], ["0" if x == "CLEAN" else "1" for x in predicted], f"CWE-{NGRAMS}-{ESTIMATORS}-Testing", True)

del predicted

In [None]:
printRFCROCCurve(RFC, TFIDF, validateListX, validateListY)

In [None]:
# printRFCROCCurve(RFC, TFIDF, validateListX, ["0" if x == "CLEAN" else "1" for x in validateListY])

In [None]:
# unknownPredict = predictTokens(TFIDF, RFC, [x[0] for x in unknownList], NGRAMS)

# outputResults([x[1] for x in unknownList], unknownPredict, None, True)

# outputResults(["0" if x[1] == "CLEAN" else "1" for x in unknownList], ["0" if x == "CLEAN" else "1" for x in unknownPredict], None, True)

# del unknownPredict

# probs = RFC.predict_proba(getTransformedData(TFIDF, [x[0] for x in unknownList], NGRAMS))

## Pre-processing for LSTM and CNN (NEED TO RUN)

In [None]:
L = None

# 0 -> Pad all token sequences to PADDING_LEN
# 1 -> Do not pad the sequences (error when using CNN)
ENCODING_TYPE = 0

PADDING_LEN = -1

if(USING_TOKEN_TYPE == 0):
    L = TOKENDICT
    PADDING_LEN = 1536
elif(USING_TOKEN_TYPE == 2):
    L = CLANGTOKENS
    PADDING_LEN = 1536
elif(USING_TOKEN_TYPE == 3):
    L = EXTENDEDTOKENS
    PADDING_LEN = 1536

NUM_TOKENS = len(L)
print(f"#TOKENS = {NUM_TOKENS}")

# if(USING_TOKEN_TYPE == 0):
#     oneHotEncodingFunctions = oneHotEncode(trainList, L, ENCODING_TYPE, PADDING_LEN)

oneHotEncodingCategories = [torch.zeros(1, N_CLASSES) for _ in trainList]

for u in range(len(oneHotEncodingCategories)):
    index = [i for i in CLASSDICT if CLASSDICT[i] == trainList[u][1]][0]
    oneHotEncodingCategories[u][0][index] = 1
# [classificationX BY 1 BY numOfClasses] where classification type = 1

In [None]:
# print(CLASSDICT[1])

In [None]:
maxLen = 0
for x in validateListX:
    if(len(x) > maxLen):
        maxLen = len(x)
print(maxLen)

In [None]:
print(oneHotEncodingCategories[0].size())
# [#funcs BY 1 BY 5]

# print(oneHotEncodingFunctions[0].size())
# [#funcs BY padding BY NumTokens]
# print(oneHotEncodingFunctions[0])

## LSTM Code

In [None]:
N_LSTM_LAYERS = 3
HIDDEN_DIM_1 = 128
HIDDEN_DIM_2 = 64
N_EPOCHS = 2
LEARNING_RATE = 0.15
if(USING_TOKEN_TYPE == 0):
    LEARNING_RATE = 0.033
if(USING_TOKEN_TYPE == 3):
    LEARNING_RATE = 0.1
BIDIRECTIONAL = False
rnn = LSTM(len(L), N_LSTM_LAYERS, HIDDEN_DIM_1, HIDDEN_DIM_2, N_CLASSES, BIDIRECTIONAL).to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(rnn.parameters(), lr=LEARNING_RATE, momentum=0.5)

In [None]:
for E in range(N_EPOCHS):
    avgLoss = 0
    accScore = 0
    mod = 7500

    # Trying to reduce memory usage
    oheF = [x[0] for x in trainList]

    oheC = oneHotEncodingCategories

    newList = list(zip(oheF, oheC))
    shuffle(newList)
    oheFN, oheCN = zip(*newList)

    for i, (funcs, cats) in enumerate(zip(oheFN, oheCN)):
        temp = funcs

        # Trying to reduce memory
        temp = torch.zeros(PADDING_LEN, NUM_TOKENS)
        minLen = min(len(funcs), PADDING_LEN)
        for v in range(minLen):
            index = 0
            for p in L.keys():
                if(funcs[v] == p):
                    index = L[p]
                    break
            temp[v][index] = 1
        for o in range(minLen, PADDING_LEN):
            temp[o] =  torch.zeros(NUM_TOKENS)

        funcs = temp.type(torch.FloatTensor)
        funcs = funcs.to(DEVICE)

        cats = cats.type(torch.FloatTensor)
        cats = cats.to(DEVICE)

        optimizer.zero_grad()
        outs = rnn(funcs)
        loss = criterion(outs[-1], cats[0])
        loss.backward()
        optimizer.step()

        avgLoss += loss.item()
        if torch.argmax(outs[-1]).item() == torch.argmax(cats[0]).item():
            accScore += 1 
        
        if (i+1)%mod == 0:
            print(f"DONE {i+1} ITERATIONS" + f" || {str(100 * i / len(oheF))[:5]}% COMPLETE" + f" || AVG LOSS {avgLoss / i}" + f" || ACCURACY = {accScore / i}")
            print("="*96)
            
    print(f"---- EPOCH {E+1} COMPLETE ----\n")
    ###
    # randFuncs = [x[0] for x in trainList]
    # randCategories = [x[1] for x in trainList]
    randFuncs = []
    randCategories = []
    for ri in range(len(validateListX)//5):
        # ri = randint(0, len(validateListX)-1)
        funcs = validateListX[ri]

        temp = torch.zeros(PADDING_LEN, NUM_TOKENS)
        minLen = min(len(funcs), PADDING_LEN)
        for v in range(minLen):
            index = 0
            for p in L.keys():
                if(funcs[v] == p):
                    index = L[p]
                    break
            temp[v][index] = 1
        for o in range(minLen, PADDING_LEN):
            temp[o] =  torch.zeros(NUM_TOKENS)

        funcs = temp.type(torch.FloatTensor)
        randFuncs.append(funcs)
        
        randCategories.append(validateListY[ri])
    predicateT = predictFunctions(rnn, randFuncs, True)
    outputResults(randCategories, predicateT, None, True)

    print("~"*64)
    # predV = [createTensor(u) for u in validateListX]
    # predicateV = predictFunctions(rnn, predV, True)
    # outputResults(validateListY, predicateV, None, True)
    ###

In [None]:
###
randOuts = []
predVals = []
fpr = dict()
tpr = dict()
threshold = dict()
roc_auc = dict()

# Label/Classes
labs = [x for x in set(validateListY)]
labs.sort()

# Binarize Classes
yBinarized = label_binarize(validateListY, classes=labs)
ll = len(validateListX)
for ri in range(ll):
    newF = validateListX[ri]
    temp = torch.zeros(PADDING_LEN, NUM_TOKENS)
    minLen = min(len(newF), PADDING_LEN)
    for v in range(minLen):
        index = 0
        for i in L.keys():
            if(newF[v] == i):
                index = L[i]
                break
        temp[v][index] = 1
    for i in range(minLen, PADDING_LEN):
        temp[i] = torch.zeros(NUM_TOKENS)

    temp = temp.type(torch.FloatTensor)
    # k = torch.unsqueeze(temp, 0)
    # rf = torch.permute(k, (0, 2, 1))
    rf = temp

    with torch.no_grad():
        newU = rf.to(DEVICE)
        randOuts.append(rnn(newU).cpu()[-1])

    del rf
    del temp

######

# for i in range(len(labs)):
#     # Generate Results
#     fpr[i], tpr[i], threshold[i] = roc_curve([x[i] for x in yBinarized[:ll]], [e[i] for e in randOuts])
#     roc_auc[i] = auc(fpr[i], tpr[i])
#     # Plots
#     plt.plot(fpr[i], tpr[i], linestyle="--", label=f"{labs[i]} vs REST | AUC = {str(roc_auc[i])[:5]}")
#     print(f"DONE {i+1} CLASSES")

######
predCols = []

for r in randOuts:
    # x = max(r[0], max(r[2:])) # Use for multiclass
    x = r[1]  # Use for Binary
    predCols.append(x)
        
# fpr, tpr, threshold = roc_curve([0 if x == "CLEAN" else 1 for x in validateListY], predCols) # Use for multiclass
fpr, tpr, threshold = roc_curve([0 if x == "0" else 1 for x in validateListY], predCols) # Use for binary
roc_auc = auc(fpr, tpr)


# Plots
plt.plot(fpr, tpr, linestyle="--", label=f"CLEAN vs BUGGY | AUC = {str(roc_auc)[:5]}")

######


###
plt.plot([0,1], [0,1], "--", color="black")
plt.xlim([0,1])
plt.ylim([0,1])
if(USING_TOKEN_TYPE == 0):
    plt.title("ROC Curve for LSTM - SIMPLE TOKENS")
elif(USING_TOKEN_TYPE == 2):
    plt.title("ROC Curve for LSTM - CLANG TOKENS")
elif(USING_TOKEN_TYPE == 3):
    plt.title("ROC Curve for LSTM - EXTENDED TOKENS")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.grid(True)
plt.legend(loc="lower right")
plt.show()
###

In [None]:
print(validateListY[0], randOuts[0])
outputResults(validateListY, [getCategoryFromOutputTensor(x) for x in randOuts], None, False)

## CNN CODE

In [None]:
# Using Original Token Types padded to 1536 (Very good for known-files / UNK for kernel files)
# Using CLANG Token Types padded to 3072 ([SLOW] UNK for known-files / UNK for kernel files)

N_EPOCHS = 2
cnn = CNN(len(L), [256, 128], [64], N_CLASSES).to(DEVICE)
criterion = nn.CrossEntropyLoss()

LEARNING_RATE = 0.01
MOMENTUM = 0.33
if(USING_TOKEN_TYPE == 2):
    LEARNING_RATE = 0.01
    MOMENTUM = 0.66
if(USING_TOKEN_TYPE == 3):
    LEARNING_RATE = 0.002
    MOMENTUM = 0.25
optimizer = torch.optim.SGD(cnn.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM)

In [None]:
for E in range(N_EPOCHS):
    avgLoss = 0
    accScore = 0
    mod = 7500

    # Trying to reduce memory usage
    oheF = [x[0] for x in trainList]

    oheC = oneHotEncodingCategories

    newList = list(zip(oheF, oheC))
    shuffle(newList)
    oheFN, oheCN = zip(*newList)

    for i, (funcs, cats) in enumerate(zip(oheFN, oheCN)):
        temp = funcs

        # Trying to reduce memory
        temp = torch.zeros(PADDING_LEN, NUM_TOKENS)
        minLen = min(len(funcs), PADDING_LEN)
        for v in range(minLen):
            index = 0
            for p in L.keys():
                if(funcs[v] == p):
                    index = L[p]
                    break
            temp[v][index] = 1
        for o in range(minLen, PADDING_LEN):
            temp[o] =  torch.zeros(NUM_TOKENS)

        funcs = temp.type(torch.FloatTensor)

        funcs = torch.unsqueeze(funcs, 0)
        funcs = torch.permute(funcs, (0, 2, 1))
        funcs = funcs.to(DEVICE)
        
        cats = cats.type(torch.FloatTensor)
        cats = cats.to(DEVICE)

        optimizer.zero_grad()
        outs = cnn(funcs)
        loss = criterion(outs, cats[0])
        loss.backward()
        optimizer.step()

        del temp
        del funcs

        avgLoss += loss.item()
        if torch.argmax(outs).item() == torch.argmax(cats[0]).item():
            accScore += 1 

        if (i+1)%mod == 0:
            print(f"DONE {i+1} ITERATIONS" + f" || {str(100 * i / len(oheF))[:5]}% COMPLETE" + f" || SET AVG LOSS {avgLoss / mod}" + f" || SET ACCURACY = {accScore / mod}")
            print("="*96)
            accScore = 0
            avgLoss = 0
            
    print(f"---- EPOCH {E+1} COMPLETE ----\n")

In [None]:
###
randOuts = []
predVals = []
fpr = dict()
tpr = dict()
threshold = dict()
roc_auc = dict()

# Label/Classes
labs = [x for x in set(validateListY)]
labs.sort()

# Binarize Classes
yBinarized = label_binarize(validateListY, classes=labs)
ll = len(validateListX)
for ri in range(ll):
    newF = validateListX[ri]
    temp = torch.zeros(PADDING_LEN, NUM_TOKENS)
    minLen = min(len(newF), PADDING_LEN)
    for v in range(minLen):
        index = 0
        for i in L.keys():
            if(newF[v] == i):
                index = L[i]
                break
        temp[v][index] = 1
    for i in range(minLen, PADDING_LEN):
        temp[i] = torch.zeros(NUM_TOKENS)

    temp = temp.type(torch.FloatTensor)
    k = torch.unsqueeze(temp, 0)
    rf = torch.permute(k, (0, 2, 1))

    with torch.no_grad():
        newU = rf.to(DEVICE)
        randOuts.append(cnn(newU).cpu())

######

# for i in range(len(labs)):
#     # Generate Results
#     fpr[i], tpr[i], threshold[i] = roc_curve([x[i] for x in yBinarized[:ll]], [e[i] for e in randOuts])
#     roc_auc[i] = auc(fpr[i], tpr[i])
#     # Plots
#     plt.plot(fpr[i], tpr[i], linestyle="--", label=f"{labs[i]} vs REST | AUC = {str(roc_auc[i])[:5]}")
#     print(f"DONE {i+1} CLASSES")

######
predCols = []

for r in randOuts:
    # x = max(r[0], max(r[2:])) # Use for multiclass
    x = r[1] # Use for binary
    predCols.append(x)
        
# fpr, tpr, threshold = roc_curve([0 if x == "CLEAN" else 1 for x in validateListY], predCols) # Use for multiclass
fpr, tpr, threshold = roc_curve([0 if x == "0" else 1 for x in validateListY], predCols) # Use for binary
roc_auc = auc(fpr, tpr)


# Plots
plt.plot(fpr, tpr, linestyle="--", label=f"CLEAN vs BUGGY | AUC = {str(roc_auc)[:5]}")

######


###
plt.plot([0,1], [0,1], "--", color="black")
plt.xlim([0,1])
plt.ylim([0,1])
if(USING_TOKEN_TYPE == 0):
    plt.title("ROC Curve for CNN - SIMPLE TOKENS")
elif(USING_TOKEN_TYPE == 2):
    plt.title("ROC Curve for CNN - CLANG TOKENS")
elif(USING_TOKEN_TYPE == 3):
    plt.title("ROC Curve for CNN - EXTENDED TOKENS")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.grid(True)
plt.legend(loc="lower right")
plt.show()
###

In [None]:
print(validateListY[0], randOuts[0])
outputResults(validateListY, [getCategoryFromOutputTensor(x) for x in randOuts], None, False)

In [None]:
# unkFunc = []
# unkCat = []
# for i in range(len(unknownList)):
#     newFunc = unknownList[i][0]
#     newCat = unknownList[i][1]

#     temp = torch.zeros(PADDING_LEN, NUM_TOKENS)
#     minLen = min(len(newFunc), PADDING_LEN)

#     for v in range(minLen):
#         index = 0
#         for i in L.keys():
#             if(newFunc[v] == i):
#                 index = L[i]
#                 break
#         temp[v][index] = 1
#     for i in range(minLen, PADDING_LEN):
#         temp[i] =  torch.zeros(NUM_TOKENS)

#     temp = temp.type(torch.FloatTensor)
#     k = torch.unsqueeze(temp, 0)
#     unkFunc.append(torch.permute(k, (0, 2, 1)))
#     unkCat.append(newCat)

In [None]:
# predicateT = predictFunctions(cnn, unkFunc, False)
# outputResults(unkCat, predicateT, None, True)

# binaryPred = ["0" if k == "CLEAN" else "1" for k in predicateT]
# binaryCats = ["0" if l == "CLEAN" else "1" for l in unkCat]

# outputResults(binaryCats, binaryPred, None, True)