# Packages Downloading

In [1]:
%%capture
!python -m pip install seqeval==1.2.2 bertviz==1.2.0 umap-learn==0.5.1
!python -m pip install transformers==4.27.4 datasets==2.8.0
!python -m pip install matplotlib ipywidgets
!pip install scikit-multilearn
!pip install evaluate

### wandb

In [None]:
!pip install wandb

In [None]:
import wandb
wandb.login()

In [None]:
# set up project name
%env WANDB_PROJECT=EH(pos&neg)

# If load_best_model_at_end=True is passed to Trainer, then W&B will save the best performing model to Artifacts.
# https://docs.wandb.ai/guides/integrations/huggingface#turn-on-model-versioning
%env WANDB_LOG_MODEL='end'

### wandb end

In [None]:
import torch
import os
print("PyTorch has version {}".format(torch.__version__))

PyTorch has version 2.0.0+cu118


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pdb

In [None]:
! pip install torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.0.0+cu118.html

In [None]:
import logging
import sys
import os

import datasets
import huggingface_hub
import transformers
from sklearn.preprocessing import MultiLabelBinarizer

In [None]:
pd.set_option('display.max_colwidth',1000)

In [None]:
if torch.cuda.is_available():
  # Get the GPU device name.
  device_name = torch.cuda.get_device_name()
  n_gpu = torch.cuda.device_count()
  print(f"Found device: {device_name}, n_gpu: {n_gpu}")

In [None]:
# set seed
from transformers import set_seed
set_seed(0)
torch.manual_seed(0)
np.random.seed(0)

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
all_labels = ['ClassCastException',
 'SecurityException',
 'UnsupportedOperationException',
 'NoSuchAlgorithmException',
 'SQLException',
 'IOException',
 'NoSuchMethodException',
 'IllegalArgumentException',
 'NullPointerException',
 'FileNotFoundException',
 'MalformedURLException',
 'InterruptedException',
 'JSONException',
 'UnsupportedEncodingException',
 'com.google.protobuf.InvalidProtocolBufferException',
 'IllegalStateException',
 'IllegalAccessException',
 'URISyntaxException',
 'ExecutionException',
 'InvalidArgumentException',
 'SAXException',
 'NumberFormatException',
 'ClassNotFoundException',
 'RuntimeException',
 'GenericEntityException',
 'InvocationTargetException',
 'ParseException',
 'IndexOutOfBoundsException',
 'InstantiationException',
 'com.google.protobuf.UninitializedMessageException']

In [None]:
len(all_labels)

30

In [None]:
mlb = MultiLabelBinarizer()
mlb.fit([all_labels])

# Preparing Dataset

In [None]:
from datasets import load_dataset, concatenate_datasets, load_from_disk
from datasets import DatasetDict, Dataset
from datasets import ClassLabel, Sequence, Value
from random import randint

In [None]:
from datasets.load import load_from_disk
# startIndex starts with 0, and the endIndex is inclusive
# Duplicates have been removed from "train" and "valid" splits, but not from the "test" split
encoded_ds = load_from_disk("PATH-TO-THE-TRAINING-DATASET-FOLDER")

In [None]:
encoded_ds

DatasetDict({
    train: Dataset({
        features: ['attention_mask', 'excep_count', 'excep_ids', 'excep_index', 'id', 'input_ids', 'label_cls', 'labels'],
        num_rows: 246118
    })
    valid: Dataset({
        features: ['attention_mask', 'excep_count', 'excep_ids', 'excep_index', 'id', 'input_ids', 'label_cls', 'labels'],
        num_rows: 30764
    })
    test: Dataset({
        features: ['attention_mask', 'excep_count', 'excep_ids', 'excep_index', 'id', 'input_ids', 'label_cls', 'labels'],
        num_rows: 30764
    })
})

# Model

In [None]:
from transformers import AutoTokenizer
codebert_model_name = "microsoft/codebert-base-mlm"
codebert_tokenizer = AutoTokenizer.from_pretrained(codebert_model_name)

In [None]:
index2tag = {0:"O", 1:"B-Try", 2:"I-Try"}
tag2index = {v:k for k, v in index2tag.items()}

In [None]:
from transformers import AutoConfig

n = len(index2tag)
print("num_labels = ", n)

codebert_config = AutoConfig.from_pretrained(codebert_model_name,
                                             num_labels=n,
                                             id2label=index2tag, label2id=tag2index,
                                             output_hidden_states=False
                                            )

codebert_config.num_cls_labels=2
codebert_config.num_excep_labels=len(all_labels)

num_labels =  3


In [None]:
import torch.nn as nn
from transformers import RobertaConfig
from transformers.models.roberta.modeling_roberta import RobertaModel
from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel
from torch_scatter import scatter

class CodebertForExcepPrediction(RobertaPreTrainedModel):
    config_class = RobertaConfig

    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.num_cls_labels = config.num_cls_labels
        # Load model body
        self.roberta = RobertaModel(config, add_pooling_layer=False)

        # Set up heads for three tasks
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.token_classifier = nn.Linear(config.hidden_size, config.num_labels)
        self.cls_classifier = nn.Linear(config.hidden_size, config.num_cls_labels)
        self.excep_classifier = nn.Linear(config.hidden_size, config.num_excep_labels)

        # Load and initialize weights
        self.init_weights()


    def forward(self, input_ids=None, attention_mask=None, token_type_ids=None,
                labels=None, label_cls=None,
                excep_ids=None, excep_count=None, excep_index=None, **kwargs):

        outputs = self.roberta(input_ids, attention_mask=attention_mask,
                               token_type_ids=token_type_ids, **kwargs)

        # Apply classifier to encoder representation
        sequence_output = self.dropout(outputs.last_hidden_state)
        token_logits = self.token_classifier(sequence_output)
        cls_logits = self.cls_classifier(sequence_output[:, 0])

        loss_fct = nn.CrossEntropyLoss()
        loss_fct2 = nn.MultiLabelSoftMarginLoss(reduction="none")
        # Calculate sentence classification losses
        cls_loss = None
        if label_cls is not None:
          cls_loss = loss_fct(cls_logits, label_cls)

        # Calculate stmts classification losses
        stmt_loss = None
        excep_loss = 0.0
        mask = label_cls.clone().detach().bool().requires_grad_(False)
        if (labels is not None) and torch.any(mask):
            # Calculate stmts classification loss
            token_logits_masked = token_logits[mask]
            labels_masked = labels[mask]
            stmt_loss = loss_fct(token_logits_masked.view(-1, self.num_labels), labels_masked.view(-1))

            # Calculate the exception prediction loss
            sequence_output_masked = sequence_output[mask]
            excep_ids_masked = excep_ids[mask]
            excep_count_masked = excep_count[mask]
            excep_index_masked = excep_index[mask]

            for out, ids, count, index in zip(sequence_output_masked, excep_ids_masked, excep_count_masked, excep_index_masked):
              h = scatter(out, index, dim=0, reduce="sum")[:count, :]
              excep_loss += torch.sum(loss_fct2(self.excep_classifier(h), ids[:count, :]))

        loss = None
        if stmt_loss is None:
          loss = cls_loss
        else:
          loss = cls_loss + stmt_loss + excep_loss / mask.sum()
        # Return model output object
        return {"loss": loss, "logits":token_logits, "cls_logits": cls_logits,
                "last_hidden_state": outputs.last_hidden_state, "attentions": outputs.attentions,
                "label_cls": label_cls, "labels": labels}

In [None]:
import sklearn.metrics as metrics
import evaluate

def compute_metrics(eval_pred):
    seqeval = evaluate.load('seqeval')
    logits = eval_pred.predictions[0]
    labels = eval_pred.label_ids[0]
    cls_logits = eval_pred.predictions[1]
    cls_true = eval_pred.label_ids[1]

    y_pred, y_true = align_predictions(logits, labels)
    cls_pred = np.argmax(cls_logits, axis=-1).flatten()

    results = seqeval.compute(predictions=y_pred, references=y_true, mode="strict", scheme="IOB2")

    return {"cls_acc": metrics.accuracy_score(cls_true, cls_pred), "cls_precision": metrics.precision_score(cls_true, cls_pred),
            "cls_recall": metrics.recall_score(cls_true, cls_pred), "cls_f1": metrics.f1_score(cls_true, cls_pred),
            "overall_accuracy": results["overall_accuracy"], "precision": results["Try"]["precision"],
            "recall": results["Try"]["recall"], "f1": results["Try"]["f1"]}

In [None]:
def align_predictions(logits, label_ids):
  preds = np.argmax(logits, axis=2)
  batch_size, seq_len = preds.shape # (bs, 512)
  labels_list, preds_list = [], []

  for batch_idx in range(batch_size):
    example_labels, example_preds = [], []

    for seq_idx in range(seq_len):
      # Ignore label IDs = -100
      if label_ids[batch_idx, seq_idx] != -100:
        example_labels.append(index2tag[label_ids[batch_idx][seq_idx]])
        example_preds.append(index2tag[preds[batch_idx][seq_idx]])

    labels_list.append(example_labels)
    preds_list.append(example_preds)

  return preds_list, labels_list

In [None]:
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def model_init():
    return (CodebertForExcepPrediction
              .from_pretrained(codebert_model_name, config=codebert_config)
              .to(device))

In [None]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(codebert_tokenizer)

## Custom Trainer

In [None]:
from collections.abc import Mapping
from transformers import Trainer
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

In [None]:
def atleast_1d(tensor_or_array: Union[torch.Tensor, np.ndarray]):
    if isinstance(tensor_or_array, torch.Tensor):
        if hasattr(torch, "atleast_1d"):
            tensor_or_array = torch.atleast_1d(tensor_or_array)
        elif tensor_or_array.ndim < 1:
            tensor_or_array = tensor_or_array[None]
    else:
        tensor_or_array = np.atleast_1d(tensor_or_array)
    return tensor_or_array

In [None]:
def torch_pad_and_concatenate(tensor1, tensor2, padding_index=-100):
    """Concatenates `tensor1` and `tensor2` on first axis, applying padding on the second if necessary."""
    tensor1 = atleast_1d(tensor1)
    tensor2 = atleast_1d(tensor2)

    if len(tensor1.shape) == 1 or tensor1.shape[1] == tensor2.shape[1]:
        return torch.cat((tensor1, tensor2), dim=0)

    # Let's figure out the new shape
    new_shape = (tensor1.shape[0] + tensor2.shape[0], max(tensor1.shape[1], tensor2.shape[1])) + tensor1.shape[2:]

    # Now let's fill the result tensor
    result = tensor1.new_full(new_shape, padding_index)
    result[: tensor1.shape[0], : tensor1.shape[1]] = tensor1
    result[tensor1.shape[0] :, : tensor2.shape[1]] = tensor2
    return result

In [None]:
def numpy_pad_and_concatenate(array1, array2, padding_index=-100):
    """Concatenates `array1` and `array2` on first axis, applying padding on the second if necessary."""
    array1 = atleast_1d(array1)
    array2 = atleast_1d(array2)

    if len(array1.shape) == 1 or array1.shape[1] == array2.shape[1]:
        return np.concatenate((array1, array2), axis=0)

    # Let's figure out the new shape
    new_shape = (array1.shape[0] + array2.shape[0], max(array1.shape[1], array2.shape[1])) + array1.shape[2:]

    # Now let's fill the result tensor
    result = np.full_like(array1, padding_index, shape=new_shape)
    result[: array1.shape[0], : array1.shape[1]] = array1
    result[array1.shape[0] :, : array2.shape[1]] = array2
    return result

In [None]:
def nested_concat(tensors, new_tensors, padding_index=-100):
    """
    Concat the `new_tensors` to `tensors` on the first dim and pad them on the second if needed. Works for tensors or
    nested list/tuples/dict of tensors.
    """
    assert type(tensors) == type(
        new_tensors
    ), f"Expected `tensors` and `new_tensors` to have the same type but found {type(tensors)} and {type(new_tensors)}."
    if isinstance(tensors, (list, tuple)):
        return type(tensors)(nested_concat(t, n, padding_index=padding_index) for t, n in zip(tensors, new_tensors))
    elif isinstance(tensors, torch.Tensor):
        return torch_pad_and_concatenate(tensors, new_tensors, padding_index=padding_index)
    elif isinstance(tensors, Mapping):
        return type(tensors)(
            {k: nested_concat(t, new_tensors[k], padding_index=padding_index) for k, t in tensors.items()}
        )
    elif isinstance(tensors, np.ndarray):
        return numpy_pad_and_concatenate(tensors, new_tensors, padding_index=padding_index)
    else:
        raise TypeError(f"Unsupported type for concatenation: got {type(tensors)}")

In [None]:
def nested_detach(tensors):
    "Detach `tensors` (even if it's a nested list/tuple/dict of tensors)."
    if isinstance(tensors, (list, tuple)):
        return type(tensors)(nested_detach(t) for t in tensors)
    elif isinstance(tensors, Mapping):
        return type(tensors)({k: nested_detach(t) for k, t in tensors.items()})
    return tensors.detach()

In [None]:
class CustomTrainer(Trainer):
  def prediction_step(
        self,
        model: nn.Module,
        inputs: Dict[str, Union[torch.Tensor, Any]],
        prediction_loss_only: bool,
        ignore_keys: Optional[List[str]] = ['last_hidden_state', 'attentions', 'label_cls', 'labels'],
    ) -> Tuple[Optional[torch.Tensor], Optional[torch.Tensor], Optional[torch.Tensor]]:
        """
        Perform an evaluation step on `model` using `inputs`.

        Subclass and override to inject custom behavior.

        Args:
            model (`nn.Module`):
                The model to evaluate.
            inputs (`Dict[str, Union[torch.Tensor, Any]]`):
                The inputs and targets of the model.

                The dictionary will be unpacked before being fed to the model. Most models expect the targets under the
                argument `labels`. Check your model's documentation for all accepted arguments.
            prediction_loss_only (`bool`):
                Whether or not to return the loss only.
            ignore_keys (`Lst[str]`, *optional*):
                A list of keys in the output of your model (if it is a dictionary) that should be ignored when
                gathering predictions.

        Return:
            Tuple[Optional[torch.Tensor], Optional[torch.Tensor], Optional[torch.Tensor]]: A tuple with the loss,
            logits and labels (each being optional).
        """
        has_labels = False if len(self.label_names) == 0 else all(inputs.get(k) is not None for k in self.label_names)
        # For CLIP-like models capable of returning loss values.
        # If `return_loss` is not specified or being `None` in `inputs`, we check if the default value of `return_loss`
        # is `True` in `model.forward`.
        return_loss = inputs.get("return_loss", None)
        if return_loss is None:
            return_loss = self.can_return_loss
        loss_without_labels = True if len(self.label_names) == 0 and return_loss else False


        ignore_keys = ['last_hidden_state', 'attentions', 'label_cls', 'labels']

        inputs = self._prepare_inputs(inputs)
        # labels may be popped when computing the loss (label smoothing for instance) so we grab them first.
        if has_labels or loss_without_labels:
            labels = nested_detach(tuple(inputs.get(name) for name in self.label_names))
            if len(labels) == 1:
                labels = labels[0]
        else:
            labels = None

        with torch.no_grad():
            if has_labels or loss_without_labels:
                with self.compute_loss_context_manager():
                    loss, outputs = self.compute_loss(model, inputs, return_outputs=True)
                loss = loss.mean().detach()

                if isinstance(outputs, dict):
                    logits = tuple(v for k, v in outputs.items() if k not in ignore_keys + ["loss"])

                else:
                    logits = outputs[1:]
            else:
                loss = None
                with self.compute_loss_context_manager():
                    outputs = model(**inputs)
                if isinstance(outputs, dict):
                    logits = tuple(v for k, v in outputs.items() if k not in ignore_keys)
                else:
                    logits = outputs

                if self.args.past_index >= 0:
                    self._past = outputs[self.args.past_index - 1]

        if prediction_loss_only:
            return (loss, None, None)

        logits = nested_detach(logits)
        if len(logits) == 1:
            logits = logits[0]

        return (loss, logits, labels)

## End: Custom Trainer

In [None]:
from transformers import TrainingArguments

num_epochs = 15
batch_size = 32
logging_steps = len(encoded_ds["train"]) // batch_size

training_args = TrainingArguments(
    output_dir="PATH-TO-FOLDER", # save model ckpts
    log_level="error",
    num_train_epochs=num_epochs,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    evaluation_strategy="epoch",
    learning_rate=6e-6,
    weight_decay=0.01,
    metric_for_best_model="loss"
    load_best_model_at_end=True,
    logging_steps=logging_steps,
    save_strategy = "epoch",
    gradient_accumulation_steps=1,
    disable_tqdm=False,
    report_to="wandb"
)

In [None]:
trainer = CustomTrainer(model_init=model_init, args=training_args,
                  data_collator=data_collator,
                  compute_metrics=compute_metrics,
                  train_dataset=encoded_ds["train"],
                  eval_dataset=encoded_ds["valid"],
                  tokenizer=codebert_tokenizer)

In [None]:
trainer.train()

In [None]:
# post-training analysis, testing, other logged code
wandb.finish()

In [None]:
best_ckpt_path = trainer.state.best_model_checkpoint
print(best_ckpt_path)

In [None]:
trainer.save_model("PATH-TO-FOLDER")