# **Explicability for Chitinase Model**

# Install packages

In [None]:
!pip install datasets
!pip install evaluate
!pip install SentencePiece
!pip install transformers[torch]
!pip install captum

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (

# Importing necesary libraries

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from torch.utils.data import DataLoader

import re
import numpy as np
import pandas as pd
import copy

import transformers, datasets
from transformers.modeling_outputs import SequenceClassifierOutput
from transformers.models.t5.modeling_t5 import T5Config, T5PreTrainedModel, T5Stack
from transformers.utils.model_parallel_utils import assert_device_map, get_device_map
from transformers import T5EncoderModel, T5Tokenizer
from transformers import TrainingArguments, Trainer, set_seed

# from evaluate import load
from datasets import Dataset

from tqdm import tqdm
import random

from scipy import stats
from sklearn.metrics import accuracy_score

import matplotlib.pyplot as plt

from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization

Mounted at /content/drive


# Model definition

## LoRA

In [None]:
# Modifies an existing transformer and introduce the LoRA layers

class LoRAConfig:
    def __init__(self):
        self.lora_rank = 4
        self.lora_init_scale = 0.01
        self.lora_modules = ".*SelfAttention|.*EncDecAttention"
        self.lora_layers = "q|k|v|o"
        self.trainable_param_names = ".*layer_norm.*|.*lora_[ab].*"
        self.lora_scaling_rank = 1
        # lora_modules and lora_layers are speicified with regular expressions
        # see https://www.w3schools.com/python/python_regex.asp for reference

class LoRALinear(nn.Module):
    def __init__(self, linear_layer, rank, scaling_rank, init_scale):
        super().__init__()
        self.in_features = linear_layer.in_features
        self.out_features = linear_layer.out_features
        self.rank = rank
        self.scaling_rank = scaling_rank
        self.weight = linear_layer.weight
        self.bias = linear_layer.bias
        if self.rank > 0:
            self.lora_a = nn.Parameter(torch.randn(rank, linear_layer.in_features) * init_scale)
            if init_scale < 0:
                self.lora_b = nn.Parameter(torch.randn(linear_layer.out_features, rank) * init_scale)
            else:
                self.lora_b = nn.Parameter(torch.zeros(linear_layer.out_features, rank))
        if self.scaling_rank:
            self.multi_lora_a = nn.Parameter(
                torch.ones(self.scaling_rank, linear_layer.in_features)
                + torch.randn(self.scaling_rank, linear_layer.in_features) * init_scale
            )
            if init_scale < 0:
                self.multi_lora_b = nn.Parameter(
                    torch.ones(linear_layer.out_features, self.scaling_rank)
                    + torch.randn(linear_layer.out_features, self.scaling_rank) * init_scale
                )
            else:
                self.multi_lora_b = nn.Parameter(torch.ones(linear_layer.out_features, self.scaling_rank))

    def forward(self, input):
        if self.scaling_rank == 1 and self.rank == 0:
            # parsimonious implementation for ia3 and lora scaling
            if self.multi_lora_a.requires_grad:
                hidden = F.linear((input * self.multi_lora_a.flatten()), self.weight, self.bias)
            else:
                hidden = F.linear(input, self.weight, self.bias)
            if self.multi_lora_b.requires_grad:
                hidden = hidden * self.multi_lora_b.flatten()
            return hidden
        else:
            # general implementation for lora (adding and scaling)
            weight = self.weight
            if self.scaling_rank:
                weight = weight * torch.matmul(self.multi_lora_b, self.multi_lora_a) / self.scaling_rank
            if self.rank:
                weight = weight + torch.matmul(self.lora_b, self.lora_a) / self.rank
            return F.linear(input, weight, self.bias)

    def extra_repr(self):
        return "in_features={}, out_features={}, bias={}, rank={}, scaling_rank={}".format(
            self.in_features, self.out_features, self.bias is not None, self.rank, self.scaling_rank
        )


def modify_with_lora(transformer, config):
    for m_name, module in dict(transformer.named_modules()).items():
        if re.fullmatch(config.lora_modules, m_name):
            for c_name, layer in dict(module.named_children()).items():
                if re.fullmatch(config.lora_layers, c_name):
                    assert isinstance(
                        layer, nn.Linear
                    ), f"LoRA can only be applied to torch.nn.Linear, but {layer} is {type(layer)}."
                    setattr(
                        module,
                        c_name,
                        LoRALinear(layer, config.lora_rank, config.lora_scaling_rank, config.lora_init_scale),
                    )
    return transformer

## Configuration class

In [None]:
class ClassConfig:
    def __init__(self, dropout=0.2, num_labels=2):
        self.dropout_rate = dropout
        self.num_labels = num_labels

## Classification head

In [None]:
class T5EncoderClassificationHead(nn.Module):
    """Head for sentence-level classification tasks."""

    def __init__(self, config, class_config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(class_config.dropout_rate)
        self.out_proj = nn.Linear(config.hidden_size, class_config.num_labels)
        self.sigmoid = nn.Sigmoid()

    def forward(self, hidden_states):

        hidden_states =  torch.mean(hidden_states,dim=1)  # avg embedding

        hidden_states = self.dropout(hidden_states)
        hidden_states = self.dense(hidden_states)
        hidden_states = torch.tanh(hidden_states)
        hidden_states = self.dropout(hidden_states)
        hidden_states = self.out_proj(hidden_states)
        hidden_states = self.sigmoid(hidden_states)
        return hidden_states

## T5 Encoder for sequence classification

In [None]:
class T5EncoderForSimpleSequenceClassification(T5PreTrainedModel):

    def __init__(self, config: T5Config, class_config):
        super().__init__(config)
        self.num_labels = class_config.num_labels
        self.config = config

        self.shared = nn.Embedding(config.vocab_size, config.d_model)

        encoder_config = copy.deepcopy(config)
        encoder_config.use_cache = False
        encoder_config.is_encoder_decoder = False
        self.encoder = T5Stack(encoder_config, self.shared)

        self.dropout = nn.Dropout(class_config.dropout_rate)
        self.classifier = T5EncoderClassificationHead(config, class_config)

        # Initialize weights and apply final processing
        self.post_init()

        # Model parallel
        self.model_parallel = False
        self.device_map = None

    def parallelize(self, device_map=None):
        self.device_map = (
            get_device_map(len(self.encoder.block), range(torch.cuda.device_count()))
            if device_map is None
            else device_map
        )
        assert_device_map(self.device_map, len(self.encoder.block))
        self.encoder.parallelize(self.device_map)
        self.classifier = self.classifier.to(self.encoder.first_device)
        self.model_parallel = True

    def deparallelize(self):
        self.encoder.deparallelize()
        self.encoder = self.encoder.to("cpu")
        self.model_parallel = False
        self.device_map = None
        torch.cuda.empty_cache()

    def get_input_embeddings(self):
        return self.shared

    def set_input_embeddings(self, new_embeddings):
        self.shared = new_embeddings
        self.encoder.set_input_embeddings(new_embeddings)

    def get_encoder(self):
        return self.encoder

    def _prune_heads(self, heads_to_prune):
        """
        Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base
        class PreTrainedModel
        """
        for layer, heads in heads_to_prune.items():
            self.encoder.layer[layer].attention.prune_heads(heads)

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        head_mask=None,
        inputs_embeds=None,
        labels=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            inputs_embeds=inputs_embeds,
            head_mask=head_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        hidden_states = outputs[0]
        logits = self.classifier(hidden_states)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[1:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

## Modified ProtT5

In [None]:
def PT5_classification_model(num_labels, half_precision):
    # Load PT5 and tokenizer
    # possible to load the half preciion model (thanks to @pawel-rezo for pointing that out)
    if not half_precision:
        model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_uniref50")
        tokenizer = T5Tokenizer.from_pretrained("Rostlab/prot_t5_xl_uniref50")
    elif half_precision and torch.cuda.is_available() :
        tokenizer = T5Tokenizer.from_pretrained('Rostlab/prot_t5_xl_half_uniref50-enc', do_lower_case=False)
        model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_half_uniref50-enc", torch_dtype=torch.float16).to(torch.device('cuda'))
    else:
          raise ValueError('Half precision can be run on GPU only.')

    # Create new Classifier model with PT5 dimensions
    class_config=ClassConfig(num_labels=num_labels)
    class_model=T5EncoderForSimpleSequenceClassification(model.config,class_config)

    # Set encoder and embedding weights to checkpoint weights
    class_model.shared=model.shared
    class_model.encoder=model.encoder

    # Delete the checkpoint model
    model=class_model
    del class_model

    # Print number of trainable parameters
    model_parameters = filter(lambda p: p.requires_grad, model.parameters())
    params = sum([np.prod(p.size()) for p in model_parameters])
    print("ProtT5_Classfier\nTrainable Parameter: "+ str(params))

    # Add model modification lora
    config = LoRAConfig()

    # Add LoRA layers
    model = modify_with_lora(model, config)

    # Freeze Embeddings and Encoder (except LoRA)
    for (param_name, param) in model.shared.named_parameters():
                param.requires_grad = False
    for (param_name, param) in model.encoder.named_parameters():
                param.requires_grad = False

    for (param_name, param) in model.named_parameters():
            if re.fullmatch(config.trainable_param_names, param_name):
                param.requires_grad = True

    # Print trainable Parameter
    model_parameters = filter(lambda p: p.requires_grad, model.parameters())
    params = sum([np.prod(p.size()) for p in model_parameters])
    print("ProtT5_LoRA_Classfier\nTrainable Parameter: "+ str(params) + "\n")

    return model, tokenizer

# Load the trained model

In [None]:
def load_model(filepath, num_labels=1):
# Creates a new PT5 model and loads the finetuned weights from a file

    # load a new model
    model, tokenizer = PT5_classification_model(num_labels=num_labels, half_precision=True)

    # Load the non-frozen parameters from the saved file
    non_frozen_params = torch.load(filepath)

    # Assign the non-frozen parameters to the corresponding parameters of the model
    for param_name, param in model.named_parameters():
        if param_name in non_frozen_params:
            param.data = non_frozen_params[param_name].data

    return tokenizer, model

In [None]:
tokenizer, model = load_model("/content/drive/MyDrive/Tesis/FineTuning/ProtT5_half_finetuned_nonFrozenParams_Full_modifiedDataset.pth", num_labels=2)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/238k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/1.79k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/656 [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


pytorch_model.bin:   0%|          | 0.00/2.42G [00:00<?, ?B/s]

ProtT5_Classfier
Trainable Parameter: 1209193474
ProtT5_LoRA_Classfier
Trainable Parameter: 3559426



  non_frozen_params = torch.load(filepath)


# Input data

In [None]:
my_test = pd.read_csv("/content/drive/MyDrive/Tesis/Data/Binary Datasets/No Data Augmentation/test.csv")
my_test.head(10)

Unnamed: 0,sequence,label
0,SVAYFVNWAIYGRNYNPQDMPADKLTHVLYAFANVRPDTGEVYLSD...,1
1,MYRRVMSLLVALGAIVAALIVLPATTAQAATCATAWSSSSVYTNGG...,1
2,MYSLWCFNNITLVFDNEAIYNICQRNLHIAKPDVNNINRLIAKVIS...,0
3,MLSPKLSLLALLVGGLCTTSAFAAAPGKPTIGSGPTKFAIVEVNQA...,1
4,ETEWLGNTPTGGEMRFHLFNLTPEAISGFSLCYTTQSRISGAAEIS...,0
5,MNQAVRFRPVITFALAFILIITWFAPRADAAAQWQAGTAYKQGDLV...,1
6,MSRKSQSRNLRRLGMGALALAMGLASVGSLATEAAPYFFTWGYGES...,1
7,MESLKKASLVLFPILVLSLFNHSNAAGIAVYWGQNGGEGSLADTCN...,1
8,MILNKQNKTNGERSITRDITGHYTEGAELIDNVLDVVRKNVESCDC...,0
9,MDNLVIFFALFAFVLGICVGSFSNVLIYRLPRNESINFPASHCPNC...,0


# Explicability

In [None]:
# Map predicted indices to the label
itop = ["Negative", "Positive"]

In [None]:
# make the code devide-agnostic
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
# change the mode to eval mode for inference and avoid dropouts and such
model.eval()
model = model.to(device)

In [None]:
# baseline generator
PAD_IND = tokenizer.pad_token_id
token_reference = TokenReferenceBase(reference_token_idx=PAD_IND)

In [None]:
# wraper around the model to return the softmaxed logits (probabilities for each class)
def custom_forward(input_ids, input_mask):
    logits = model(input_ids, attention_mask=input_mask).logits
    softmax = nn.Softmax(dim=1)
    return softmax(logits)

In [None]:
# create the explicability method
lig = LayerIntegratedGradients(custom_forward, model.shared)

In [None]:
# accumalate couple samples in this array for visualization purposes
vis_data_records_ig = []

def interpret_sentence(model, sequence, label = 0):
    seq_length = len(sequence)
    procesed_sequence = " ".join(list(re.sub(r"[UZOB]", "X", sequence)))

    indexed = tokenizer(procesed_sequence, max_length=seq_length, padding=False, truncation=True)
    model.zero_grad()

    input_indices = torch.tensor(indexed["input_ids"], device=device)
    input_indices = input_indices.unsqueeze(0)

    input_mask = torch.tensor(indexed["attention_mask"], device=device)
    input_mask = input_mask.unsqueeze(0)

    # predict
    pred = model(input_indices, attention_mask=input_mask).logits.tolist()
    np_array = np.array(pred)
    pred_ind = np_array.argmax()

    # generate reference indices for each sample
    reference_indices = token_reference.generate_reference(seq_length, device=device).unsqueeze(0)

    # compute attributions and approximation delta using layer integrated gradients
    attributions_ig, delta = lig.attribute(input_indices, baselines=reference_indices, target=label,\
                                           n_steps=5, additional_forward_args=(input_mask) ,return_convergence_delta=True)

    print('pred: ', itop[pred_ind], '(', '%.2f'%pred[0][pred_ind], ')', ', delta: ', abs(delta.item()))

    add_attributions_to_visualizer(attributions_ig, sequence, pred, pred_ind, label, delta, vis_data_records_ig)

def add_attributions_to_visualizer(attributions, sequence, pred, pred_ind, label, delta, vis_data_records):
    attributions = attributions.sum(dim=2).squeeze(0)
    attributions_score = attributions / torch.norm(attributions)
    attributions = attributions / torch.std(attributions) # re-scale values for visualization purposes

    attributions_score = attributions_score.cpu().detach().numpy()
    attributions = attributions.cpu().detach().numpy()

    # storing couple samples in an array for visualization purposes
    vis_data_records.append(visualization.VisualizationDataRecord(
                            word_attributions = attributions,  # attribution values
                            pred_prob = pred[0][label], # probability predicted for the class
                            pred_class = itop[pred_ind],  # predicted class
                            true_class = itop[label],  # real class label name
                            attr_class = itop[1],  # attr class
                            attr_score = attributions_score.sum(), # attribution score
                            raw_input_ids = sequence,  # raw text
                            convergence_score = delta))  # delta value => convergence score

In [None]:
# sequences to apply explicability
quantity = 2
sequences, labels = pd.Series.to_list(my_test["sequence"][:quantity]), pd.Series.to_list(my_test["label"][:quantity])

In [None]:
# apply explicability to sequences
for sequence, label in zip(sequences, labels):
    interpret_sentence(model, sequence, label=label)

pred:  Positive ( 1.00 ) , delta:  0.4407920345210722
pred:  Positive ( 0.99 ) , delta:  0.4457921482137538


# Visualize results

In [None]:
print('Visualize attributions based on Integrated Gradients')
_ = visualization.visualize_text(vis_data_records_ig)

Visualize attributions based on Integrated Gradients


True Label,Predicted Label,Attribution Label,Attribution Score,Word Importance
Positive,Positive (1.00),Positive,-3.49,S V A Y F V N W A I Y G R N Y N P Q D M P A D K L T H V L Y A F A N V R P D T G E V Y L S D T W S D I E K H Y P T D S W N D V G T N V Y G C V K Q L F L L K Q Q N R K L K V L L S I G G W T Y S P N F A Q A A S T D A G R T K F A E T A T K L V T D L G F D G I D I D W E Y P K D D T E A Q N M V L L L Q K C R E T L D A T A G A S R K F L L T I A C P A G P A N Y T K L K L S Q M T P Y L D F Y N L M A Y D Y A G S W D T V A G H Q A N L Y P S A D K P A S T P F S T N E A V N Y Y I Q N G G V P S S K I I L G M P L Y G R A F T N T D G P G T A F S G V G E G S W E Q G V W D Y K A L P R T G A T E H V D A N L G A S W S Y D P T A R T M V S Y D T V A V S E M K L D Y I T K L Q L G G G M W W E T S G D
,,,,
Positive,Positive (0.99),Positive,-4.33,M Y R R V M S L L V A L G A I V A A L I V L P A T T A Q A A T C A T A W S S S S V Y T N G G T V S Y N G R N Y T A K W W T Q N E R P G T S D V W A D K G A C G T G G E G P G G N N G F V V S E A Q F N Q M F P N R N A F Y T Y K G L T D A L S A Y P A F A K T G S D E V K K R E A A A F L A N V S H E T G G L F Y I K E V N E A N Y P H Y C D T T Q S Y G C P A G Q A A Y Y G R G P I Q L S W N F N Y K A A G D A L G I N L L A N P Y L V E Q D P A V A W K T G L W Y W N S Q N G P G T M T P H N A I V N N A G F G E T I R S I N G A L E C N G G N P A Q V Q S R I N K F T Q F T Q I L G T T T G P N L S C
,,,,
