In [None]:
!pip install nlp
!pip install alibi-detect
!pip install transformers


import nlp
import numpy as np
import os
from transformers import AutoTokenizer
from alibi_detect.cd import KSDrift


In [None]:
model_name = 'bert-base-cased'
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
def load_dataset(dataset: str, split: str = 'test'):
    data = nlp.load_dataset(dataset)
    X, y = [], []
    for x in data[split]:
        X.append(x['text'])
        y.append(x['label'])
    X = np.array(X)
    y = np.array(y)
    return X, y


In [None]:
X, Y = load_dataset('imdb', split='train')

In [None]:
def random_sample(X: np.ndarray, y: np.ndarray, proba_zero: float, n: int):
    if len(y.shape) == 1:
        idx_0 = np.where(y == 0)[0]
        idx_1 = np.where(y == 1)[0]
    else:
        idx_0 = np.where(y[:, 0] == 1)[0]
        idx_1 = np.where(y[:, 1] == 1)[0]
    n_0, n_1 = int(n * proba_zero), int(n * (1 - proba_zero))
    n_0
    idx_0_out = np.random.choice(idx_0, n_0, replace=False)
    idx_1_out = np.random.choice(idx_1, n_1, replace=False)
    X_out = np.concatenate([X[idx_0_out], X[idx_1_out]])
    y_out = np.concatenate([y[idx_0_out], y[idx_1_out]])
    return X_out.tolist(), y_out.tolist()


def padding_last(x: np.ndarray, seq_len: int) -> np.ndarray:
    try:  # try not to replace padding token
        last_token = np.where(x == 0)[0][0]
    except:  # no padding
        last_token = seq_len - 1
    return 1, last_token


def padding_first(x: np.ndarray, seq_len: int) -> np.ndarray:
    try:  # try not to replace padding token
        first_token = np.where(x == 0)[0][-1] + 2
    except:  # no padding
        first_token = 0
    return first_token, seq_len - 1


def inject_word(token: int, X: np.ndarray, perc_chg: float, padding: str = 'last'):
    seq_len = X.shape[1]
    n_chg = int(perc_chg * .01 * seq_len)
    X_cp = X.copy()
    for _ in range(X.shape[0]):
        if padding == 'last':
            first_token, last_token = padding_last(X_cp[_, :], seq_len)
        else:
            first_token, last_token = padding_first(X_cp[_, :], seq_len)
        if last_token <= n_chg:
            choice_len = seq_len
        else:
            choice_len = last_token
        idx = np.random.choice(np.arange(first_token, choice_len), n_chg, replace=False)
        X_cp[_, idx] = token
    return X_cp.tolist()

In [None]:
sample_size = 1000
h0_sample = random_sample(X, Y, proba_zero=.5, n=sample_size)[0]
reference_sample = random_sample(X, Y, proba_zero=.5, n=sample_size)[0]
imbalance_fractions = [.1, .9]
imbalance_samples = {frac: random_sample(X, Y, proba_zero=frac, n=sample_size)[0] for frac in imbalance_fractions}

In [None]:
words = ['fantastic', 'good', 'bad', 'horrible']
perc_chg = [1., 5.]  # % of tokens to change in an instance

words_tf = tokenizer(words)['input_ids']
words_tf = [token[1:-1][0] for token in words_tf]
max_len = 100
tokens = tokenizer(reference_sample, pad_to_max_length=True,
                   max_length=max_len, return_tensors='tf')
X_word = {}
for i, w in enumerate(words_tf):
    X_word[words[i]] = {}
    for p in perc_chg:
        x = inject_word(w, tokens['input_ids'].numpy(), p)
        dec = tokenizer.batch_decode(x, **dict(skip_special_tokens=True))
        X_word[words[i]][p] = dec

In [None]:
from alibi_detect.models.pytorch import TransformerEmbedding

emb_type = 'hidden_state'
n_layers = 8
layers = [-_ for _ in range(1, n_layers + 1)]

embedding = TransformerEmbedding(model_name, emb_type, layers)

In [None]:
from alibi_detect.cd.tensorflow import UAE

tokens = tokenizer(list(X[:5]), pad_to_max_length=True,
                   max_length=max_len, return_tensors='tf')
x_emb = embedding(tokens)
enc_dim = 32
shape = (x_emb.shape[1],)

uae = UAE(input_layer=embedding, shape=shape, enc_dim=enc_dim)

In [None]:
from functools import partial
from alibi_detect.cd.tensorflow import preprocess_drift

# define preprocessing function
preprocess_fn = partial(preprocess_drift, model=uae, tokenizer=tokenizer,
                        max_len=max_len, batch_size=32)

# initialize detector
drift_detector = KSDrift(X_ref, p_val=.05, preprocess_fn=preprocess_fn, input_shape=(max_len,))

In [None]:
preds_h0 = drift_detector.predict(h0_sample)
labels = ['No Drift Detected', 'Drift Detected!']
print('Drift: {}'.format(labels[preds_h0['data']['is_drift']]))
print('Probability Value: {}'.format(preds_h0['data']['p_val']))

In [None]:
for k, v in imbalance_samples.items():
    preds = drift_detector.predict(v)
    print('% Negative Sentiment: {}'.format(k * 100))
    print('Drift: {}'.format(labels[preds['data']['is_drift']]))
    print('Probability Value: {}'.format(preds['data']['p_val']))