# Evaluator Module

In [1]:
#| default_exp evaluator

In [2]:
#| export
import CodeCheckList
import pandas as pd

import CodeCheckList.utils as utils
from CodeCheckList.tokenizer import CodeTokenizer
from CodeCheckList.masker import Masker
from CodeCheckList.predictor import Predictor

import statistics
import textdistance

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
#| export
class Evaluator:
    """Evaluator Module to perform all AST Evaluations"""
    def __init__(self, checkpoint: str, language, gpu_available=False):
        self.tokenizer = CodeTokenizer.from_pretrained(checkpoint, language)
        self.masker = Masker(self.tokenizer)
        self.predictor = Predictor.from_pretrained(checkpoint, self.tokenizer, gpu_available)

    def __call__(self, test_set, number_of_predictions: int, masking_rate: float):
        results_dict = self.evaluate_test_set(test_set, number_of_predictions, masking_rate)
        results_dataframe = pd.DataFrame([], columns=[
            'ast_element', 'occurences', 'jaccard', 'sorensen_dice', 'levenshtein', 'jaccard_avg', 'sorensen_dice_avg', 'levenshtein_avg'])
        for result_index, result in enumerate(results_dict):
            results_dataframe.loc[len(results_dataframe.index)] = [self.tokenizer.node_types[result_index], result[0], tuple(tuple(l) for l in result[1]), tuple(tuple(l) for l in result[2]), tuple(tuple(l) for l in result[3]), tuple(result[4]), tuple(result[5]), tuple(result[6])]
        return results_dataframe
    
    def evaluate_test_set(self, test_set, number_of_predictions: int, masking_rate: float):
        results_dict = []
        for node_type in self.tokenizer.node_types:
            results_dict.append([0,                                           #ocurrences
                                [[] for i in range(0,number_of_predictions)], #jaccard per prediction
                                [[] for i in range(0,number_of_predictions)], #sorensen_dice per prediction
                                [[] for i in range(0,number_of_predictions)], #levenshtein per prediction
                                [0 for i in range(0,number_of_predictions)],  #avg jaccard per prediction
                                [0 for i in range(0,number_of_predictions)],  #avg sorensen_dice per prediction
                                [0 for i in range(0,number_of_predictions)],  #avg levenshtein per prediction
                                ])
        for sample_index, sample in enumerate(test_set):
            print('--------evaluating sample:'+str(sample_index)+' --------')
            for node_type_idx, node_type in enumerate(self.tokenizer.node_types):
                node_type_results = self.evaluate_node_type_on_snippet(sample['whole_func_string'], node_type_idx, number_of_predictions, masking_rate)
                if(len(node_type_results)>0):
                    results_dict[node_type_idx][0] += node_type_results[0][0]
                    for prediction_number_index in range(0, number_of_predictions):
                        if(node_type_results[prediction_number_index][1]!=None):
                            results_dict[node_type_idx][1][prediction_number_index].append(node_type_results[prediction_number_index][1])
                            results_dict[node_type_idx][4][prediction_number_index] = round(statistics.mean(results_dict[node_type_idx][1][prediction_number_index]),3)
                        if(node_type_results[prediction_number_index][2]!=None):
                            results_dict[node_type_idx][2][prediction_number_index].append(node_type_results[prediction_number_index][2])
                            results_dict[node_type_idx][5][prediction_number_index] = round(statistics.mean(results_dict[node_type_idx][2][prediction_number_index]),3)
                        if(node_type_results[prediction_number_index][3]!=None):
                            results_dict[node_type_idx][3][prediction_number_index].append(node_type_results[prediction_number_index][3])
                            results_dict[node_type_idx][6][prediction_number_index] = round(statistics.mean(results_dict[node_type_idx][3][prediction_number_index]),3)
        return results_dict
        
    def evaluate_node_type_on_snippet(self, source_code: str, target_node_type_idx: int, number_of_predictions: int, masking_rate: float):
        #print('type: '+str(self.tokenizer.node_types[target_node_type_idx]))
        results=[]

        source_code_tree = self.tokenizer.parser.parse(bytes(source_code, "utf8"))
        source_code_nodes = []
        utils.find_nodes(source_code_tree.root_node, self.tokenizer.node_types[target_node_type_idx], source_code_nodes)
        if len(source_code_nodes) == 0:
            return results

        masked_code_encoding = self.masker(source_code, self.tokenizer(source_code), target_node_type_idx, masking_rate)
        predictions = self.predictor(masked_code_encoding, self.tokenizer.tokenizer(source_code, return_tensors='pt'), number_of_predictions)  

        for prediction_number in range(0, number_of_predictions):
            predicted_code = predictions[prediction_number]
            jaccard_similarity = 0        #the closest to 1, the best
            sorensen_dice_similarity = 0  #the closest to 1, the best
            levenshtein_similarity = 0    #the closest to 1, the best
            if utils.is_balanced_snippet(predicted_code, 1):
                #"""
                print('~~~~~~~~~~~~~~~~~~~~~~')
                print(predicted_code)
                #"""
                predicted_code_tree = self.tokenizer.parser.parse(bytes(predicted_code, "utf8"))
                predicted_code_types = utils.get_node_type_list(predicted_code_tree.root_node)
                source_code_types = utils.get_node_type_list(source_code_tree.root_node)
                jaccard_similarity = textdistance.jaccard.normalized_similarity(predicted_code_types,source_code_types)
                sorensen_dice_similarity = textdistance.sorensen_dice.normalized_similarity(predicted_code_types, source_code_types)
                levenshtein_similarity = textdistance.levenshtein.normalized_similarity(predicted_code_types,source_code_types)
            #else:
            #    print('ignore')
            #    print(predicted_code)
            results.append([len(source_code_nodes), jaccard_similarity, sorensen_dice_similarity, levenshtein_similarity])
        return results


## Full Pipeline

### Download Grammar

In [4]:
from CodeCheckList import loader

"""define language"""
python_language = "python"

languages = [python_language]

loader.download_grammars(languages)

/home/svelascodimate/miniconda3/envs/code-check-list/lib/python3.10/site-packages/CodeCheckList/grammars


### Load Model

In [5]:

"""define the model checkpoint"""
checkpoint = "huggingface/CodeBERTa-small-v1"

### Create Modules

In [6]:
from CodeCheckList.tokenizer import CodeTokenizer
from CodeCheckList.masker import Masker

#create code tokenizer 
bert_tokenizer = CodeTokenizer.from_pretrained(checkpoint, python_language)

#create code masker
code_masker = Masker(bert_tokenizer)

### Node Types

In [7]:
print(bert_tokenizer.node_types)

['=', '/', '/=', '%=', 'exec', 'continue_statement', 'binary_operator', 'dictionary_splat', 'relative_import', 'nonlocal_statement', 'concatenated_string', 'else_clause', 'except_clause', 'finally_clause', 'ellipsis', '<=', '>', 'block', 'assert_statement', 'while', 'delete_statement', '-', 'set', '^', 'pattern', 'parameters', 'with_item', '_compound_statement', 'list_pattern', 'raise_statement', 'argument_list', 'async', 'escape_sequence', 'break_statement', 'def', 'try', ';', 'return', ':=', 'break', ')', 'assignment', 'comparison_operator', 'case_pattern', '!=', 'elif', 'ERROR', 'for_statement', '<>', 'not', 'tuple_pattern', 'or', ':', 'true', 'yield', 'type_conversion', 'parenthesized_expression', 'lambda', 'nonlocal', 'match_statement', 'try_statement', '==', '(', 'dictionary_splat_pattern', 'typed_parameter', 'dictionary_comprehension', 'class_definition', '**', 'dotted_name', 'parameter', '*=', ',', 'aliased_import', 'unary_operator', '**=', '~', 'with_statement', 'while_stateme

### Encodings

In [8]:
"""example source code"""

code = "def multiply_numbers(a,b):\n    return a*b"
#code = "def scale(self, center=True, scale=True):\n        \"\"\"\nthe the\n\n\n                                                                                                                                                          _\n                     ____________=_=_===========________===______________________________==_____________________\n_______\n____\n\n___\n\n\n\n\n\n\n\n\n        return return)"
#code = "def hello_world(a,b):\n    print('hello world')"
#code = "def __ordered_values_by_indexes(self, data, inds): \"\"\" Return values (intensities) by indexes. Used for multiscale graph cut. data = [[0 1 1], [0 2 2], [0 2 2]] inds = [[0 1 2], [3 4 4], [5 4 4]] return: [0, 1, 1, 0, 2, 0] If the data are not consistent, it will take the maximal value \"\"\" # get unique labels and their first indexes # lab, linds = np.unique(inds, return_index=True) # compute values by indexes # values = data.reshape(-1)[linds] # alternative slow implementation # if there are different data on same index, it will take # maximal value # lab = np.unique(inds) # values = [0]*len(lab) # for label in lab: # values[label] = np.max(data[inds == label]) # # values = np.asarray(values) # yet another implementation values = [None] * (np.max(inds) + 1) linear_inds = inds.ravel() linear_data = data.ravel() for i in range(0, len(linear_inds)): # going over all data pixels if values[linear_inds[i]] is None: # this index is found for first values[linear_inds[i]] = linear_data[i] elif values[linear_inds[i]] < linear_data[i]: # here can be changed maximal or minimal value values[linear_inds[i]] = linear_data[i] values = np.asarray(values) return values"
#code = "def __ordered_values_by_indexes(self, data, inds):  # get unique labels and their first indexes # lab, linds = np.unique(inds, return_index=True) # compute values by indexes # values = data.reshape(-1)[linds] # alternative slow implementation # if there are different data on same index, it will take # maximal value # lab = np.unique(inds) # values = [0]*len(lab) # for label in lab: # values[label] = np.max(data[inds == label]) # # values = np.asarray(values) # yet another implementation values = [None] * (np.max(inds) + 1) linear_inds = inds.ravel() linear_data = data.ravel() for i in range(0, len(linear_inds)): # going over all data pixels if values[linear_inds[i]] is None: # this index is found for first values[linear_inds[i]] = linear_data[i] elif values[linear_inds[i]] < linear_data[i]: # here can be changed maximal or minimal value values[linear_inds[i]] = linear_data[i] values = np.asarray(values) return values"
target_node_type = "identifier"

#encoding 
source_code_encoding = bert_tokenizer(code)

#masking
masked_code_encoding = code_masker(code, bert_tokenizer(code), bert_tokenizer.node_types.index(target_node_type), 1)

assert len(source_code_encoding['input_ids']) == len(masked_code_encoding['input_ids'])

#masked code
masked_code = bert_tokenizer.tokenizer.decode(list(filter(lambda token_id: False if token_id == bert_tokenizer.tokenizer.bos_token_id or 
            token_id == bert_tokenizer.tokenizer.eos_token_id else True, masked_code_encoding['input_ids'])))

print(masked_code)

def<mask><mask><mask>(<mask>,<mask>):
    return<mask>*<mask>


### Code Prediction

In [9]:
from CodeCheckList.predictor import Predictor

predictor = Predictor.from_pretrained(checkpoint, bert_tokenizer)
predictions = predictor(masked_code_encoding, bert_tokenizer.tokenizer(code, return_tensors='pt'), 5)

### Evaluation

In [10]:
import CodeCheckList.utils as utils

prediction_number = 0
print('------------- CODE -------------')
print(code)
print('\n---------- MASKED -------------')
print(masked_code)
print('\n--------- PREDICTED -----------')
predicted_code = predictions[prediction_number]
print(predicted_code)
print('\n--------- AST COMPARE -----------')
filtered_nodes = []
filtered_nodes_predict = []
utils.find_nodes(bert_tokenizer.parser.parse(bytes(code, "utf8")).root_node, bert_tokenizer.node_types[bert_tokenizer.node_types.index(target_node_type)], filtered_nodes)
utils.find_nodes(bert_tokenizer.parser.parse(bytes(predicted_code, "utf8")).root_node, bert_tokenizer.node_types[bert_tokenizer.node_types.index(target_node_type)], filtered_nodes_predict)
print(len(filtered_nodes))
print(len(filtered_nodes_predict))
#base the evaluation on size comparison

------------- CODE -------------
def multiply_numbers(a,b):
    return a*b

---------- MASKED -------------
def<mask><mask><mask>(<mask>,<mask>):
    return<mask>*<mask>

--------- PREDICTED -----------
def __function(name, value):
    return f*args

--------- AST COMPARE -----------
5
5


## Testing

In [11]:
from datasets import load_dataset 
import CodeCheckList.utils as utils


evaluator = Evaluator(checkpoint, python_language)

max_token_number = bert_tokenizer.tokenizer.max_len_single_sentence
print(max_token_number)

test_set = load_dataset("code_search_net", split='test')
test_set = utils.get_test_sets(test_set, python_language, max_token_number, bert_tokenizer)

510


No config specified, defaulting to: code_search_net/all
Found cached dataset code_search_net (/home/svelascodimate/.cache/huggingface/datasets/code_search_net/all/1.0.0/80a244ab541c6b2125350b764dc5c2b715f65f00de7a56107a28915fac173a27)
  0%|          | 0/101 [00:00<?, ?ba/s]Token indices sequence length is longer than the specified maximum sequence length for this model (517 > 512). Running this sequence through the model will result in indexing errors
100%|██████████| 101/101 [00:19<00:00,  5.31ba/s]


In [12]:
print(test_set[0]['whole_func_string'])

def sina_xml_to_url_list(xml_data):
    """str->list
    Convert XML to URL List.
    From Biligrab.
    """
    rawurl = []
    dom = parseString(xml_data)
    for node in dom.getElementsByTagName('durl'):
        url = node.getElementsByTagName('url')[0]
        rawurl.append(url.childNodes[0].data)
    return rawurl


In [13]:
print('TOTAL PYTHON FILTERED SAMPLES: '+str(len(test_set)))
test_set = utils.get_random_sub_set_test_set(utils.get_test_sets(load_dataset("code_search_net", split='test'), "python", evaluator.tokenizer.tokenizer.max_len_single_sentence, evaluator.tokenizer), 5)
print('TOTAL SAMPLES TO EVALUATE: '+str(len(test_set)))
len(test_set)

TOTAL PYTHON FILTERED SAMPLES: 17808


No config specified, defaulting to: code_search_net/all
Found cached dataset code_search_net (/home/svelascodimate/.cache/huggingface/datasets/code_search_net/all/1.0.0/80a244ab541c6b2125350b764dc5c2b715f65f00de7a56107a28915fac173a27)
  0%|          | 0/101 [00:00<?, ?ba/s]Token indices sequence length is longer than the specified maximum sequence length for this model (517 > 512). Running this sequence through the model will result in indexing errors
100%|██████████| 101/101 [00:18<00:00,  5.32ba/s]


TOTAL SAMPLES TO EVALUATE: 5


5

In [14]:
number_of_predictions = 3
checkpoint = "huggingface/CodeBERTa-small-v1"
python_language = "python"
masking_rate = 1

evaluator = Evaluator(checkpoint, python_language, gpu_available=False)

results_dataframe = evaluator(test_set, number_of_predictions, masking_rate)

results_dataframe.sort_values(by=['occurences'], ascending=False)


--------evaluating sample:0 --------
~~~~~~~~~~~~~~~~~~~~~~
def create_field(field_info):
    """
    Create a field by field info dict.
    """
    field_type = field_info.get('type')
    if field_type not in FIELDS_NAME_MAP:
        raise ValueError(_('not support this field: {}').format(field_type))
    field_class = FIELDS_NAME_MAP.get(field_type)
    params = dict(field_info)
    params.pop('type')
    return field_class.from_dict(params)
~~~~~~~~~~~~~~~~~~~~~~
def create_field(field_info):
    """
    Create a field by field info dict.
    """
    field_type= field_info.get('type')
    if field_type not in FIELDS_NAME_MAP:
        raise ValueError(_('not support this field: {}').format(field_type))
    field_class, FIELDS_NAME_MAP.get(field_type)
    params += dict(field_info)
    params.pop('type')
    return field_class.from_dict(params)
~~~~~~~~~~~~~~~~~~~~~~
def create_field(field_info):
    """
    Create a field by field info dict.
    """
    field_type, field_info.get('ty

Unnamed: 0,ast_element,occurences,jaccard,sorensen_dice,levenshtein,jaccard_avg,sorensen_dice_avg,levenshtein_avg
107,identifier,112,"((0.8916666666666667, 1.0, 0.9504950495049505,...","((0.9427312775330396, 1.0, 0.9746192893401016,...","((0.8916666666666666, 1.0, 0.9504950495049505,...","(0.916, 0.852, 0.86)","(0.953, 0.912, 0.92)","(0.918, 0.855, 0.833)"
62,(,27,"((1.0, 1.0, 1.0, 1.0, 1.0), (0.914529914529914...","((1.0, 1.0, 1.0, 1.0, 1.0), (0.955357142857142...","((1.0, 1.0, 1.0, 1.0, 1.0), (0.914529914529914...","(1.0, 0.93, 0.932)","(1.0, 0.963, 0.965)","(1.0, 0.933, 0.937)"
40,),27,"((1.0, 1.0, 1.0, 1.0, 1.0), (0.930434782608695...","((1.0, 1.0, 1.0, 1.0, 1.0), (0.963963963963964...","((1.0, 1.0, 1.0, 1.0, 1.0), (0.930434782608695...","(1.0, 0.938, 0.919)","(1.0, 0.967, 0.957)","(1.0, 0.93, 0.915)"
184,"""",26,"((1.0, 0.875, 1.0, 1.0, 1.0), (1.0, 0.85567010...","((1.0, 0.9333333333333333, 1.0, 1.0, 1.0), (1....","((1.0, 0.8924731182795699, 1.0, 1.0, 1.0), (1....","(0.975, 0.954, 0.759)","(0.987, 0.976, 0.853)","(0.978, 0.95, 0.76)"
130,.,25,"((1.0, 0.9310344827586207, 1.0, 1.0, 1.0), (0....","((1.0, 0.9642857142857143, 1.0, 1.0, 1.0), (0....","((1.0, 0.9310344827586207, 1.0, 1.0, 1.0), (0....","(0.986, 0.964, 0.929)","(0.993, 0.981, 0.962)","(0.986, 0.967, 0.928)"
...,...,...,...,...,...,...,...,...
77,while_statement,0,"((), (), ())","((), (), ())","((), (), ())","(0, 0, 0)","(0, 0, 0)","(0, 0, 0)"
78,false,0,"((), (), ())","((), (), ())","((), (), ())","(0, 0, 0)","(0, 0, 0)","(0, 0, 0)"
79,wildcard_import,0,"((), (), ())","((), (), ())","((), (), ())","(0, 0, 0)","(0, 0, 0)","(0, 0, 0)"
81,elif_clause,0,"((), (), ())","((), (), ())","((), (), ())","(0, 0, 0)","(0, 0, 0)","(0, 0, 0)"
