# LSTM


In [None]:
# Progress bar utility for loops (training), and numpy for arrays/ops
from tqdm import tqdm
import numpy as np

# Helper functions provided in the local `utils.py`:
# - initWeights: initialize weight matrices with small random values
# - sigmoid, tanh: activation functions (sigmoid supports derivative flag)
# - softmax: convert raw logits to probability distributions
from utils import initWeights, sigmoid, tanh, softmax

In [None]:
# Load a sample corpus (Hamlet soliloquy) and normalize to lowercase.
# This small text will be used for character-level language modeling.
data = """To be, or not to be, that is the question: Whether \
'tis nobler in the mind to suffer The slings and arrows of ou\
trageous fortune, Or to take arms against a sea of troubles A\
nd by opposing end them. To die—to sleep, No more; and by a s\
leep to say we end The heart-ache and the thousand natural sh\
ocks That flesh is heir to: 'tis a consummation Devoutly to b\
e wish'd. To die, to sleep; To sleep, perchance to dream—ay, \
there's the rub: For in that sleep of death what dreams may c\
ome, When we have shuffled off this mortal coil, Must give us\
 pause—there's the respect That makes calamity of so long lif\
e. For who would bear the whips and scorns of time, Th'oppres\
sor's wrong, the proud man's contumely, The pangs of dispriz'\
d love, the law's delay, The insolence of office, and the spu\
rns That patient merit of th'unworthy takes, When he himself \
might his quietus make""".lower()

chars = set(data)

data_size, char_size = len(data), len(chars)

print(f"Data size: {data_size}, Char Size: {char_size}")

char_to_idx = {c: i for i, c in enumerate(chars)}
idx_to_char = {i: c for i, c in enumerate(chars)}

train_X, train_y = data[:-1], data[1:]

Data size: 866, Char Size: 32


In [None]:
def oneHotEncode(text):
    # Create a column vector of zeros with length equal to vocab size
    output = np.zeros((char_size, 1))
    # Set the index corresponding to the character to 1 (one-hot)
    output[char_to_idx[text]] = 1

    return output

In [None]:
class LSTM:
    def __init__(self, input_size, hidden_size, output_size, num_epochs, learning_rate):
        # Hyperparameters: learning rate, hidden layer size, number of epochs
        self.learning_rate = learning_rate
        self.hidden_size = hidden_size
        self.num_epochs = num_epochs

        # Weight matrices and biases for the LSTM gates. The weights expect a
        # concatenated vector of previous hidden state and current input, so
        # their input dim is `input_size` and output dim is `hidden_size`.
        # Forget Gate
        self.wf = initWeights(input_size, hidden_size)
        self.bf = np.zeros((hidden_size, 1))

        # Input Gate
        self.wi = initWeights(input_size, hidden_size)
        self.bi = np.zeros((hidden_size, 1))

        # Candidate Gate (cell update proposal)
        self.wc = initWeights(input_size, hidden_size)
        self.bc = np.zeros((hidden_size, 1))

        # Output Gate
        self.wo = initWeights(input_size, hidden_size)
        self.bo = np.zeros((hidden_size, 1))

        # Final readout layer that maps hidden state to output logits over chars
        self.wy = initWeights(hidden_size, output_size)
        self.by = np.zeros((output_size, 1))

    # Reset Network Memory: clear stored states and gate activations between runs
    def reset(self):
        self.concat_inputs = {}

        # Use -1 key to represent the t-1 state before the sequence starts
        self.hidden_states = {-1: np.zeros((self.hidden_size, 1))}
        self.cell_states = {-1: np.zeros((self.hidden_size, 1))}

        # Containers to store intermediate values needed for backpropagation
        self.activation_outputs = {}
        self.candidate_gates = {}
        self.output_gates = {}
        self.forget_gates = {}
        self.input_gates = {}
        self.outputs = {}

    # Forward Propogation: process a sequence of one-hot encoded inputs
    def forward(self, inputs):
        self.reset()

        outputs = []
        for q in range(len(inputs)):
            # Concatenate previous hidden state and current input vector
            self.concat_inputs[q] = np.concatenate(
                (self.hidden_states[q - 1], inputs[q])
            )

            # Compute gate activations using learned weights and biases
            self.forget_gates[q] = sigmoid(
                np.dot(self.wf, self.concat_inputs[q]) + self.bf
            )
            self.input_gates[q] = sigmoid(
                np.dot(self.wi, self.concat_inputs[q]) + self.bi
            )
            self.candidate_gates[q] = tanh(
                np.dot(self.wc, self.concat_inputs[q]) + self.bc
            )
            self.output_gates[q] = sigmoid(
                np.dot(self.wo, self.concat_inputs[q]) + self.bo
            )

            # Update cell state and hidden state according to LSTM equations
            self.cell_states[q] = (
                self.forget_gates[q] * self.cell_states[q - 1]
                + self.input_gates[q] * self.candidate_gates[q]
            )
            self.hidden_states[q] = self.output_gates[q] * tanh(self.cell_states[q])

            # Compute raw output logits (will convert to probabilities later)
            outputs += [np.dot(self.wy, self.hidden_states[q]) + self.by]

        return outputs

    # Backward Propogation: accumulate gradients through time and update weights
    def backward(self, errors, inputs):
        d_wf, d_bf = 0, 0
        d_wi, d_bi = 0, 0
        d_wc, d_bc = 0, 0
        d_wo, d_bo = 0, 0
        d_wy, d_by = 0, 0

        # Initialize gradients for next time-step to zeros
        dh_next, dc_next = (
            np.zeros_like(self.hidden_states[0]),
            np.zeros_like(self.cell_states[0]),
        )
        for q in reversed(range(len(inputs))):
            error = errors[q]

            # Gradients for readout (final) layer
            d_wy += np.dot(error, self.hidden_states[q].T)
            d_by += error

            # Backprop into hidden state (sum of readout gradient and next-step gradient)
            d_hs = np.dot(self.wy.T, error) + dh_next

            # Output gate gradient
            d_o = (
                tanh(self.cell_states[q])
                * d_hs
                * sigmoid(self.output_gates[q], derivative=True)
            )
            d_wo += np.dot(d_o, inputs[q].T)
            d_bo += d_o

            # Cell state gradient (through tanh nonlinearity)
            d_cs = (
                tanh(tanh(self.cell_states[q]), derivative=True) * self.output_gates[q],
                *d_hs,
                +dc_next,
            )

            # Forget gate gradient
            d_f = (
                d_cs * self.cell_states[q - 1],
                *sigmoid(self.forget_gates[q], derivative=True),
            )
            d_wf += np.dot(d_f, inputs[q].T)
            d_bf += d_f

            # Input gate gradient
            d_i = (
                d_cs * self.candidate_gates[q],
                *sigmoid(self.input_gates[q], derivative=True),
            )
            d_wi += np.dot(d_i, inputs[q].T)
            d_bi += d_i

            # Candidate gate gradient
            d_c = (
                d_cs * self.input_gates[q],
                *tanh(self.candidate_gates[q], derivative=True),
            )
            d_wc += np.dot(d_c, inputs[q].T)
            d_bc += d_c

            # Gradient wrt concatenated inputs (sum of gradients from each gate)
            d_z = (
                np.dot(self.wf.T, d_f),
                +np.dot(self.wi.T, d_i),
                +np.dot(self.wc.T, d_c),
                +np.dot(self.wo.T, d_o),
            )

            # Split the gradient back into hidden-state and input parts
            dh_next = (d_z[: self.hidden_size, :],)
            dc_next = (self.forget_gates[q] * d_cs,)

        # Clip gradients to avoid exploding gradients, then apply SGD updates
        for d_ in (d_wf, d_bf, d_wi, d_bi, d_wc, d_bc, d_wo, d_bo, d_wy, d_by):
            np.clip(d_, -1, 1, out=d_)  # type: ignore

        self.wf += d_wf * self.learning_rate
        self.bf += d_bf * self.learning_rate

        self.wi += d_wi * self.learning_rate
        self.bi += d_bi * self.learning_rate

        self.wc += d_wc * self.learning_rate
        self.bc += d_bc * self.learning_rate

        self.wo += d_wo * self.learning_rate
        self.bo += d_bo * self.learning_rate

        self.wy += d_wy * self.learning_rate
        self.by += d_by * self.learning_rate

    # Train
    def train(self, inputs, labels):
        # Convert inputs (characters) to one-hot vectors for the whole sequence
        inputs = [oneHotEncode(input) for input in inputs]

        for _ in tqdm(range(self.num_epochs)):
            # Forward pass to get logits for each time-step
            predictions = self.forward(inputs)

            # Compute simple cross-entropy style errors to pass to backward
            errors = []
            for q in range(len(predictions)):
                # Start with negative softmax (probabilities) and add 1 to the true class
                errors += ([-softmax(predictions[q])],)
                errors[-1][char_to_idx[labels[q]]] += 1

            # Backpropagate errors through time
            self.backward(errors, self.concat_inputs)

    # Test
    def test(self, inputs, labels):
        # Predict the sequence by sampling from the softmax probabilities
        accuracy = 0
        probabilities = self.forward([oneHotEncode(input) for input in inputs])

        output = ""
        for q in range(len(labels)):
            prediction = idx_to_char[
                np.random.choice(
                    [*range(char_size)], p=softmax(probabilities[q].reshape(-1))
                )
            ]

            output += prediction

            if prediction == labels[q]:
                accuracy += 1

        # Show ground truth and sampled predictions and compute accuracy
        print(f"Ground Truth:\nt{labels}\n")
        print(f"Predictions:\nt{''.join(output)}\n")

        print(f"Accuracy: {round(accuracy * 100 / len(inputs), 2)}%")


In [5]:
# Initialize Network
hidden_size = 25

lstm = LSTM(
    input_size=char_size + hidden_size,
    hidden_size=hidden_size,
    output_size=char_size,
    num_epochs=1_000,
    learning_rate=0.05,
)

##### Training #####
lstm.train(train_X, train_y)

##### Testing #####
lstm.test(train_X, train_y)

100%|██████████| 1000/1000 [01:06<00:00, 15.02it/s]

Ground Truth:
to be, or not to be, that is the question: whether 'tis nobler in the mind to suffer the slings and arrows of outrageous fortune, or to take arms against a sea of troubles and by opposing end them. to die—to sleep, no more; and by a sleep to say we end the heart-ache and the thousand natural shocks that flesh is heir to: 'tis a consummation devoutly to be wish'd. to die, to sleep; to sleep, perchance to dream—ay, there's the rub: for in that sleep of death what dreams may come, when we have shuffled off this mortal coil, must give us pause—there's the respect that makes calamity of so long life. for who would bear the whips and scorns of time, th'oppressor's wrong, the proud man's contumely, the pangs of dispriz'd love, the law's delay, the insolence of office, and the spurns that patient merit of th'unworthy takes, when he himself might his quietus make

Predictions:
to be, or not to be, that is the question: whether 'tis nobler in the mind to suffer the slings and arrow


