

Downloading two companion files where we have some useful function and also the main model architecture code

In [None]:
!wget https://raw.githubusercontent.com/autosoft-dev/ml-on-code/main/assets/model.py
!wget https://raw.githubusercontent.com/autosoft-dev/ml-on-code/main/assets/utils.py

--2022-11-11 06:26:05--  https://raw.githubusercontent.com/autosoft-dev/ml-on-code/main/assets/model.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9935 (9.7K) [text/plain]
Saving to: ‘model.py.1’


2022-11-11 06:26:05 (74.8 MB/s) - ‘model.py.1’ saved [9935/9935]

--2022-11-11 06:26:05--  https://raw.githubusercontent.com/autosoft-dev/ml-on-code/main/assets/utils.py
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2602 (2.5K) [text/plain]
Saving to: ‘utils.py.1’


2022-11-11 06:26:05 (33.5 MB/s) - ‘utils.py.1’ saved [2602/2602]

### installing tree-hugger and transformers

In [None]:
!pip install transformers
!pip install -U tree-hugger PyYAML

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Command to build the necessary processing libary (tree-hugger related)

In [None]:
!create_libs -c python

2022-11-11 07:47:49,509 INFO:Cloneing python repo from tree-sitter collections
2022-11-11 07:48:15,375 INFO:Creating the library my-languages.so at /content
2022-11-11 07:48:16,403 INFO:Finished creating library!


**Importing all necessary modules**

and

**Creating the Seq2Seq model for model architecture**

In [None]:
import os
import json
import torch
import torch.nn as nn
from utils import Example, convert_examples_to_features
from transformers import RobertaConfig, RobertaModel, RobertaTokenizer
from torch.utils.data import TensorDataset, DataLoader, SequentialSampler

class Seq2Seq(nn.Module):
    """
    Build Seqence-to-Sequence.

    Parameters:

    * `encoder`- encoder of seq2seq model. e.g. roberta
    * `decoder`- decoder of seq2seq model. e.g. transformer
    * `config`- configuration of encoder model.
    * `beam_size`- beam size for beam search.
    * `max_length`- max length of target for beam search.
    * `sos_id`- start of symbol ids in target for beam search.
    * `eos_id`- end of symbol ids in target for beam search.
    """

    def __init__(
        self,
        encoder,
        decoder,
        config,
        beam_size=None,
        max_length=None,
        sos_id=None,
        eos_id=None,
    ):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.config = config
        self.register_buffer("bias", torch.tril(torch.ones(2048, 2048)))
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
        self.lsm = nn.LogSoftmax(dim=-1)
        self.tie_weights()

        self.beam_size = beam_size
        self.max_length = max_length
        self.sos_id = sos_id
        self.eos_id = eos_id

    def _tie_or_clone_weights(self, first_module, second_module):
        """Tie or clone module weights depending of weither we are using TorchScript or not"""
        if self.config.torchscript:
            first_module.weight = nn.Parameter(second_module.weight.clone())
        else:
            first_module.weight = second_module.weight

    def tie_weights(self):
        """Make sure we are sharing the input and output embeddings.
        Export to TorchScript can't handle parameter sharing so we are cloning them instead.
        """
        self._tie_or_clone_weights(
            self.lm_head, self.encoder.embeddings.word_embeddings
        )

    def forward(
        self,
        source_ids=None,
        source_mask=None,
        target_ids=None,
        target_mask=None,
        args=None,
    ):
        outputs = self.encoder(source_ids, attention_mask=source_mask)
        encoder_output = outputs[0].permute([1, 0, 2]).contiguous()
        if target_ids is not None:
            attn_mask = -1e4 * (
                1 - self.bias[: target_ids.shape[1], : target_ids.shape[1]]
            )
            tgt_embeddings = (
                self.encoder.embeddings(target_ids).permute([1, 0, 2]).contiguous()
            )
            out = self.decoder(
                tgt_embeddings,
                encoder_output,
                tgt_mask=attn_mask,
                memory_key_padding_mask=(1 - source_mask).bool(),
            )
            hidden_states = torch.tanh(self.dense(out)).permute([1, 0, 2]).contiguous()
            lm_logits = self.lm_head(hidden_states)
            # Shift so that tokens < n predict n
            active_loss = target_mask[..., 1:].ne(0).view(-1) == 1
            shift_logits = lm_logits[..., :-1, :].contiguous()
            shift_labels = target_ids[..., 1:].contiguous()
            # Flatten the tokens
            loss_fct = nn.CrossEntropyLoss(ignore_index=-1)
            loss = loss_fct(
                shift_logits.view(-1, shift_logits.size(-1))[active_loss],
                shift_labels.view(-1)[active_loss],
            )

            outputs = loss, loss * active_loss.sum(), active_loss.sum()
            return outputs
        else:
            # Predict
            preds = []
            if source_ids.device.type == "cuda":
                zero = torch.cuda.LongTensor(1).fill_(0)
            elif source_ids.device.type == "cpu":
                zero = torch.LongTensor(1).fill_(0)
            for i in range(source_ids.shape[0]):
                context = encoder_output[:, i : i + 1]
                context_mask = source_mask[i : i + 1, :]
                beam = Beam(
                    self.beam_size,
                    self.sos_id,
                    self.eos_id,
                    device=source_ids.device.type,
                )
                input_ids = beam.getCurrentState()
                context = context.repeat(1, self.beam_size, 1)
                context_mask = context_mask.repeat(self.beam_size, 1)
                for _ in range(self.max_length):
                    if beam.done():
                        break
                    attn_mask = -1e4 * (
                        1 - self.bias[: input_ids.shape[1], : input_ids.shape[1]]
                    )
                    tgt_embeddings = (
                        self.encoder.embeddings(input_ids)
                        .permute([1, 0, 2])
                        .contiguous()
                    )
                    out = self.decoder(
                        tgt_embeddings,
                        context,
                        tgt_mask=attn_mask,
                        memory_key_padding_mask=(1 - context_mask).bool(),
                    )
                    out = torch.tanh(self.dense(out))
                    hidden_states = out.permute([1, 0, 2]).contiguous()[:, -1, :]
                    out = self.lsm(self.lm_head(hidden_states)).data
                    beam.advance(out)
                    input_ids.data.copy_(
                        input_ids.data.index_select(0, beam.getCurrentOrigin())
                    )
                    input_ids = torch.cat((input_ids, beam.getCurrentState()), -1)
                hyp = beam.getHyp(beam.getFinal())
                pred = beam.buildTargetTokens(hyp)[: self.beam_size]
                pred = [
                    torch.cat(
                        [x.view(-1) for x in p] + [zero] * (self.max_length - len(p))
                    ).view(1, -1)
                    for p in pred
                ]
                preds.append(torch.cat(pred, 0).unsqueeze(0))

            preds = torch.cat(preds, 0)
            return preds


class Beam(object):
    def __init__(self, size, sos, eos, device):
        self.size = size
        if device == "cuda":
            self.tt = torch.cuda
        elif device == "cpu":
            self.tt = torch
        # The score for each translation on the beam.
        self.scores = self.tt.FloatTensor(size).zero_()
        # The backpointers at each time-step.
        self.prevKs = []
        # The outputs at each time-step.
        self.nextYs = [self.tt.LongTensor(size).fill_(0)]
        self.nextYs[0][0] = sos
        # Has EOS topped the beam yet.
        self._eos = eos
        self.eosTop = False
        # Time and k pair for finished.
        self.finished = []

    def getCurrentState(self):
        "Get the outputs for the current timestep."
        batch = self.tt.LongTensor(self.nextYs[-1]).view(-1, 1)
        return batch

    def getCurrentOrigin(self):
        "Get the backpointers for the current timestep."
        return self.prevKs[-1]

    def advance(self, wordLk):
        """
        Given prob over words for every last beam `wordLk` and attention
        `attnOut`: Compute and update the beam search.

        Parameters:

        * `wordLk`- probs of advancing from the last step (K x words)
        * `attnOut`- attention at the last step

        Returns: True if beam search is complete.
        """
        numWords = wordLk.size(1)

        # Sum the previous scores.
        if len(self.prevKs) > 0:
            beamLk = wordLk + self.scores.unsqueeze(1).expand_as(wordLk)

            # Don't let EOS have children.
            for i in range(self.nextYs[-1].size(0)):
                if self.nextYs[-1][i] == self._eos:
                    beamLk[i] = -1e20
        else:
            beamLk = wordLk[0]
        flatBeamLk = beamLk.view(-1)
        bestScores, bestScoresId = flatBeamLk.topk(self.size, 0, True, True)

        self.scores = bestScores

        # bestScoresId is flattened beam x word array, so calculate which
        # word and beam each score came from
        prevK = bestScoresId // numWords
        self.prevKs.append(prevK)
        self.nextYs.append((bestScoresId - prevK * numWords))

        for i in range(self.nextYs[-1].size(0)):
            if self.nextYs[-1][i] == self._eos:
                s = self.scores[i]
                self.finished.append((s, len(self.nextYs) - 1, i))

        # End condition is when top-of-beam is EOS and no global score.
        if self.nextYs[-1][0] == self._eos:
            self.eosTop = True

    def done(self):
        return self.eosTop and len(self.finished) >= self.size

    def getFinal(self):
        if len(self.finished) == 0:
            self.finished.append((self.scores[0], len(self.nextYs) - 1, 0))
        self.finished.sort(key=lambda a: -a[0])
        if len(self.finished) != self.size:
            unfinished = []
            for i in range(self.nextYs[-1].size(0)):
                if self.nextYs[-1][i] != self._eos:
                    s = self.scores[i]
                    unfinished.append((s, len(self.nextYs) - 1, i))
            unfinished.sort(key=lambda a: -a[0])
            self.finished += unfinished[: self.size - len(self.finished)]
        return self.finished[: self.size]

    def getHyp(self, beam_res):
        """
        Walk back to construct the full hypothesis.
        """
        hyps = []
        for _, timestep, k in beam_res:
            hyp = []
            for j in range(len(self.prevKs[:timestep]) - 1, -1, -1):
                hyp.append(self.nextYs[j + 1][k])
                k = self.prevKs[j][k]
            hyps.append(hyp[::-1])
        return hyps

    def buildTargetTokens(self, preds):
        sentence = []
        for pred in preds:
            tokens = []
            for tok in pred:
                if tok == self._eos:
                    break
                tokens.append(tok)
            sentence.append(tokens)
        return sentence

The simple model was not having a good score. So we decided to import a fine tuned model for this purpose which was trained on the similar kind of dataset. Importing the same from the code.

In [None]:
!wget https://code-summary.s3.amazonaws.com/pytorch_model.bin

--2022-11-11 07:48:52--  https://code-summary.s3.amazonaws.com/pytorch_model.bin
Resolving code-summary.s3.amazonaws.com (code-summary.s3.amazonaws.com)... 52.216.97.3
Connecting to code-summary.s3.amazonaws.com (code-summary.s3.amazonaws.com)|52.216.97.3|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 706871064 (674M) [application/macbinary]
Saving to: ‘pytorch_model.bin.2’


2022-11-11 07:49:06 (51.9 MB/s) - ‘pytorch_model.bin.2’ saved [706871064/706871064]



In [None]:
## We are defining all the needed functions here. 
def inference(data, model, tokenizer):
    # Calculate bleu
    eval_sampler = SequentialSampler(data)
    eval_dataloader = DataLoader(data, sampler=eval_sampler, batch_size=len(data))

    model.eval()
    p = []
    for batch in eval_dataloader:
        batch = tuple(t.to('cpu') for t in batch)
        source_ids, source_mask = batch
        with torch.no_grad():
            preds = model(source_ids=source_ids, source_mask=source_mask)
            for pred in preds:
                t = pred[0].cpu().numpy()
                t = list(t)
                if 0 in t:
                    t = t[: t.index(0)]
                text = tokenizer.decode(t, clean_up_tokenization_spaces=False)
                p.append(text)
    return (p, source_ids.shape[-1])


def get_features(examples, tokenizer):
    features = convert_examples_to_features(
        examples, tokenizer, stage="test"
    )
    all_source_ids = torch.tensor(
        [f.source_ids[: 256] for f in features], dtype=torch.long
    )
    all_source_mask = torch.tensor(
        [f.source_mask[: 256] for f in features], dtype=torch.long
    )
    return TensorDataset(all_source_ids, all_source_mask)


def build_model(model_class, config, tokenizer):
    encoder = model_class(config=config)
    decoder_layer = nn.TransformerDecoderLayer(
        d_model=config.hidden_size, nhead=config.num_attention_heads
    )
    decoder = nn.TransformerDecoder(decoder_layer, num_layers=6)
    model = Seq2Seq(
        encoder=encoder,
        decoder=decoder,
        config=config,
        beam_size=10,
        max_length=128,
        sos_id=tokenizer.cls_token_id,
        eos_id=tokenizer.sep_token_id,
    )

    model.load_state_dict(
        torch.load(
            "pytorch_model.bin",
            map_location=torch.device("cpu"),
        ),
        strict=False,
    )
    return model

Now that we have all the needed functions,loading the baseline model from Hugging Face model hub

In [None]:
config = RobertaConfig.from_pretrained("microsoft/codebert-base")
tokenizer = RobertaTokenizer.from_pretrained(
    "microsoft/codebert-base", do_lower_case=False
)

model = build_model(
    model_class=RobertaModel, config=config, tokenizer=tokenizer
).to('cpu')

Predictions

In [None]:
example = [Example(source="def fn_tensors(t, t1) -> Any:\n    return t + t1", target=None)]
message, length = inference(get_features(example, tokenizer), model, tokenizer)
print(message)



['Compute the difference between two tensors .']



We need to be able to run it on a bunch of files and extract the functions from it and then predict their docstrings. How shall we do it?

Codist [tree-hugger](https://github.com/autosoft-dev/tree-hugger) to the rescue!

We are going to declare a small function that will help us go over each files in a nested directory tree (like the one above we cloned) and get each file at a time. 

In [None]:
from pathlib import Path

def check_out_path(target_path: Path):
    """"
    This function recursively yields all contents of a pathlib.Path object
    """
    yield target_path
    for file in target_path.iterdir():
        if file.is_dir():
            yield from check_out_path(file)
        else:
            yield file.absolute()


def is_python_file(file_path: Path):
  """
  This little function will help us to filter the result and keep only the python files
  """
  return file_path.is_file() and file_path.suffix == ".py"

In [None]:
for file_path in check_out_path(Path("py-files")):
  if is_python_file(file_path):
    print(file_path)

/content/py-files/api.py
/content/py-files/mlapi.py
/content/py-files/main.py
/content/py-files/mainapp.py


Using tree-hugger to parse all the needed files and let's do that

In [None]:
# We first create our PythonParser object
from tree_hugger.core import PythonParser

In [None]:
pp = PythonParser(library_loc="/content/my-languages.so")

In [None]:
# Let's use the function we defined before to go over all the files.
for file_path in check_out_path(Path("py-files")):
  if is_python_file(file_path):
    # we use one line, super convinient tree-hugger API call to get the needed data
    if pp.parse_file(str(file_path)):
      temp_cache = []
      # The following call returns a dict where each key is a name of a function
      # And each value is a tuple, (function_body, function_docstring)
      func_and_docstr = pp.get_all_function_bodies(strip_docstr=True)
      for func_name, (body, docstr) in func_and_docstr.items():
        example = [Example(source=body, target=None)]
        message, length = inference(get_features(example, tokenizer), model, tokenizer)
        print("Function name:",func_name, ", \t","Function definition/Work :"," ".join(message))
      # Let's add the result to the final output



Function name: scoring_endpoint , 	 Function definition/Work : estimate scoring endpoint
Function name: engine_talk , 	 Function definition/Work : send text to engine
Function name: user_commands , 	 Function definition/Work : Extract user commands .
Function name: run_blanco , 	 Function definition/Work : Run the bot .
Function name: index , 	 Function definition/Work : Return the index .
Function name: stomach_disesase_pred , 	 Function definition/Work : Evaluate gadgets .
Function name: hepatitis_disease_pred , 	 Function definition/Work : Evaluate hepatitis disease prediction .
Function name: skin_disease_pred , 	 Function definition/Work : Evaluate the skin disease prediction .
Function name: medical_insurance_price_pred , 	 Function definition/Work : Compute medicalurance price prediction .
Function name: breast_cancer_pred , 	 Function definition/Work : This function is used to make sure that there is no cancer cases .
