In [3]:
import numpy as np

def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

def sigmoid_prime(y):
    # y = sigmoid(x) already
    return y * (1.0 - y)

def tanh(x):
    return np.tanh(x)

def tanh_prime(y):
    # y = tanh(x) already
    return 1.0 - y**2

In [4]:
import numpy as np

# -------- Activation functions (as in the paper experiments) --------
def f(x):
    np.clip(x, -500, 500, out=x)  # prevent overflow
    return 1.0 / (1.0 + np.exp(-x))  # sigmoid

def h(x):
    return 2.0 * f(x) - 1.0          # squashed to [-1, 1]

def g(x):
    return 2.0 * h(x)                # scaled version


class LSTM1997:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.1):
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.lr = learning_rate

        # Weights initialization
        self.W_i   = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  # input gate
        self.W_in  = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  # input to cell
        self.W_out = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  # output gate
        self.W_c   = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  # memory cells
        self.W_k   = np.random.randn(output_size, hidden_size + hidden_size) * 0.1 # outputs

        # States
        self.h_prev = np.zeros(hidden_size)  # hidden activations
        self.s_c    = np.zeros(hidden_size)  # cell states
        self.y_c    = np.zeros(hidden_size)  # cell outputs
        self.caches = []                     # store all timesteps for BPTT

    def forward(self, X):
        """
        Forward pass through a sequence X of shape (T, input_size).
        Returns predictions Y of shape (T, output_size).
        """
        T = len(X)
        Y = []

        self.caches = []   # reset for new sequence
        for t in range(T):
            x_t = X[t]
            cache = {}
            y_prev = np.concatenate((x_t, self.h_prev))

            net_i = self.W_i @ y_prev
            y_i = f(net_i)

            net_in = self.W_in @ y_prev
            y_in = g(net_in)

            net_out = self.W_out @ y_prev
            y_out = f(net_out)

            net_c = self.W_c @ y_prev
            self.s_c = self.s_c + y_in * g(net_c)   # cell state
            h_s = h(self.s_c)                       # squashed state
            self.y_c = y_out * h_s                  # cell output

            y_mem_hidden = np.concatenate([self.h_prev, self.y_c])
            net_k = self.W_k @ y_mem_hidden
            y_k = f(net_k)

            self.h_prev = self.y_c   # update recurrent hidden state

            # Save everything for BPTT
            cache["y_prev"] = y_prev
            cache["net_i"], cache["y_i"] = net_i, y_i
            cache["net_in"], cache["y_in"] = net_in, y_in
            cache["net_out"], cache["y_out"] = net_out, y_out
            cache["net_c"], cache["g_net_c"] = net_c, g(net_c)
            cache["s_c"], cache["h_s"], cache["y_c"] = self.s_c.copy(), h_s, self.y_c.copy()
            cache["y_mem_hidden"], cache["net_k"], cache["y_k"] = y_mem_hidden, net_k, y_k

            self.caches.append(cache)
            Y.append(y_k)

        return np.array(Y)

    def backward(self, X, Y_true, Y_pred, truncation=5):
        """
        Backward pass with truncated BPTT.
        X: (T, input_size)
        Y_true: (T, output_size)
        Y_pred: (T, output_size)
        """
        T = len(X)

        # Initialize weight gradients
        dW_k   = np.zeros_like(self.W_k)
        dW_out = np.zeros_like(self.W_out)
        dW_in  = np.zeros_like(self.W_in)
        dW_i   = np.zeros_like(self.W_i)
        dW_c   = np.zeros_like(self.W_c)

        # Backpropagate through time
        for t in reversed(range(T)):
            cache = self.caches[t]
            y_k = cache['y_k']

            # Output layer gradient
            f_prime_k = y_k * (1 - y_k)
            if t == len(self.caches) - 1:
                e_k = f_prime_k * (Y_pred[0] - Y_true[0])
            else:
                e_k = np.zeros_like(y_k)   # no direct error from loss here
            # e_k = f_prime_k * (Y_pred[t] - Y_true[t])

            y_mem = cache['y_mem_hidden']
            dW_k += self.lr * np.outer(e_k, y_mem)

            # Propagate error into memory
            delta_mem = self.W_k.T @ e_k
            delta_h = delta_mem[:self.hidden_size]
            delta_y_c = delta_mem[self.hidden_size:]

            # Input gate error
            y_i = cache['y_i']
            f_prime_i = y_i * (1 - y_i)
            e_i = f_prime_i * delta_h
            y_prev = cache['y_prev']
            dW_i += self.lr * np.outer(e_i, y_prev)

            # Output gate error
            y_out = cache['y_out']
            f_prime_out = y_out * (1 - y_out)
            e_out = f_prime_out * (cache['h_s'] * delta_y_c)
            dW_out += self.lr * np.outer(e_out, y_prev)

            # Cell state error
            hprime_s = (1 - cache['h_s']**2) * 0.5
            e_sc = y_out * hprime_s * delta_y_c

            # Input to cell error
            g_net_c = cache['g_net_c']
            y_in = cache['y_in']
            g_prime_c = 1 - (y_in/2.0)**2  # approx derivative
            coeff_in = e_sc * g_net_c * g_prime_c
            dW_in += self.lr * (coeff_in[:, None] @ y_prev[None, :])

            # Cell input error
            gprime_net_c = 1 - (g_net_c/2.0)**2
            coeff_c = e_sc * y_in * gprime_net_c
            dW_c += self.lr * (coeff_c[:, None] @ y_prev[None, :])

            # Truncate BPTT: stop after 'truncation' steps
            if T - t > truncation:
                break

        # Update weights
        self.W_k   += dW_k
        self.W_out += dW_out
        self.W_in  += dW_in
        self.W_i   += dW_i
        self.W_c   += dW_c

In [6]:
import torch
torch.cuda.is_available()

True

In [None]:
import torch
from torch import nn

class LSTMlatest():
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.1):
        
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.lr = learning_rate

        self.Wf = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  # forget gate
        self.Wi = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  # input gate
        self.Wo = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  # output gate
        self.Wc = np.random.randn(hidden_size, input_size + hidden_size) * 0.1  #   cell gate
        self.Wy = np.random.randn(output_size, hidden_size) * 0.1

        self.dWf = np.zeros_like(self.Wf)
        self.dWi = np.zeros_like(self.Wi)
        self.dWo = np.zeros_like(self.Wo)
        self.dWc = np.zeros_like(self.Wc)
        self.dWy = np.zeros_like(self.Wy)

        self.bf = np.zeros(hidden_size)
        self.bi = np.zeros(hidden_size)
        self.bo = np.zeros(hidden_size)
        self.bc = np.zeros(hidden_size)
        self.by = np.zeros(output_size)

        self.dbf = np.zeros_like(self.bi)
        self.dbi = np.zeros_like(self.bi)
        self.dbo = np.zeros_like(self.bi)
        self.dbc = np.zeros_like(self.bi)
        self.dby = np.zeros_like(self.by)

        self.h_prev = np.zeros(hidden_size)  # hidden state
        self.c_prev = np.zeros(hidden_size)  # cell state
        self.caches = []                     # store all timesteps for BPTT



    def forward(self, x):

        concat = np.concatenate((self.h_prev, x))  # concatenate h_prev and x
        f_t = sigmoid(np.dot(self.Wf, concat) + self.bf)  # forget gate

        i_t = sigmoid(np.dot(self.Wi, concat) + self.bi)  # input gate
        c_tilda_t = np.tanh(np.dot(self.Wc, concat) + self.bc)  # candidate cell state

        C_prev = self.c_prev
        C_t = f_t * C_prev + i_t * c_tilda_t  # new cell state
        self.c_prev = C_t

        o_t = sigmoid(np.dot(self.Wo, concat) + self.bo)  # output gate
        h_t = o_t * np.tanh(C_t)  # new hidden state
        self.h_prev = h_t

        self.caches.append((concat, f_t, i_t, c_tilda_t, C_t, o_t, h_t, C_prev))  # store cache for backpropagation
        y_t = np.dot(self.Wy, h_t) + self.by

        return y_t, h_t

    def backward(self, dy, dh_next, dc_next):

        concat, f_t, i_t, c_tilda_t, C_t, o_t, h_t, C_prev = self.caches.pop()

        do_t = dh_next * np.tanh(C_t)
        dC_total = dh_next * o_t * (1 - np.power(np.tanh(C_t), 2))
        dc_t = dC_total + dc_next

        dWy = np.outer(dy, h_t)
        dby = dy
        dh = np.dot(self.Wy.T, dy) + dh_next
        self.dWy += dWy
        self.dby += dby

        df_t = dc_t * C_prev
        di_t = dc_t * c_tilda_t
        dc_tilda_t = dc_t * i_t
        dc_prev = dc_t * f_t

        df_t_raw = df_t * f_t * (1 - f_t)
        di_t_raw = di_t * i_t * (1 - i_t)
        do_t_raw = do_t * o_t * (1 - o_t)
        dc_tilda_t_raw = dc_tilda_t * (1 - np.power(c_tilda_t, 2 ))

        dWf = np.outer(df_t_raw, concat)
        dWi = np.outer(di_t_raw, concat)
        dWo = np.outer(do_t_raw, concat)
        dWc = np.outer(dc_tilda_t_raw, concat)

        self.dWc += dWc
        self.dWi += dWi
        self.dWo += dWo
        self.dWf += dWf

        dbf = df_t_raw
        dbi = di_t_raw
        dbc = dc_tilda_t_raw        
        dbo = do_t_raw

        self.dbf += dbf
        self.dbo += dbo
        self.dbc += dbc
        self.dbi += dbi
        
        dconcat = (
            np.dot(self.Wf.T, df_t_raw)
            + np.dot(self.Wi.T, di_t_raw)
            + np.dot(self.Wo.T, do_t_raw)
            + np.dot(self.Wc.T, dc_tilda_t_raw)
        )

        dh_prev = dconcat[:self.hidden_size]
        dx = dconcat[self.hidden_size:]

        return dh_prev, dc_prev, dx

    def update(self):
        for param, dparam in zip(
            [self.Wf, self.Wi, self.Wo, self.Wc, self.bf, self.bi, self.bo, self.bc, self.Wy, self.by],
            [self.dWf, self.dWi, self.dWo, self.dWc, self.dbf, self.dbi, self.dbo, self.dbc, self.dWy, self.dby],
        ):
            param -= self.lr * dparam


        for dparam in [self.dWf, self.dWi, self.dWo, self.dWc, self.dbf, self.dbi, self.dbo, self.dbc]:
            dparam.fill(0)


lstm = LSTMlatest(20, 10, 5)


In [None]:
import numpy as np

with open("shakespeare.txt", "r") as f:
    data = f.read()
    
chars = sorted(list(set(data)))
data_size, char_size = len(data), len(chars)
print(f'Data size: {data_size}, Char Size: {char_size}')

char_to_idx = {c:i for i, c in enumerate(chars)}
idx_to_char = {i:c for i, c in enumerate(chars)}

train_X = data[:-1]
train_y = data[1:]

# One-hot encoding
def char_to_onehot(c):
    vec = np.zeros(len(chars))
    vec[char_to_idx[c]] = 1
    return vec

X_seq = [char_to_onehot(c) for c in train_X]
Y_seq = [char_to_onehot(c) for c in train_y]

def sample(lstm, seed_char, length=200):
    lstm.h_prev = np.zeros(lstm.hidden_size)
    lstm.c_prev = np.zeros(lstm.hidden_size)
    x = char_to_onehot(seed_char)
    result = [seed_char]
    
    for _ in range(length):
        y_pred, _ = lstm.forward(x)
        probs = np.exp(y_pred) / np.sum(np.exp(y_pred))
        idx = np.random.choice(len(probs), p=probs.ravel())
        char = idx_to_char[idx]
        result.append(char)
        x = char_to_onehot(char)
    return ''.join(result)



Data size: 1115394, Char Size: 65


In [14]:
torch.cuda.is_available()

False

In [None]:
input_size = output_size = char_size
hidden_size = 50
lr = 0.1
epochs = 5  # increase later
seq_len = len(X_seq)
seq_length = 50  # truncated BPTT length

# Initialize LSTM
lstm = LSTMlatest(input_size, hidden_size, output_size, learning_rate=lr)

# for epoch in range(epochs):
#     lstm.h_prev = np.zeros(hidden_size)
#     lstm.c_prev = np.zeros(hidden_size)
#     lstm.caches = []

#     outputs = []
for i in range(0, len(X_seq) - seq_length, seq_length):
    X_batch = X_seq[i:i+seq_length]
    Y_batch = Y_seq[i:i+seq_length]
    
    outputs = []
    for t in range(seq_length):
        y_pred, _ = lstm.forward(X_batch[t])
        outputs.append(y_pred)

    # ---------- Forward Pass ----------
    for t in range(seq_len):
        y_pred, _ = lstm.forward(X_seq[t])
        outputs.append(y_pred)

    outputs = np.array(outputs)
    Y_true = np.array(Y_seq)

    # ---------- Compute Loss ----------
    exp_scores = np.exp(outputs - np.max(outputs, axis=1, keepdims=True))
    probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

    # Compute cross-entropy loss
    eps = 1e-8
    loss = -np.mean(np.sum(Y_true * np.log(probs + eps), axis=1))

    # Gradient of softmax + cross entropy
    dY = (probs - Y_true) / output_size

    # ---------- Compute initial gradient ----------
    # dY = 2 * (outputs - Y_true) / output_size  # dL/dy

    dh_next = np.zeros(hidden_size)
    dc_next = np.zeros(hidden_size)

    # ---------- Backward Pass (BPTT) ----------
    for t in reversed(range(seq_len)):
        dy = dY[t]
        dh_next, dc_next, _ = lstm.backward(dy, dh_next, dc_next)

    # ---------- Update ----------
    lstm.lr = 0.001  # reduce LR
    max_norm = 5

    # Before lstm.update()
    for dparam in [lstm.dWf, lstm.dWi, lstm.dWo, lstm.dWc, lstm.dbf, lstm.dbi, lstm.dbo, lstm.dbc, lstm.dWy, lstm.dby]:
        np.clip(dparam, -max_norm, max_norm, out=dparam)

    lstm.update()
    lstm.lr *= 0.97

    # if epoch % 1 == 0:
    print(f'Epoch - Loss: {loss:.6f}')
    print(sample(lstm, seed_char='t', length=200))


In [None]:
h = np.zeros(hidden_size)
c = np.zeros(hidden_size)
x = X_seq[0]  # start with first character

generated_text = ''
for _ in range(200):
    y_pred, h = lstm.forward(x)
    c = lstm.c_prev
    idx = np.argmax(y_pred)  # pick most probable character
    generated_text += idx_to_char[idx]
    x = np.zeros(char_size)
    x[idx] = 1

print("Generated text:")
print(generated_text)


Generated text:
uuuinltnltnanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rhlalanlanand

 rh
