In [1]:
!git clone https://github.com/nlbse2024/code-comment-classification.git

Cloning into 'code-comment-classification'...
remote: Enumerating objects: 217, done.[K
remote: Counting objects: 100% (56/56), done.[K
remote: Compressing objects: 100% (37/37), done.[K
remote: Total 217 (delta 28), reused 33 (delta 17), pack-reused 161[K
Receiving objects: 100% (217/217), 20.29 MiB | 20.11 MiB/s, done.
Resolving deltas: 100% (72/72), done.


In [2]:
!pip install datasets transformers

Collecting datasets
  Downloading datasets-2.15.0-py3-none-any.whl (521 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m521.2/521.2 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
Collecting pyarrow-hotfix (from datasets)
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl (7.9 kB)
Collecting dill<0.3.8,>=0.3.0 (from datasets)
  Downloading dill-0.3.7-py3-none-any.whl (115 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.3/115.3 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
Collecting multiprocess (from datasets)
  Downloading multiprocess-0.70.15-py310-none-any.whl (134 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pyarrow-hotfix, dill, multiprocess, datasets
Successfully installed datasets-2.15.0 dill-0.3.7 multiprocess-0.70.15 pyarrow-hotfix-0.6


In [3]:
import pandas as pd
import time
from datasets import Dataset
from tqdm.auto import tqdm
## Workaround for dashes in name
from importlib import import_module
nlbse_statistics = import_module('code-comment-classification.nlbse_statistics')
from datasets import load_dataset, get_dataset_split_names
from sklearn.metrics import f1_score, confusion_matrix
import numpy as np
tqdm.pandas()

# Model declaration

In [4]:
from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel, RobertaModel, RobertaClassificationHead, RobertaLayer
from transformers.modeling_outputs import SequenceClassifierOutput
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
import torch.nn.init as init
from typing import *

class CustomRobertaClassificationHead(nn.Module):
    """Head for sentence-level classification tasks."""

    def __init__(self, config, infeat):
        super().__init__()
        self.linear = nn.Linear(infeat, config.hidden_size)
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        self.out = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = self.dropout(features)
        x = self.linear(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out(x)
        return x


class CodeBERTForSequenceClassification(RobertaPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config

        self.roberta = RobertaModel(config, add_pooling_layer=False)
        self.classifier = CustomRobertaClassificationHead(config, 768)

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        labels: Optional[torch.LongTensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
        weighted_loss = None
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.roberta(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        sequence_output = outputs[0][:,0,:]

        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            # move labels to correct device to enable model parallelism
            labels = labels.to(logits.device)
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                if weighted_loss is not None:
                    loss_fct = CrossEntropyLoss(weight=weighted_loss[0][:self.num_labels])
                else:
                    loss_fct = CrossEntropyLoss()

                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


class HSUM(nn.Module):
    def __init__(self, config):
        super(HSUM, self).__init__()
        self.config = config

        self.pre_layers = torch.nn.ModuleList()
        for i in range(config.count):
            self.pre_layers.append(RobertaLayer(config))

        self.apply(self._init_weights)

    def _init_weights(self, module):
        """Initialize the weights"""
        if isinstance(module, nn.Linear):
            # Slightly different from the TF version which uses truncated_normal for initialization
            # cf https://github.com/pytorch/pytorch/pull/5617
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, layers, attention_mask, return_list= False):
        logitses = []
        output = torch.zeros_like(layers[0])

        for i in range(self.config.count):
            output = output + layers[-i-1]
            logits = self.pre_layers[i](output, attention_mask)[0]
            logitses.append(logits)

        if return_list:
            return logitses

        avg_logits = torch.sum(torch.stack(logitses), dim=0) / self.config.count
        return avg_logits


class CodeBERTHSUMForSequenceClassification(RobertaPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config

        self.roberta = RobertaModel(config, add_pooling_layer=False)
        self.classifier = CustomRobertaClassificationHead(config, 768)

        self.mixlayer = HSUM(config)

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        labels: Optional[torch.LongTensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
        weighted_loss = None
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        # if labels is not None:
        output_hidden_states = True

        outputs = self.roberta(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        if return_dict:
            layers = outputs.hidden_states
        else:
            layers = outputs[2]
        extend_attention_mask = (1.0 - attention_mask[:,None, None, :]) * -10000.0

        sequence_output = self.mixlayer(layers, extend_attention_mask)[:,0,:]

        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            # move labels to correct device to enable model parallelism
            labels = labels.to(logits.device)
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                if weighted_loss is not None:
                    loss_fct = CrossEntropyLoss(weight=weighted_loss[0][:self.num_labels])
                else:
                    loss_fct = CrossEntropyLoss()

                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )


# Evaluation

In [5]:
from transformers import AutoTokenizer
import torch

In [6]:
langs = ['java', 'python', 'pharo']
lan_cats = []
datasets = {}
for lan in langs: # for each language
    df = pd.read_csv(f'./code-comment-classification/{lan}/input/{lan}.csv')
    df['combo'] = df[['class', 'comment_sentence']].agg('</s>'.join, axis=1)
    df['label'] = df.instance_type
    df["combo_len"] = [len(x.split()) for x in df["combo"]]
    cats = list(map(lambda x: lan + '_' + x, list(set(df.category))))

    for cat in list(set(df.category)): # for each category
        filtered =  df[df.category == cat].sort_values(by='combo_len')
        train_data = Dataset.from_pandas(filtered[filtered.partition == 0])
        test_data = Dataset.from_pandas(filtered[filtered.partition == 1])
        datasets[f'{lan}_{cat}'.lower()] = {'train_data': train_data, 'test_data' : test_data}
        lan_cats.append(f'{lan}_{cat}'.lower())

In [8]:
def get_prediction(x, model, tokenizer, num_iter):
  y_hat = []
  for i in range(int(num_iter)):
      inputs = tokenizer(x[i*batch_size:(i+1)*batch_size], max_length=32, padding=True, truncation=True, return_tensors="pt")
      logits = model(**inputs).logits
      y_hat.extend(np.argmax(logits.cpu().numpy(), axis=1).tolist())
  return y_hat

In [14]:
scores = []
batch_size = 64
torch.set_default_device('cuda')
for lan_cat in lan_cats:
    # load models and data
    tokenizer = AutoTokenizer.from_pretrained("Fsoft-AIC/dopamin-{}".format(lan_cat.replace("_","-")))
    model = CodeBERTHSUMForSequenceClassification.from_pretrained("Fsoft-AIC/dopamin-{}".format(lan_cat.replace("_","-")))
    model.eval()
    test_data = datasets[lan_cat]['test_data']
    x = test_data["combo"]
    y = test_data['label']
    num_iter = np.ceil(len(x) / batch_size)

    # # run and time 10 times for each cat
    with torch.no_grad():
      for it in range(10):
        ############# TIME BLOCK #####################
        num_iter = np.ceil(len(x) / batch_size)
        start = time.time()
        y_hat = get_prediction(x, model, tokenizer, num_iter)
        elapsed_time = time.time() - start
        time_per_sample = elapsed_time / len(y)

        # # ############# TIME BLOCK #####################
        _, fp, fn, tp = confusion_matrix(y_hat, y).ravel()
        wf1 = f1_score(y, y_hat, average='weighted')
        precision, recall, f1 = nlbse_statistics.get_precision_recall_f1(tp, fp, fn)
        scores.append({'lan_cat': lan_cat.lower(),'precision': precision,'recall': recall,'f1': f1,'wf1': wf1, 'avg_runtime': time_per_sample, 'iteration': it, 'len': len(test_data)})

In [17]:
df = pd.DataFrame(scores).groupby('lan_cat').mean()
df['time_std'] = pd.DataFrame(scores).groupby('lan_cat').std()['avg_runtime']
avg_runtime = round(df['avg_runtime'].mean(), 5)
avg_score = round(df['f1'].mean(), 2)
print(f"Average runtime: {round(df['avg_runtime'].mean(), 5)}")
print(f"Average f1: {round(df['f1'].mean(), 2)}")
df.precision = df.precision.round(2)
df.recall = df.recall.round(2)
df.f1 = df.f1.round(4)
df.avg_runtime = df.avg_runtime.round(4)

df[['precision','recall', 'f1', 'avg_runtime']]


Average runtime: 0.00203
Average f1: 0.74


Unnamed: 0_level_0,precision,recall,f1,avg_runtime
lan_cat,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
java_deprecation,0.81,0.88,0.8451,0.0022
java_expand,0.52,0.63,0.5677,0.0022
java_ownership,1.0,0.99,0.9956,0.0022
java_pointer,0.89,0.81,0.8488,0.0022
java_rational,0.49,0.59,0.5361,0.0022
java_summary,0.94,0.87,0.9031,0.0022
java_usage,0.87,0.95,0.9073,0.0023
pharo_classreferences,0.76,0.62,0.6842,0.0021
pharo_collaborators,0.57,0.64,0.6038,0.0021
pharo_example,0.93,0.93,0.9342,0.002


In [18]:
def score(f1, avg_time_per_sample):
  return 0.75 * f1 + 0.25 * ((max((0.005 - avg_time_per_sample), 0)/0.005))
score(avg_score,avg_runtime)

0.7034999999999999