In [None]:
%load_ext autoreload

%autoreload 2

In [None]:
import os
import logging

logger = logging.getLogger(__name__)

os.chdir("../")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from src.etl import *
from src.rfm import *
from src.utils import *

In [None]:
url = "https://www.gutenberg.org/cache/epub/1533/pg1533-images.html"

make_dataset(url)

In [None]:
X, vocab = tokenizer("./data/Author: William Shakespeare.txt")

In [None]:
X.shape

In [None]:
data = X.reshape(X.shape[0], X.shape[1]*X.shape[2])

In [None]:
data.shape

In [None]:
vocab

## Baseline: Bigrams/Trigrams

In [None]:
from nltk.util import bigrams, trigrams, pad_sequence
from nltk.lm.preprocessing import padded_everygram_pipeline, flatten
from nltk.lm import MLE

In [None]:
CONTEXT_SIZE = 32
TEST_SPLIT = 0.2

In [None]:
def clean_sentence(sent):
    # remove newline and separate into characters
    sent = list(sent[:-1])
    # add start token and truncate to context size
    sent = (["<START>"] + sent)[:CONTEXT_SIZE]
    # pad to context size
    if len(sent) == CONTEXT_SIZE:
        return sent
    elif len(sent) < CONTEXT_SIZE:
        # add end token
        sent = sent + ["<END>"]
        return sent + ["<PAD>"] * (CONTEXT_SIZE - len(sent))
    else:
        raise ValueError("Sentence too long after truncating. Something went wrong.")


In [None]:
# get the text

fpath = "./data/Author: William Shakespeare.txt"
raw = open(fpath, "r").readlines()
sentences = list(map(clean_sentence, raw))
train_sentences = sentences[:int(len(sentences) * (1 - TEST_SPLIT))]
test_sentences = sentences[int(len(sentences) * (1 - TEST_SPLIT)):]
len(train_sentences)

In [None]:
np.array(train_sentences)[:, :16].shape

In [None]:
# shop the dataset in half. The first half will be used to condition the second half
CONTEXT_SPLIT_SIZE = 16
X_train = np.array(train_sentences)[:, :16]
y_train = np.array(train_sentences)[:, 16:]
X_test = np.array(test_sentences)[:, :16]
y_test = np.array(test_sentences)[:, 16:]

In [None]:
# get ngrams
ngrams = []
for sent in X_train:
    ngrams.append(list(bigrams(sent)))


In [None]:
# pad the text
vocab = list(flatten(sent for sent in X_train))

In [None]:
lm = MLE(3)
lm.fit(ngrams, vocab)

In [None]:
X_train[1]

In [None]:
def generate_text(lm, X_test, length=16):
    # generate text
    text = []
    for sent in X_test:
        text.append(lm.generate(length, text_seed=sent))
    return text

In [None]:
y_test.shape

In [None]:
y_test_pred = np.array(generate_text(lm, X_test))

In [None]:
"".join(y_test[0]), "".join(y_test_pred[0])

In [None]:
lm.vocab.unk_label

In [None]:
class Vocab():
    def __init__(self, vals):
        self._dict = {v: i for i, v in enumerate(vals)}
        self._dict['<UNK>'] = len(self._dict)
        self.rev = {i: v for v, i in self._dict.items()}
    
    def __getitem__(self, key):
        if key in self._dict:
            return self._dict[key]
        else:
            return self._dict['<UNK>']
    
    def __len__(self):
        return len(self._dict)
    
    def __iter__(self):
        return iter(self._dict)
    
    def __contains__(self, key):
        return key in self._dict
    
    def __repr__(self):
        return f"{self.__class__.__name__}({self._dict})"
    
    def decode(self, idx):
        return self.rev[idx]

In [None]:
train_vocab = Vocab(list(sorted(lm.vocab.counts.keys())))
train_vocab

In [None]:
def encode(sent, vocab=train_vocab):
    return np.array([vocab[w] for w in sent])

def evaluate(y_test, y_test_pred):
    # evaluate
    bleu = []
    perplexity = []
    y_perplexity = []
    for i in range(len(y_test)):
        bleu.append(utils.bleu_score([y_test[i]], y_test_pred[i], n=2))
        perplexity.append(utils.perplexity(encode(y_test_pred[i])))
        y_perplexity.append(utils.perplexity(encode(y_test[i])))
    return { "bleu2": np.mean(bleu), "perplexity": np.mean(perplexity), "true_perplexity": np.mean(y_perplexity) }

In [None]:
list(map(lambda x: "".join(x), y_test_pred))

In [None]:
evaluate(y_test, y_test_pred)

## Laplacian Kernel

In [None]:
# replace each token with its index in the vocab
X_train_enc = np.array([encode(sent) for sent in X_train])
y_train_enc = np.array([encode(sent) for sent in y_train])
X_test_enc = np.array([encode(sent) for sent in X_test])
y_test_enc = np.array([encode(sent) for sent in y_test])

In [None]:
# one hot encode the data
X_train_enc = np.eye(len(train_vocab))[X_train_enc]
X_train_enc = X_train_enc.reshape(X_train_enc.shape[0], X_train_enc.shape[1]*X_train_enc.shape[2])
y_train_enc = np.eye(len(train_vocab))[y_train_enc]
y_train_enc = y_train_enc.reshape(y_train_enc.shape[0], y_train_enc.shape[1]*y_train_enc.shape[2])
X_test_enc = np.eye(len(train_vocab))[X_test_enc]
X_test_enc = X_test_enc.reshape(X_test_enc.shape[0], X_test_enc.shape[1]*X_test_enc.shape[2])
y_test_enc = np.eye(len(train_vocab))[y_test_enc]
y_test_enc = y_test_enc.reshape(y_test_enc.shape[0], y_test_enc.shape[1]*y_test_enc.shape[2])

In [None]:
from functools import partial

In [None]:
next_char_train = y_train_enc[:, :len(train_vocab)]
next_char_test = y_test_enc[:, :len(train_vocab)]

In [None]:
# train a laplace kernel
lam = 1
kernel = partial(utils.K_laplace_mat, gamma=0.025)

K = kernel(X_train_enc, X_train_enc)
alpha_hat = np.linalg.solve(K + lam * np.eye(K.shape[0]), next_char_train)

In [None]:
yhat = kernel(X_test_enc, X_train_enc) @ alpha_hat
yhat

In [None]:
(next_char_test.argmax(axis=1) == yhat.argmax(axis=1)).mean()

In [None]:
max_index = np.argmax(yhat, axis=1)
yhat[np.arange(yhat.shape[0]), max_index] = 1
yhat[yhat != 1] = 0
plt.imshow(yhat, aspect=0.1)

In [None]:
# plot next_char_train, with a good aspect ratio
plt.imshow(next_char_test, aspect=0.1)

In [None]:
yhat_train = kernel(X_train_enc, X_train_enc) @ alpha_hat

(next_char_train.argmax(axis=1) == yhat_train.argmax(axis=1)).mean()

In [None]:
def generate_text_kernel(kernel, alpha_hat, X, z, length=16):
    text = []
    for i in range(length):
        yhat = kernel(X, z) @ alpha_hat
        # decode the text
        text.append([train_vocab.decode(y) for y in np.argmax(yhat, axis=1)])
        # move the window forward
        X = np.concatenate([X[:, len(train_vocab):], yhat], axis=1)
    
    transpose = list(zip(*text))
    return transpose

In [None]:
generated_test = generate_text_kernel(kernel, alpha_hat, X_test_enc, X_train_enc, length=16)

In [None]:
list(map(lambda x: "".join(x), generated_test))

In [None]:
evaluate(y_test, generated_test)

## RFM

In [None]:
alpha, M, _ = train_rfm(X_train_enc, next_char_train)

In [None]:
y_train_pred = utils.K_M(X_train_enc, X_train_enc, M, L=1.0) @ alpha

In [None]:
utils.mse(y_train_pred, next_char_train)


In [None]:
fig, ax = plt.subplots(ncols=2, figsize=(16, 9))
ar = (y_train_pred.shape[1]/y_train_pred.shape[0])
ax[0].imshow(utils.softmax(y_train_pred, axis=1), aspect=ar)
ax[1].imshow(next_char_train, aspect=ar)

In [None]:
y_test_pred = utils.K_M(X_test_enc, X_train_enc, M, L=1.0) @ alpha

In [None]:
utils.mse(y_test_pred, next_char_test)

In [None]:
fig, ax = plt.subplots(ncols=2, figsize=(16, 9))
ar = (y_test_pred.shape[1]/y_test_pred.shape[0])
y_test_argmax_ohc = np.eye(len(train_vocab))[y_test_pred.argmax(axis=1)]
ax[0].imshow(y_test_argmax_ohc, aspect=ar)
ax[1].imshow(next_char_test, aspect=ar)

In [None]:
plt.imshow(y_test_argmax_ohc - next_char_test, aspect=ar)
plt.colorbar()

In [None]:
from functools import partial

In [None]:
kernel_rfm = partial(utils.K_M, M=M, L=1.0)

gen_test_rfm = generate_text_kernel(kernel_rfm, alpha, X_test_enc, X_train_enc, length=16)

In [None]:
evaluate(y_test, gen_test_rfm)

In [None]:
list(map(lambda x: "".join(x), y_test_pred))

In [None]:
# get train and test accuracy
train_acc = (next_char_train.argmax(axis=1) == y_train_pred.argmax(axis=1)).mean()
test_acc = (next_char_test.argmax(axis=1) == y_test_pred.argmax(axis=1)).mean()

print(f"Train accuracy: {train_acc:.2f}")
print(f"Test accuracy: {test_acc:.2f}")