# Integrated gradients for transformers models

In this example, we apply the integrated gradients method to two different sentiment analysis models. The first one is a pretrained sentiment analysis model from the  [transformers](https://github.com/huggingface/transformers) library. The second model is a combination of a pretrained BERT model and a simple feed forward network. The feed forward network is trained on the IMDB dataset using the BERT output embeddings as features. 

In text classification models, integrated gradients define an attribution value for each word in the input sentence. The attributions are calculated considering the integral of the model  gradients with respect to the word embedding layer along a straight path from a baseline instance $x^\prime$ to the input instance $x.$ A description of the method can be found [here](https://docs.seldon.io/projects/alibi/en/latest/methods/IntegratedGradients.html). Integrated gradients was originally proposed in Sundararajan et al., ["Axiomatic Attribution for Deep Networks"](https://arxiv.org/abs/1703.01365)

In [1]:
import re
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"]="2"

import numpy as np
import tensorflow as tf
import matplotlib as mpl
import matplotlib.cm

from tqdm import tqdm
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer
from transformers import BertTokenizerFast, TFBertModel, BertConfig
from alibi.explainers import IntegratedGradients
from tensorflow.keras.datasets import imdb
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    
%load_ext autoreload
%autoreload 2

Here we define some functions needed to process the data. For consistency with other [text examples](https://github.com/SeldonIO/alibi/blob/master/examples/integrated_gradients_imdb.ipynb) in alibi, we will use the IMDB dataset provided by keras. Since the dataset consists of reviews that are already tokenized, we need to decode each sentence and re-convert them into tokens using the BERT tokenizer.

In [2]:
def decode_sentence(x, reverse_index, unk_token: str = '[UNK]'):
    """ 
    Decodes the tokenized sentences from keras IMDB dataset into plain text.
    """
    # the `-3` offset is due to the special tokens used by keras
    # see https://stackoverflow.com/questions/42821330/restore-original-text-from-keras-s-imdb-dataset
    return " ".join([reverse_index.get(i - 3, unk_token) for i in x])

def preprocess_reviews(reviews):
    """
    Preprocess the text.
    """
    REPLACE_NO_SPACE = re.compile("[.;:,!\'?\"()\[\]]")
    REPLACE_WITH_SPACE = re.compile("(<br\s*/><br\s*/>)|(\-)|(\/)")
    
    reviews = [REPLACE_NO_SPACE.sub("", line.lower()) for line in reviews]
    reviews = [REPLACE_WITH_SPACE.sub(" ", line) for line in reviews]
    
    return reviews

def process_sentences(sentence, tokenizer, max_len):
    """
    Tokenize the text sentences.
    """
    # since we are using the model for classification, we need to include special char (i.e, '[CLS]', ''[SEP]')
    # check the example here: https://huggingface.co/transformers/v4.4.2/quicktour.html
    z = tokenizer(sentence, 
                  add_special_tokens=True, 
                  padding='max_length', 
                  max_length=max_len, 
                  truncation=True,
                  return_token_type_ids=True, 
                  return_attention_mask = True,  
                  return_tensors='np')
    return z

## Automodel

In this section, we will use the tensorflow auto model for sequence classification provided by the [transformers](https://github.com/huggingface/transformers) library. 

The model is pre-trained on the [Stanford Sentiment Treebank (SST)](https://huggingface.co/datasets/sst) dataset. The Stanford Sentiment Treebank is the first corpus with fully labeled parse trees that allows for a complete analysis of the compositional effects of sentiment in language.

Each phrase is labelled as either negative, somewhat negative, neutral, somewhat positive or positive. The corpus with all 5 labels is referred to as SST-5 or SST fine-grained. Binary classification experiments on full sentences (negative or somewhat negative vs somewhat positive or positive with neutral sentences discarded) refer to the dataset as SST-2 or SST binary.  In this example, we will use a text classifier pre-trained on the SST-2 dataset.

In [3]:
from transformers import TFAutoModelForSequenceClassification, AutoTokenizer

# load model and tokenizer
model_name = "distilbert-base-uncased-finetuned-sst-2-english"
auto_model_bert = TFAutoModelForSequenceClassification.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# special tokens string and int representation
special_tokens = list(tokenizer.special_tokens_map.values())
special_tokens_ids = [tokenizer.encode(stok, add_special_tokens=False)[0] for stok in special_tokens]

All model checkpoint layers were used when initializing TFDistilBertForSequenceClassification.

All the layers of TFDistilBertForSequenceClassification were initialized from the model checkpoint at distilbert-base-uncased-finetuned-sst-2-english.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDistilBertForSequenceClassification for predictions without further training.


The automodel output is a custom object containing the output logits. We use a wrapper to transform the output into a tensor and apply a softmax function to the logits.

In [4]:
class AutoModelWrapper(tf.keras.Model):

    def __init__(self, model_bert, **kwargs):
        super().__init__()
        self.model_bert = model_bert

    def call(self, inputs, attention_mask=None):
        inputs = tf.cast(inputs, tf.int32)
        out = self.model_bert(inputs, attention_mask=attention_mask)
        return tf.nn.softmax(out.logits)
    
    def get_config(self):
        return {}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [5]:
auto_model = AutoModelWrapper(auto_model_bert)

### Calculate integrated gradients

In [6]:
max_features = 10000
max_len = 128

Here we consider some simple sentences such as "I love you, I like you", "I love you, I like you, but I also kind of dislike you" .

In [7]:
z_test_sample = ['I love you, I like you', 
                 'I love you, I like you, but I also kind of dislike you',
                'Everything is so nice about you']
z_test_sample = [z.lower() for z in z_test_sample]
z_test_sample = process_sentences(z_test_sample, tokenizer, max_len)
x_test_sample = z_test_sample['input_ids'].astype(np.int32)

# the values of the kwargs have to be `tf.Tensor`. 
# see transformers issue #14404: https://github.com/huggingface/transformers/issues/14404
kwargs = {k: tf.constant(v) for k,v in z_test_sample.items() if k == 'attention_mask'}

The auto model consists of a main BERT layer (layer 0) followed by two dense layers.

In [8]:
auto_model.layers[0].layers

[<transformers.models.distilbert.modeling_tf_distilbert.TFDistilBertMainLayer at 0x7f8a90dccc10>,
 <keras.layers.core.dense.Dense at 0x7f86f8714cd0>,
 <keras.layers.core.dense.Dense at 0x7f86f8714f40>,
 <keras.layers.core.dropout.Dropout at 0x7f86f86bb2b0>]

We extract the first transformer's block in the main BERT layer.

In [61]:
#  Extracting the first transformer block
# bl = auto_model.layers[0].layers[0].transformer.layer[0]
bl = auto_model.layers[0].layers[0].embeddings

In [67]:
n_steps = 50
method = "gausslegendre"
internal_batch_size = 5
ig  = IntegratedGradients(auto_model,
                          layer=bl,
                          n_steps=n_steps, 
                          method=method,
                          internal_batch_size=internal_batch_size)

In [70]:
# get predictions
predictions = auto_model(x_test_sample, **kwargs).numpy().argmax(axis=1)

# get the baselines. Note that the baseline contain special characters and
# only the regular tokens are zeroed.
baselines = x_test_sample *  np.isin(x_test_sample, special_tokens_ids)

# get explanation
explanation = ig.explain(x_test_sample, 
                         forward_kwargs=kwargs,
                         baselines=baselines, 
                         target=predictions)



In [71]:
# Get attributions values from the explanation object
attrs = explanation.attributions[0]
print('Attributions shape:', attrs.shape)

Attributions shape: (3, 128, 768)


In [72]:
attrs = attrs.sum(axis=2)
print('Attributions shape:', attrs.shape)

Attributions shape: (3, 128)


In [86]:
i = 1
x_i = x_test_sample[i]
attrs_i = attrs[i]
pred = predictions[i]
pred_dict = {1: 'Positive review', 0: 'Negative review'}

In [87]:
from IPython.display import HTML
def  hlstr(string, color='white'):
    """
    Return HTML markup highlighting text with the desired color.
    """
    return f"<mark style=background-color:{color}>{string} </mark>"

In [88]:
def colorize(attrs, cmap='PiYG'):
    """
    Compute hex colors based on the attributions for a single instance.
    Uses a diverging colorscale by default and normalizes and scales
    the colormap so that colors are consistent with the attributions.
    """
    cmap_bound = np.abs(attrs).max()
    norm = mpl.colors.Normalize(vmin=-cmap_bound, vmax=cmap_bound)
    cmap = mpl.cm.get_cmap(cmap)
    
    # now compute hex values of colors
    colors = list(map(lambda x: mpl.colors.rgb2hex(cmap(norm(x))), attrs))
    return colors

In [89]:
words = [tokenizer.decode([x_i[i]]) for i in range(len(x_i))]
colors = colorize(attrs_i)

In [90]:
print('Predicted label =  {}: {}'.format(pred, pred_dict[pred]))

Predicted label =  0: Negative review


In [91]:
HTML("".join(list(map(hlstr, words, colors))))

## Sentiment analysis on IMDB with fine-tuned model head.

We consider a text classifier fine-tuned on the IMDB dataset. We train a feed forward network which uses the concatenated output embeddings of a pretrained BERT model as input features. The BERT model and the trained ffn are combined to obtain an end-to-end text classifier.

It must be noted that training an end-to-end text classifier (i. e. combining the BERT model and the feed forward network before training) instead of training the feed forward network separately is likely to lead to better model performance. However, the latter approach is considerably faster and lighter. We use this approach here since performance optimization is beyond the scope of this notebook and the purpose of this example is to illustrate the integrated gradients method applied to a custom classifier.

In [94]:
def get_embeddings(X_train, model, batch_size=50):
    args = X_train['input_ids']
    # the values of the kwargs have to be `tf.Tensor`. 
    # see transformers issue #14404: https://github.com/huggingface/transformers/issues/14404
    kwargs = {k: tf.constant(v) for k, v in  X_train.items() if k != 'input_ids'}
    dataset = tf.data.Dataset.from_tensor_slices((args, kwargs)).batch(batch_size)
    
    embbedings = []
    for X_batch in tqdm(dataset):
        args_b, kwargs_b = X_batch
        batch_embeddings = model(args_b, **kwargs_b)
        
        # extract hidden representation for [CLS] token
        embedding = batch_embeddings.last_hidden_state[:, 0, :].numpy()
        embbedings.append(embedding)
        
    return np.concatenate(embbedings, axis=0)

### Load and process data

Loading the IMDB dataset. 

In [95]:
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)

# remove starting of a sequence int token
x_train = [x[1:] for x in x_train]
x_test = [x[1:] for x in x_test]

# get mappings
index = imdb.get_word_index()
reverse_index = {value: key for (key, value) in index.items()}

### Extract embeddings for training

In order to speed up the training, the BERT embeddings are pre-extracted and used as features by the feed forward network.

In [96]:
# load config
config = BertConfig.from_pretrained("bert-base-uncased")

# load model
modelBert = TFBertModel.from_pretrained("bert-base-uncased", config=config)
modelBert.trainable = False

# load tokenizer
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', config=config)

# special tokens string and int representation
special_tokens = list(tokenizer.special_tokens_map.values())
special_tokens_ids = [tokenizer.encode(stok, add_special_tokens=False)[0] for stok in special_tokens]

Some layers from the model checkpoint at bert-base-uncased were not used when initializing TFBertModel: ['nsp___cls', 'mlm___cls']
- This IS expected if you are initializing TFBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFBertModel were initialized from the model checkpoint at bert-base-uncased.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions without further training.


Decoding each sentence in the keras IMDB tokenized dataset to obtain the corresponding plain text.

In [97]:
X_train, X_test = [], []

# decode training sentences
for i in range(len(x_train)):
    tr_sentence = decode_sentence(x_train[i], reverse_index, unk_token=tokenizer.unk_token)
    X_train.append(tr_sentence)

# decode testing sentences
for i in range(len(x_test)):
    te_sentence = decode_sentence(x_test[i], reverse_index, unk_token=tokenizer.unk_token)
    X_test.append(te_sentence)

Re-tokenizing the plain text using the BERT tokenizer.

In [98]:
# tokenize datasets
X_train = process_sentences(X_train, tokenizer, max_len)
X_test = process_sentences(X_test, tokenizer, max_len)

Extracting the BERT embeddings.

In [99]:
train_embeddings = get_embeddings(X_train, modelBert, batch_size=100)
test_embeddings = get_embeddings(X_test, modelBert, batch_size=100)

100%|██████████| 250/250 [01:36<00:00,  2.59it/s]
100%|██████████| 250/250 [01:37<00:00,  2.56it/s]


### Train model

Here we train the model head using the BERT output embeddings as features. The output embeddings are tensors of dimension 100 X 768, where each 768-dimensional vector represents a word in a sentence of 100 words. The embedding vectors are concatenated along the first dimension in order to represents a full review. The model head consists of one dense layer 128 hidden units followed by a 2 units layer with softmax activation. 

In [100]:
dropout = 0.1
hidden_dims = 128

In [125]:
class ModelOut(tf.keras.Model):

    def __init__(self, dropout=0.2, hidden_dims=128):
        super().__init__()
        self.dropout = dropout
        self.hidden_dims = hidden_dims
        
        self.dense_1 =  tf.keras.layers.Dense(hidden_dims, activation='relu')
        self.dropoutl = tf.keras.layers.Dropout(dropout)
        self.dense_2 = tf.keras.layers.Dense(2, activation='softmax')

    def call(self, x, training=False):
        x = self.dense_1(x)
        x = self.dropoutl(x, training=training)
        x = self.dense_2(x)
        return x
    
    def get_config(self):
        return {"dropout": self.dropout, "hidden_dims": self.hidden_dims}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [102]:
model_out = ModelOut(dropout=dropout, hidden_dims=hidden_dims)

Training the model. If the model has been already trained, it can be loaded from the checkpoint directory setting `load_model=True`.

In [103]:
load_model = False

# paper's recommendation
learning_rate = 2e-5
batch_size = 16
epochs = 5

In [104]:
filepath = './model_transformers/'  # change to desired save directory

model_out.compile(optimizer=Adam(learning_rate), 
                  loss=SparseCategoricalCrossentropy(from_logits=False), 
                  metrics=['accuracy'])

if not load_model:
    
    checkpoint_path = os.path.join(filepath, "training/cp-{epoch:04d}.ckpt")
    checkpoint_dir = os.path.dirname(checkpoint_path)

    # Create a callback that saves the model's weights every epoch
    cp_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path, 
        verbose=1, 
        save_weights_only=True,
        save_freq='epoch')

    # using the entire testing dataset might result in memory issues when running on GPU
    model_out.fit(train_embeddings, y_train, 
                  validation_data=(test_embeddings, y_test),
                  epochs=epochs, 
                  batch_size=batch_size,
                  callbacks=[cp_callback],
                  verbose=1)
else:
    epoch = 3
    load_path = os.path.join(filepath, f"training/cp-{epoch:04d}.ckpt")
    model_out.load_weights(load_path)

Epoch 1/5
Epoch 00001: saving model to ./model_transformers/training/cp-0001.ckpt
Epoch 2/5
Epoch 00002: saving model to ./model_transformers/training/cp-0002.ckpt
Epoch 3/5
Epoch 00003: saving model to ./model_transformers/training/cp-0003.ckpt
Epoch 4/5
Epoch 00004: saving model to ./model_transformers/training/cp-0004.ckpt
Epoch 5/5
Epoch 00005: saving model to ./model_transformers/training/cp-0005.ckpt


### Combine BERT and feed forward network

Here we combine the BERT model with the model head to obtain an end-to-end text classifier. 

In [105]:
class TextClassifier(tf.keras.Model):

    def __init__(self, model_bert, model_out):
        super().__init__()
        self.model_bert = model_bert
        self.model_out = model_out

    def call(self, inputs, attention_mask=None, training=False):
        out = self.model_bert(inputs, attention_mask=attention_mask, training=training)
        out = self.model_out(out.last_hidden_state[:, 0, :], training=training)
        return out
    
    def get_config(self):
        return {}

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [106]:
text_classifier = TextClassifier(modelBert, model_out)

### Calculate integrated gradients

We pick the first 10 sentences from the test set as examples.

In [107]:
z_test_sample = [decode_sentence(x_test[i], reverse_index, unk_token=tokenizer.unk_token) for i in range(10)]
z_test_sample = process_sentences(z_test_sample, tokenizer, max_len)

x_test_sample = z_test_sample['input_ids']
# the values of the kwargs have to be `tf.Tensor`. 
# see transformers issue #14404: https://github.com/huggingface/transformers/issues/14404
kwargs = {k:tf.constant(v) for k,v in z_test_sample.items() if k == 'attention_mask'}

We calculate the attributions with respect to the first embedding layer of the BERT encoder.

In [111]:
# bl = text_classifier.layers[0].bert.encoder.layer[0]
bl = text_classifier.layers[0].bert.embeddings

In [112]:
n_steps = 50
method = "gausslegendre"
internal_batch_size = 5
ig  = IntegratedGradients(text_classifier,
                          layer=bl,
                          n_steps=n_steps, 
                          method=method,
                          internal_batch_size=internal_batch_size)

In [113]:
predictions = text_classifier(x_test_sample, **kwargs).numpy().argmax(axis=1)
baselines = x_test_sample * np.isin(x_test_sample, special_tokens_ids)

explanation = ig.explain(x_test_sample, 
                         forward_kwargs=kwargs,
                         baselines=baselines, 
                         target=predictions)



In [114]:
# Get attributions values from the explanation object
attrs = explanation.attributions[0]
print('Attributions shape:', attrs.shape)

Attributions shape: (10, 128, 768)


In [115]:
attrs = attrs.sum(axis=2)
print('Attributions shape:', attrs.shape)

Attributions shape: (10, 128)


In [116]:
i = 1
x_i = x_test_sample[i]
attrs_i = attrs[i]
pred = predictions[i]
pred_dict = {1: 'Positive review', 0: 'Negative review'}

In [117]:
from IPython.display import HTML
def  hlstr(string, color='white'):
    """
    Return HTML markup highlighting text with the desired color.
    """
    return f"<mark style=background-color:{color}>{string} </mark>"

In [118]:
def colorize(attrs, cmap='PiYG'):
    """
    Compute hex colors based on the attributions for a single instance.
    Uses a diverging colorscale by default and normalizes and scales
    the colormap so that colors are consistent with the attributions.
    """
    import matplotlib as mpl
    cmap_bound = np.abs(attrs).max()
    norm = mpl.colors.Normalize(vmin=-cmap_bound, vmax=cmap_bound)
    cmap = mpl.cm.get_cmap(cmap)
    
    # now compute hex values of colors
    colors = list(map(lambda x: mpl.colors.rgb2hex(cmap(norm(x))), attrs))
    return colors

In [119]:
words = [tokenizer.decode([x_i[i]]) for i in range(len(x_i))]
colors = colorize(attrs_i)

In [120]:
print('Predicted label =  {}: {}'.format(pred, pred_dict[pred]))

Predicted label =  1: Positive review


In [121]:
HTML("".join(list(map(hlstr, words, colors))))