In [1]:


import sys
import json
import pickle
from transformers import (RobertaConfig, RobertaForSequenceClassification, RobertaTokenizer)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
sys.path.append('/Users/mahbubcseju/Desktop/projects/vul_detect')

In [3]:
with open("../../data/devign/test.jsonl", 'r') as f:
    data = []
    for  line in f:
        try:
            data.append(json.loads(line.strip()))
        except Exception as e:
            print(e)


In [4]:
explanations = pickle.load(open('explanation_devign.pkl', 'rb'))

In [5]:
config_class, model_class, tokenizer_class = RobertaConfig, RobertaForSequenceClassification, RobertaTokenizer
tokenizer = tokenizer_class.from_pretrained("microsoft/codebert-base")

In [6]:
def convert_examples_to_features(js,tokenizer,args):
    #source
    # codes = remove_comments(js['func'])
    code=' '.join(js['func'].split())
    # code=code.replace("{", "").replace("}", "")
    # code = ' '.join(codes.split())
    code_tokens=tokenizer.tokenize(code)[:args.block_size-2]
    source_tokens =[tokenizer.cls_token]+code_tokens+[tokenizer.sep_token]
    source_ids =  tokenizer.convert_tokens_to_ids(source_tokens)
    padding_length = args.block_size - len(source_ids)
    source_ids+=[tokenizer.pad_token_id]*padding_length
    return 

In [7]:
def get_max_match(original, start_index,  tokens):
    lo = len(original)
    lt = len(tokens)
    ind , max_score = 0, lo
    for i in range(start_index, lo-lt+ 1):
        found = True
        for j in range(lt):
            if original[i + j] == tokens[j] or original[i+ j][1:] == tokens[j]:
                continue
            if  original[i + j] != tokens[j]:
                found = False
        if found:
            return [x for x in range(i, i + lt)]
    return []

In [8]:
def calculate_score(scores, ids):
    sum = 0
    for id in ids:
        sum += scores[id]
    return 0 if len(ids) == 0 else sum / len(ids)

In [9]:
def convert_to_token_lines(tokens, code):
    code = ' '.join(code.split())
    lines = code.split("\n")
    start_index = 0
    token_lines = []
    for line in lines:
        # code = ' '.join(line.split())
        line_tokens = tokenizer.tokenize( code)
        ids = get_max_match(tokens, start_index, line_tokens)
        if  ids:
            token_lines.append(tokens[ids[0]: ids[-1] + 1])
            start_index = ids[-1] + 1
        else:
            token_lines.append([])
    return token_lines

In [10]:
results = []
for sample, explanation in zip(data, explanations):
    assert sample['idx'] == int(explanation['id'][0])
    assert len(explanation['src_nodes']) == len(explanation['des_nodes']) == len(explanation['edge_mask']), sample['idx']
    # print(explanation.keys())
    # print({i: tok[0] for i, tok in enumerate(explanation['tokens']) if tok[0][0] == 'Ġ'})
    # print("Src  Nodes: ", explanation['src_nodes'])
    # print("Destination nodes: ", explanation['des_nodes'])
    # print("edge_mask: ", min(explanation['edge_mask'].cpu().numpy()))
    tokens = [tok[0] for tok in explanation['tokens']]
    vocabs = list(set(tokens))
    vocabs_line_dict = {word: [] for word in vocabs}
    lines = sample['func'].split("\n")
    for word_ in vocabs:
        word = word_[1:] if word_[0] == 'Ġ' else word_
        for i, line in enumerate(lines):
            if word in line:
                vocabs_line_dict[word_].append(i)
    vocabs_line_dict['<s>'].append(0)
    vocabs_line_dict['</s>'].append(len(lines)-1)
    lines_scores = [[] for i in range(len(lines))]
    for src, des, score in zip(explanation['src_nodes'], explanation['des_nodes'], explanation['edge_mask'].cpu().numpy()):
        if src == '<pad>' or des == '<pad>':
            continue
        for line_no in vocabs_line_dict[src] + vocabs_line_dict[des]:
            lines_scores[line_no].append(score)
    
    results.append({
        'idx': sample['idx'],
        'scores': [sum(score) for score in lines_scores]
    })

In [11]:
json.dump(results, open("line_scores.json", 'w'))