# Custom LSTM Implementation

Relevant equations:

$
    i_t = \sigma ( W_{ii} x_t + b_{ii} + W_{hi} h_{t-1} + b_{hi} ) \\
    f_t = \sigma ( W_{if} x_t + b_{if} + W_{hf} h_{t-1} + b_{hf} ) \\
    g_t = tanh ( W_{ig} x_t + b_{ig} + W_{hg} h_{t-1} + b_{hg} ) \\
    o_t = \sigma ( W_{io} x_t + b_{io} + W_{ho} h_{t-1} + b_{ho} ) \\
    c_t = f_t \odot c_{t-1} + i_t \odot g_t \\
    h_t = o_t \odot tanh(c_t)
$

Where:

$x_t$: input to LSTM cell at time $t$\
$i_t$: input gate at time $t$\
$f_t$: forget gate at time $t$\
$g_t$: candidate gate at time $t$\
$o_t$: output gate at time $t$\
$c_t$: cell state at time $t$\
$h_t$: hidden state at time $t$

$W$: weight matrices\
$b$: bias vectors\
$\sigma$: sigmoid activation function\
$tanh$: tanh activation function\
$\odot$: elementwise multiplication

Thoughts on shapes (ignoring batch dimension for now):

- Input character sequences get one-hot-encoded into a vector of dimension (vocabulary_size).
- One-hot-vectors get transformed to n-dimensional space via embedding lookup table. Shape: (vocabulary_size) @ (vocabulary_size, embedding_dims) = (embedding_dims) 
- $W_{i*}$ matrices need to be of shape (embedding_dims, hidden_units)
- $W_{h*}$ matrices need to be of shape (hidden_units, hidden_units)
- All vectors $b_{**}$ need to be of shape (hidden_units)
- Vectors $i$, $f$, $g$, $o$, $c$ and $h$ will be of shape (hidden_units)

## init

In [1]:
import pandas as pd
import numpy as np
import torch
import os
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sqlalchemy import create_engine

device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using mps device


## hyper params

In [2]:
context_size = 24
number_of_strings = 1000
embedding_dims = 16
hidden_units = 256
max_grad = 3.0

## load data

In [3]:
vocabulary = [
    '<>', 'a', 'b', 'c', 'd', 'e', \
    'f', 'g', 'h', 'i', 'j', 'k', \
    'l', 'm', 'n', 'o', 'p', 'q', \
    'r', 's', 't', 'u', 'v', 'w', \
    'x', 'y', 'z', '.', ',', ' '
]

ctoi = {c:i for i, c in enumerate(vocabulary)}
itoc = {i:c for c, i in ctoi.items()}

engine = create_engine(f'postgresql://{os.environ["DBUSER"]}:{os.environ["DBPW"]}@localhost:5432/hn')
with engine.begin() as con:
    df = pd.read_sql(sql=f'''SELECT text FROM comments ORDER BY random() LIMIT {number_of_strings}''', con=con)
    
contexts = []
ys = []
for text in df['text'].str.lower():
    if text is not None:
        context = ['<>'] * context_size
        charlist = list(text) + ['<>']
        for char in charlist:
            if char in vocabulary:
                contexts.append([ctoi[c] for c in context])
                ys.append([ctoi[c] for c in (context[1:] + [char])])
                context = context[1:] + [char]
X = F.one_hot(torch.tensor(contexts)).float()
y = F.one_hot(torch.tensor(ys)).float()

X = X.to(device)
y = y.to(device)

## model definition

In [4]:
class EmbeddingLayer(nn.Module):
    def __init__(self, vocab_size, embedding_dims):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_dims = embedding_dims
        self.C = nn.Linear(self.vocab_size, self.embedding_dims)

    def forward(self, x):
        output = self.C(x)
        return output

class LSTM(nn.Module):
    """Custom LSTM Implementation"""

    def __init__(self, input_size, hidden_units):
        super().__init__()
        self.input_size = input_size
        self.hidden_units = hidden_units
        self.Wii = nn.Linear(self.input_size, self.hidden_units, bias=True)
        self.Wif = nn.Linear(self.input_size, self.hidden_units, bias=True)
        self.Wig = nn.Linear(self.input_size, self.hidden_units, bias=True)
        self.Wio = nn.Linear(self.input_size, self.hidden_units, bias=True)
        self.Whi = nn.Linear(self.hidden_units, self.hidden_units, bias=True)
        self.Whf = nn.Linear(self.hidden_units, self.hidden_units, bias=True)
        self.Whg = nn.Linear(self.hidden_units, self.hidden_units, bias=True)
        self.Who = nn.Linear(self.hidden_units, self.hidden_units, bias=True)

    def forward(self, x, h, c):
        i = F.sigmoid(self.Wii(x) + self.Whi(h))
        f = F.sigmoid(self.Wif(x) + self.Whf(h))
        g = F.tanh(self.Wig(x) + self.Whg(h))
        o = F.sigmoid(self.Wio(x) + self.Who(h))
        c_new = f * c + i * g
        h_new = o * F.tanh(c_new)
        return h_new, c_new
    
    
class Model(nn.Module):
    def __init__(self, vocab_size, embedding_dims, hidden_units):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding_dims = embedding_dims
        self.hidden_units = hidden_units
        self.emb = EmbeddingLayer(self.vocab_size, self.embedding_dims)
        self.lstm1 = LSTM(self.embedding_dims, self.hidden_units)
        #self.linear12 = nn.Linear(self.hidden_units, self.hidden_units)
        #self.lstm2 = LSTM(self.hidden_units, self.hidden_units)
        #self.linear23 = nn.Linear(self.hidden_units, self.hidden_units)
        #self.lstm3 = LSTM(self.hidden_units, self.hidden_units)
        self.output_layer = nn.Linear(self.hidden_units, self.vocab_size)

    def forward(self, x, h, c):
        embedding = self.emb(x)
        h_new, c_new = self.lstm1(embedding, h, c)
        #h_new, c_new = self.lstm2(F.tanh(self.linear12(h_new)), h_new, c_new)
        #h_new, c_new = self.lstm3(F.tanh(self.linear23(h_new)), h_new, c_new)
        output = F.softmax(F.tanh(self.output_layer(h_new)), dim=0)
        return output, h_new, c_new


In [5]:
model = Model(
    vocab_size=len(vocabulary),
    embedding_dims=embedding_dims,
    hidden_units=hidden_units
)
model = model.to(device)
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

print(sum(p.numel() for p in model.parameters()), 'parameters')

288782 parameters


In [None]:
number_of_examples = X.shape[0]
losses = []

for seqi in range(number_of_examples):
    h = torch.zeros(hidden_units, requires_grad=False).to(device)
    c = torch.zeros(hidden_units, requires_grad=False).to(device)
    for coni in range(context_size):
        x = X[seqi, coni, :]
        ytrue = y[seqi, coni, :]
        ypred, h, c = model(x, h, c)
    loss = loss_fn(ytrue, ypred)
    optimizer.zero_grad()
    losses.append(loss.item())
    loss.backward()
    nn.utils.clip_grad_norm_(model.parameters(), max_norm=max_grad)
    optimizer.step()
    if seqi%1000==0:
        print(f'sequence {seqi+1}: loss {np.mean(losses[-1000:]):.5f}')
