### Imports

In [None]:
import numpy as np
import shap
import math
from scipy.stats import entropy
from sklearn.metrics import classification_report

import tensorflow as tf
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
tf.compat.v1.disable_v2_behavior()

from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Embedding
from tensorflow.keras.layers import LSTM
from tensorflow.keras.datasets import imdb

from tqdm import tqdm

from IPython.display import Markdown, display

### Global Variables

In [None]:
RANDOM_SEED = 42
max_features = 20000
maxlen = 80                        # cut texts after this number of words (among top max_features most common words)
batch_size = 32
EPOCHS = 3
class_sentiment_map = {0:'Negative', 1:'Positive'}
q, c, p, b, k, initial_seed_size, initial_seed, j, S_indices, U_indices, train_indices, model_theta, num2word = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0

### Helper Functions

In [None]:
def printmd(string):
    display(Markdown(string))

def load_data():
    print('Loading data...')
    (x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features, seed=RANDOM_SEED)
    print(len(x_train), 'train sequences')
    print(len(x_test), 'test sequences')

    print('Pad sequences (samples x time)')
    x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
    x_test = sequence.pad_sequences(x_test, maxlen=maxlen)
    print('x_train shape:', x_train.shape)
    print('x_test shape:', x_test.shape)

    return x_train, y_train, x_test, y_test

def get_lstm_model():
    print('Build model...')
    model_lstm = Sequential()
    model_lstm.add(Embedding(max_features, 128))
    model_lstm.add(LSTM(128, dropout=0.2, recurrent_dropout=0.2))
    model_lstm.add(Dense(2, activation='softmax'))

    model_lstm.compile(loss='sparse_categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    return model_lstm

def init_workflow(x_train):
    global q, c, p, b, k, initial_seed_size, initial_seed, j, S_indices, U_indices, model_theta, train_indices
    np.random.seed(RANDOM_SEED)
    p = 10
    train_indices = [i for i in range(len(x_train))]
    initial_seed_size = int(len(x_train) * 0.005)
    b = initial_seed_size
    k = 2 * initial_seed_size
    initial_seed = np.random.choice(train_indices, size=initial_seed_size, replace=False)
    j = 1
    S_indices = initial_seed
    U_indices = list(set(train_indices) - set(S_indices))

def train_model_lstm(model_lstm, x_train, y_train):
    print('Train...')
    model_lstm.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=EPOCHS,
              validation_data=(x_test, y_test))
    score, acc = model_lstm.evaluate(x_test, y_test,
                                batch_size=batch_size)
    print('Test score:', score)
    print('Test accuracy:', acc)

    return model_lstm


def compute_cosine_similarity(term_dict_1, term_dict_2):
    cosine_sim = 0
    for index in term_dict_1.keys():
        if(index in term_dict_2):
            cosine_sim += term_dict_1[index] * term_dict_2[index]
    values_term_1 = sum([v**2 for k, v in term_dict_1.items()])
    values_term_2 = sum([v**2 for k, v in term_dict_2.items()])

    cosine_sim /= ( math.sqrt(values_term_1) * math.sqrt(values_term_2) )

    return cosine_sim

def compute_fidelity(model_theta, test_samples_i):
    
    x_subset = x_test[test_samples_i]
    TOP_FEATURE_PCT = 0.1
    TOP_FEATURES_LEN = int(len(x_subset[0]) * TOP_FEATURE_PCT)
    
    print(f'{len(x_subset)=} {TOP_FEATURES_LEN=}')
    pred_proba_orig = model_theta.predict(x_subset)
    pred_orig = np.argmax(pred_proba_orig, axis=1)
    explainer = shap.DeepExplainer(model_theta, x_subset)
    shap_values_labelled = explainer.shap_values(x_subset)
    
    pred_class_shap_values = []
    x_new_subset = np.copy(x_subset)
    for i in range(len(shap_values_labelled[0])):
        
        sort_index = np.argsort(-shap_values_labelled[pred_orig[i]][i])
        top_features_index = sort_index[:TOP_FEATURES_LEN]
        mask = np.ones(len(x_subset[0]), dtype=bool)
        mask[top_features_index] = False
        x_new_subset[i, mask] = 0

    pred_proba_masked = model_theta.predict(x_new_subset)
    pred_masked = np.argmax(pred_proba_masked, axis=1)
    
    fidelity = np.sum(pred_orig == pred_masked)
    
    avg_chg = np.mean((pred_proba_orig - pred_proba_masked), axis=0)
    print(avg_chg)
    
    return fidelity

def get_num2word():
    global num2word
    
    if(not num2word):
        offset = 3

        words = imdb.get_word_index()
        words = {k:(v+offset) for k,v in words.items()}
        words["<PAD>"] = 0
        words["<START>"] = 1
        words["<UNK>"] = 2
        words["<UNUSED>"] = 3

        num2word = dict((i, word) for (word, i) in words.items())
    
    return num2word

def shap_explanations(model_theta, x_test, y_test, sample_num):
    explainer = shap.DeepExplainer(model_theta, x_test[:100])

    shap_values = explainer.shap_values(np.array( [x_test[sample_num],] ))
    shap.initjs()

    num2word = get_num2word()
    
    x_test_words = np.stack([np.array(list(map(lambda x: num2word.get(x, "NONE"), x_test[sample_num]))) for i in range(10)])

    display(shap.force_plot(explainer.expected_value[y_test[sample_num]], shap_values[y_test[sample_num]][0], x_test_words[0]))
    
def decode_sentence(sample_num, x_test):
    num2word = get_num2word()
    decoded_sequence = " ".join(num2word[i] for i in x_test[sample_num])
    
    return decoded_sequence

### ALEX Method

In [None]:
%%time

global q, c, p, b, k, initial_seed_size, initial_seed, j, S_indices, U_indices, model_theta, train_indices
x_train, y_train, x_test, y_test = load_data()
init_workflow(x_train)

while(j <= p):
    print(f'==========Active Learning Iteration {j}/{p}==========')
    x_train_subset = x_train[S_indices]
    y_train_subset = y_train[S_indices]

    print('Training Prediction Model')
    model_theta = get_lstm_model()
    model_theta = train_model_lstm(model_theta, x_train_subset, y_train_subset)

    train_preds = np.argmax(model_theta.predict(x_train_subset), axis=1)

    print('Generating Classification Report')
    y_pred = np.argmax(model_theta.predict(x_test), axis=1)
    print(classification_report(y_test, y_pred, target_names=['Negative', 'Positive']))

    if(j == p):
        break

    print('Training Explainer Model on Labelled Set')
    explainer = shap.DeepExplainer(model_theta, x_train_subset)

    print(f'Computing Shap values for labelled instances - {len(x_train_subset)}')
    shap_values_labelled = explainer.shap_values(x_train_subset)

    print('Predicting unlabelled instances')
    s_x = np.ndarray((len(U_indices), 3))
    preds = model_theta.predict(x_train[U_indices])
    # s_x[:, 0] = np.amax(preds, axis=1)
    s_x[:, 0] = entropy(np.array(preds), base=2,axis=1)
    s_x[:, 1] = U_indices
    s_x[:, 2] = np.argmax(preds, axis=1)

    sort_index = np.argsort(-s_x[:, 0])
    s_x = s_x[sort_index]
    c_indxs = list(map(int, s_x[:k, 1]))

    pred_y_dict = dict(zip(s_x[:k, 1], s_x[:k, 2]))

    print('Training Explainer Model on Unlabelled Candidate Set')
    explainer = shap.DeepExplainer(model_theta, x_train[c_indxs])

    print(f'Computing Shap values for unlabelled candidate set instances - {len(x_train[c_indxs])}')
    shap_values_unlabelled = explainer.shap_values(x_train[c_indxs])

    print('Computing Similarities')
    xu_kld = np.ndarray((len(c_indxs), 2))
    for i, index_xu in enumerate(tqdm(c_indxs)):
        term_dict_U = dict(zip(x_train[int(index_xu)], shap_values_unlabelled[int(pred_y_dict[int(index_xu)])][i]))
        for q, index_x in enumerate(S_indices):
            term_dict_S = dict(zip(x_train[int(index_x)], shap_values_labelled[y_train[int(index_x)]][q]))
            xu_kld[i][0] += compute_cosine_similarity(term_dict_S, term_dict_U)
        xu_kld[i][0] /= len(S_indices)
        xu_kld[i][1] = index_xu

    sort_index = np.argsort(xu_kld[:, 0])
    xu_kld = xu_kld[sort_index]                                             #least similar first; get such top k samples
    delta_S = xu_kld[:b, 1]
    print(f'Adding {len(delta_S)} samples to labelled set.')
    S_indices = np.append(S_indices, list(map(int, delta_S)))
    print(f'Total samples in labelled set {len(S_indices)}')
    print(f'Removing {len(delta_S)} samples from unlabelled set')
    U_indices = list(set(train_indices) - set(S_indices))
    print(f'Total samples in unlabelled set {len(U_indices)}')
    j += 1

### ALEX Fidelity

In [None]:
test_indices = [i for i in range(len(x_test))]

global model_theta

np.random.seed(RANDOM_SEED)
test_samples_i = np.random.choice(test_indices, size=100, replace=False)
fidelity = compute_fidelity(model_theta, test_samples_i)
print(f'Fidelity {fidelity}')

### Random Sampling

In [None]:
%%time

global q, c, p, b, k, initial_seed_size, initial_seed, j, S_indices, U_indices, model_theta, train_indices
x_train, y_train, x_test, y_test = load_data()
init_workflow(x_train)

while(j <= p):
    printmd(f'==========Active Learning Iteration {j}/{p}==========')
    x_train_subset = x_train[S_indices]
    y_train_subset = y_train[S_indices]
    
    print('Training Prediction Model')
    model_theta = get_lstm_model()
    model_theta = train_model_lstm(model_theta, x_train_subset, y_train_subset)
    
    if(j == p):
        break
    
    print(f'Selecting random {initial_seed_size} samples')
    delta_S = np.random.choice(U_indices, size=initial_seed_size, replace=False)
    
    print(f'Adding {len(delta_S)} samples to labelled set.')
    S_indices = np.append(S_indices, list(map(int, delta_S)))
    print(f'Total samples in labelled set {len(S_indices)}')
    
    print(f'Removing {len(delta_S)} samples from unlabelled set')
    U_indices = list(set(train_indices) - set(S_indices))
    print(f'Total samples in unlabelled set {len(U_indices)}')

    U_indices = list(set(train_indices) - set(S_indices))
    j += 1

### Random Sampling Fidelity

In [None]:
test_indices = [i for i in range(len(x_test))]

global model_theta

np.random.seed(RANDOM_SEED)
test_samples_i = np.random.choice(test_indices, size=100, replace=False)
fidelity = compute_fidelity(model_theta, test_samples_i)
print(f'Fidelity {fidelity}')

### Uncertainty Sampling Least Confidence

In [None]:
%%time

global q, c, p, b, k, initial_seed_size, initial_seed, j, S_indices, U_indices, model_theta, train_indices
x_train, y_train, x_test, y_test = load_data()
init_workflow(x_train)

while(j <= p):
    printmd(f'==========Active Learning Iteration {j}/{p}==========')
    x_train_subset = x_train[S_indices]
    y_train_subset = y_train[S_indices]
    
    print('Training Prediction Model')
    model_theta = get_lstm_model()
    model_theta = train_model_lstm(model_theta, x_train_subset, y_train_subset)
    
    if(j == p):
        break
    
    print('Predicting unlabelled instances and computing uncertainty by least confidence')
    s_x = np.ndarray((len(U_indices), 2))
    s_x[:, 0] = 1-np.amax(model_theta.predict(x_train[U_indices]), axis=1)
    s_x[:, 1] = U_indices
    sort_index = np.argsort(s_x[:, 0])
    s_x = s_x[-sort_index]   #larger the (1-model_top_pred) difference, lower the confidence of the model in that prediction, get such top k samples
    delta_S = s_x[:k, 1]

    print(f'Adding {len(delta_S)} samples to labelled set.')
    S_indices = np.append(S_indices, list(map(int, delta_S)))
    print(f'Total samples in labelled set {len(S_indices)}')
    
    print(f'Removing {len(delta_S)} samples from unlabelled set')
    U_indices = list(set(train_indices) - set(S_indices))
    print(f'Total samples in unlabelled set {len(U_indices)}')
    
    j += 1

### Uncertainty Sampling Least Confidence Fidelity

In [None]:
test_indices = [i for i in range(len(x_test))]

global model_theta

np.random.seed(RANDOM_SEED)
test_samples_i = np.random.choice(test_indices, size=100, replace=False)
fidelity = compute_fidelity(model_theta, test_samples_i)
print(f'Fidelity {fidelity}')

### Uncertainty Sampling Smallest Margin

In [None]:
%%time

global q, c, p, b, k, initial_seed_size, initial_seed, j, S_indices, U_indices, model_theta, train_indices
x_train, y_train, x_test, y_test = load_data()
init_workflow(x_train)

while(j <= p):
    printmd(f'==========Active Learning Iteration {j}/{p}==========')
    x_train_subset = x_train[S_indices]
    y_train_subset = y_train[S_indices]
    
    print('Training Prediction Model')
    model_theta = get_lstm_model()
    model_theta = train_model_lstm(model_theta, x_train_subset, y_train_subset)
    
    if(j == p):
        break
    
    print('Predicting unlabelled instances and computing uncertainty by smallest margin')
    s_x = np.ndarray((len(U_indices), 2))
    pred = model_theta.predict(x_train[U_indices])
    s_x[:, 0] = abs(pred[:, 0] - pred[:, 1])
    s_x[:, 1] = U_indices
    sort_index = np.argsort(s_x[:, 0])
    s_x = s_x[sort_index]   #smallest difference (margin) means model is struggling to differentiate between the two classes; take such top k samples
    delta_S = s_x[:k, 1]

    print(f'Adding {len(delta_S)} samples to labelled set.')
    S_indices = np.append(S_indices, list(map(int, delta_S)))
    print(f'Total samples in labelled set {len(S_indices)}')
    
    print(f'Removing {len(delta_S)} samples from unlabelled set')
    U_indices = list(set(train_indices) - set(S_indices))
    print(f'Total samples in unlabelled set {len(U_indices)}')
    
    j += 1

### Uncertainty Sampling Smallest Margin Fidelity

In [None]:
test_indices = [i for i in range(len(x_test))]

global model_theta

np.random.seed(RANDOM_SEED)
test_samples_i = np.random.choice(test_indices, size=100, replace=False)
fidelity = compute_fidelity(model_theta, test_samples_i)
print(f'Fidelity {fidelity}')

### SHAP Explanations

In [None]:
global model_theta

sample_num = 0            
prediction = model_theta.predict(np.array([x_test[sample_num], ]))
print(f'Review True Sentiment - {class_sentiment_map[y_test[sample_num]]}')
print(f'Review Predicted Sentiment - {class_sentiment_map[np.argmax(prediction)]} with Probability - {np.max(prediction)}')
shap_explanations(model_theta, x_test, y_test, sample_num)

In [None]:
print(decode_sentence(sample_num, x_test))