In [None]:
import numpy as np
import tqdm as tqdm

In [None]:
##### Data #####
data = """Hi nice to see you! My name is Ryan""".lower()

chars = set(data)

data_size, char_size = len(data), len(chars)

print(f'Data size: {data_size}, Char Size: {char_size}')

char_to_idx = {c:i for i, c in enumerate(chars)}
idx_to_char = {i:c for i, c in enumerate(chars)}

X_train, y_train = data[:-1], data[1:]

Data size: 35, Char Size: 15


In [None]:
##### Helper Functions #####

# one hot encoding
def oneHotEncoding(char):
  onehot = np.zeros((char_size, 1))
  onehot[char_to_idx[char]] = 1
  return onehot

# Xavior Normalized initialization
def initWeights(input_size, output_size):
  return np.random.uniform(-1, 1, (output_size, input_size)) * np.sqrt(6 / (input_size + output_size))

##### Activation Function #####

# sigmoid
def sigmoid(x, derivative=False):
  if derivative:
    return x * (1 - x) # x is already in sigmoid function

  return 1 / (1 + np.exp(-x))

# hyperbolic tangent
def tanh(x, derivative=False):
  if derivative:
    return 1 - x**2 # x is already in hyperbolic tangent function

  return np.tanh(x)

# softmax
def softmax(x):
  return np.exp(x) / np.sum(np.exp(x)) # return a list

In [None]:
class LSTM():
  def __init__(self, input_size, hidden_size, output_size, learning_rate, epochs):
    self.hidden_size = hidden_size
    self.learning_rate = learning_rate
    self.epochs = epochs

    # input gate
    self.Wi = initWeights(input_size+hidden_size, hidden_size)
    self.bi = np.zeros((hidden_size, 1))
    # forget gate
    self.Wf = initWeights(input_size+hidden_size, hidden_size)
    self.bf = np.zeros((hidden_size, 1))
    # output gate
    self.Wo = initWeights(input_size+hidden_size, hidden_size)
    self.bo = np.zeros((hidden_size, 1))
    # gate gate
    self.Wg = initWeights(input_size+hidden_size, hidden_size)
    self.bg = np.zeros((hidden_size, 1))
    # final gate
    self.Wy = initWeights(hidden_size, output_size)
    self.by = np.zeros((output_size, 1))

  def forward(self, inputs, targets=None):
    hidden = {-1:np.zeros((self.hidden_size,1))}
    context = {-1:np.zeros((self.hidden_size,1))}

    concat_inputs = {}
    forget_gates = {}
    input_gates = {}
    output_gates = {}
    gate_gates = {}

    outputs = []
    loss = 0
    for i in range(len(inputs)):
      concat_inputs[i] = np.concatenate((hidden[i-1], inputs[i])) # (hidden_size + input_size, 1)

      forget_gates[i] = sigmoid(np.dot(self.Wf, concat_inputs[i]) + self.bf)
      input_gates[i] = sigmoid(np.dot(self.Wi, concat_inputs[i]) + self.bi)
      output_gates[i] = sigmoid(np.dot(self.Wo, concat_inputs[i]) + self.bo)
      gate_gates[i] = tanh(np.dot(self.Wg, concat_inputs[i]) + self.bg)

      context[i] = forget_gates[i] * context[i-1] + input_gates[i] * gate_gates[i]
      hidden[i] = output_gates[i] * tanh(context[i])

      # real outputs
      outputs += [np.dot(self.Wy, hidden[i]) + self.by]
      softmax_outputs = softmax(outputs[i])

      if targets:
        loss += -np.log(softmax_outputs[targets[i]==1][0])

    if targets:
      loss /= len(inputs)
    cache = (hidden, context, concat_inputs, input_gates, forget_gates, output_gates, gate_gates)

    return outputs, loss, cache

  def backward(self, inputs, douts, cache):
    hidden, context, concat_inputs, input_gates, forget_gates, output_gates, gate_gates = cache

    dWy, dby = 0, 0
    dWo, dbo = 0, 0
    dWf, dbf = 0, 0
    dWi, dbi = 0, 0
    dWg, dbg = 0, 0

    for i in reversed(range(len(inputs))):
      dWy += np.dot(douts[i], hidden[i].T)
      dby += douts[i]

      dh_t = np.dot(self.Wy.T, douts[i])  # dhidden_t
      do = dh_t * tanh(context[i], derivative=False)
      do_before_sigmoid = sigmoid(output_gates[i], derivative=True)
      dbo += do * do_before_sigmoid
      dWo += np.dot(do * do_before_sigmoid, concat_inputs[i].T)

      dc_t = tanh(tanh(context[i]), derivative=True) # dcontext_t
      di = dc_t * gate_gates[i]
      di_before_sigmoid = sigmoid(input_gates[i], derivative=True)
      dbi += di * di_before_sigmoid
      dWi += np.dot(di * di_before_sigmoid, concat_inputs[i].T)

      df = dc_t * context[i-1]
      df_before_sigmoid = sigmoid(forget_gates[i], derivative=True)
      dbf += df * df_before_sigmoid
      dWf += np.dot(df * df_before_sigmoid, concat_inputs[i].T)

      dg = dc_t * input_gates[i]
      dg_before_tanh = tanh(gate_gates[i], derivative=True)
      dbg += dg * dg_before_tanh
      dWg += np.dot(dg * dg_before_tanh, concat_inputs[i].T)

      dc_t_1 = dc_t * forget_gates[i] # dcontext_t-1
      dconcat = np.dot(self.Wi.T, di) + np.dot(self.Wf.T, df) + np.dot(self.Wo.T, do) + np.dot(self.Wg.T, dg)
      dh_t_1 = dconcat[:self.hidden_size] # dhidden_t-1

    # update using gradient descent
    self.Wy -= self.learning_rate * dWy
    self.by -= self.learning_rate * dby
    self.Wf -= self.learning_rate * dWf
    self.bf -= self.learning_rate * dbf
    self.Wi -= self.learning_rate * dWi
    self.bi -= self.learning_rate * dbi
    self.Wo -= self.learning_rate * dWo
    self.bo -= self.learning_rate * dbo
    self.Wg -= self.learning_rate * dWg
    self.bg -= self.learning_rate * dbg

  def train(self, inputs, targets):
    onehot_inputs = [oneHotEncoding(input) for input in inputs] # one hot encoding for inputs
    onehot_targets = [oneHotEncoding(target) for target in targets] # one hot encoding for targets

    for epo in range(self.epochs):
      outputs, loss, cache = self.forward(onehot_inputs, onehot_targets)
      if epo % 100 == 0:
        msg = f"epoch = {epo}, loss = {loss}"
        print(msg)

      douts = []
      for i in range(len(outputs)):
        prob = softmax(outputs[i])
        k = np.argwhere(onehot_targets[i]==1)[0] # ex. k = [[1,0]]
        prob[k[0]][k[1]] -= 1 # dscore
        douts.append(prob)

      self.backward(onehot_inputs, douts, cache)

  def test(self, test_inputs):
    onehot_test_inputs = [oneHotEncoding(input) for input in test_inputs] # one hot encoding for test inputs

    outputs, _ , _ = self.forward(onehot_test_inputs)
    output_sentence = ''
    for i in range(len(outputs)):
      prob = softmax(outputs[i])
      idx = np.random.choice([*range(len(prob))], p = prob.reshape(-1))
      output_sentence += idx_to_char[idx]

    print(output_sentence)

In [None]:
hidden_size = 50
LSTM  = LSTM(input_size=char_size, hidden_size=hidden_size, output_size=char_size, learning_rate=0.05, epochs=5000)
LSTM.train(X_train, y_train)

epoch = 0, loss = 2.715551027951377
epoch = 100, loss = 1.2337891451164413
epoch = 200, loss = 0.45910197262826985
epoch = 300, loss = 0.14760032668903716
epoch = 400, loss = 0.06988381978472889
epoch = 500, loss = 0.039789174185169536
epoch = 600, loss = 0.026377071396366778
epoch = 700, loss = 0.01926072096497533
epoch = 800, loss = 0.014960101927254936
epoch = 900, loss = 0.012121088131169409
epoch = 1000, loss = 0.010125090353813867
epoch = 1100, loss = 0.008654323994921746
epoch = 1200, loss = 0.00753056998598098
epoch = 1300, loss = 0.006646886632895559
epoch = 1400, loss = 0.005935564198458209
epoch = 1500, loss = 0.005351820080635571
epoch = 1600, loss = 0.004864943950238699
epoch = 1700, loss = 0.004453223682779984
epoch = 1800, loss = 0.004100903966734013
epoch = 1900, loss = 0.0037962921129027754
epoch = 2000, loss = 0.00353053873813585
epoch = 2100, loss = 0.003296829964385403
epoch = 2200, loss = 0.0030898385384734574
epoch = 2300, loss = 0.00290534242926737
epoch = 2400, 

In [None]:
LSTM.test(X_train)

i nice to see you! my name is ryan
