In [1]:
%set_env CUDA_VISIBLE_DEVICES=1

env: CUDA_VISIBLE_DEVICES=1


In [2]:
import tensorflow as tf
import numpy as np
import os
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Embedding, Conv1D, GlobalMaxPooling1D, Dropout 
from tensorflow.keras.layers import MaxPooling1D, Flatten, Activation
from tensorflow.keras.utils import to_categorical
from alibi.explainers import IntegratedGradients
from captum.attr import LayerIntegratedGradients, TokenReferenceBase
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
print('TF version: ', tf.__version__)
print('Eager execution enabled: ', tf.executing_eagerly()) # True

TF version:  2.3.1
Eager execution enabled:  True


## Load data

In [3]:
max_features = 10000
maxlen = 100

In [4]:
print('Loading data...')
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)
test_labels = y_test.copy()
train_labels = y_train.copy()
print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')
y_train, y_test = to_categorical(y_train), to_categorical(y_test)

print('Pad sequences (samples x time)')
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)

index = imdb.get_word_index()
reverse_index = {value: key for (key, value) in index.items()} 

Loading data...
25000 train sequences
25000 test sequences
Pad sequences (samples x time)
x_train shape: (25000, 100)
x_test shape: (25000, 100)


In [5]:
def decode_sentence(x, reverse_index):
    # the `-3` offset is due to the special tokens used by keras
    # see https://stackoverflow.com/questions/42821330/restore-original-text-from-keras-s-imdb-dataset
    return " ".join([reverse_index.get(i - 3, 'UNK') for i in x])

In [6]:
print(decode_sentence(x_test[1], reverse_index)) 

a powerful study of loneliness sexual UNK and desperation be patient UNK up the atmosphere and pay attention to the wonderfully written script br br i praise robert altman this is one of his many films that deals with unconventional fascinating subject matter this film is disturbing but it's sincere and it's sure to UNK a strong emotional response from the viewer if you want to see an unusual film some might even say bizarre this is worth the time br br unfortunately it's very difficult to find in video stores you may have to buy it off the internet


# Models

In [7]:
batch_size = 32
embedding_dims = 50
filters = 250
kernel_size = 3
hidden_dims = 250

In [8]:
device = torch.device("cpu")

### pytorch model

In [9]:
class Net(nn.Module):
    def __init__(self):

        super(Net, self).__init__()

        self.emb = nn.Embedding(max_features,
                               embedding_dims)
        self.linear1 = nn.Linear(5000, hidden_dims)
        self.linear2 = nn.Linear(hidden_dims, 2)

    def forward(self,x):
        x = self.emb(x)
        x = torch.flatten(x, 1)
        x = self.linear1(x)
        x = F.relu(x) # Adding relu layers makes the attributions different
        x = self.linear2(x)
        return x


In [10]:
    
inputs = Input(shape=(maxlen,), 
               dtype='int32', 
               name='inputs')
out = Embedding(max_features,
                               embedding_dims, 
                               name='emb')(inputs)
out = Flatten(name='Flat', data_format='channels_last')(out)
out = Dense(hidden_dims, 
            name='linear1')(out)
out = Activation('relu')(out) # Adding relu layers makes the attributions different
out = Dense(2, 
                name='linear2')(out)
model = Model(inputs=inputs, outputs=out)

In [11]:
net = Net()
net.to(device)

Net(
  (emb): Embedding(10000, 50)
  (linear1): Linear(in_features=5000, out_features=250, bias=True)
  (linear2): Linear(in_features=250, out_features=2, bias=True)
)

# transfer weights

In [12]:
lnames = []
for name in net.state_dict().keys():
    lname = name.split('.')[0]
    if lname not in lnames:
        lnames.append(lname)

In [13]:
for name in lnames:

    if 'conv' in name:
        ws = net.state_dict()[name + '.weight'].cpu().numpy()
        ws = np.transpose(ws, (2, 1, 0))
        bs = net.state_dict()[name + '.bias'].cpu().numpy()
        l = model.get_layer(name)
        l.set_weights([ws, bs])
    elif 'linear' in name:
        ws = net.state_dict()[name + '.weight'].cpu().numpy()
        ws = ws.T
        bs = net.state_dict()[name + '.bias'].cpu().numpy()
        l = model.get_layer(name)
        l.set_weights([ws, bs])
    elif 'emb' in name:
        ws = net.state_dict()[name + '.weight'].cpu().numpy()
        l = model.get_layer(name)
        l.set_weights([ws])
    print(name, ws.shape) #  , bs.shape)

emb (10000, 50)
linear1 (5000, 250)
linear2 (250, 2)


In [14]:
weights_emb_pt = net.state_dict()['emb.weight']
weights_emb_tf = model.layers[1].get_weights()

In [15]:
np.allclose(weights_emb_pt, weights_emb_tf, rtol=1e-03)

True

In [16]:
nb_samples = 10
torch_X_test = torch.from_numpy(x_test)
torch_y_test = torch.from_numpy(y_test)
x_test_sample = torch_X_test[:nb_samples]

In [17]:
x_test_sample.shape

torch.Size([10, 100])

tf predictions

In [18]:
model(x_test_sample.numpy())

<tf.Tensor: shape=(10, 2), dtype=float32, numpy=
array([[ 0.19532351, -0.15969233],
       [ 0.40878993,  0.29023722],
       [-0.263573  ,  0.17727989],
       [-0.02696468, -0.13478509],
       [-0.02748159,  0.15112618],
       [-0.00592033, -0.01442789],
       [-0.13127048,  0.12909648],
       [-0.18316656,  0.05256993],
       [-0.26749277,  0.10450753],
       [-0.24014267,  0.40070525]], dtype=float32)>

pytorch predictions

In [19]:
net(x_test_sample.to(device))

tensor([[ 0.1953, -0.1597],
        [ 0.4088,  0.2902],
        [-0.2636,  0.1773],
        [-0.0270, -0.1348],
        [-0.0275,  0.1511],
        [-0.0059, -0.0144],
        [-0.1313,  0.1291],
        [-0.1832,  0.0526],
        [-0.2675,  0.1045],
        [-0.2401,  0.4007]], grad_fn=<AddmmBackward>)

# Intgrads comparison

In [20]:
n_steps = 50
method = "gausslegendre"
internal_batch_size = 100

### pytorch

In [21]:
def interpret_sentence(model, indexed, min_len = 100, label = 1):

    input_indices = indexed.to(device)
    seq_length = min_len

    # predict
    pred = net.forward(input_indices)

    # generate reference indices for each sample
    reference_indices = torch.tensor(np.zeros(input_indices.shape), dtype=int).to(device)

    # compute attributions and approximation delta using layer integrated gradients
    attributions_ig, delta = lig.attribute(input_indices, 
                                           reference_indices, 
                                           target=label,
                                           method=method,
                                           n_steps=50, 
                                           return_convergence_delta=True)
    
    return attributions_ig, delta, reference_indices.numpy()

In [22]:
lig = LayerIntegratedGradients(net, net.emb)

In [23]:
token_reference = TokenReferenceBase(reference_token_idx=0)
# For simplicity, we compute the attribution relative to label = 1 for all samples
attributions_pt, delta, reference_indices = interpret_sentence(net, 
                                                               x_test_sample, 
                                                               label=1)
attributions_pt = attributions_pt.numpy()
print('Attributions shape:', attributions_pt.shape)

Attributions shape: (10, 100, 50)


### tf

In [24]:
layer = model.layers[1]
layer

<tensorflow.python.keras.layers.embeddings.Embedding at 0x7f1dc3657d90>

In [25]:
ig  = IntegratedGradients(model,
                          layer=layer,
                          n_steps=n_steps, 
                          method=method,
                          internal_batch_size=internal_batch_size)

In [26]:
x_test_sample = x_test_sample.numpy()
predictions = model(x_test_sample).numpy().argmax(axis=1)
explanation = ig.explain(x_test_sample, 
                         baselines=reference_indices, 
                         target=1)
# Get attributions values from the explanation object
attributions_tf = explanation.attributions[0]
print('Attributions shape:', attributions_tf.shape)

model output_shape (None, 2)
model output_shape (None, 2)
model output_shape (None, 2)
model output_shape (None, 2)
model output_shape (None, 2)
model output_shape (None, 2)
baselines [array([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

### Compare

In [27]:
np.allclose(attributions_tf, attributions_pt, rtol=1e-03)

False

# Gradients

### tensorflow

In [28]:
from typing import Callable, TYPE_CHECKING, Union, List, Tuple, Optional

In [29]:
def _run_forward(model: Union[tf.keras.models.Model, 'keras.models.Model'],
                 x: Union[List[tf.Tensor], List[np.ndarray]],
                 target: Union[None, tf.Tensor, np.ndarray, list]) -> tf.Tensor:
    """
    Returns the output of the model. If the target is not `None`, only the output for the selected target is returned.

    Parameters
    ----------
    model
        Tensorflow or keras model.
    x
        Input data point.
    target
        Target for which the gradients are calculated for classification models.

    Returns
    -------
        Model output or model output after target selection for classification models.

    """

    def _select_target(ps, ts):
        if ts is not None:
            if isinstance(ps, tf.Tensor):
                ps = tf.linalg.diag_part(tf.gather(ps, ts, axis=1))
            else:
                raise NotImplementedError
        else:
            raise ValueError("target cannot be `None` if `model` output dimensions > 1")
        return ps

    preds = model(x)
    if len(model.output_shape) > 1 and model.output_shape[-1] > 1:
        preds = _select_target(preds, target)

    return preds

def _gradients_input(model: Union[tf.keras.models.Model, 'keras.models.Model'],
                     x: List[tf.Tensor],
                     target: Union[None, tf.Tensor]) -> List[tf.Tensor]:
    """
    Calculates the gradients of the target class output (or the output if the output dimension is equal to 1)
    with respect to each input feature.

    Parameters
    ----------
    model
        Tensorflow or keras model.
    x
        Input data point.
    target
        Target for which the gradients are calculated if the output dimension is higher than 1.

    Returns
    -------
        Gradients for each input feature.

    """
    with tf.GradientTape() as tape:
        tape.watch(x)
        preds = _run_forward(model, x, target)

    grads = tape.gradient(preds, x)

    return grads


def _gradients_layer(model: Union[tf.keras.models.Model, 'keras.models.Model'],
                     layer: Union[tf.keras.layers.Layer, 'keras.layers.Layer'],
                     orig_call: Callable,
                     x: List[tf.Tensor],
                     target: Union[None, tf.Tensor]) -> tf.Tensor:
    """
    Calculates the gradients of the target class output (or the output if the output dimension is equal to 1)
    with respect to each element of `layer`.

    Parameters
    ----------
    model
        Tensorflow or keras model.
    layer
        Layer of the model with respect to which the gradients are calculated.
    orig_call
        Original `call` method of the layer. This is necessary since the call method is modified by the function
        in order to make the layer output visible to the GradientTape.
    x
        Input data point.
    target
        Target for which the gradients are calculated if the output dimension is higher than 1.

    Returns
    -------
        Gradients for each element of layer.

    """

    def watch_layer(layer, tape):
        """
        Make an intermediate hidden `layer` watchable by the `tape`.
        After calling this function, you can obtain the gradient with
        respect to the output of the `layer` by calling:

            grads = tape.gradient(..., layer.result)

        """

        def decorator(func):
            def wrapper(*args, **kwargs):
                # Store the result of `layer.call` internally.
                layer.result = func(*args, **kwargs)
                # From this point onwards, watch this tensor.
                tape.watch(layer.result)
                # Return the result to continue with the forward pass.
                return layer.result

            return wrapper

        layer.call = decorator(layer.call)
        return layer

    with tf.GradientTape() as tape:
        watch_layer(layer, tape)
        preds = _run_forward(model, x, target)

    grads = tape.gradient(preds, layer.result)

    delattr(layer, 'result')
    layer.call = orig_call

    return grads

In [30]:
orig_call = layer.call

In [31]:
target = [1 for _ in range(len(x_test_sample))]

In [32]:
grads_tf = _gradients_layer(model, layer, orig_call, x_test_sample, target)

In [33]:
grads_tf = grads_tf.numpy()

In [34]:
from captum._utils.gradient import compute_layer_gradients_and_eval

In [35]:
x_test_sample_pt = torch.from_numpy(x_test_sample)

In [36]:
grads_pt, _ = compute_layer_gradients_and_eval(net,
    net.emb,
    x_test_sample_pt,
    target)
grads_pt = grads_pt[0].numpy()

In [37]:
np.allclose(grads_pt, grads_tf)

True