# Упражнение

`На основе существующих открытых обученных моделей (CodeBERT, InCoder) собрать решение для суммаризации кода.`

Код был взят из [репозитория](https://github.com/microsoft/CodeBERT/tree/master/CodeBERT/code2nl) и адаптирован

In [None]:
!wget https://code-summary.s3.amazonaws.com/pytorch_model.bin

In [None]:
import torch
import torch.nn as nn
from transformers import RobertaConfig, RobertaModel, RobertaTokenizer
from torch.utils.data import TensorDataset, DataLoader, SequentialSampler

## Модель и вспомогательный код

In [2]:
class Beam:
    def __init__(self, size, sos, eos, device):
        self.size = size

        if device == "cuda":
            self.tt = torch.cuda
        elif device == "cpu":
            self.tt = torch

        self.scores = self.tt.FloatTensor(size).zero_() # The score for each translation on the beam.
        self.prevKs = [] # The backpointers at each time-step.
        self.nextYs = [self.tt.LongTensor(size).fill_(0)]  # The outputs at each time-step.
        self.nextYs[0][0] = sos
        self._eos = eos # Has EOS topped the beam yet.
        self.eosTop = False
        self.finished = [] # Time and k pair for finished.

    def getCurrentState(self):
        """Get the outputs for the current timestep."""
        return self.tt.LongTensor(self.nextYs[-1]).view(-1, 1)

    def getCurrentOrigin(self):
        """Get the backpointers for the current timestep."""
        return self.prevKs[-1]

    def advance(self, wordLk):
        """
        Given prob over words for every last beam `wordLk` and attention
        `attnOut`: Compute and update the beam search.

        Parameters:

        * `wordLk`- probs of advancing from the last step (K x words)
        * `attnOut`- attention at the last step

        Returns: True if beam search is complete.
        """
        numWords = wordLk.size(1)

        # Sum the previous scores.
        if len(self.prevKs) > 0:
            beamLk = wordLk + self.scores.unsqueeze(1).expand_as(wordLk)

            # Don't let EOS have children.
            for i in range(self.nextYs[-1].size(0)):
                if self.nextYs[-1][i] == self._eos:
                    beamLk[i] = -1e20
        else:
            beamLk = wordLk[0]

        flatBeamLk = beamLk.view(-1)
        bestScores, bestScoresId = flatBeamLk.topk(self.size, 0, True, True)
        self.scores = bestScores

        # bestScoresId is flattened beam x word array, so calculate which
        # word and beam each score came from
        prevK = bestScoresId // numWords
        self.prevKs.append(prevK)
        self.nextYs.append((bestScoresId - prevK * numWords))

        for i in range(self.nextYs[-1].size(0)):
            if self.nextYs[-1][i] == self._eos:
                self.finished.append((self.scores[i], len(self.nextYs) - 1, i))

        # End condition is when top-of-beam is EOS and no global score.
        if self.nextYs[-1][0] == self._eos:
            self.eosTop = True

    def done(self):
        return self.eosTop and len(self.finished) >= self.size

    def getFinal(self):
        if len(self.finished) == 0:
            self.finished.append((self.scores[0], len(self.nextYs) - 1, 0))

        self.finished.sort(key=lambda a: -a[0])
        if len(self.finished) != self.size:
            unfinished = []
            for i in range(self.nextYs[-1].size(0)):
                if self.nextYs[-1][i] != self._eos:
                    s = self.scores[i]
                    unfinished.append((s, len(self.nextYs) - 1, i))
            unfinished.sort(key=lambda a: -a[0])
            self.finished += unfinished[: self.size - len(self.finished)]
        return self.finished[: self.size]

    def getHyp(self, beam_res):
        """
        Walk back to construct the full hypothesis.
        """
        hyps = []
        for _, timestep, k in beam_res:
            hyp = []
            for j in range(len(self.prevKs[:timestep]) - 1, -1, -1):
                hyp.append(self.nextYs[j + 1][k])
                k = self.prevKs[j][k]
            hyps.append(hyp[::-1])
        return hyps

    def buildTargetTokens(self, preds):
        sentence = []
        for pred in preds:
            tokens = []
            for tok in pred:
                if tok == self._eos:
                    break
                tokens.append(tok)
            sentence.append(tokens)
        return sentence

In [None]:
class Seq2Seq(nn.Module):
    """
    Build Seqence-to-Sequence.

    Parameters:

    * `encoder`- encoder of seq2seq model. e.g. roberta
    * `decoder`- decoder of seq2seq model. e.g. transformer
    * `config`- configuration of encoder model.
    * `beam_size`- beam size for beam search.
    * `max_length`- max length of target for beam search.
    * `sos_id`- start of symbol ids in target for beam search.
    * `eos_id`- end of symbol ids in target for beam search.
    """

    def __init__(self, encoder, decoder, config, beam_size=None,
        max_length=None, sos_id=None, eos_id=None):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.config = config
        self.register_buffer("bias", torch.tril(torch.ones(2048, 2048)))
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
        self.lsm = nn.LogSoftmax(dim=-1)
        self.tie_weights()

        self.beam_size = beam_size
        self.max_length = max_length
        self.sos_id = sos_id
        self.eos_id = eos_id

    def _tie_or_clone_weights(self, first_module, second_module):
        """Tie or clone module weights depending of weither we are using TorchScript or not"""
        if self.config.torchscript:
            first_module.weight = nn.Parameter(second_module.weight.clone())
        else:
            first_module.weight = second_module.weight

    def tie_weights(self):
        """Make sure we are sharing the input and output embeddings.
        Export to TorchScript can't handle parameter sharing so we are cloning them instead.
        """
        self._tie_or_clone_weights(
            self.lm_head, self.encoder.embeddings.word_embeddings
        )

    def forward(self, source_ids, source_mask):
        outputs = self.encoder(source_ids, attention_mask=source_mask)
        encoder_output = outputs[0].permute([1, 0, 2]).contiguous()
        preds = []

        if source_ids.device.type == "cuda":
            zero = torch.cuda.LongTensor(1).fill_(0)
        elif source_ids.device.type == "cpu":
            zero = torch.LongTensor(1).fill_(0)

        for i in range(source_ids.shape[0]):
            beam = Beam(
                self.beam_size,
                self.sos_id,
                self.eos_id,
                device=source_ids.device.type,
            )
            context = encoder_output[:, i:i + 1].repeat(1, self.beam_size, 1)
            context_mask = source_mask[i:i + 1, :].repeat(self.beam_size, 1)
            input_ids = beam.getCurrentState()

            for _ in range(self.max_length):
                if beam.done():
                    break

                attn_mask = -1e4 * (
                    1 - self.bias[:input_ids.shape[1], :input_ids.shape[1]]
                )
                tgt_embeddings = (
                    self.encoder.embeddings(input_ids)
                    .permute([1, 0, 2])
                    .contiguous()
                )
                out = self.decoder(
                    tgt_embeddings,
                    context,
                    tgt_mask=attn_mask,
                    memory_key_padding_mask=(1 - context_mask).bool(),
                )
                out = torch.tanh(self.dense(out))
                hidden_states = out.permute([1, 0, 2]).contiguous()[:, -1, :]
                out = self.lsm(self.lm_head(hidden_states)).data
                beam.advance(out)
                input_ids.data.copy_(
                    input_ids.data.index_select(0, beam.getCurrentOrigin())
                )
                input_ids = torch.cat((input_ids, beam.getCurrentState()), -1)

            hyp = beam.getHyp(beam.getFinal())
            pred = beam.buildTargetTokens(hyp)[: self.beam_size]
            pred = [
                torch.cat(
                    [x.view(-1) for x in p] + [zero] * (self.max_length - len(p))
                ).view(1, -1)
                for p in pred
            ]
            preds.append(torch.cat(pred, 0).unsqueeze(0))

        preds = torch.cat(preds, 0)
        return preds

In [None]:
class InputFeatures:
    """A single training/test features for a example."""

    def __init__(self, example_id, source_ids,
        target_ids, source_mask, target_mask):
        self.example_id = example_id
        self.source_ids = source_ids
        self.target_ids = target_ids
        self.source_mask = source_mask
        self.target_mask = target_mask


def convert_examples_to_features(examples, tokenizer):
    features = []

    for example_index, example in enumerate(examples):
        # source
        source_tokens = tokenizer.tokenize(example)[: 256 - 2]
        source_tokens = [tokenizer.cls_token] + source_tokens + [tokenizer.sep_token]
        source_ids = tokenizer.convert_tokens_to_ids(source_tokens)
        source_mask = [1] * (len(source_tokens))
        padding_length = 256 - len(source_ids)
        source_ids += [tokenizer.pad_token_id] * padding_length
        source_mask += [0] * padding_length

        target_tokens = tokenizer.tokenize("None")
        target_tokens = [tokenizer.cls_token] + target_tokens + [tokenizer.sep_token]
        target_ids = tokenizer.convert_tokens_to_ids(target_tokens)
        target_mask = [1] * len(target_ids)
        padding_length = 128 - len(target_ids)
        target_ids += [tokenizer.pad_token_id] * padding_length
        target_mask += [0] * padding_length

        features.append(
            InputFeatures(
                example_index,
                source_ids,
                target_ids,
                source_mask,
                target_mask,
            )
        )

    return features

In [None]:
## We are defining all the needed functions here.
def inference(data, model, tokenizer):
    # Calculate bleu
    eval_sampler = SequentialSampler(data)
    eval_dataloader = DataLoader(data, sampler=eval_sampler, batch_size=len(data))

    model.eval()
    p = []
    for batch in eval_dataloader:
        batch = tuple(t.to('cpu') for t in batch)
        source_ids, source_mask = batch
        with torch.no_grad():
            preds = model(source_ids=source_ids, source_mask=source_mask)
            for pred in preds:
                t = pred[0].cpu().numpy()
                t = list(t)
                if 0 in t:
                    t = t[: t.index(0)]
                text = tokenizer.decode(t, clean_up_tokenization_spaces=False)
                p.append(text)
    return p, source_ids.shape[-1]


def get_features(examples, tokenizer):
    features = convert_examples_to_features(examples, tokenizer)
    all_source_ids = torch.tensor(
        [f.source_ids[: 256] for f in features], dtype=torch.long
    )
    all_source_mask = torch.tensor(
        [f.source_mask[: 256] for f in features], dtype=torch.long
    )
    return TensorDataset(all_source_ids, all_source_mask)


def build_model(model_class, config, tokenizer):
    encoder = model_class(config=config)
    decoder_layer = nn.TransformerDecoderLayer(
        d_model=config.hidden_size, nhead=config.num_attention_heads
    )
    decoder = nn.TransformerDecoder(decoder_layer, num_layers=6)
    model = Seq2Seq(
        encoder=encoder,
        decoder=decoder,
        config=config,
        beam_size=10,
        max_length=128,
        sos_id=tokenizer.cls_token_id,
        eos_id=tokenizer.sep_token_id,
    )

    model.load_state_dict(
        torch.load(
            "pytorch_model.bin",
            map_location=torch.device("cpu"),
        ),
        strict=False,
    )
    return model

In [None]:
model_name = "microsoft/codebert-base"

config = RobertaConfig.from_pretrained(model_name)
tokenizer = RobertaTokenizer.from_pretrained(model_name, do_lower_case=False)

model = build_model(model_class=RobertaModel, config=config, tokenizer=tokenizer).to('cpu')

## Вычисление

In [None]:
def evaluate(example):
    return inference(get_features([example], tokenizer), model, tokenizer)[0]

## Примеры

In [None]:
print(evaluate("""
def add_numbers(a, b):
    return a + b
"""))

print(evaluate("""
def add(a, b):
    return a + b
"""))

print(evaluate("""
def f(a, b):
    return a + b
"""))

print(evaluate("""
def smart_algo(*args):
    l = len(args)
    args = list(sorted(args))

    if l % 2 == 1:
        return args[l // 2]

    return (args[l // 2 - 1] + args[l // 2]) / 2
"""))

print(evaluate("""
def get_median(*args):
    l = len(args)
    args = list(sorted(args))

    if l % 2 == 1:
        return args[l // 2]

    return (args[l // 2 - 1] + args[l // 2]) / 2
"""))

print(evaluate("""
def f():
    '''
    Does completely nothing
    '''
    pass
"""))

print(evaluate("""
def BFS(graph, start):
    visited = [False] * len(graph)
    distances = [-1] * len(graph)
    queue = []

    queue.append(start)
    visited[start] = True
    distances[start] = 0

    while queue:
        n = queue.pop(0)

        for i in graph[n]:
            if not visited[i]:
                queue.append(i)
                visited[i] = True
                distances[i] = distances[n] + 1

    return distances
"""))

print(evaluate("""
def DFS(graph, node, visited=None):
    if not visited:
        visited = set()
        visited.add(node)

    yield node

    for i in graph[node]:
        if i not in visited:
            visited.add(i)
            DFS(graph, i, visited)
"""))

print(evaluate("""
def DFS(graph, node):
    visited = {node}
    stack = [node]

    yield node

    while stack:
        n = stack[-1]

        for i in graph[n]:
            if i not in visited:
                visited.add(i)
                yield i
"""))

print(evaluate("""
def algo(graph, node):
    visited = {node}
    stack = [node]

    yield node

    while stack:
        n = stack[-1]

        for i in graph[n]:
            if i not in visited:
                visited.add(i)
                yield i
"""))

```
['Add two numbers .']
['Add two arrays .']
['Greater common multiplication .']
['Calculate the objective function .']
['Calculate the median value of an iterable .']
['This function will be called on every time .']
['BFS traversal .']
['DFS DFS iterator .']
['DFS DFS iterator .']
['Iterate over the graph .']
```