## Startup

In [None]:
!pip install transformers

In [None]:
import torch
from transformers import AutoTokenizer, AutoModel, RobertaTokenizer, RobertaModel, RobertaConfig

# CodeBERT
# config = RobertaConfig.from_pretrained("microsoft/codebert-base", output_hidden_states=True)
# tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base")
# model = RobertaModel.from_pretrained("microsoft/codebert-base", config=config)

# GraphCodeBERT
config = RobertaConfig.from_pretrained("microsoft/graphcodebert-base", output_hidden_states=True)
tokenizer = AutoTokenizer.from_pretrained("microsoft/graphcodebert-base")
model = AutoModel.from_pretrained("microsoft/graphcodebert-base", config=config)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

model.eval() # disable dropout etc.

In [None]:
# If there's a GPU available...
if torch.cuda.is_available():    
    # Tell PyTorch to use the GPU.    
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
# If not...
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

## Helper Functions

In [None]:
!pip install beautifulsoup4

from pygments import highlight
from pygments.lexers import PhpLexer, Python3Lexer, RubyLexer, GoLexer, JavaLexer, JavascriptLexer
from pygments.formatters import HtmlFormatter
from bs4 import BeautifulSoup

In [None]:
def code2vec(code):
    encoded = tokenizer.encode(
        code,                      # Sentence to encode.
        add_special_tokens = True, # Add '[CLS]' and '[SEP]'
        max_length = 512
    )
    # print(encoded)
    # print(tokenizer.tokenize(code))
    encoded = torch.LongTensor(encoded).unsqueeze(0)
    encoded = encoded.to(device)
    
    with torch.no_grad():
        out = model(input_ids=encoded)

    # sum of last 4 layers
    # vector = torch.cat((out.hidden_states[-4], out.hidden_states[-3], out.hidden_states[-2], out.hidden_states[-1]), dim=0)
    vector = torch.stack(out.hidden_states[-4:]).sum(0)

    return vector

In [None]:
# def array2vec(snippet, sourcesinkonly = False):
#     """Calc vec for each line of code in snippet list and average the vecs.
#     """
#     vecs = []
#     for loc in snippet:
#         # vec representation exclude [CLS] and [SEP] token
#         vec = code2vec(loc['code'])[:,1:-1,:]
#         # mean loc vecs and reshape [1, 768] to [768]
#         vec = torch.mean(vec, 1).squeeze(0)
#         vecs.append(vec)

#     if len(vecs) == 1:
#         return vecs[0]
#     # apply weights on source to sink depending on locs
#     elif sourcesinkonly:
#         return 0.75 * vecs[0] + 0.25 * vecs[-1]
#         # return torch.mean(torch.stack([vecs[0], vecs[-1]]), 0)
#     else:
#         return torch.mean(torch.stack(vecs), 0)

In [None]:
def array2vec(snippet, sinkfirst=False):
    snippet = [i["code"] for i in snippet]
    # source first
    if sinkfirst == False:
        snippet = snippet[::-1]
    code = "\n".join(snippet)
    vec = code2vec(code)[:,1:-1,:]
    vec = torch.mean(vec, 1).squeeze(0)
    return vec

In [None]:
def get_iss_by_id(id, dicts):
    return next(item for item in dicts if item["id"] == id)

In [None]:
def tensor_to_fix_length(input, length):
    if input.shape[0] > length:
        input = torch.narrow(input, 0, 0, length)
    else:
        pad = length - input.shape[0]
        zeros = torch.zeros((pad, 768))
        input = torch.cat((input, zeros), 0)
    return input

## Calculating Embeddings

In [None]:
import json

with open('../output/java_taints_cleaned.json') as json_file:
    feats = json.load(json_file)

for iss in feats:
    if "cleared" in iss and len(iss["cleared"]) > 0:
        vec_cleared = array2vec(iss["cleared"], False)
    else:
        continue
    iss['embedding'] = vec_cleared

## Evaluation 4
Check a0 against all others. Look at the majority of rankings inside the |A| window. Assign label.

In [None]:
from torch import nn
import operator
import copy

# compare all issues to a query. first similar should be from same category.
def evaluate_categories_3(categories, feats, category_check=True):
    cos = nn.CosineSimilarity(dim=0)

    # measures per category
    measures = [{
        "tp": 0,
        "fp": 0,
        "fn": 0,
    } for _ in range(len(categories))]

    # loop through all categories
    for (i_cat, category) in enumerate(categories):
        for (i_query, query_id) in enumerate(category):
            # put all similarity results in this array
            results = []

            query = get_iss_by_id(query_id, feats)
            others = copy.deepcopy(categories)
            del others[i_cat][i_query]

            # compare to all other issues
            for i_check, check_cat in enumerate(others):
                for check_iss in check_cat:
                    check = get_iss_by_id(check_iss, feats)
                    sim = cos(check['embedding'], query['embedding'])
                    results.append({
                        "bucket": i_check,
                        "similarity": sim
                    })
            
            # sort results by similarity score
            results = sorted(results, key=lambda k: k['similarity'], reverse=True)
            # we only care about the results inside the bucket size
            results = results[:len(category) - 1]
            # check majority category of results
            counts = dict()
            for i in results:
                counts[i['bucket']] = counts.get(i['bucket'], 0) + 1
            majority = max(counts.items(), key=operator.itemgetter(1))[0]

            if i_cat == majority:
                measures[i_cat]['tp'] += 1
            else:
                measures[i_cat]['fn'] += 1
                measures[majority]['fp'] += 1

    return measures

In [None]:
with open('../data/label_java_xss.json') as json_file:
    categories = json.load(json_file)

measures = evaluate_categories_3(categories, feats)
for measure in measures:
    precision = measure['tp'] / (measure['tp'] + measure['fp']) if (measure['tp'] + measure['fp']) else 0
    recall = measure['tp'] / (measure['tp'] + measure['fn']) if (measure['tp'] + measure['fn']) else 0
    f1 = 2 * ((precision * recall) / (precision + recall)) if (precision + recall) else 0
    beta = 2
    f_beta = (1 + beta**2) * ((precision * recall) / (beta**2 * precision + recall)) if (precision + recall) else 0
    
    print(f"F1: {f1}")
    print("-----------")

## Evaluation 5
Check a0 against all others. Look at rank 1 and apply label.

In [None]:
from torch import nn
import operator
import copy

# compare all issues to a query. first similar should be from same category.
def evaluate_categories_4(categories, feats, threshold = 1):
    cos = nn.CosineSimilarity(dim=0)

    # measures per category
    measures = [{
        "true": 0,
        "false": 0,
    } for _ in range(len(categories))]

    # loop through all categories
    for (i_cat, category) in enumerate(categories):
        for (i_query, query_id) in enumerate(category):
            # put all similarity results in this array
            results = []

            query = get_iss_by_id(query_id, feats)
            others = copy.deepcopy(categories)
            del others[i_cat][i_query]

            # compare to all other issues
            for i_check_cat, check_cat in enumerate(others):
                for check_iss in check_cat:
                    check = get_iss_by_id(check_iss, feats)
                    sim = cos(check['embedding'], query['embedding'])
                    results.append({
                        "bucket": i_check_cat == i_cat,
                        "similarity": sim
                    })
            
            # sort results by similarity score
            results = sorted(results, key=lambda k: k['similarity'], reverse=True)

            # check if one under the first threshold results comes from the same category
            found = False
            for i in range(threshold):
                if results[i]["bucket"] == True:
                    measures[i_cat]["true"] += 1
                    found = True
                    break
            if found == False:
                measures[i_cat]["false"] += 1
    
    return measures

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt

with open('../data/label_java_xss.json') as json_file:
    categories = json.load(json_file)

for threshold in [1, 3, 5]:
    print(f"Threshold: {threshold}")
    measures = evaluate_categories_4(categories, feats, threshold)
    for i_cat, cat in enumerate(measures):
        print(f"Kategorie {i_cat + 1}")
        print(cat['true'] / (cat['true'] + cat['false']))
    print("---------------")