# Mapping Function
> Labels a given token following a tailored taxonomy

In [53]:
import pandas as pd
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
import json
import random
import numpy as np

In [54]:
from code_rationales.loader import download_grammars
from tree_sitter import Language, Parser
import code_rationales

In [55]:
import nltk

## Setup

In [56]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('tagsets')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package tagsets to /root/nltk_data...
[nltk_data]   Package tagsets is already up-to-date!


True

In [57]:
def param_default():
    return {
        #'dataset' : 'code_completion_random_cut_5k_30_512_tokens',
        'dataset' : 'code_completion_docstring_random_cut_3.8k_30_150_tokens',
        #'dataset' : 'code_completion_docstring_signature_3.8k_30_150_tokens',
        #'dataset' : 'code_completion_docstring_5k_30_150_tokens',
        'rational_results': '/workspaces/code-rationales/data/rationales/gpt',
        'global_ast_results': '/workspaces/code-rationales/data/global_ast_results/gpt',
        'global_taxonomy_results': '/workspaces/code-rationales/data/global_taxonomy_results/gpt',
        'num_samples' : 100, 
        'size_samples' : 146,
        'num_experiments': 30, 
        'bootstrapping' : 500
    }
params = param_default()

## Python Taxonomy

In [58]:
#Programming Language Taxonomy
def pl_taxonomy_python() -> dict:
    return {
  "punctuation": ['{', '}', '[', ']', '(', ')','\"', ',', '.', '...', ';', ':'], 
  "exceptions": ['raise_statement','catch', 'try', 'finally', 'throw', 'throws', 'except'],
  "oop": ['def','class','instanceof','interface','private','protected','public','abstract','extends','package','this','implements','import','new','super'],
  "asserts": ['assert'],
  "types": ['tuple','set','list','pair','subscript','type','none','dictionary','integer','native','static','synchronized','transient','volatile','void','final','enum','byte','char','float','boolean','double','int','long','short','strictfp'],
  "conditionals": ['else', 'if', 'switch', 'case', 'default'],
  "loops": ['break', 'do', 'for', 'while', 'continue'],
  "operators": ['as','yield','is','@','in','and','or','not','**','slice','%','+','<','>','=','+','-','*','/','%','++','--','!','==','!=','>=','<=','&&','||','?',':','~','<<','>>','>>>','&','^','|','//'],
  "indentation": ['\n','\t'],
  "bool": ['true', 'false'], 
  "functional":['lambda','lambda_parameters'],
  "with" : ['with','with_item','with_statement','with_clause'], 
  "return" :['return'],
  "structural" : ['attribute','module', 'argument_list','parenthesized_expression','pattern_list','class_definition','function_definition','block'],
  "statements" : ['return_statement','break_statement','assignment','while_statement','expression_statement','assert_statement'],
  "expression": ['call','exec','async','ellipsis','unary_operator','binary_operator','as_pattern_target','boolean_operator','as_pattern','comparison_operator','conditional_expression','named_expression','not_operator','primary_expression','as_pattern'],
  "errors": ["ERROR"],
  "identifier":["identifier"],  
  "comment":["comment"],
  "string": ['string','interpolation','string_content','string_end','string_start','escape_sequence'], 
  "unknown": []
}

## NL POS Taxonomy

In [59]:
def nl_pos_taxonomy() -> dict: return {
    "nl_verb" : ['VBN', 'VBG', 'VBZ', 'VBP', 'VBD', 'VB'],
    "nl_noun" : ['NN', 'NNPS', 'NNS', 'NNP'],
    "nl_pronoun" : ['WP', 'PRP', 'PRP$', 'WP','WP$'], 
    "nl_adverb" : ['RBS','RBR', 'RB', 'WRB'], 
    "nl_adjetive" : ['JJR', 'JJS', 'JJ'], 
    "nl_determier" : ['DT','WDT','PDT'], 
    "nl_preposition" : ['IN', 'TO'],
    "nl_particle" : ['RP'],
    "nl_modal" : ['MD'],
    "nl_conjunction" : ['CC'],
    "nl_cardinal" : ['CD'],
    "nl_list": ['LS'],
    "nl_other" : ['FW', 'EX', 'SYM' , 'UH', 'POS', "''", '--',':', '(', ')', '.', ',', '``', '$']
}

## AST Mapping

### Calculate Spans

In [60]:
#df_rationals = pd.read_csv('/workspaces/code-rationales/data/rationales/gpt/testing/[t_100]_[max_tgt_44]_[exp:0]_.csv',index_col=0)

In [61]:
#df_rationals = df_rationals[df_rationals['from_seq_id'] == 0]

In [62]:
### Retrieve the generated output
#initial_token = eval(df_rationals['typesets_tgt'][0])[0][0]
#code = initial_token + ''.join(df_rationals['goal_token'])
#code

In [63]:
#### Add Span column
#calculate_left_span = lambda index : len(initial_token + ''.join(df_rationals['goal_token'][:index]))
#calculate_right_span = lambda left_span, token : len(left_span) + len(token)
#span_col = list(map(lambda tuple: (tuple[0],tuple[0]+len(tuple[1])),[(calculate_left_span(index),token) for index, token in df_rationals['goal_token'].items()]))
#df_rationals.insert(loc=df_rationals.columns.get_loc('goal_token')+1, column='span', value=span_col)

In [64]:
#df_rationals

### Map Tokens with Nodes

In [65]:
languages=['python', 'java']
download_grammars(languages)

/usr/local/lib/python3.8/dist-packages/code_rationales/grammars


In [66]:
def unroll_node_types(
    nested_node_types: dict  # node_types from tree-sitter
) -> list: # list of node types
    def iterate_and_unroll_dict(nested_node_types: dict, all_node_types: set):
        for key, value in nested_node_types.items():
            if key == 'type' and type(value) == str:
                all_node_types.add(value)
            if type(value) == dict:
                iterate_and_unroll_dict(value, all_node_types)
            if type(value) == list:
                for element in value:
                    iterate_and_unroll_dict(element, all_node_types) 
    all_node_types = set()
    for dictionary in nested_node_types:
        iterate_and_unroll_dict(dictionary, all_node_types)
    all_node_types.add('ERROR')
    return list(all_node_types)

In [67]:
def create_parser(lang: str):
    # Grab the node types from the tree-sitter language
    language = Language(f"{code_rationales.__path__[0]}/grammars/tree-sitter-languages.so", lang)
    node_path = f"{code_rationales.__path__[0]}/grammars/tree-sitter-{lang}/src/node-types.json"
    with open(node_path) as f:
            node_types = json.load(f)
    node_types = unroll_node_types(node_types)
    # Create a parser for the language
    parser = Parser()
    parser.set_language(language)
    return parser, node_types

In [68]:
def traverse(
    node,       # tree-sitter node
) -> None:
    """Traverse in a recursive way, a tree-sitter node and append results to a list."""
    results = []
    def traverse_tree(node, results):
        if node.type == 'string':
            results.append(node)
            return
        for n in node.children:
            traverse_tree(n, results)
        if not node.children:
            results.append(node)
    traverse_tree(node, results)
    return results

In [69]:
def convert_to_offset(
    point,              #point to convert
    lines: list         #list of lines in the source code
    ):
        """Convert the point to an offset"""
        row, column = point
        chars_in_rows = sum(map(len, lines[:row])) + row
        chars_in_columns = len(lines[row][:column])
        offset = chars_in_rows + chars_in_columns
        return offset

In [70]:
def get_node_span(node, lines):
    """Get the span position of the node in the code string"""
    start_span = convert_to_offset(node.start_point, lines)
    end_span = convert_to_offset(node.end_point, lines)
    return start_span, end_span
    

In [71]:
def is_token_span_in_node_span(tok_span, node_span):
    if (node_span[0] <= tok_span[0] and tok_span[0] < node_span[1]) or (node_span[0] < tok_span[1] and tok_span[1] <= node_span[1]):
        return True
    return False

In [72]:
def get_token_type(
    tok_span: tuple, # (start, end) position of a token in tokenizer
    nodes: list,     # list of tree-sitter nodes
    lines: list,     # list of lines in the code
) -> tuple: # (parent_type, token_type) of the token
    """Get the parent AST type and token AST type of a token."""
    node_spans = [get_node_span(node, lines) for node in nodes]
    for i, span in enumerate(node_spans):
        if is_token_span_in_node_span(tok_span, span):
            return nodes[i].parent.type, nodes[i].type

In [73]:
def get_token_nodes(
    tok_span: tuple, # (start, end) position of a token in tokenizer
    node,            # tree-sitter node
    lines: list,     # list of lines in the code
) -> list: 
    """Get all AST types for the given token span"""
    results = []
    def traverse_and_get_types(tok_span, node, lines, results) -> None:
        node_span = get_node_span(node, lines)
        if (node_span[0] <= tok_span[0] and tok_span[0] < node_span[1]) or (node_span[0] < tok_span[1] and tok_span[1] <= node_span[1]):
            results.append(node)
        for n in node.children:
            traverse_and_get_types(tok_span, n, lines, results)
    traverse_and_get_types(tok_span, node, lines, results)
    return results

In [74]:
def get_nodes_by_type(
    node, 
    node_types: list
) -> list :
    def traverse_and_search(node, node_types, results):
        if node.type in node_types:
            results.append(node)
            return
        for n in node.children:
            traverse_and_search(n, node_types ,results)
    results = []
    traverse_and_search(node, node_types, results)
    return results

In [75]:
#parser, node_types = create_parser('python')

In [76]:
#nodes = traverse(parser.parse(bytes(code, 'utf8')).root_node)

In [77]:
#print(get_token_type(df_rationals['span'][40], nodes, code.split("\n")))

In [78]:
#print(get_token_nodes(df_rationals['span'][42], parser.parse(bytes(code, 'utf8')).root_node, code.split("\n")))

In [79]:
#print(eval(df_rationals['rationale_pos_tgt'][2]))
#print(eval(df_rationals['rationale_prob_tgt'][2]))

In [80]:
#print(df_rationals['goal_token'][eval(df_rationals['rationale_pos_tgt'][2])[0]-1])
#print(df_rationals['span'][eval(df_rationals['rationale_pos_tgt'][2])[0]-1])


## Taxonomy Mapping

In [81]:
def clean_results(global_results):
    def clean_dictonary(result_dict):
        clean_dict = result_dict.copy()
        for key, value in result_dict.items():
            if not value:
                clean_dict.pop(key)
        return clean_dict
    for key, value in global_results.items():
        global_results[key] = clean_dictonary(value)
    return clean_dictonary(global_results)

In [82]:
def search_category_by_token(taxonomy_dict: dict, token_type: str):
    for key, value in taxonomy_dict.items():
        if token_type in value:
            return key
    return 'unknown'

In [83]:
def map_to_taxonomy(taxonomy_dict: dict, result_dict: dict):
    result_dict = result_dict.copy()
    mappings = {token: {category : [] for category in taxonomy_dict.keys()} for token in taxonomy_dict.keys()}
    for target_token, value in result_dict.items():
        for source_token, probs in value.items():
            mappings[search_category_by_token(taxonomy_dict, target_token)][search_category_by_token(taxonomy_dict, source_token)]+=probs
    return clean_results(mappings)

##  Rational Global Aggregates

In [84]:
get_experiment_path =  lambda samples, size, exp: params['rational_results'] + '/' + params['dataset'] + '/' + '[t_'+str(samples)+']_[max_tgt_'+str(size)+']_[exp:'+str(exp)+']_.csv'
calculate_left_span = lambda index, initial_token, df_rationals : len(str(initial_token) + ''.join(str(df_rationals['goal_token'][:index])))
calculate_right_span = lambda left_span, token : len(left_span) + len(token)

In [85]:
import traceback

def aggregate_rationals(experiment_paths: list, parser, node_types: list, pos_types: list, nl_ast_types: list):

    global_results = {node_type : {node_type : [] for node_type in node_types + pos_types} for node_type in node_types + pos_types}

    for exp_idx, experiment_path in enumerate(experiment_paths):
        df_experiment = pd.read_csv(experiment_path, index_col=0)
        experiment_rational_results = [df_experiment[(df_experiment['from_seq_id'] == sample_idx) | (df_experiment['from_seq_id'] == str(sample_idx))].reset_index() for sample_idx in range(params['num_samples'])]
        print('*'*10 +'Aggregating rationales for exp: ' +str(exp_idx) + '*'*10)
        
        for experiment_rational_result in experiment_rational_results:
            initial_token = eval(experiment_rational_result['typesets_tgt'][0])[0][0]
            experiment_rational_result.insert(loc=experiment_rational_result.columns.get_loc('goal_token')+1, column='span', value=list(map(lambda tuple: (tuple[0],tuple[0]+len(tuple[1])),[(calculate_left_span(index, initial_token, experiment_rational_result), str(token)) for index, token in experiment_rational_result['goal_token'].items()])))
            target_code = eval(experiment_rational_result['typesets_tgt'][0])[0][0] + ''.join(experiment_rational_result['goal_token'].map(lambda value: str(value)))
            target_ast = parser.parse(bytes(target_code, 'utf8')).root_node
            nl_target_nodes = get_nodes_by_type(target_ast, nl_ast_types)
            nl_pos_column = [None for i in range(len(experiment_rational_result))]

            ###### NL SAMPLE POS TAGGING
            for target_token_idx in range(len(experiment_rational_result['span'])):
                for nl_target_node in nl_target_nodes:
                    if is_token_span_in_node_span(experiment_rational_result['span'][target_token_idx], get_node_span(nl_target_node, target_code.split("\n"))) and \
                            str(experiment_rational_result['goal_token'][target_token_idx]) in nl_target_node.text.decode('utf-8'):
                            tagged_token_list = list(filter(lambda tagged_token: str(experiment_rational_result['goal_token'][target_token_idx]) in tagged_token[0] or \
                                                       tagged_token[0] in str(experiment_rational_result['goal_token'][target_token_idx]), \
                                                        nltk.pos_tag( nltk.word_tokenize(nl_target_node.text.decode('utf-8')))))
                            if len(tagged_token_list)>0 and tagged_token_list[0][1] in pos_types: nl_pos_column[target_token_idx] = tagged_token_list[0][1]
            experiment_rational_result['nl_pos'] = nl_pos_column      
            
            ###### GLOBAL MAPPING
            for target_token_idx in range(len(experiment_rational_result['span'])):
                target_nodes = get_token_nodes(experiment_rational_result['span'][target_token_idx], target_ast, target_code.split("\n"))
                target_node_types = list(map(lambda node: node.type, target_nodes))
                for rational_idx, rational_pos in enumerate(eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])):
                    if eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])[rational_idx] > 0: #rational 1 position.
                        try:
                            #########################AST rationales binding: AST -> AST
                            rational_prob = eval(experiment_rational_result['rationale_prob_tgt'][target_token_idx])[rational_idx]
                            rational_nodes = get_token_nodes(experiment_rational_result['span'][eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])[rational_idx]-1], target_ast, target_code.split("\n"))
                            rational_node_types = list(map(lambda node: node.type, rational_nodes))
                            [global_results[target_node_type][rational_node_type].append(rational_prob) for target_node_type in target_node_types for rational_node_type in rational_node_types]

                            #######################NL bindings
                            target_nl_type = experiment_rational_result['nl_pos'][target_token_idx]
                            rational_nl_type = experiment_rational_result['nl_pos'][eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])[rational_idx]-1]
                            #NL rationales binding: AST -> NL
                            for target_node_type in target_node_types:
                                global_results[target_node_type][rational_nl_type].append(rational_prob) if rational_nl_type else None
                            #NL rationales binding: NL -> AST
                            if target_nl_type:
                                [global_results[target_nl_type][rational_node_type].append(rational_prob) for rational_node_type in rational_node_types]
                                #NL rationales binding: NL -> NL
                                global_results[target_nl_type][rational_nl_type].append(rational_prob) if rational_nl_type else None
                        except Exception as e:
                            print("An error occurred:", e)
    return global_results

In [86]:
import traceback

def aggregate_rationals(experiment_paths: list, parser, node_types: list, pos_types: list, nl_ast_types: list):

    global_results = {node_type : {node_type : [] for node_type in node_types + pos_types} for node_type in node_types + pos_types}

    for exp_idx, experiment_path in enumerate(experiment_paths):
        df_experiment = pd.read_csv(experiment_path, index_col=0)
        experiment_rational_results = [df_experiment[(df_experiment['from_seq_id'] == sample_idx) | (df_experiment['from_seq_id'] == str(sample_idx))].reset_index() for sample_idx in range(params['num_samples'])]
        print('*'*10 +'Aggregating rationales for exp: ' +str(exp_idx) + '*'*10)
        
        for experiment_rational_result in experiment_rational_results:
            initial_token = eval(experiment_rational_result['typesets_tgt'][0])[0][0]
            experiment_rational_result.insert(loc=experiment_rational_result.columns.get_loc('goal_token')+1, column='span', value=list(map(lambda tuple: (tuple[0],tuple[0]+len(tuple[1])),[(calculate_left_span(index, initial_token, experiment_rational_result), str(token)) for index, token in experiment_rational_result['goal_token'].items()])))
            target_code = eval(experiment_rational_result['typesets_tgt'][0])[0][0] + ''.join(experiment_rational_result['goal_token'].map(lambda value: str(value)))
            target_ast = parser.parse(bytes(target_code, 'utf8')).root_node
            nl_target_nodes = get_nodes_by_type(target_ast, nl_ast_types)
            nl_pos_column = [None for i in range(len(experiment_rational_result))]

            ###### NL SAMPLE POS TAGGING
            for target_token_idx in range(len(experiment_rational_result['span'])):
                for nl_target_node in nl_target_nodes:
                    if is_token_span_in_node_span(experiment_rational_result['span'][target_token_idx], get_node_span(nl_target_node, target_code.split("\n"))) and \
                            str(experiment_rational_result['goal_token'][target_token_idx]) in nl_target_node.text.decode('utf-8'):
                            tagged_token_list = list(filter(lambda tagged_token: str(experiment_rational_result['goal_token'][target_token_idx]) in tagged_token[0] or \
                                                       tagged_token[0] in str(experiment_rational_result['goal_token'][target_token_idx]), \
                                                        nltk.pos_tag( nltk.word_tokenize(nl_target_node.text.decode('utf-8')))))
                            if len(tagged_token_list)>0 and tagged_token_list[0][1] in pos_types: nl_pos_column[target_token_idx] = tagged_token_list[0][1]
            experiment_rational_result['nl_pos'] = nl_pos_column      
            
            ###### GLOBAL MAPPING
            for target_token_idx in range(len(experiment_rational_result['span'])):
                target_nodes = get_token_nodes(experiment_rational_result['span'][target_token_idx], target_ast, target_code.split("\n"))
                target_node_types = list(map(lambda node: node.type, target_nodes))
                for rational_idx, rational_pos in enumerate(eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])):
                    if eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])[rational_idx] > 0: #rational 1 position.
                        try:
                            #########################AST rationales binding: AST -> AST
                            rational_prob = eval(experiment_rational_result['rationale_prob_tgt'][target_token_idx])[rational_idx]
                            rational_nodes = get_token_nodes(experiment_rational_result['span'][eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])[rational_idx]-1], target_ast, target_code.split("\n"))
                            rational_node_types = list(map(lambda node: node.type, rational_nodes))
                            [global_results[target_node_type][rational_node_type].append(rational_prob) for target_node_type in target_node_types for rational_node_type in rational_node_types]

                            #######################NL bindings
                            target_nl_type = experiment_rational_result['nl_pos'][target_token_idx]
                            rational_nl_type = experiment_rational_result['nl_pos'][eval(experiment_rational_result['rationale_pos_tgt'][target_token_idx])[rational_idx]-1]
                            #NL rationales binding: AST -> NL
                            for target_node_type in target_node_types:
                                global_results[target_node_type][rational_nl_type].append(rational_prob) if rational_nl_type else None
                            #NL rationales binding: NL -> AST
                            if target_nl_type:
                                [global_results[target_nl_type][rational_node_type].append(rational_prob) for rational_node_type in rational_node_types]
                                #NL rationales binding: NL -> NL
                                global_results[target_nl_type][rational_nl_type].append(rational_prob) if rational_nl_type else None
                        except Exception as e:
                            print("An error occurred:", e)
    return global_results

In [87]:
def bootstrapping( np_data, np_func, size ):
    """Create a bootstrap sample given data and a function
    For instance, a bootstrap sample of means, or mediands. 
    The bootstrap replicates are a long as the original size
    we can choose any observation more than once (resampling with replacement:np.random.choice)
    """
    
    #Cleaning NaNs
    #np_data_clean = np_data[ np.logical_not( np.isnan(np_data) ) ] 
    
    #The size of the bootstrap replicate is as big as size
    #Creating the boostrap replicates as long as the orignal data size
    #This strategy might work as imputation 
    bootstrap_repl = [ np_func( np.random.choice( np_data, size=len(np_data) ) ) for i in range( size ) ]
    
    #logging.info("Covariate: " + cov) #Empirical Mean
    #logging.info("Empirical Mean: " + str(np.mean(np_data_clean))) #Empirical Mean
    #logging.info("Bootstrapped Mean: " + str( np.mean(bootstrap_repl) ) ) #Bootstrapped Mean
    
    return np.array( bootstrap_repl )

In [88]:
def bootstrap_samples_global_results(global_results: dict, size: int):
    for target_type, target_value in global_results.items():
        for source_type, source_value in target_value.items():
            global_results[target_type][source_type] = bootstrapping(source_value, np.mean, size).tolist()

## Running Experiment

In [89]:
### Retrieve experiments
experiment_paths = [get_experiment_path(params['num_samples'], params['size_samples'], exp) for exp in range(params['num_experiments'])][:1]
### Define parser
parser, node_types = create_parser('python')
### Defines pos tags 
pos_types = list(nltk.data.load('help/tagsets/upenn_tagset.pickle'))

In [90]:
###AGGREGATES BY AST
nl_ast_types = ['comment','identifier','string']
global_ast_results = clean_results(aggregate_rationals(experiment_paths, parser, node_types, pos_types, nl_ast_types))

**********Aggregating rationales for exp: 0**********


In [91]:
###AGGREGATES BY TAXONOMY
taxonomy = {**pl_taxonomy_python(), **nl_pos_taxonomy()}
global_taxonomy_results = map_to_taxonomy(taxonomy, global_ast_results.copy())

In [None]:
### WARNING TAKES TIME
bootstrap_samples_global_results(global_taxonomy_results, params['bootstrapping'])

## Storing Results

In [None]:
#with open(params['global_ast_results'] + '/' + params['dataset'] + '.txt', 'w') as file:
#    file.write(json.dumps(global_ast_results))

In [None]:
#with open(params['global_taxonomy_results'] + '/' + params['dataset'] + '.txt', 'w') as file:
#    file.write(json.dumps(global_taxonomy_results))