# DD2424 Deep Learning in Data Science - Lab 4

## Diar Sabri - July 2021

In [None]:
# imports
import numpy as np
import pickle
import decimal
from google.colab import drive
drive.mount('/content/drive')
import copy
import matplotlib.pyplot as plt
import math

In [None]:
text = open('/content/drive/MyDrive/Colab Notebooks/goblet-of-fire/goblet_book.txt', 'r', encoding='utf8').read()
chars = list(set(text))
char_to_ind = {
    char: i for i, char in enumerate(chars)
}
ind_to_char = {
    idx: char for idx, char in enumerate(chars)
}

onehot = np.zeros((len(chars), len(text)))
for i, chr in enumerate(text):
  label = char_to_ind[chr]
  onehot[label, i] = 1

all_X = onehot
all_Y = np.roll(all_X, -1, axis=1)

In [None]:
def InitalizeLayers(K, m=100, sig=0.01):
  NetParams = {
    'b': np.zeros((m, 1)),
    'c': np.zeros((K, 1)),
    'U': np.random.normal(0, sig, size=(m, K)),
    'W': np.random.normal(0, sig, size=(m, m)),
    'V': np.random.normal(0, sig, size=(K, m)),
    'm': m,
    'K': K,
  }

  return NetParams

In [None]:
def ForwardPass(X, h0, NetParams):
  n = X.shape[1]
  a, h, o, P = [], [h0], [], []
  for t in range(n):
    a.append((NetParams['W'] @ h[t] + NetParams['U'] @ X[:, [t]] + NetParams['b']))
    h.append(np.tanh(a[t]))
    o.append((NetParams['V'] @ h[t+1] + NetParams['c']))
    P.append(np.exp(o[t]) / np.sum(np.exp(o[t]), axis=0)) #sm

  RNNParams = {
      'a': np.squeeze(np.asarray(a)).T,
      'h': np.squeeze(np.asarray(h)).T,
      'o': np.squeeze(np.asarray(o)).T,
      'P': np.squeeze(np.asarray(P)).T,
  }

  return RNNParams

In [None]:
def BackwardPass(X, Y, NetParams, RNNParams):
  n = X.shape[1]
  grad_o = -(Y - RNNParams['P']).T
  grad_V = grad_o.T @ RNNParams['h'][:, 1:].T
  
  grad_h = [None for _ in range(n-1)] + [grad_o[-1] @ NetParams['V']]
  grad_a = [None for _ in range(n-1)] + [grad_h[-1] @ np.diag(1 - np.square(np.tanh(RNNParams['a'][:, -1])))]
  for t in reversed(range(n-1)):
    grad_h[t] = grad_o[t] @ NetParams['V'] + grad_a[t+1] @ NetParams['W']
    grad_a[t] = grad_h[t] @ np.diag(1 - np.square(np.tanh(RNNParams['a'][:, t])))

  grad_W = np.asarray(grad_a).T @ RNNParams['h'][:, :-1].T
  grad_U = np.asarray(grad_a).T @ X.T

  grad_b = np.sum(grad_a, axis=0, keepdims=True).T
  grad_c = np.sum(grad_o, axis=0, keepdims=True).T

  grads = {
    'b': grad_b,
    'c': grad_c,
    'U': grad_U,
    'W': grad_W,
    'V': grad_V,
  }

  for grad in grads:
    grads[grad] = np.clip(grads[grad], -5, 5)

  return grads

In [None]:
def SynthesizeText(x0, h0, text_len, NetParams):
  text = ''
  hprev = h0
  x = x0
  for t in range(text_len):
    RNNParams = ForwardPass(x, hprev, NetParams)
    hprev = RNNParams['h'][:, [-1]]
    ix = np.random.choice(range(NetParams['K']), p=RNNParams['P'].flat)
    x = np.zeros((NetParams['K'], 1))
    x[ix] = 1
    char = ind_to_char[ix]

    text += char
  
  return text

In [None]:
def GenerateText():
  text_len = 15
  starting_char = ind_to_char[0]
  print('Starting char: ' + starting_char)

  K = len(ind_to_char)
  NetParams = InitalizeLayers(K)
  h0 = np.zeros((NetParams['m'], 1))
  x0 = np.zeros((K, 1))
  x0[0] = 1
  return SynthesizeText(x0, h0, text_len, NetParams)

In [None]:
GenerateText()

# Gradients

In [None]:
def ComputeGradsNumSlow(X, Y, NetParams, h):

  grads = {
      'b': np.zeros(NetParams['b'].shape),
      'c': np.zeros(NetParams['c'].shape),
      'U': np.zeros(NetParams['U'].shape),
      'W': np.zeros(NetParams['W'].shape),
      'V': np.zeros(NetParams['V'].shape),
  }

  h0 = np.zeros((NetParams['m'], 1))
  for par in grads.keys():
    for i in range(NetParams[par].size):
      NetParams_try = copy.deepcopy(NetParams)
      NetParams_try[par].flat[i] -= h
      RNNParams_try = ForwardPass(X, h0, NetParams_try)
      c1 = -np.sum(Y*np.log(RNNParams_try['P'])) #l
      
      NetParams_try = copy.deepcopy(NetParams)
      NetParams_try[par].flat[i] += h
      RNNParams_try = ForwardPass(X, h0, NetParams_try)
      c2 = -np.sum(Y*np.log(RNNParams_try['P'])) #l
      
      grads[par].flat[i] = (c2 - c1) / (2*h)
    
  return grads

In [None]:
def CompareGradients():
  h = 1e-4
  tolerance = 1e-6

  X, Y = all_X[:, :25], all_Y[:, :25]
  K = len(ind_to_char)

  NetParams = InitalizeLayers(K, m=5, seq_len=25, eta=0.1, sig=0.01)

  h0 = np.zeros((NetParams['m'], 1))

  RNNParams = ForwardPass(X, h0, NetParams)
  loss = -np.sum(Y*np.log(RNNParams['P']))
  grads = BackwardPass(X, Y, NetParams, RNNParams)

  grads_num = ComputeGradsNumSlow(X, Y, NetParams, h)

  for par in grads.keys():
    equal = True
    for i in range(grads[par].size):
      elem = grads[par].flat[i]
      elem_num = grads_num[par].flat[i]

      diff = abs(elem - elem_num)
      if diff >= tolerance:
        print(par, elem, elem_num, diff)
        equal = False
    
    print(par, equal)

  return grads, grads_num

In [None]:
grads_a, grads_n = CompareGradients()

# Train on Goblet of Fire 

In [None]:
def SGC(tra_X, tra_Y, GDP, NetParams):

  mem = {
      'b': np.zeros((NetParams['m'], 1)),
      'c': np.zeros((NetParams['K'], 1)),
      'U': np.zeros((NetParams['m'], NetParams['K'])),
      'W': np.zeros((NetParams['m'], NetParams['m'])),
      'V': np.zeros((NetParams['K'], NetParams['m'])),
  }
  smooth_loss = []
  step = 0
  for i in range(GDP['epochs']):
    hprev = np.zeros((NetParams['m'], 1))
    for j in range(tra_X.shape[1] // GDP['seq_len']):
      X_batch = tra_X[:, j * GDP['seq_len'] : (j+1) * GDP['seq_len']]
      Y_batch = tra_Y[:, j * GDP['seq_len'] : (j+1) * GDP['seq_len']]

      RNNParams = ForwardPass(X_batch, hprev, NetParams)
      h_prev = RNNParams['h'][:, [-1]]
      grads = BackwardPass(X_batch, Y_batch, NetParams, RNNParams)

      for par in grads.keys():
        mem[par] += np.square(grads[par])
        NetParams[par] -= GDP['eta'] * grads[par] / np.sqrt(mem[par] + np.finfo(float).eps)

      loss = -np.sum(Y_batch*np.log(RNNParams['P']))
      if step == 0:
        smooth_loss.append((step, loss))
      else:
        smooth_loss.append((step, 0.999 * smooth_loss[-1][1] + 0.001 * loss))

      if step % 1000 == 0: 
        print('Step ' + str(step) + ': ' + str(smooth_loss[-1]))

      if step % 10000 == 0:
        txt = SynthesizeText(X_batch[:, [0]], hprev, 200, NetParams)
        print(txt)
      
      step += 1

  return NetParams, smooth_loss

In [None]:
def TrainGOF():
  K = len(ind_to_char)
  m = 100
  sigma = 0.01
  GDP = {
      'eta': 0.1,
      'epochs': 10,
      'seq_len': 25,
  }

  NetParams = InitalizeLayers(K, m, sigma)
  NetParams, loss = SGC(all_X, all_Y, GDP, NetParams)

  # Syntesize a long text with the final settings
  text_len = 1000
  h0 = np.zeros((m, 1))
  x0 = np.zeros((K, 1))
  x0[0] = 1
  txt = SynthesizeText(x0, h0, 1000, NetParams)

  print('Best text: ' + txt)

  return NetParams, loss, txt

In [None]:
def PlotLoss(loss):

  tra_X = [x for x, y in loss]
  tra_Y = [y for x, y in loss]

  plt.plot(tra_X, tra_Y, label='Loss')
  plt.legend()

  plt.show()

In [None]:
final_model, final_loss, final_txt = TrainGOF()
PlotLoss(final_loss)