In [76]:
%load_ext autoreload
%autoreload 2

In [2]:
# go up a directory
import os
os.chdir('..')

In [3]:
from openxai.Explainer import Explainer
import torch
from utils import get_model_names
from openxai.dataloader import return_loaders, get_tokenizer_and_vocab
from sklearn.metrics import auc
import numpy as np
from openxai.ML_Models.LR.model import LogisticRegression
import openxai.ML_Models.ANN.MLP as model_MLP
import openxai.ML_Models.ANN.Text_MLP as model_MLP_Text
import datetime

In [4]:
def getExperimentID():
    date_info = datetime.datetime.now()
    testID    = '%d%02d%02d_%02d%02d' % (date_info.year, date_info.month, date_info.day, date_info.hour, date_info.minute)
    return testID

In [20]:
data_name = 'yelp'
model_name = 'text_ann'
base_model_dir = './models/ClassWeighted/'
model_dir, model_file_name = get_model_names(model_name, data_name, base_model_dir)
num_test_samps = 100
batch_size = 1

In [21]:
# Load dataset
loader_train, loader_val, loader_test = return_loaders(data_name=data_name, batch_size=batch_size, download=False)

# Get the data
X_train = [data[0] for data in loader_train.dataset]
y_train = np.array([data[1] for data in loader_train.dataset])
X_val   = [data[0] for data in loader_val.dataset]
y_val   = np.array([data[1] for data in loader_val.dataset])
X_test  = [data[0] for data in loader_test.dataset]
y_test  = np.array([data[1] for data in loader_test.dataset])

Vocabulary size: 1721


In [22]:
def DefineModel(model_name, dim_per_layer=None, activation_per_layer=None, vocab_size=None, embed_dim=None, num_class=None):
    if 'text_ann' in model_name:
        # model = model_MLP_Text.Text_MLP(vocab_size, embed_dim, num_class)
        model = model_MLP_Text.Text_MLP(vocab_size)
    else:
        input_size = loader_train.dataset.get_number_of_features()
        if 'ann' in model_name:
            dim_per_layer = [input_size] + dim_per_layer
            model         = model_MLP.MLP(dim_per_layer, activation_per_layer)
        elif model_name == 'lr':
            dim_per_layer = [input_size] + dim_per_layer
            model         = LogisticRegression(dim_per_layer[0], dim_per_layer[1])
    
    return model

In [23]:
# Load model
tokenizer, voc = get_tokenizer_and_vocab(X_train, y_train)

Vocabulary size: 1721


In [24]:
model = DefineModel(model_name, vocab_size=len(voc))

model.load_state_dict(torch.load(model_dir + model_file_name))
model.eval()
model

Text_MLP(
  (embeddings): TokenEmbedding(
    (embedding): Embedding(1721, 8)
  )
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.3, inplace=False)
  )
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=8, out_features=8, bias=True)
        )
        (linear1): Linear(in_features=8, out_features=32, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=32, out_features=8, bias=True)
        (norm1): LayerNorm((8,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((8,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (linear): Linear(in_features=8, out_features=2, bias=True)
  (dropout): Dropout(p=0.3, inplace=False)
)

In [25]:
# LIME
kernel_width           = 0.75
std_LIME               = 0.1
mode                   = 'text'
sample_around_instance = True
n_samples_LIME         = 1000
discretize_continuous  = False

# grad
absolute_value = True

# Smooth grad
n_samples_SG = 100#16
std_SG       = 0.005

# Integrated gradients
method             = 'gausslegendre'
multiply_by_inputs = False
n_steps            = 50#16

#SHAP
n_samples = 500#16

In [26]:
param_dict_lime = dict()
param_dict_lime['dataset_tensor']         = None
param_dict_lime['kernel_width']           = kernel_width
param_dict_lime['std']                    = std_LIME
param_dict_lime['mode']                   = mode
param_dict_lime['sample_around_instance'] = sample_around_instance
param_dict_lime['n_samples']              = n_samples_LIME
param_dict_lime['discretize_continuous']  = discretize_continuous
param_dict_lime['categorical_features']   = None

param_dict_grad                   = dict()
param_dict_grad['absolute_value'] = absolute_value

param_dict_sg                       = dict()
param_dict_sg['n_samples']          = n_samples_SG
param_dict_sg['standard_deviation'] = std_SG

param_dict_ig                       = dict()
param_dict_ig['method']             = method
param_dict_ig['multiply_by_inputs'] = multiply_by_inputs
param_dict_ig['baseline']           = None #torch.mean(X_train, dim=0).reshape(1, -1).float()
param_dict_ig['n_steps']            = n_steps

param_dict_shap              = dict()
param_dict_shap['n_samples'] = n_samples

param_dicts = {'lime': param_dict_lime, 'grad': param_dict_grad, 'sg': param_dict_sg, 'ig': param_dict_ig,
               'shap': param_dict_shap, 'itg': dict(), 'random': dict()}

In [27]:
X_test_lime = [data[0] for data in X_test]
X_train_lime = [data[0] for data in X_train]

In [39]:
def calculateFaithfulnessAUC_text(test_sentences, explanations, text_classifier, min_idx, max_idx, max_k, do_pgu=False, do_random_baseline=False):
    PG_AUC = []
    for index, test_sentence in enumerate(test_sentences):
        if len(tokenizer(test_sentence)) <= max_k + 2:
            print(f'Less than {max_k} tokens in sentence. Skipping the sentence:', test_sentence)
            continue
        if index == max_idx:
            break
        if max_k > 1:
            auc_x = np.arange(max_k) / (max_k - 1)
        PG = []
        for top_k in range(1, max_k + 1):
            PG.append(
                PG_words(test_sentence, explanations[index][0][:top_k], text_classifier, do_pgu, do_random_baseline)) 
        if max_k > 1:
            PG_AUC.append(auc(auc_x, PG))
        else:
            PG_AUC.append(PG)

    return PG_AUC

def PG_words(test_sentence, topk_exp_words, text_classifier, do_pgu, do_random_baseline=False):
    tokenized_input = torch.tensor([voc[t] for t in tokenizer(test_sentence)])
    with torch.no_grad():
        pred_original = text_classifier(tokenized_input.unsqueeze(0))

    if do_random_baseline:
        # Create a mask where each element is True if it is NOT in values_to_remove
        mask = torch.rand(tokenized_input.shape) > 0.5
        if not do_pgu:
            while mask.sum() == 0:
                mask = torch.rand(tokenized_input.shape) > 0.5
        else:
            while mask.sum() == len(tokenized_input):
                mask = torch.rand(tokenized_input.shape) > 0.5
    else:
        # find the indices where the topk words are in the sentence
        voc_values_to_remove = []
        for word in topk_exp_words:
            top_k_idx_to_remove = voc[tokenizer(word)[0]]
            voc_values_to_remove.append(top_k_idx_to_remove)
        
        # Create a mask where each element is True if it is NOT in values_to_remove
        mask = ~torch.isin(tokenized_input, torch.tensor(voc_values_to_remove))
    
    if do_pgu:
        # flip the mask
        mask = ~mask
    
    # Apply the mask to get the filtered tensor
    filtered_tokenized_input = tokenized_input[mask]

    with torch.no_grad():
        pred_removed = text_classifier(filtered_tokenized_input.unsqueeze(0))
        
    PG = torch.abs(pred_original.squeeze() - pred_removed.squeeze())[0] # [0] - take first class. assume binary classifier (this is what openxai does) 
    return PG

In [40]:
def classifier_fn(perturbed_texts):
    all_tokenized_sentences = []
    for text in perturbed_texts:
        temp_texts = []
        for t in tokenizer(text):
            temp_texts.append(voc[t])
        all_tokenized_sentences.append(temp_texts)
    
    max_len = max([len(tokens) for tokens in all_tokenized_sentences])
    for sentence in all_tokenized_sentences:
        while len(sentence) < max_len:
            sentence.append(0) # pad token
    all_tokenized_sentences = [torch.tensor(sentence) for sentence in all_tokenized_sentences]
    inputs = torch.stack(all_tokenized_sentences)
    
    model.eval()
    with torch.no_grad():
        predictions = model(inputs)
    return predictions

In [41]:
k = 3
experiment_id = getExperimentID()

In [47]:
algos = ['lime16', 'lime1000', 'random'] #'random', 'lime'
for algo in algos:
    print('algo:', algo)
    if algo == 'lime16':
        param_dicts['lime']['n_samples'] = 16
    elif algo == 'lime1000':
        param_dicts['lime']['n_samples'] = 1000
        
    if algo == 'random':
        do_random_baseline = True
        PGI_AUC = calculateFaithfulnessAUC_text(X_test_lime[:num_test_samps], np.zeros((num_test_samps*10,1,k)), model, 0, num_test_samps, k, do_pgu=False, do_random_baseline=do_random_baseline)
        PGU_AUC = calculateFaithfulnessAUC_text(X_test_lime[:num_test_samps], np.zeros((num_test_samps*10,1,k)), model, 0, num_test_samps, k, do_pgu=True, do_random_baseline=do_random_baseline)
    else:  # lime
        do_random_baseline = False
        
        all_exps = []
        num_valid_exps = 0
        for index, test_sentence in enumerate(X_test_lime):
            if num_valid_exps == num_test_samps:
                print('num_test_samps reached')
                break
            if len(tokenizer(test_sentence)) <= k:
                print(f'Less than {k} tokens in sentence. Skipping the sentence:', test_sentence)
                all_exps.append([])
                continue
                
            explainer = Explainer(method='lime', model=classifier_fn, dataset_tensor=X_train_lime, param_dict=param_dicts['lime'])
            explanations = explainer.get_explanation([test_sentence], seed=0, disable_tqdm=True)
            
            # list of tuples: (word, importance), sorted by importance. Get top-k words.
            exps = [[exp.as_list()[i][0] for i in range(len(exp.as_list()[:k]))] for exp in explanations] 
            all_exps.append(exps)
            num_valid_exps += 1
        PGI_AUC = calculateFaithfulnessAUC_text(X_test_lime[:num_test_samps], all_exps, model, 0, num_test_samps, k, do_pgu=False, do_random_baseline=do_random_baseline)
        PGU_AUC = calculateFaithfulnessAUC_text(X_test_lime[:num_test_samps], all_exps, model, 0, num_test_samps, k, do_pgu=True, do_random_baseline=do_random_baseline)

        
    print('PGI:' + str(round(np.mean(PGI_AUC), 3)) + '+/-' + str(round(np.std(PGI_AUC)/np.sqrt(len(PGI_AUC)), 3)))
    print('PGU:' + str(round(np.mean(PGU_AUC), 3)) + '+/-' + str(round(np.std(PGU_AUC)/np.sqrt(len(PGU_AUC)), 3)))
    
    # save out to file
    with open(f'outputs/TextFaithfulnessResults/{data_name}/faithfulness_{experiment_id}_{algo}_{data_name}_{model_name}_AUC-k{k}.txt', 'w') as f:
        f.write('PGI:' + str(round(np.mean(PGI_AUC), 3)) + '+/-' + str(round(np.std(PGI_AUC)/np.sqrt(len(PGI_AUC)), 3)) + '\n')
        f.write('PGU:' + str(round(np.mean(PGU_AUC), 3)) + '+/-' + str(round(np.std(PGU_AUC)/np.sqrt(len(PGU_AUC)), 3)) + '\n')

algo: lime16
text_instance:  the presentation of the food was awful.
text_instance:  Worst food/service I've had in a while.
text_instance:  Never again will I be dining at this place!
text_instance:  I guess maybe we went on an off night but it was disgraceful.
text_instance:  As a sushi lover avoid this place by all means.
text_instance:  The ambiance isn't much better.
text_instance:  This hole in the wall has great Mexican street tacos, and friendly staff.
text_instance:  If the food isn't bad enough for you, then enjoy dealing with the world's worst/annoying drunk people.
text_instance:  Will never, ever go back.
text_instance:  The atmosphere here is fun.
text_instance:  The pancake was also really good and pretty large at that.
text_instance:  All of the tapas dishes were delicious!
text_instance:  The chains, which I'm no fan of, beat this place easily.
text_instance:  Everyone is very attentive, providing excellent customer service.
text_instance:  The staff are also very frie

In [43]:
from captum.attr import LayerIntegratedGradients, LayerGradientShap, LayerDeepLift, LayerGradientXActivation, LayerActivation, LayerConductance

In [44]:
# lig is LayerIntegratedGradients
# lgshap is LayerGradientShap
# ldl is LayerDeepLift
# lgxa is LayerGradientXActivation
# la is LayerActivation
# lc is LayerConductance

In [45]:
grad_algos = ['lig', 'lgshap', 'ldl', 'lgxa', 'la', 'lc']
for algo in grad_algos:
    print('algo', algo)
    all_exps = []
    num_valid_exps = 0
    if algo == 'lig':
        attr_method = LayerIntegratedGradients(model, model.embeddings.embedding)
    elif algo == 'lgshap':
        attr_method = LayerGradientShap(model, model.embeddings.embedding)
    elif algo == 'ldl':
        attr_method = LayerDeepLift(model, model.embeddings.embedding)
    elif algo == 'lgxa':
        attr_method = LayerGradientXActivation(model, model.embeddings.embedding)
    elif algo == 'la':
        attr_method = LayerActivation(model, model.embeddings.embedding)
    elif algo == 'lc':
        attr_method = LayerConductance(model, model.embeddings.embedding)
    else:
        print('algo not implemented yet')
        continue
    model = model.to('cpu')
    for index, test_sentence in enumerate(X_test_lime):
        if num_valid_exps == num_test_samps:
            print('num_test_samps reached')
            break
        if len(tokenizer(test_sentence)) <= k:
            print(f'Less than {k} tokens in sentence. Skipping the sentence:', test_sentence)
            all_exps.append([])
            continue
        # print('test_sentence:', test_sentence)
        tokenized_input = torch.tensor([voc[t] for t in tokenizer(test_sentence)])
        
        ref = torch.tensor([[0]*len(tokenized_input)], dtype=torch.long)
        
        x = tokenized_input.unsqueeze(0)
        
        if algo == 'lig':
            attributions, delta = attr_method.attribute(x.float(), ref, n_steps=500, return_convergence_delta=True, target=0)
        elif algo == 'lgxa':
            attributions = attr_method.attribute(x.float(), target=0)
        elif algo == 'la':
            attributions = attr_method.attribute(x.float())
        else:
            attributions, delta = attr_method.attribute(x.float(), ref, return_convergence_delta=True, target=0)
        attributions = attributions.sum(dim=2).squeeze(0)
        attributions = attributions / torch.norm(attributions)
        
        # get indices of topk attributions
        topk_inds = torch.argsort(attributions, descending=True)[:k]
        top_k_voc_inds = tokenized_input[topk_inds]
        top_k_words = [voc.itos[ind] for ind in top_k_voc_inds]
        all_exps.append([top_k_words])
        num_valid_exps += 1
    
    PGI_AUC = calculateFaithfulnessAUC_text(X_test_lime[:num_test_samps], all_exps, model, 0, num_test_samps, k, do_pgu=False, do_random_baseline=False)
    PGU_AUC = calculateFaithfulnessAUC_text(X_test_lime[:num_test_samps], all_exps, model, 0, num_test_samps, k, do_pgu=True, do_random_baseline=False)
    print('PGI:' + str(round(np.mean(PGI_AUC), 3)) + '+/-' + str(round(np.std(PGI_AUC)/np.sqrt(len(PGI_AUC)), 3)))
    print('PGU:' + str(round(np.mean(PGU_AUC), 3)) + '+/-' + str(round(np.std(PGU_AUC)/np.sqrt(len(PGU_AUC)), 3)))
    
    # save out to file
    with open(f'outputs/TextFaithfulnessResults/{data_name}/faithfulness_{experiment_id}_{algo}_{data_name}_{model_name}_AUC-k{k}.txt', 'w') as f:
        f.write('PGI:' + str(round(np.mean(PGI_AUC), 3)) + '+/-' + str(round(np.std(PGI_AUC)/np.sqrt(len(PGI_AUC)), 3)) + '\n')
        f.write('PGU:' + str(round(np.mean(PGU_AUC), 3)) + '+/-' + str(round(np.std(PGU_AUC)/np.sqrt(len(PGU_AUC)), 3)) + '\n')

algo lig
Less than 3 tokens in sentence. Skipping the sentence: Fantastic food!
num_test_samps reached
Less than 3 tokens in sentence. Skipping the sentence: Fantastic food!
Less than 3 tokens in sentence. Skipping the sentence: Crust is not good.
Less than 3 tokens in sentence. Skipping the sentence: An absolute must visit!
Less than 3 tokens in sentence. Skipping the sentence: Service was fantastic.
Less than 3 tokens in sentence. Skipping the sentence: Appetite instantly gone.
Less than 3 tokens in sentence. Skipping the sentence: I consider this theft.
Less than 3 tokens in sentence. Skipping the sentence: Food was below average.
Less than 3 tokens in sentence. Skipping the sentence: Weird vibe from owners.
Less than 3 tokens in sentence. Skipping the sentence: Fantastic food!
Less than 3 tokens in sentence. Skipping the sentence: Crust is not good.
Less than 3 tokens in sentence. Skipping the sentence: An absolute must visit!
Less than 3 tokens in sentence. Skipping the sentence: 

In [5]:
# all_labels_train = []
# all_tokenized_inputs_w_offsets_train = []
# # all_offsets_train = []
# # all_tokenized_inputs_no_offsets_train = []
# for index, input_tuple in enumerate(loader_train):
#     (labels, inputs) = input_tuple
#     all_labels_train.append(labels[0])
#     all_tokenized_inputs_w_offsets_train.append(inputs[0])
#     # all_offsets_train.append(offsets)
#     # all_tokenized_inputs_no_offsets_train.append(tokenized_list[0])
#     

In [6]:
# from torch.nn.utils.rnn import pad_sequence
# 
# padded_inputs_train = pad_sequence(all_tokenized_inputs_w_offsets_train, batch_first=True)

In [12]:
# # Generate embeddings
# def generate_embeddings(texts):
#     embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
#     batch_size = 256  # Adjust based on your system's memory capacity
#     embeddings = []
#     # print('transforming data...')
#     num_batches = len(texts) // batch_size
#     for i in range(0, len(texts), batch_size):
#         # if i % 100 == 0:
#             # print(f'Processing batch {i // batch_size + 1}/{num_batches}')
#         batch = texts[i:i + batch_size]
#         batch_embeddings = embedding_model.encode(batch)
#         embeddings.extend(batch_embeddings)
#     return embeddings

In [13]:
# def classifier_fn(sentences):
#     embeddings = generate_embeddings(sentences)
#     with torch.no_grad():
#         preds = model(torch.Tensor(embeddings))
#     # return the class argmax
#     # preds = torch.argmax(preds, dim=1)
#     return preds

In [7]:
# padded_inputs_train.shape

In [8]:
# all_labels_test = []
# all_tokenized_inputs_w_offsets_test = []
# # all_offsets_test = []
# # all_tokenized_inputs_no_offsets_test = []
# for index, input_tuple in enumerate(loader_test):
#     (labels, inputs) = input_tuple
#     all_labels_test.append(labels[0])
#     all_tokenized_inputs_w_offsets_test.append(inputs[0])
#     # all_offsets_test.append(offsets)
#     # all_tokenized_inputs_no_offsets_test.append(tokenized_list[0])
# padded_inputs_test = pad_sequence(all_tokenized_inputs_w_offsets_test, batch_first=True)
# padded_inputs_test.shape

In [29]:
# #get the top k indices given a list of strings
# def get_top_k_indices(top_k, feature_names):
#     top_k_indices = []
#     for feature in top_k:
#         top_k_indices.append(feature_names.index(feature))
#     return top_k_indices
# 
# # top_k_inds_0 = get_top_k_indices(LIME_exps[0], X_test_sentences[0].split()) # [10, 34, 12]

In [40]:
# from nltk.corpus import wordnet
# 
# def get_synonyms(word):
#     synonyms = set()
#     for syn in wordnet.synsets(word):
#         for lemma in syn.lemmas():
#             synonyms.add(lemma.name())  # add the synonyms
#     return list(synonyms)
# 
# def get_antonyms(word):
#     antonyms = set()
#     for syn in wordnet.synsets(word):
#         for lemma in syn.lemmas():
#             if lemma.antonyms():  # Check if antonyms are available
#                 antonyms.add(lemma.antonyms()[0].name())  # add the antonyms
#     return list(antonyms)
# 
# get_synonyms('good')

['commodity',
 'salutary',
 'thoroughly',
 'undecomposed',
 'honorable',
 'safe',
 'unspoiled',
 'skilful',
 'proficient',
 'dependable',
 'practiced',
 'respectable',
 'well',
 'near',
 'full',
 'goodness',
 'soundly',
 'secure',
 'just',
 'serious',
 'beneficial',
 'sound',
 'in_effect',
 'upright',
 'right',
 'estimable',
 'in_force',
 'honest',
 'trade_good',
 'dear',
 'effective',
 'good',
 'expert',
 'ripe',
 'adept',
 'unspoilt',
 'skillful']

In [60]:
# # get the words at those indices
# X_test_sentences_temp = X_test_sentences[0].split().copy()
# print('original sentence', X_test_sentences_temp)
# for idx, top_k_word in enumerate(LIME_exps[0]):
#     print(top_k_word)
#     # print(get_synonyms(top_k_word))
#     print('index', top_k_inds_0[idx])
#     synonym = get_synonyms(top_k_word)[0]
#     X_test_sentences_temp[top_k_inds_0[idx]] = synonym
#     print('synonym', synonym)
#     print(X_test_sentences_temp)

original sentence ['I', 'bought', 'this', 'for', 'my', 'wife', 'She', 'says', 'that', 'it', 'makes', 'the', 'process', 'easier', 'and', 'much', 'faster', 'I', 'say', 'that', 'the', 'results', 'are', 'great', 'a', 'very', 'noticeable', 'difference', 'My', 'daughter', 'says', 'shes', 'wants', 'one', 'too']
makes
index 10
synonym piddle
['I', 'bought', 'this', 'for', 'my', 'wife', 'She', 'says', 'that', 'it', 'piddle', 'the', 'process', 'easier', 'and', 'much', 'faster', 'I', 'say', 'that', 'the', 'results', 'are', 'great', 'a', 'very', 'noticeable', 'difference', 'My', 'daughter', 'says', 'shes', 'wants', 'one', 'too']
too
index 34
synonym excessively
['I', 'bought', 'this', 'for', 'my', 'wife', 'She', 'says', 'that', 'it', 'piddle', 'the', 'process', 'easier', 'and', 'much', 'faster', 'I', 'say', 'that', 'the', 'results', 'are', 'great', 'a', 'very', 'noticeable', 'difference', 'My', 'daughter', 'says', 'shes', 'wants', 'one', 'excessively']
process
index 12
synonym sue
['I', 'bought', 

In [61]:
#note: it’s actually not possible to do perturbations in the embedding space. I am asking the LLM (and post hoc explainers) for the most important WORDS for classification. This is only a problem because I’m generating SENTENCE level embeddings and training the model on that. I don’t see a way to generate perturbations in the embedding space because what would I be perturbing?

# # Function to perturb the important features
# def perturb_features(embedding, top_k_indices, std_dev=0.1):
#     perturbed_embedding = embedding.clone()
#     perturbed_embedding[0, top_k_indices] += torch.randn(top_k_indices.size(0)) * std_dev
#     return perturbed_embedding
# perturb_features(torch.tensor(X_test[0]), torch.tensor(top_k_inds_0))

In [27]:
# import torch
# class WrapperModel(torch.nn.Module):
#     def __init__(self, original_model):
#         super().__init__()
#         self.original_model = original_model
# 
#     def forward(self, texts):
#         print('texts', texts)
#         # Assume preprocessing and tokenization here
#         # processed_inputs = [torch.tensor([voc[t] for t in tokens]) for tokens in texts]
#         # inputs = torch.cat(processed_inputs)
#         offsets = torch.tensor([0])
#         print('offsets', offsets)
#         
#         # Ensure the model is in the right mode (e.g., eval)
#         self.original_model.eval()
#         with torch.no_grad():
#             predictions = self.original_model(texts, offsets)
#         print('predictions', predictions)
#         return predictions
# 
# # Usage
# wrapper_model = WrapperModel(model)
# # Now you can use wrapper_model to make predictions

In [247]:
# def classifier_fn(inputs): # TODO finish this...... rework this for LIME
#     print('texts',inputs)
#     # Assuming `tokenizer` and `vocab` are defined elsewhere and accessible here
#     # tokenized_texts = [tokenizer(text) for text in texts]
#     # processed_inputs = [torch.tensor([voc.stoi[token] for token in text if token in voc.stoi]) for text in texts]
#     # processed_inputs = [torch.tensor([voc[t] for t in tokens]) for tokens in texts]
#     # offsets = [0] + [len(text) for text in texts[:-1]]
#     # offsets = torch.tensor(offsets).cumsum(dim=0)
# 
#     # assuming we're passing in 1 tokenized sentence at a time!
#     offsets = torch.tensor([0])
#     # inputs = torch.cat(processed_inputs)
#     # offsets = torch.tensor(offsets)
#     print('inputs',inputs)
#     print('offsets',offsets)
#     # Assuming the model is already on the correct device and in evaluation mode
#     # model.eval()
#     # with torch.no_grad():
#     predictions = model(inputs, offsets)
# 
#     print('predictions',predictions)
#     return predictions

In [248]:
# from captum.attr import Lime, LimeBase
# from captum._utils.models.linear_model import SkLearnLinearRegression, SkLearnLasso
# from torch.nn import functional as F

In [160]:
# # remove the batch dimension for the embedding-bag model
# def forward_func(text):
#     print('text', text)
#     print('text.shape', text.shape)
#     return model(text)
# 
# # encode text indices into latent representations & calculate cosine similarity
# def exp_embedding_cosine_distance(original_inp, perturbed_inp, _, **kwargs):
#     print('Making embeddings!')
#     print('original_inp', original_inp)
#     print('perturbed_inp', perturbed_inp)
#     original_emb = model.embeddings.embedding(original_inp.unsqueeze(0))
#     perturbed_emb = model.embeddings.embedding(perturbed_inp)
#     print('embedding shapes:')
#     print('original_emb.shape', original_emb.shape)
#     print('perturbed_emb.shape', perturbed_emb.shape)
#     distance = 1 - F.cosine_similarity(original_emb.squeeze(), perturbed_emb.squeeze(), dim=0)
#     print('distance:', distance)
#     return torch.exp(-1 * (distance ** 2) / 2)
# 
# # binary vector where each word is selected independently and uniformly at random
# def bernoulli_perturb(text, **kwargs):
#     probs = torch.ones_like(text) * 0.5
#     return torch.bernoulli(probs).long()
# 
# # remove absenst token based on the intepretable representation sample
# def interp_to_input(interp_sample, original_input, **kwargs):
#     # temp = original_input[interp_sample.bool()].view(original_input.size(0), -1)
#     temp = original_input[interp_sample.bool()].view(1, -1)
# 
#     print("interp_sample",interp_sample)
#     print('temp',temp)
#     print('original_input',original_input)
#     print(interp_sample.shape, temp.shape)
#     len_original = len(original_input.squeeze())
#     len_temp = temp.squeeze().shape[0]
#     # pad with 0s temp.shape torch.Size([1, 5]) to torch.Size([1, 10])
#     if len_original > len_temp:
#         temp = F.pad(temp, (0, len_original - len_temp), 'constant', 0)
#     
#     print('temp',temp)
#     print('temp.unsqueeze(0).shape', temp.unsqueeze(0).shape)
#     return temp.unsqueeze(0)
# 
# lasso_lime_base = LimeBase(
#     forward_func, 
#     interpretable_model=SkLearnLasso(alpha=0.08),
#     similarity_func=exp_embedding_cosine_distance,
#     perturb_func=bernoulli_perturb,
#     perturb_interpretable_space=True,
#     from_interp_rep_transform=interp_to_input,
#     to_interp_rep_transform=None
# )

In [161]:
# def collate_batch(batch):
#     labels = torch.tensor([label for _, label in batch]) 
#     text_list = [tokenizer(line) for line, _ in batch]
#     
#     # flatten tokens across the whole batch
#     text = torch.tensor([voc[t] for tokens in text_list for t in tokens])
#     tokenized_list = [torch.tensor([voc[t] for t in tokens]) for tokens in text_list]
#     # the offset of each example
#     offsets = torch.tensor(
#         [0] + [len(tokens) for tokens in text_list][:-1]
#     ).cumsum(dim=0)
# 
#     return labels, text, offsets, tokenized_list

In [162]:
# def collate_batch(batch):
#     print(batch)
#     labels = torch.tensor([label for _, label in batch])
#     text_list = [tokenizer(line[0]) for line, _ in batch]
#     #pad the text_list to have the same length
#     max_len = max([len(tokens) for tokens in text_list])
#     for tokens in text_list:
#         while len(tokens) < max_len:
#             tokens.append('<pad>')
# 
#     inputs = torch.stack([torch.tensor([voc[t] for t in tokens]) for tokens in text_list])
# 
#     return labels, inputs

In [166]:
# test_label = 1 
# test_line = [('I think this is a bad product, but needs work')]
# 
# test_labels, test_text = collate_batch([(test_line, test_label)])
# print(test_labels, test_text)
# probs = model(test_text).squeeze()
# print('Prediction probability:', round(probs[test_labels[0]].item(), 4))

[(['I think this is a bad product, but needs work'], 1)]
tensor([1]) tensor([[   4,  256,    8,    9,   10,  100,   31,    7,   40, 1213,   45]])
Prediction probability: 0.0


In [168]:
# attrs = lasso_lime_base.attribute(
#     test_text.unsqueeze(0), # add batch dimension for Captum
#     target=test_labels,
#     # additional_forward_args=(test_offsets,),
#     n_samples=16,
#     show_progress=True
# ).squeeze(0)
# 
# print('Attribution range:', attrs.min().item(), 'to', attrs.max().item())

Lime Base attribution:   0%|          | 0/16 [00:00<?, ?it/s]

interp_sample tensor([[[0, 0, 0, 0, 1, 1, 0, 0, 1, 1, 1]]])
temp tensor([[  10,  100,   40, 1213,   45]])
original_input tensor([[[   4,  256,    8,    9,   10,  100,   31,    7,   40, 1213,   45]]])
torch.Size([1, 1, 11]) torch.Size([1, 5])
temp tensor([[  10,  100,   40, 1213,   45,    0,    0,    0,    0,    0,    0]])
temp.unsqueeze(0).shape torch.Size([1, 1, 11])
Making embeddings!
original_inp tensor([[[   4,  256,    8,    9,   10,  100,   31,    7,   40, 1213,   45]]])
perturbed_inp tensor([[[  10,  100,   40, 1213,   45,    0,    0,    0,    0,    0,    0]]])
embedding shapes:
original_emb.shape torch.Size([1, 1, 1, 11, 8])
perturbed_emb.shape torch.Size([1, 1, 11, 8])
distance: tensor([0.7507, 0.6202, 0.9325, 0.8418, 1.6475, 0.8301, 1.2224, 1.4550])
text tensor([[[  10,  100,   40, 1213,   45,    0,    0,    0,    0,    0,    0]]])
text.shape torch.Size([1, 1, 11])


AssertionError: query should be unbatched 2D or batched 3D tensor but received 4-D query tensor

In [175]:
# test_label = 1  # {1: World, 2: Sports, 3: Business, 4: Sci/Tec}
# test_line = [('I love this product, it is amazing')]
# 
# test_labels, test_text = collate_batch([(test_line, test_label)])
# 
# probs = model(test_text).squeeze(0)
# print('Prediction probability:', round(probs[test_labels[0]].item(), 4))

[(['I love this product, it is amazing'], 1)]
Prediction probability: 1.0


In [76]:
# # remove the batch dimension for the embedding-bag model
# def forward_func(text):
#     return model(text.squeeze(0))
# 
# # encode text indices into latent representations & calculate cosine similarity
# def exp_embedding_cosine_distance(original_inp, perturbed_inp, _, **kwargs):
#     original_emb = model.embeddings.embedding(original_inp)
#     perturbed_emb = model.embeddings.embedding(perturbed_inp)
#     distance = 1 - F.cosine_similarity(original_emb, perturbed_emb, dim=1)
#     return torch.exp(-1 * (distance ** 2) / 2)
# 
# # binary vector where each word is selected independently and uniformly at random
# def bernoulli_perturb(text, **kwargs):
#     probs = torch.ones_like(text) * 0.5
#     return torch.bernoulli(probs).long()
# 
# # remove absenst token based on the intepretable representation sample
# def interp_to_input(interp_sample, original_input, **kwargs):
#     return original_input[interp_sample.bool()].view(original_input.size(0), -1)
# 
# lasso_lime_base = LimeBase(
#     forward_func, 
#     interpretable_model=SkLearnLasso(alpha=0.08),
#     similarity_func=exp_embedding_cosine_distance,
#     perturb_func=bernoulli_perturb,
#     perturb_interpretable_space=True,
#     from_interp_rep_transform=interp_to_input,
#     to_interp_rep_transform=None
# )