In [None]:
import torch
import seqgen.seq_gen as g
import random

%load_ext autoreload
%autoreload 2

In [None]:
if torch.cuda.device_count():
    device="cuda"
else:
    device="cpu"
print("Device", device)

In [None]:
features, target_seqs = g.generate_synthetic_training_data(10, max_length=10, device=device, swap_times=0)
input_seqs = torch.Tensor(features[:, :, 0]).to(torch.int64)
coordinates = torch.Tensor(features[:, :, 1:])

In [None]:
features.shape, input_seqs.shape, coordinates.shape, target_seqs.shape

In [None]:
# Show the first three encoded input sequences
input_seqs[0:3]

In [None]:
# Show the coordinates of the tokens of the first input sequence
coordinates[0]

In [None]:
# Show the first three encoded output sequences
target_seqs[0:3]

## Embedding Layer

The embedding layers maps each token to a vector space of dimension $\mathbb{R}^{D_{emb}}$.
If we have an input sequence `[5,3,4]` and $D_{emb} = 2$ the output may look like this: `[[0.319, 0.841], [0.781, 0.682], [0.432,0.968]]`.

The embedding layer expects an input sequence of type `int` where each integer in the input sequence represents a class. The total number of distinct possible classes of the input sequence is called the vocabulary size $N_{vocab}$.

In [None]:
emb = torch.nn.Embedding(num_embeddings=17, embedding_dim=2).to(device)
x_emb = emb(input_seqs)
x_emb.shape

In [None]:
# Show embedding of first input sequence
x_emb[0]

In [None]:
# concatenate embeddings and coordinates
emb_cat = torch.cat([x_emb, coordinates], dim=2)
emb_cat[0]

## LSTM Layer

The LSTM layer implements recursion in a neural net. It will need three hyperparameters:
- **input_size**: This is the dimension of the input vectors that are run through the LSTM layer. If the vectors have been run to an embedding layer before input_size must be equal to the argument embedding_dim of the embedding layer
- **hidden_size**: This is the dimension of the internal state vector $h_n$, which is identical to the dimension of the cell state $c_n$ and the dimension of the output vectors $out$. The hidden size can be freely chosen by you. Small values for hidden_size may leed to underfitting, but large values can cause overfitting.
- **num_layers**: This parameter defines how many layers of LSTMs are stacked in the network. The more layers you stack the more complex patterns the LSTM is able to model, but this also comes with te risk of overfitting the data.

There is also another important parameter:
- **batch_first**: If the input tensor of the LSTM layer is of shape `(batch_size, sequence_length, embedding_dim)` you will have to set this parameter to True. Otherwise if the input is of shape `(sequence_length, embedding_dim, batch_size)` you will have to set this parameter to false.

Now let's look at the outputs of the LSTM layer:
- **output**: This is the predicted tensor of the LSTM layer which will be passed to the next layer. You may add a linear classification and a softmax layer after the LSTM layer. The output tensor is of shape `(batch_size, sequence_length, hidden_size)` if `batch_first` is set to true.
- **h_n**: Hidden state, tensor of shape `(num_layers, batch_size, hidden_size)`
- **c_n**: Cell state, tensor of shape `(num_layers, batch_size, hidden_size)`

In [None]:
hidden_size=4
lstm = torch.nn.LSTM(input_size=6, hidden_size=hidden_size, num_layers=7, batch_first=True).to(device)
lstm_output, (h_n, c_n) = lstm(emb_cat)

In [None]:
lstm_output.shape, h_n.shape, c_n.shape

## Linear classifier

After the tensors have been passed trough the LSTM layer it is time to implement a classification of these tensors. The linear layer's task is to take the output of the LSTM layer and map it to the output classes. In language models these classes would be the characters or words of the output vocabulary. There are two hyperparameters of the linear layer that we have to set:

- **in_features**: This is the dimension of the vectors that represent the words in our sequences. When these vectors come from an LSTM layer the dimension of the input features is equal to the hidden_size value of the LSTM layer.
- **out_features**: The dimension of the output vectors of the linear layer is equal to the number of characters / words of our output vocabulary. If we want to produce englisch sentences with our model and there are 5000 possible words in our vocabulary this parameter's value would be 5000.

The output of the linear layer is of shape `(batch_size, sequence_length, target_vocab_size)`

In [None]:
output_size = 5000
linear = torch.nn.Linear(in_features=hidden_size, out_features=output_size).to(device)
linear_output = linear(lstm_output)
linear_output.shape

## Softmax function

The purpose of the softmax layer is to compute a probability for each position and each word of the output vocabulary.

In [None]:
softmax = torch.nn.LogSoftmax(dim=1)
softmax_output = softmax(linear_output)
softmax_output.shape

# The Encoder

In [None]:
from seqgen.model import seq2seq_lstm
from seqgen.vocabulary import *

In [None]:
lr = 1e-2
num_layers=2
embedding_dim = 200
hidden_size=200
batch_size=50
max_length=25
bidirectional=True

load_from_checkpoint = False
checkpoint_file = "model_2022-12-24_10-29-55.pt"

vocab_in = Vocabulary(vocab_filename="seqgen/vocab_in.txt")
vocab_out = Vocabulary(vocab_filename="seqgen/vocab_out.txt")

encoder = seq2seq_lstm.EncoderRNN(vocab_size=len(vocab_in), embedding_dim=embedding_dim, num_layers=num_layers, hidden_size=hidden_size, bidirectional=bidirectional).to(features.device)
decoder = seq2seq_lstm.DecoderRNN(embedding_dim=embedding_dim, num_layers=num_layers, hidden_size=hidden_size, vocab_size=len(vocab_out), bidirectional=bidirectional).to(features.device)

# Initialize optimizer for encoder and decoder
encoder_optimizer = torch.optim.SGD(encoder.parameters(), lr=lr)
decoder_optimizer = torch.optim.SGD(decoder.parameters(), lr=lr)

# Loss function
criterion = torch.nn.NLLLoss()

# Load model weights from checkpoint
if load_from_checkpoint:
    checkpoint = torch.load(checkpoint_file)
    encoder.load_state_dict(checkpoint['encoder_model_state_dict'])
    decoder.load_state_dict(checkpoint['decoder_model_state_dict'])
    encoder_optimizer.load_state_dict(checkpoint['encoder_optimizer_state_dict'])
    decoder_optimizer.load_state_dict(checkpoint['decoder_optimizer_state_dict'])
    num_layers = checkpoint['num_layers']
    embedding_dim = checkpoint['embedding_dim']
    hidden_size = checkpoint['hidden_size']
    bidirectional = checkpoint['bidirectional']

In [None]:
# Initialize the encoder hidden state and cell state with zeros
hn = encoder.initHidden(input_seqs.shape[0], device=features.device)
cn = encoder.initHidden(input_seqs.shape[0], device=features.device)
print(hn.shape, cn.shape)

# Iterate over the sequence words and run every word through the encoder
for i in range(input_seqs.shape[1]):
    # Run the i-th word of the input sequence through the encoder.
    # As a result we will get the prediction (output), the hidden state and the cell state.
    # The hidden state and cell state will be used as inputs in the next round
    print(f"Run word {i+1} of all {input_seqs.shape[0]} sequences through the encoder")
    output, (hn, cn) = encoder(input_seqs[:, i].unsqueeze(dim=1), coordinates[:, i], (hn, cn))

In [None]:
output.shape, hn.shape, cn.shape

# The Decoder

In [None]:
loss = 0

# Iterate over words of target sequence and run words through the decoder.
# This will produce a prediction for the next word in the sequence
for i in range(0, target_seqs.size(1)):
    print(f"Run word {i+1} through decoder")
    output, (hn, cn) = decoder(
        x=target_seqs[:, i].unsqueeze(dim=1),
        coordinates=coordinates[:, i],
        hidden=(hn, cn)
    )
    loss += criterion(output.squeeze(), target_seqs[:, i])

print("LOSS", loss.item() / max_length)

In [None]:
history = []

for epoch in range(100000):
    # With a certain chance present the model the true predictions
    # instead of its own predictions in the next iteration
    use_teacher_forcing_prob = 0.5
    use_teacher_forcing = random.random() < use_teacher_forcing_prob
    
    # Get a batch of trianing data
    features, target_seqs = g.generate_synthetic_training_data(batch_size, max_length=max_length, continue_prob=0.99, device=device, swap_times=8)
    features = features.to(device)
    target_seqs = target_seqs.to(device)
    input_seqs = torch.Tensor(features[:, :, 0]).to(torch.int64)
    coordinates = torch.Tensor(features[:, :, 1:])

    # Initialize the encoder hidden state and cell state with zeros
    hn_enc = encoder.initHidden(input_seqs.shape[0], device=features.device)
    cn_enc = encoder.initHidden(input_seqs.shape[0], device=features.device)
    
    # Set gradients of all model parameters to zero
    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    # Initialize loss
    loss = 0
    
    ####################
    #     ENCODING     #
    ####################

    # Iterate over the sequence words and run every word through the encoder
    for i in range(input_seqs.shape[1]):
        # Run the i-th word of the input sequence through the encoder.
        # As a result we will get the prediction (output), the hidden state (hn) and the cell state (cn).
        # The hidden state and cell state will be used as inputs in the next round
        output, (hn_enc, cn_enc) = encoder(
            input_seqs[:, i].unsqueeze(dim=1),
            coordinates[:, i],
            (hn_enc, cn_enc)
        )
        
    ####################
    #     DECODING     #
    ####################

    # The first words that we be presented to the model is the '<start>' token
    prediction = target_seqs[:, 0]
    
    # The initial hidden state of the decoder is the final hidden state of the decoder
    hn_dec, cn_dec = hn_enc, cn_enc
    
    # Iterate over words of target sequence and run words through the decoder.
    # This will produce a prediction for the next word in the sequence
    for i in range(1, target_seqs.size(1)):
        # Run word i through decoder and get word i+1 and the new hidden state as outputs
        if use_teacher_forcing:
            output, (hn_dec, cn_dec) = decoder(
                target_seqs[:, i-1].unsqueeze(dim=1),
                coordinates[:, i-1],
                (hn_dec, cn_dec)
            )
        else:
            output, (hn_dec, cn_dec) = decoder(
                prediction.unsqueeze(dim=1),
                coordinates[:, i-1],
                (hn_dec, cn_dec)
            )

            # Get the predicted classes of the model
            topv, topi = output.topk(1)
            prediction = topi.squeeze()    
        loss += criterion(output.squeeze(), target_seqs[:, i])
    
    history.append(loss.item())
    if not epoch % 100:
        print(f"LOSS after epoch {epoch}", loss.item() / target_seqs.size(1))

    # Compute gradient
    loss.backward()

    # Update weights of encoder and decoder
    encoder_optimizer.step()
    decoder_optimizer.step()

#### Save model history

In [None]:
import pickle
from datetime import datetime

model_data = {
    "history": history,
    "lr": lr,
    "embedding_dim": embedding_dim,
    "hidden_size": hidden_size,
    "batch_size": batch_size,
    "max_length": max_length
}

now = datetime.now() # current date and time
date_time = now.strftime("%Y-%m-%d_%H-%M-%S")

torch.save({
    'epoch': epoch,
    'encoder_model_state_dict': encoder.state_dict(),
    'decoder_model_state_dict': decoder.state_dict(),
    'encoder_optimizer_state_dict': encoder_optimizer.state_dict(),
    'decoder_optimizer_state_dict': decoder_optimizer.state_dict(),
    'loss': loss,
    "history": history,
    "lr": lr,
    "embedding_dim": embedding_dim,
    "hidden_size": hidden_size,
    "batch_size": batch_size,
    "max_length": max_length,
    "num_layers": num_layers,
    "bidirectional": bidirectional,
}, "model_" + date_time + ".pt")


with open("training_" + date_time + '.pkl', 'wb') as f:
    pickle.dump(model_data, f)

## Make predictions

We run our input sequences through the model and get output seuences. Then we decode the output sequences with the Vocabulary class and get our final latex code.

In [None]:
def predict(input_seqs, coordinates, target_seqs):
    vocab_in = Vocabulary(vocab_filename="seqgen/vocab_in.txt")
    vocab_out = Vocabulary(vocab_filename="seqgen/vocab_out.txt")

    predictions = torch.zeros(target_seqs.shape)

    with torch.no_grad():
        # Initialize the encoder hidden state and cell state with zeros
        hn = encoder.initHidden(input_seqs.shape[0], device=features.device)
        cn = encoder.initHidden(input_seqs.shape[0], device=features.device)

        # Iterate over the sequence words and run every word through the encoder
        for i in range(input_seqs.shape[1]):
            output, (hn, cn) = encoder(
                input_seqs[:, i].unsqueeze(dim=1),
                coordinates[:, i],
                (hn, cn)
            )

        # Predict tokens of the target sequence by running the hidden state through
        # the decoder
        for i in range(0, target_seqs.size(1)):
            output, (hn, cn) = decoder(
                target_seqs[:, i].unsqueeze(dim=1),
                coordinates[:, i],
                (hn, cn)
            )
            # Select the indices of the most likely tokens
            predicted_char = torch.argmax(output, dim=2)
            predictions[:, i] = torch.argmax(output, dim=2).squeeze()
        
        return predictions

In [None]:
prediction = predict(input_seqs[0:1], coordinates[0:1], target_seqs[0:1])
input_seqs[0:1], prediction

In [None]:
in_swapped = g.random_swap(input_seqs[0], i=2).unsqueeze(dim=0)
coords_swapped = g.random_swap(coordinates[0], i=2).unsqueeze(dim=0)
prediction_swapped = predict(in_swapped, coords_swapped, target_seqs[0:1])
in_swapped, prediction_swapped

In [None]:
input_seqs[0:1] == in_swapped

In [None]:
prediction == prediction_swapped

In [None]:
# Pick random sequence and its prediction from the model
import random

vocab_in = Vocabulary(vocab_filename="seqgen/vocab_in.txt")
vocab_out = Vocabulary(vocab_filename="seqgen/vocab_out.txt")

predictions = predict(input_seqs, coordinates, target_seqs)

i = random.randint(0, predictions.size(0))
print("MODEL INPUT", vocab_in.decode_sequence(input_seqs[i].cpu().numpy()))
print("MODEL OUTPUT", vocab_out.decode_sequence(predictions[i].cpu().numpy()))
print("TARGET OUTPUT", vocab_out.decode_sequence(target_seqs[i][1:].cpu().numpy()))

In [None]:
prediction = vocab_out.decode_sequence(predictions[i].cpu().numpy())
prediction = list(filter(lambda x: x != '<end>', prediction))
prediction = "".join(prediction)
print("MODEL OUTPUT", prediction)