In [1]:
import torch
from transformers import RobertaConfig, RobertaForMaskedLM, RobertaTokenizer
import argparse
import json
import os
import pandas as pd
import numpy as np 
from tqdm import tqdm
from scipy import stats
import javalang
from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization
from captum.attr import IntegratedGradients, LayerConductance, LayerIntegratedGradients, LayerActivation
from captum.attr import configure_interpretable_embedding_layer, remove_interpretable_embedding_layer

if torch.__version__ >= '1.7.0':
    norm_fn = torch.linalg.norm
else:
    norm_fn = torch.norm

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
MODEL_CLASSES = {'roberta': (RobertaConfig, RobertaForMaskedLM, RobertaTokenizer)}

config_class, model_class, tokenizer_class = MODEL_CLASSES['roberta']
config = config_class.from_pretrained('roberta-base')
tokenizer = tokenizer_class.from_pretrained('roberta-base')

model = RobertaForMaskedLM.from_pretrained('microsoft/codebert-base-mlm', 
                                           output_attentions=True, output_hidden_states=True)
model.to(device)

cuda:0


RobertaForMaskedLM(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0): RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNor

In [3]:
ref_token_id = tokenizer.pad_token_id # A token used for generating token reference
sep_token_id = tokenizer.sep_token_id # A token used as a separator between question and text and it is also added to the end of the text.
cls_token_id = tokenizer.cls_token_id # A token used for prepending to the concatenated question-text word sequence

token_reference = TokenReferenceBase(reference_token_idx=ref_token_id)

In [25]:
tokenizer.pad_token_id, tokenizer.sep_token_id, tokenizer.cls_token_id, tokenizer.mask_token_id

(1, 2, 0, 50264)

In [4]:
def get_cloze_words(filename, tokenizer):
    with open(filename, 'r', encoding='utf-8') as fp:
        words = fp.read().split('\n')
    idx2word = {tokenizer.encoder[w]: w for w in words}
    return idx2word

In [5]:
cloze_results = []
cloze_words_file = 'data/cloze-all/cloze_test_words.txt'
file_path = 'data/cloze-all/java/clozeTest.json'

idx2word = get_cloze_words(cloze_words_file, tokenizer)
lines = json.load(open(file_path))
len(lines)

40492

In [6]:
def read_answers(filename):
    answers = {}
    with open(filename, 'r', encoding='utf-8') as f:
        for line in f.readlines():
            line = line.strip()
            answers[line.split('<CODESPLIT>')[0]] = line.split('<CODESPLIT>')[1]
    return answers

answer_file = 'evaluator/answers/java/answers.txt'
answers = read_answers(answer_file)
answer_list = list(answers.values())
print(len(answer_list))

40492


In [7]:
bestSampleWithMaxPairLength = []
bestSampleWithMaxPairLength_LEN =[]

number_of_samples = 10
for i in range(len(lines[:number_of_samples])):
    code = ' '.join(lines[i]['pl_tokens'])
    bestStr = "<s> " + code + " </s>"
    bestLen = len(bestStr.split(" "))
    bestSampleWithMaxPairLength.append(bestStr)
    bestSampleWithMaxPairLength_LEN.append(bestLen)

In [8]:
lengths=[]
codes=[]
selected_answers = []

for index, code in enumerate(bestSampleWithMaxPairLength):
  l = len(tokenizer.tokenize(code))
  if l<=256:
    lengths.append(l)
    codes.append(code)
    selected_answers.append(answer_list[index])

len(codes), len(selected_answers)

(6, 6)

### Extract attribution score

In [9]:
def summarize_attributions(attributions):
    attributions = attributions.sum(dim=-1).squeeze(0)
    attributions = attributions / norm_fn(attributions)
    return attributions

In [10]:
def construct_whole_bert_embeddings(input_ids, ref_input_ids):
    input_embeddings = interpretable_embedding.indices_to_embeddings(input_ids)
    ref_input_embeddings = interpretable_embedding.indices_to_embeddings(ref_input_ids)
    
    return input_embeddings, ref_input_embeddings

In [11]:
def predict_forward_func(input_embeddings, tokenized_text):
    output = model(inputs_embeds=input_embeddings)
    index = tokenized_text.index(tokenizer.mask_token_id)
    if index > output.logits.shape[1]:
        print("Length of outpu is {} and index is {}".format(output.logits.shape[1], index))
    output_list = output.logits[0][index]
    output_list = output_list.unsqueeze(0)
    
    return output_list.max(1).values
    

In [12]:
interpretable_embedding = configure_interpretable_embedding_layer(model, 'roberta.embeddings.word_embeddings')



### Calculate Average attribution on CLS 

In [13]:
cls_data = np.zeros((12,12))

with torch.no_grad():
    for code in tqdm(codes):
        tokenized_text = tokenizer.convert_tokens_to_ids(tokenizer.tokenize(code))
        input_ids = torch.tensor([tokenized_text]).to(device)
        reference_indices = token_reference.generate_reference(input_ids.shape[1], device=device).unsqueeze(0)

        layer_attrs = []
        layer_attn_mat = []
        input_embeddings, ref_input_embeddings = construct_whole_bert_embeddings(input_ids, reference_indices)

        for i in range(model.config.num_hidden_layers):
            lc = LayerConductance(predict_forward_func, 
                                model.roberta.encoder.layer[i])
            layer_attributions = lc.attribute(inputs=input_embeddings, 
                                                    baselines=ref_input_embeddings, 
                                                    additional_forward_args=(tokenized_text))
            layer_attrs.append(summarize_attributions(layer_attributions[0]))
            layer_attn_mat.append(layer_attributions[1])
        # layer x seq_len
        layer_attrs = torch.stack(layer_attrs)
        # layer x batch x head x seq_len x seq_len
        layer_attn_mat = torch.stack(layer_attn_mat)
        for layer in range(12):
            for head in range(12):
                cls_data[layer][head] += layer_attn_mat[layer][0][head][:, 0:1].mean().cpu().detach().numpy()
            
CLS_atten = cls_data/len(codes)

100%|██████████| 6/6 [00:08<00:00,  1.45s/it]


In [14]:
CLS_atten

array([[ 5.29757121e-08,  1.63888776e-07, -2.77009934e-06,
         2.26214744e-06, -5.52203538e-07,  1.38852113e-07,
        -4.90683995e-07,  9.46428062e-09,  1.74776503e-07,
         4.10140203e-09,  6.29861288e-06, -2.12366336e-07],
       [ 3.60767359e-07, -2.17151443e-06,  7.05822950e-08,
        -1.21162116e-07, -5.03540059e-06,  9.24548914e-07,
         5.71309341e-08, -2.54528085e-07,  8.93198973e-08,
         2.13705974e-05, -4.75635428e-07,  2.11378501e-07],
       [ 1.67830868e-06, -1.50289176e-06, -1.02237394e-06,
         2.52465275e-06,  1.18492803e-06,  1.43955161e-06,
        -1.50837164e-06,  3.54706079e-07,  1.09389004e-05,
         1.39103494e-06,  1.15306639e-06,  1.19133960e-07],
       [-2.22937385e-07, -5.61024658e-07, -1.54342132e-06,
         9.32231796e-06, -5.96278724e-07,  4.47601837e-07,
        -3.19015990e-08,  1.30412347e-06, -3.99118062e-06,
        -5.20843457e-08,  7.00009177e-09,  2.76793514e-06],
       [-4.97239374e-06, -2.28115541e-06,  1.7606810

In [25]:
lst = [ [[] for col in range(12)] for row in range(12)]

In [59]:
cls_data = [ [list() for col in range(12)] for row in range(12)]

with torch.no_grad():
    for code in tqdm(codes):
        tokenized_text = tokenizer.convert_tokens_to_ids(tokenizer.tokenize(code))
        input_ids = torch.tensor([tokenized_text]).to(device)
        reference_indices = token_reference.generate_reference(input_ids.shape[1], device=device).unsqueeze(0)

        layer_attrs = []
        layer_attn_mat = []
        input_embeddings, ref_input_embeddings = construct_whole_bert_embeddings(input_ids, reference_indices)

        for i in range(model.config.num_hidden_layers):
            lc = LayerConductance(predict_forward_func, 
                                model.roberta.encoder.layer[i])
            layer_attributions = lc.attribute(inputs=input_embeddings, 
                                                    baselines=ref_input_embeddings, 
                                                    additional_forward_args=())
            layer_attrs.append(summarize_attributions(layer_attributions[0]))
            layer_attn_mat.append(layer_attributions[1])
        # layer x seq_len
        layer_attrs = torch.stack(layer_attrs)
        # layer x batch x head x seq_len x seq_len
        layer_attn_mat = torch.stack(layer_attn_mat)
        for layer in range(12):
            for head in range(12):
                cls_data[layer][head].append(layer_attn_mat[layer][0][head][:, 0:1].mean().cpu().detach().numpy())
            
# CLS_atten = cls_data/len(codes)

100%|██████████| 6/6 [00:07<00:00,  1.20s/it]


In [31]:
a = np.array(cls_data)

### Calculating Average attribution put on SEP token

In [17]:
sep_data = np.zeros((12,12))

with torch.no_grad():
    for code in tqdm(codes):
        tokenized_text = tokenizer.convert_tokens_to_ids(tokenizer.tokenize(code))
        input_ids = torch.tensor([tokenized_text]).to(device)
        reference_indices = token_reference.generate_reference(input_ids.shape[1], device=device).unsqueeze(0)

        layer_attrs = []
        layer_attn_mat = []
        input_embeddings, ref_input_embeddings = construct_whole_bert_embeddings(input_ids, reference_indices)

        for i in range(model.config.num_hidden_layers):
            lc = LayerConductance(predict_forward_func, 
                                model.roberta.encoder.layer[i])
            layer_attributions = lc.attribute(inputs=input_embeddings, 
                                                    baselines=ref_input_embeddings, 
                                                    additional_forward_args=())
            layer_attrs.append(summarize_attributions(layer_attributions[0]))
            layer_attn_mat.append(layer_attributions[1])
        # layer x seq_len
        layer_attrs = torch.stack(layer_attrs)
        # layer x batch x head x seq_len x seq_len
        layer_attn_mat = torch.stack(layer_attn_mat)
        for layer in range(12):
          for head in range(12):
            for each_sep_index in torch.where(input_ids[0]==2)[0].cpu().detach().numpy():
              sep_data[layer][head] += layer_attn_mat[layer][0][head][:, each_sep_index].mean().cpu().detach().numpy() / len(torch.where(input_ids[0]==2)[0].cpu().detach().numpy())

            
SEP_atten = sep_data/len(codes)

100%|██████████| 6/6 [00:07<00:00,  1.24s/it]


In [18]:
print(SEP_atten.shape)
SEP_atten_sum = np.sum(SEP_atten, axis=1)
print(SEP_atten_sum)

(12, 12)
[ 1.22738629e-06 -1.32907765e-07 -4.75614517e-06 -2.41541149e-06
 -9.75793593e-07  3.98978181e-07 -7.98725491e-07 -1.47497113e-07
  4.01814727e-07 -1.09785690e-06 -3.58600239e-08 -2.69610463e-06]


### Average attention on Syntactic Types

In [17]:
def get_syntax_types_for_code(code_snippet):
  types = ["[CLS]"]
  code = ["<s>"]
  tree = list(javalang.tokenizer.tokenize(code_snippet))
  
  for i in tree:
    j = str(i)
    j = j.split(" ")
    if j[1] == '"MASK"':
      types.append('[MASK]')
      code.append('<mask>')
    else:
      types.append(j[0].lower())
      code.append(j[1][1:-1])
    
  types.append("[SEP]")
  code.append("</s>")
  return np.array(types), ' '.join(code)

In [18]:
def get_start_end_of_token_when_tokenized(code, types, tokenizer):
  reindexed_types = []
  start = 0
  end = 0
  for index, each_token in enumerate(code.split(" ")):
    tokenized_list = tokenizer.tokenize(each_token)
    for i in range(len(tokenized_list)):
      end += 1
    reindexed_types.append((start, end-1))
    start = end
  return reindexed_types

In [21]:
def getSyntaxAttributionScore(codes, tokenizer, syntaxType):
  Instannce = []

  with torch.no_grad():
    
    number = 0 
    failed_calculate = 0
    for eachCode in tqdm(codes, desc=syntaxType):
      identifier = [[[] for col in range(12)] for row in range(12)]
      try: 
        cleancode = eachCode.replace("<s> ", "").replace(" </s>", "").replace('<mask>', 'MASK')
        types, rewrote_code = get_syntax_types_for_code(cleancode)
        # send input to model
        tokenized_text = tokenizer.convert_tokens_to_ids(tokenizer.tokenize(rewrote_code))
        input_ids = torch.tensor([tokenized_text]).to(device)
        # get reference indices
        reference_indices = token_reference.generate_reference(input_ids.shape[1], device=device).unsqueeze(0)

        layer_attrs = []
        layer_attn_mat = []
        input_embeddings, ref_input_embeddings = construct_whole_bert_embeddings(input_ids, reference_indices)
        # get layer attribution
        for i in range(model.config.num_hidden_layers):
          lc = LayerConductance(predict_forward_func, 
                              model.roberta.encoder.layer[i])
          layer_attributions = lc.attribute(inputs=input_embeddings, 
                                                  baselines=ref_input_embeddings, 
                                                  additional_forward_args=())
          layer_attrs.append(summarize_attributions(layer_attributions[0]))
          layer_attn_mat.append(layer_attributions[1])
          
        # layer x seq_len
        layer_attrs = torch.stack(layer_attrs)
        # layer x batch x head x seq_len x seq_len
        layer_attn_mat = torch.stack(layer_attn_mat)
        # get start and end index of each token
        start_end = get_start_end_of_token_when_tokenized(rewrote_code, types, tokenizer)
        if syntaxType in types:
          number += 1
        for layer in range(12):
          for head in range(12):
            for each_sep_index in np.where(types==syntaxType)[0]:
              start_index, end_index = start_end[each_sep_index]
              interim_value = layer_attn_mat[layer][0][head][:, start_index:end_index+1].mean().cpu().detach().numpy()
              if np.isnan(interim_value):
                  pass
              else: 
                  identifier[layer][head].append(interim_value) 
      except:
        failed_calculate += 1
        
      
    print("failed calculate: ", failed_calculate)
    identifier = identifier/number
    
  return identifier, number

In [25]:
syntax_list = ['annotation', 'basictype', 'boolean', 
          'decimalinteger', 'identifier', 'keyword',
          'modifier', 'operator', 'separator', 'null',
          'string', 'decimalfloatingpoint']

In [26]:
avg_attns = {}
avg_attens_sum = {}
syntax_frequenct = {}

for syntax in syntax_list:
    avg_attns[syntax] = np.zeros((12, 12))
    avg_attns[syntax], syntax_frequenct[syntax] = getSyntaxAttributionScore(codes, tokenizer, syntax)
    avg_attens_sum[syntax] = np.sum(avg_attns[syntax], axis=1)

annotation: 100%|██████████| 6/6 [00:07<00:00,  1.20s/it]


failed calculate:  0


basictype: 100%|██████████| 6/6 [00:07<00:00,  1.22s/it]


failed calculate:  0


boolean: 100%|██████████| 6/6 [00:07<00:00,  1.21s/it]


failed calculate:  0


decimalinteger: 100%|██████████| 6/6 [00:07<00:00,  1.22s/it]


failed calculate:  0


identifier: 100%|██████████| 6/6 [00:08<00:00,  1.39s/it]


failed calculate:  0


keyword: 100%|██████████| 6/6 [00:07<00:00,  1.25s/it]


failed calculate:  0


modifier: 100%|██████████| 6/6 [00:07<00:00,  1.24s/it]


failed calculate:  0


operator: 100%|██████████| 6/6 [00:07<00:00,  1.26s/it]


failed calculate:  0


separator: 100%|██████████| 6/6 [00:08<00:00,  1.42s/it]


failed calculate:  0


null: 100%|██████████| 6/6 [00:07<00:00,  1.21s/it]


failed calculate:  0


string: 100%|██████████| 6/6 [00:07<00:00,  1.22s/it]
  identifier = identifier/number


failed calculate:  0


decimalfloatingpoint: 100%|██████████| 6/6 [00:07<00:00,  1.22s/it]

failed calculate:  0





### Instance based 

In [19]:
def getInstanceSyntaxAttributionScore(codes, tokenizer, syntaxType):
  instance = []
  number = 0

  with torch.no_grad():
    for eachCode in tqdm(codes, desc=syntaxType):
      identifier = [[[] for col in range(12)] for row in range(12)]
      cleancode = eachCode.replace("<s> ", "").replace(" </s>", "").replace('<mask>', 'MASK')
      types, rewrote_code = get_syntax_types_for_code(cleancode)
      # send input to model
      tokenized_text = tokenizer.convert_tokens_to_ids(tokenizer.tokenize(rewrote_code))
      input_ids = torch.tensor([tokenized_text]).to(device)
      # get reference indices
      reference_indices = token_reference.generate_reference(input_ids.shape[1], device=device).unsqueeze(0)

      layer_attrs = []
      layer_attn_mat = []
      input_embeddings, ref_input_embeddings = construct_whole_bert_embeddings(input_ids, reference_indices)
      # get layer attribution
      for i in range(model.config.num_hidden_layers):
        lc = LayerConductance(predict_forward_func, 
                            model.roberta.encoder.layer[i])
        layer_attributions = lc.attribute(inputs=input_embeddings, 
                                                baselines=ref_input_embeddings, 
                                                additional_forward_args=(tokenized_text))
        layer_attrs.append(summarize_attributions(layer_attributions[0]))
        layer_attn_mat.append(layer_attributions[1])
        
      # layer x seq_len
      layer_attrs = torch.stack(layer_attrs)
      # layer x batch x head x seq_len x seq_len
      layer_attn_mat = torch.stack(layer_attn_mat)
      # get start and end index of each token
      start_end = get_start_end_of_token_when_tokenized(rewrote_code, types, tokenizer)
      if syntaxType in types:
        number += 1
      for layer in range(12):
        for head in range(12):
          for each_sep_index in np.where(types==syntaxType)[0]:
            start_index, end_index = start_end[each_sep_index]
            interim_value = layer_attn_mat[layer][0][head][:, start_index:end_index+1].mean().cpu().detach().numpy()
            if np.isnan(interim_value):
                pass
            else: 
                identifier[layer][head].append(interim_value)
                
      if np.array(identifier).shape[2] != 0:
        instance.append(identifier)

    
  return instance

In [20]:
syntax_list = ['annotation','modifier', 'operator']
avg_attns = {}
for syntax in syntax_list:
    avg_attns[syntax] = getInstanceSyntaxAttributionScore(codes, 
                                                            tokenizer, 
                                                            syntax)

annotation: 100%|██████████| 6/6 [00:07<00:00,  1.19s/it]
modifier: 100%|██████████| 6/6 [00:07<00:00,  1.24s/it]
operator: 100%|██████████| 6/6 [00:07<00:00,  1.25s/it]


In [52]:
codes[0]

'<s> @ Override public int peekBit ( ) throws AACException { int ret ; if ( bitsCached > 0 ) { ret = ( cache >> ( bitsCached - 1 ) ) & 1 ; } else { final int word = readCache ( true ) ; ret = ( <mask> >> WORD_BITS - 1 ) & 1 ; } return ret ; } </s>'

In [23]:
len(avg_attns['annotation']), len(avg_attns['modifier']), len(avg_attns['operator'])

(1, 6, 5)

In [21]:
a = avg_attns['annotation']

In [22]:
len(a)

1

In [70]:
b = a[0]

In [71]:
b = np.array(b)

In [72]:
b.shape

(12, 12, 1)

In [89]:
len(avg_attns['operator'])

5

In [96]:
a = avg_attns['operator'][2]

In [97]:
a = np.array(a)

In [98]:
a.shape

(12, 12, 2)