In [None]:
%cd ../

In [None]:
import numpy as np
import torch
import torch.nn.functional as F

In [None]:
book_fname = "./data/goblet_book.txt"
with open(book_fname, 'r') as book:
    book_data = book.read()
len(book_data)

In [None]:
word_list = book_data.split()
chars = [[*word] for word in word_list]
max_len = max(len(word) for word in chars)
for wordl in chars:
    while len(wordl) < max_len:
        wordl.append(' ')
chars = np.array(chars)

In [None]:
unique_chars = list(np.unique(chars))
unique_chars.append('\n')
unique_chars.append('\t')
K = len(unique_chars)  # dimensionality of the input and output vectors
K

In [None]:
char_to_ind = {}
ind_to_char = {}
for idx, char in enumerate(unique_chars):
    char_to_ind[char] = idx
    ind_to_char[idx] = char

In [None]:
m = 100  # dimensionality of the hidden state
eta = 0.1  # learning rate
seq_length = 25  # length of input sequences used during training
epsilon = 1e-8  # for AdaGrad

In [None]:
sig = 0.01
RNN = {'b': torch.zeros((m, 1), dtype=torch.double), 'c': torch.zeros((K, 1), dtype=torch.double), 'U': torch.normal(0.0, sig, (m, K), dtype=torch.double), 'W': torch.normal(0.0, sig, (m, m), dtype=torch.double), 'V': torch.normal(0.0, sig, (K, m), dtype=torch.double), 'h0': torch.zeros((m, 1), dtype=torch.double)}

In [None]:
def encode_char(char):
    oh = [0]*K
    oh[char_to_ind[char]] = 1
    return oh

In [None]:
def synthetize_seq(rnn, h0, x0, n):
    t, ht, xt = 0, h0, x0
    indexes = []
    while t < n:
        xt = xt.reshape((K, 1))
        at = torch.mm(rnn['W'], ht) + torch.mm(rnn['U'], xt) + rnn['b']
        ht = torch.tanh(at)
        ot = torch.mm(rnn['V'], ht) + rnn['c']
        pt = F.softmax(ot, dim=0)
        cp = torch.cumsum(pt, dim=0)
        a = torch.rand(1)
        ixs = torch.where(cp - a > 0)
        ii = ixs[0][0].item()
        indexes.append(ii)
        xt = torch.zeros((K, 1), dtype=torch.double)
        xt[ii, 0] = 1
        t += 1
    Y = []
    for idx in indexes:
        oh = [0]*K
        oh[idx] = 1
        Y.append(oh)
    Y = torch.tensor(Y).t()
    
    s = ''
    for i in range(Y.shape[1]):
        idx = torch.where(Y[:, i] == 1)[0].item()
        s += ind_to_char[idx]
    
    return Y, s

In [None]:
X_chars = book_data[:seq_length]
Y_chars = book_data[1:seq_length+1]

In [None]:
X_chars

In [None]:
Y_chars

In [None]:
def encode_string(chars):
    M = []
    for i in range(len(chars)):
        M.append(encode_char(chars[i]))
    M = torch.tensor(M, dtype=torch.double).t()
    return M

In [None]:
X = encode_string(X_chars)
Y = encode_string(Y_chars)

In [None]:
print(X.shape, Y.shape)

In [None]:
Y0, s0 = synthetize_seq(RNN, RNN['h0'], X[:,0], 200)
print(s0)

In [None]:
def forward(rnn, X, hprev):
    ht = hprev.clone()
    indexes = []
    P = torch.zeros((K, seq_length), dtype=torch.double)
    A = torch.zeros((m, seq_length), dtype=torch.double)
    H = torch.zeros((m, seq_length), dtype=torch.double)
    for i in range(seq_length):
        xt = X[:, i].reshape((K, 1))
        at = torch.mm(rnn['W'], ht) + torch.mm(rnn['U'], xt) + rnn['b']
        ht = torch.tanh(at)
        ot = torch.mm(rnn['V'], ht) + rnn['c']
        pt = F.softmax(ot, dim=0)

        H[:, i] = ht.squeeze()
        P[:, i] = pt.squeeze()
        A[:, i] = at.squeeze()
        cp = torch.cumsum(pt, dim=0)
        a = torch.rand(1)
        ixs = torch.where(cp - a > 0)
        ii = ixs[0][0].item()
        indexes.append(ii)

    Y_pred = []
    for idx in indexes:
        oh = [0]*K
        oh[idx] = 1
        Y_pred.append(oh)
    Y_pred = torch.tensor(Y_pred, dtype=torch.double).t()

    s_pred = ''
    for i in range(Y_pred.shape[1]):
        idx = torch.where(Y_pred[:, i] == 1)[0].item()
        s_pred += ind_to_char[idx]

    return s_pred, Y_pred, A, H, P, ht

In [None]:
s_pred, Y_pred, A, H, P, ht = forward(RNN, X, RNN['h0'])
print(s_pred, Y_pred.shape, A.shape, H.shape, P.shape, ht.shape, sep='\n')

In [None]:
def compute_loss(Y, P):
    log_probs = torch.log(P)
    cross_entropy = -torch.sum(Y * log_probs)
    loss = cross_entropy.item()
    return loss

In [None]:
print(compute_loss(Y, P))

In [None]:
def backward(rnn, X, Y, A, H, P, hprev):
    dA = torch.zeros_like(A)
    dH = torch.zeros_like(H)

    G = -(Y - P)
    dV = torch.matmul(G, H.t())
    dhtau = torch.matmul(G[:, -1], rnn['V'])
    datau = (1 - torch.pow(torch.tanh(A[:, -1]), 2)) * dhtau
    dH[:, -1] = dhtau.squeeze()
    dA[:, -1] = datau.squeeze()

    for i in range(seq_length - 2, -1, -1):
        dht = torch.matmul(G[:, i], rnn['V']) + torch.matmul(dA[:, i+1].reshape(1, -1), rnn['W'])
        dat = (1 - torch.pow(torch.tanh(A[:, i]), 2)) * dht
        dH[:, i] = dht.squeeze()
        dA[:, i] = dat.squeeze()

    Hd = torch.cat((hprev, H[:, :-1]), dim=1)
    dW = torch.matmul(dA, Hd.t())
    dU = torch.matmul(dA, X.t())
    dc = G.sum(1).reshape((-1, 1))
    db = dA.sum(1).reshape((-1, 1))
    grads = {'U': dU, 'W': dW, 'V': dV, 'c': dc, 'b': db}
    grads_clamped = {k: torch.clamp(v, min=-5.0, max=5.0) for (k,v) in grads.items()}
    return grads, grads_clamped

In [None]:
grads, grads_clamped = backward(RNN, X, Y, A, H, P, ht)
print(*list(map(lambda v: grads_clamped[v].shape, grads_clamped)), sep='\n')

In [None]:
def ComputeGradNum(X, Y, param_name, rnn, h=1e-4):
    """
    Compute the numerical gradient of the rnn's parameter specified by param_name.
    """
    grad = torch.zeros_like(rnn[param_name])
    hprev = torch.zeros(rnn['W'].shape[0], 1, dtype=torch.double)
    n = torch.numel(rnn[param_name])
    
    for i in range(n):
        old_val = rnn[param_name].view(-1)[i].item()
        rnn[param_name].view(-1)[i] = old_val - h
        s_pred, Y_pred, A, H, P, ht = forward(rnn, X, hprev)
        l1 = compute_loss(Y, P)
        
        rnn[param_name].view(-1)[i] = old_val + h
        s_pred, Y_pred, A, H, P, ht = forward(rnn, X, hprev)
        l2 = compute_loss(Y, P)
        
        grad.view(-1)[i] = (l2 - l1) / (2 * h)
        rnn[param_name].view(-1)[i] = old_val  # Reset to original value

    return grad

def ComputeGradsNum(X, Y, rnn, h=1e-4):
    num_grads = {}
    for param_name in rnn:
        if param_name != 'h0':
            print('Computing numerical gradient for')
            print(f'Field name: {param_name}')
            num_grads[param_name] = ComputeGradNum(X, Y, param_name, rnn, h)
    return num_grads

In [None]:
"""
num_grads = ComputeGradsNum(X, Y, RNN, 1e-4)
print("-------- Gradient validation --------")
print("Max diff for gradient of b:", torch.max(torch.abs(num_grads['b'] - grads['b'])).item())
print("Max diff for gradient of c:", torch.max(torch.abs(num_grads['c'] - grads['c'])).item())
print("Max diff for gradient of W:", torch.max(torch.abs(num_grads['W'] - grads['W'])).item())
print("Max diff for gradient of U:", torch.max(torch.abs(num_grads['U'] - grads['U'])).item())
print("Max diff for gradient of V:", torch.max(torch.abs(num_grads['V'] - grads['V'])).item())
"""

In [None]:
e, step, epoch = 0, 0, 0
n_epochs = 10
smooth_loss = 0
losses = []
hprev = RNN['h0']

mb = torch.zeros_like(RNN['b'], dtype=torch.float)
mc = torch.zeros_like(RNN['c'], dtype=torch.float)
mU = torch.zeros_like(RNN['U'], dtype=torch.float)
mV = torch.zeros_like(RNN['V'], dtype=torch.float)
mW = torch.zeros_like(RNN['W'], dtype=torch.float)
ms = {'b': mb, 'c': mc, 'U': mU, 'V': mV, 'W': mW}

while epoch < n_epochs:
    X_chars = book_data[e:e+seq_length]
    Y_chars = book_data[e+1:e+seq_length+1]
    X_train = encode_string(X_chars)
    Y_train = encode_string(Y_chars)

    _, _, A_train, H_train, P_train, ht = forward(RNN, X_train, hprev)
    loss = compute_loss(Y_train, P_train)
    grads, grads_clamped = backward(RNN, X_train, Y_train, A_train, H_train, P_train, hprev)

    for k in ms.keys():
        ms[k] += grads_clamped[k]**2
        RNN[k] -= (eta/torch.sqrt(ms[k] + epsilon))*grads_clamped[k]

    if step == 0:
        smooth_loss = loss
    else:
        smooth_loss = 0.999*smooth_loss + 0.001*loss

    losses.append(smooth_loss)

    if step % 1000 == 0:
        print(f"Step: {step}")
        print(f"\t * Smooth loss: {smooth_loss}")
    if step % 5000 == 0:
        _, s_syn = synthetize_seq(RNN, hprev, X_train[:, 0], 200)
        print("-" * 100)
        print(f"Synthetized sequence: \n{s_syn}")
        print("-" * 100)
    if step % 100000 == 0 and step > 0:
        _, s_lsyn = synthetize_seq(RNN, hprev, X_train[:, 0], 1000)
        print("-" * 100)
        print(f"Long synthetized sequence: \n{s_lsyn}")
        print("-" * 100)

    step += 1
    e += seq_length
    if e > len(book_data) - seq_length:
        e = 0
        epoch += 1
        hprev = RNN['h0']
    else:
        hprev = ht