In [33]:
from transformers import RobertaConfig, RobertaTokenizer, RobertaForMaskedLM, pipeline
import torch

model = RobertaForMaskedLM.from_pretrained("microsoft/codebert-base-mlm")
tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base-mlm")

CODE = "def returnInt() -> <mask>:"
fill_mask = pipeline('fill-mask', model=model, tokenizer=tokenizer)

outputs = fill_mask(CODE)
outputs

[{'score': 0.2260781228542328,
  'token': 6979,
  'token_str': ' int',
  'sequence': 'def returnInt() -> int:'},
 {'score': 0.15783804655075073,
  'token': 5053,
  'token_str': ' Any',
  'sequence': 'def returnInt() -> Any:'},
 {'score': 0.1569615751504898,
  'token': 9291,
  'token_str': ' None',
  'sequence': 'def returnInt() -> None:'},
 {'score': 0.12274446338415146,
  'token': 1907,
  'token_str': ' type',
  'sequence': 'def returnInt() -> type:'},
 {'score': 0.06580942869186401,
  'token': 1666,
  'token_str': '...',
  'sequence': 'def returnInt() ->...:'}]

In [34]:
class CustomModel(torch.nn.Module):
    def __init__(self, model, d, vocabulary_size = 50265): 
        super(CustomModel, self).__init__() 
        self.d = d
        self.model = model
        self.config = model.config
        self.layer = torch.nn.Linear(vocabulary_size, d)
    
    def forward(self, input_ids=None, attention_mask=None):
        model_output = self.model.forward(input_ids=input_ids, attention_mask=attention_mask)        
        final_output_tensor = self.layer.forward(model_output[0])
        model_output.logits = final_output_tensor
        return model_output

In [35]:
nl_tokens=tokenizer.tokenize("")
code_tokens=tokenizer.tokenize("def returnInt() -> <mask>")
tokens=[tokenizer.cls_token]+nl_tokens+[tokenizer.sep_token]+code_tokens+[tokenizer.sep_token]
tokens_ids=tokenizer.convert_tokens_to_ids(tokens)
output = model(torch.tensor(tokens_ids)[None,:])[0]
# tokens[7]
output[0][7][torch.argmax(output[0][7])]

tensor(13.4799, grad_fn=<SelectBackward0>)

In [36]:
custom_model = CustomModel(model, 8)
custom_model.forward(torch.tensor(tokens_ids)[None,:])



MaskedLMOutput(loss=None, logits=tensor([[[-9.5757e-01,  4.0185e-01,  1.6400e-01, -1.6877e+00,  7.0705e-01,
           2.5099e+00,  1.4349e+00,  1.1156e+00],
         [-1.5685e+00,  1.8493e+00, -1.0181e+00, -1.1429e+00,  1.4097e+00,
           2.0580e+00, -6.7345e-01,  3.6960e-01],
         [-1.1213e+00,  3.8167e+00, -8.0053e-01, -2.3354e-01, -8.6537e-01,
           3.7011e+00,  2.5471e-01,  6.3558e-01],
         [-4.0530e+00, -1.6602e+00, -9.5008e-01, -1.0846e-01,  3.1014e+00,
           3.9293e+00,  1.4203e-03,  2.1470e+00],
         [ 3.1931e+00, -8.0419e-01, -6.1936e-02, -1.7952e+00,  6.3725e-01,
           1.3534e+00,  1.6296e+00, -6.4972e-01],
         [ 1.2609e+00,  4.6863e+00, -2.5809e+00, -2.5047e+00, -1.1010e+00,
           1.1595e+00,  3.3692e+00, -2.8049e-01],
         [-2.7581e+00, -1.8081e-01, -2.2230e+00, -4.9168e-01,  3.1385e+00,
           3.4737e+00,  1.3601e+00,  3.3062e+00],
         [-2.2129e+00, -1.5894e+00, -1.1046e+00,  1.0554e+00,  4.7831e-01,
           2.5344

In [37]:
custom_model = CustomModel(model, 8)
fill_mask = pipeline('fill-mask', model=custom_model, tokenizer=tokenizer)

outputs = fill_mask(CODE)
outputs

[{'score': 0.6114524602890015,
  'token': 1,
  'token_str': '<pad>',
  'sequence': 'def returnInt() ->:'},
 {'score': 0.2869931161403656,
  'token': 3,
  'token_str': '<unk>',
  'sequence': 'def returnInt() ->:'},
 {'score': 0.03550773859024048,
  'token': 6,
  'token_str': ',',
  'sequence': 'def returnInt() ->,:'},
 {'score': 0.029577510431408882,
  'token': 0,
  'token_str': '<s>',
  'sequence': 'def returnInt() ->:'},
 {'score': 0.01907293312251568,
  'token': 4,
  'token_str': '.',
  'sequence': 'def returnInt() ->.:'}]

In [38]:
input_list = [
    "def returnInt() -> <mask>: \n\t x: int = 42 \n\t return x",
    "def setInt(self, x: int) -> <mask>: \n\t self.x = x",
    "def getInt(self) -> <mask>: \n\t return self.x",
    "def concatString(self, s1: str, s2: str) -> <mask>: \n\t return s1 + s2",
    "def setStr(self, s: str) -> <mask>: \n\t self.s = s",
    "def getStr(self) -> <mask>: \n\t return self.s",
    "def isInt(self, x) -> <mask>: \n\t return x % 1 == 0",
    "def doSomething() -> <mask>: \n\t pass",
    "def isString(self, s) -> <mask>: \n\t return type(s, str)"
]

labels = [
    "int",
    "None",
    "int",
    "str",
    "None",
    "str",
    "bool",
    "None",
    "bool"
]

In [39]:
class TripletLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin
        
    def calc_euclidean(self, x1, x2):
        return (x1 - x2).pow(2).sum(1)
    
    def forward(self, anchor: torch.Tensor, positive: torch.Tensor, negative: torch.Tensor) -> torch.Tensor:
        distance_positive = self.calc_euclidean(anchor, positive)
        distance_negative = self.calc_euclidean(anchor, negative)
        losses = torch.relu(distance_positive - distance_negative + self.margin)

        return losses.mean()

In [40]:
from torch.utils.data import TensorDataset
from typing import Tuple

class TripletDataset(torch.utils.data.Dataset):

    def __init__(self, *in_sequences: torch.Tensor, labels: torch.Tensor, dataset_name: str,
                 train_mode: bool=True):
        self.data = TensorDataset(*in_sequences)
        self.labels = labels
        self.dataset_name = dataset_name
        self.train_mode = train_mode

        self.get_item_func = self.get_item_train if self.train_mode else self.get_item_test

    def get_item_train(self, index: int) -> Tuple[Tuple[torch.Tensor, torch.Tensor],
                                         Tuple[torch.Tensor, torch.Tensor], Tuple[torch.Tensor, torch.Tensor]]:
        """
        It returns three tuples. Each one is a (data, label)
         - The first tuple is (data, label) at the given index
         - The second tuple is similar (data, label) to the given index
         - The third tuple is different (data, label) from the given index 
        """

         # Find a similar datapoint randomly
        mask = self.labels == self.labels[index]
        mask[index] = False # Making sure that the similar pair is NOT the same as the given index
        mask = mask.nonzero()
        a = mask[torch.randint(high=len(mask), size=(1,))][0]

        # Find a different datapoint randomly
        mask = self.labels != self.labels[index]
        mask = mask.nonzero()
        b = mask[torch.randint(high=len(mask), size=(1,))][0]
        
        return (self.data[index], self.labels[index]), (self.data[a.item()], self.labels[a.item()]), \
               (self.data[b.item()], self.labels[b.item()])

    def get_item_test(self, index: int) -> Tuple[Tuple[torch.Tensor, torch.Tensor], list, list]:
        return (self.data[index], self.labels[index]), [], []
    
    def __getitem__(self, index: int) -> Tuple[Tuple[torch.Tensor, torch.Tensor],
                                         Tuple[torch.Tensor, torch.Tensor], Tuple[torch.Tensor, torch.Tensor]]:
         return self.get_item_func(index)

    def __len__(self) -> int:
        return len(self.data)

In [41]:
import random

def make_triplet(input, labels, index):
    pos_labels = []
    neg_labels = []
    for i, l in enumerate(labels):
        if not i == index:
            if l == labels[index]:
                pos_labels.append(i)
            else:
                neg_labels.append(i)

    return input[index], input[pos_labels[random.randint(0, len(pos_labels)-1)]], input[neg_labels[random.randint(0, len(neg_labels)-1)]]

def make_data(input, labels):
    data = []
    for i in range(len(input)):
        data.append(make_triplet(input, labels, i))
    return data

In [42]:
from regex import P
from tqdm.notebook import tqdm
from torch.nn.utils.rnn import pad_sequence, unpad_sequence

def tokenize_code(code):
    nl_tokens = tokenizer.tokenize("")
    code_tokens = tokenizer.tokenize(code)
    tokens = [tokenizer.cls_token]+nl_tokens+[tokenizer.sep_token]+code_tokens+[tokenizer.sep_token]
    tokens_ids = tokenizer.convert_tokens_to_ids(tokens)
    return torch.tensor(tokens_ids)[None,:]

# def tokenize_input(input):
#     token_ids_list = torch.Tensor()
#     for code in input:
#         nl_tokens = tokenizer.tokenize("")
#         code_tokens = tokenizer.tokenize(code)
#         tokens = [tokenizer.cls_token]+nl_tokens+[tokenizer.sep_token]+code_tokens+[tokenizer.sep_token]
#         tokens_ids = tokenizer.convert_tokens_to_ids(tokens)
#         token_ids_list = torch.cat((token_ids_list, torch.tensor(tokens_ids)), 0)
#     # print(token_ids_list)
#     # data = pad_sequence(token_ids_list, batch_first=True)
#     # print(data)
#     return token_ids_list

epochs = 1
# data = [("def returnInt() -> <mask>:", "def calcInt() -> <mask>:","def returnFloat() -> <mask>:", "int")]
# tokenized_input = tokenize_input(input_list)
# tokenized_labels = tokenize_input(labels)
# print(tokenized_input)
# print(tokenized_labels[0])
# data = TripletDataset(tokenized_input, labels=tokenized_labels, dataset_name="test")
# print(data.get_item_train(0))
data = make_data(input_list, labels)
# print(data)
optimizer = torch.optim.Adam(custom_model.parameters(), lr=0.001)
criterion = torch.jit.script(TripletLoss())

for epoch in tqdm(range(epochs), desc="Epochs"):
    custom_model.train()
    running_loss = []
    for step, (t_a, t_p, t_n) in enumerate(data):
        
        #  = data.get_item_train(step)
        # print(tokenize_code(t_a))
        # print(tokenize_code(t_p))
        # print(tokenize_code(t_n))

        l = [tokenize_code(t_a)[0], tokenize_code(t_p)[0], tokenize_code(t_n)[0]]
        l = pad_sequence(l, batch_first=True)
        print(l[0])
        
        optimizer.zero_grad()
        anchor_out = custom_model(l[0][None,:])
        positive_out = custom_model(l[1][None,:])
        negative_out = custom_model(l[2][None,:])
        
        loss = criterion(anchor_out[0], positive_out[0], negative_out[0])
        loss.backward()
        optimizer.step()

Epochs:   0%|          | 0/1 [00:00<?, ?it/s]

tensor([    0,     2,  9232,   671, 22886, 43048, 43839, 50264,    35,  1437,
        50118, 50117,  3023,    35,  6979,  5457,  3330,  1437, 50118, 50117,
          671,  3023,     2])
tensor([    0,     2,  9232,   278, 22886,  1640, 13367,     6,  3023,    35,
         6979,    43, 43839, 50264,    35,  1437, 50118, 50117,  1403,     4,
         1178,  5457,  3023,     2])
tensor([    0,     2,  9232,   120, 22886,  1640, 13367,    43, 43839, 50264,
           35,  1437, 50118, 50117,   671,  1403,     4,  1178,     2,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0])
tensor([    0,     2,  9232, 10146,   415, 34222,  1640, 13367,     6,   579,
          134,    35,  7031,     6,   579,   176,    35,  7031,    43, 43839,
        50264,    35,  1437, 50118, 50117,   671,   579,   134,  2055,   579,
          176,     2])
tensor([    0,     2,  9232,   278, 29116,  1640, 13367,     6,   579,    35,
         7031,    43, 43839, 

In [43]:
fill_mask = pipeline('fill-mask', model=custom_model, tokenizer=tokenizer)

outputs = fill_mask(CODE)
outputs

[{'score': 1.0,
  'token': 0,
  'token_str': '<s>',
  'sequence': 'def returnInt() ->:'},
 {'score': 4.295258014329576e-23,
  'token': 7,
  'token_str': ' to',
  'sequence': 'def returnInt() -> to:'},
 {'score': 0.0,
  'token': 6,
  'token_str': ',',
  'sequence': 'def returnInt() ->,:'},
 {'score': 0.0,
  'token': 5,
  'token_str': ' the',
  'sequence': 'def returnInt() -> the:'},
 {'score': 0.0,
  'token': 4,
  'token_str': '.',
  'sequence': 'def returnInt() ->.:'}]

In [44]:
import numpy as np
from annoy import AnnoyIndex
from torch.utils.data import DataLoader
import random
from collections import defaultdict
import time

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
KNN_TREE_SIZE = 20
DISTANCE_METRIC = 'euclidean'


def create_type_space(inputs=input_list[:4], labels=labels[:4]):
    """
    Creates the type space based on the inputs and their corresponding labels
    """
    
    # Make sure imputs are labeled
    assert len(inputs) == len(labels)
    
    # Cache the type space mappings
    computed_mapped_batches_train = []
    with torch.no_grad():
        
        # Iterate through the data set
        for inp, label in zip(inputs, labels):
            
            # Tokenize the code
            nl_tokens = tokenizer.tokenize("")
            code_tokens = tokenizer.tokenize(inp)
            tokens=[tokenizer.cls_token]+nl_tokens+[tokenizer.sep_token]+code_tokens+[tokenizer.sep_token]
            tokens_ids=tokenizer.convert_tokens_to_ids(tokens)
            
            # Get the type space mapping from the model
            output = custom_model.forward(torch.tensor(tokens_ids)[None,:])
            
            # Select masked tokens
            masked_tokens = [c for c, token in enumerate(code_tokens) if token == "<mask>"]
            
            print(masked_tokens)
            
            # For this version, assume only one mask
            assert len(masked_tokens) == 1
            
            # Selected only the masked tokens from the output
            vals = output.logits.cpu().numpy()
            predicted_masks = [vals[0][i] for i in masked_tokens]
            
            # Cache the mapping of the masked token only
            computed_mapped_batches_train.append(predicted_masks)
        
        # Create the type space
        annoy_index = create_knn_index(computed_mapped_batches_train, None, computed_mapped_batches_train[0][0].size)
    return annoy_index

def create_knn_index(train_types_embed: np.array, valid_types_embed: np.array, type_embed_dim:int) -> AnnoyIndex:
    """
    Creates KNNs index for given type embedding vectors, taken from Type4Py
    """
    
    annoy_idx = AnnoyIndex(type_embed_dim, DISTANCE_METRIC)

    for i, v in enumerate(tqdm(train_types_embed, total=len(train_types_embed), desc="KNN index")):
        print(v[0])
        annoy_idx.add_item(i, v[0])

    annoy_idx.build(KNN_TREE_SIZE)
    return annoy_idx

annoy_idx = create_type_space()
print(annoy_idx)

[5]
[11]
[7]
[18]


KNN index:   0%|          | 0/4 [00:00<?, ?it/s]

[145.42784  -29.278706  45.338547 -30.66457   18.99699   39.224346
  40.825054 103.400734]
[156.02185  -32.93112   38.885925 -41.234108  30.782158  37.38632
  49.57484  104.4173  ]
[148.68427  -32.9436    39.215916 -36.328114  27.086197  36.987328
  41.257534 103.47224 ]
[161.6432   -34.169556  38.239708 -40.985474  29.513176  38.65199
  48.715927 109.535225]
<annoy.Annoy object at 0x7f88678083f0>


In [45]:
def map_type(inputs=input_list[:4]):
    """
    Maps an input to the type space
    """
    with torch.no_grad():
        computed_embed_batches_test = []
        computed_embed_labels_test = []
        
        for inp in inputs:

            # Tokenize the code
            nl_tokens = tokenizer.tokenize("")
            code_tokens = tokenizer.tokenize(inp)
            tokens=[tokenizer.cls_token]+nl_tokens+[tokenizer.sep_token]+code_tokens+[tokenizer.sep_token]
            tokens_ids=tokenizer.convert_tokens_to_ids(tokens)
            
            # Get the type space mapping from the model
            output = custom_model.forward(torch.tensor(tokens_ids)[None,:])
            
            # Select masked tokens
            masked_tokens = [c for c, token in enumerate(code_tokens) if token == "<mask>"]
            
            # For this version, assume only one mask
            assert len(masked_tokens) == 1
            
            # Selected only the masked tokens from the output
            vals = output.logits.cpu().numpy()
            predicted_masks = [vals[0][i] for i in masked_tokens]

            # Cache the mapping of the masked token only
            computed_embed_batches_test.append(predicted_masks)
        
        return computed_embed_batches_test

def predict_type(types_embed_array: np.array, types_embed_labels: np.array, indexed_knn: AnnoyIndex, k: int):
    """
    Predict type of given type embedding vectors
    """

    pred_types_embed = []
    pred_types_score = []
    for i, embed_vec in enumerate(tqdm(types_embed_array, total=len(types_embed_array), desc="Finding KNNs & Prediction")):
        
        # Get the distances to the KNN
        idx, dist = indexed_knn.get_nns_by_vector(embed_vec[0], k, include_distances=True)
        
        # Compute the scores according to the formula
        pred_idx_scores = compute_types_score(dist, idx, types_embed_labels)
        
        # Cache the scores and the labels
        pred_types_embed.append([i for (i, s) in pred_idx_scores])
        pred_types_score.append(pred_idx_scores)
    
    return pred_types_embed, pred_types_score

def compute_types_score(types_dist: list, types_idx: list, types_embed_labels: np.array):
        types_dist = 1 / (np.array(types_dist) + 1e-10) ** 2
        types_dist /= np.sum(types_dist)
        types_score = defaultdict(int)
        for n, d in zip(types_idx, types_dist):
            types_score[types_embed_labels[n]] += d
        
        return sorted({t: s for t, s in types_score.items()}.items(), key=lambda kv: kv[1], reverse=True)
    
types_embed_array = map_type()
knn_K = 2
pred_type_embed, pred_type_score = predict_type(types_embed_array, labels[:4], annoy_idx, knn_K,)
print(input_list[:4])
print(labels[:4])
print(pred_type_score)

Finding KNNs & Prediction:   0%|          | 0/4 [00:00<?, ?it/s]

['def returnInt() -> <mask>: \n\t x: int = 42 \n\t return x', 'def setInt(self, x: int) -> <mask>: \n\t self.x = x', 'def getInt(self) -> <mask>: \n\t return self.x', 'def concatString(self, s1: str, s2: str) -> <mask>: \n\t return s1 + s2']
['int', 'None', 'int', 'str']
[[('int', 0.5271375653386224), ('None', 0.4728624346613775)], [('int', 1.0)], [('int', 1.0)], [('int', 0.5739985440616903), ('None', 0.4260014559383097)]]
