In [3]:
from utils import *
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

Question 6

In [4]:

def padding(x, w2i, batch_size=15):
    batches_x = []

    # step over x met steps of batch_size
    for i in range(0, len(x), batch_size):

        start = i
        end = i + batch_size

        # get the batch
        batch_x = x[start:end]
        batch = []

        # Adding start/end
        for sentence in batch_x:
            sentence.insert(0, w2i['.start'])
            sentence.append(w2i['.end'])

        for i, sentence in enumerate(batch_x):
            longest_sentence = max([len(sentence) for sentence in batch_x])
            if len(sentence) < longest_sentence:
                sentence += [w2i['.pad']] * (longest_sentence - len(sentence))
            # print(len(sentence))
            batch.append(sentence)

        batches_x.append(batch)

    # transform all batches to tensors
    batches_x = [torch.tensor(batch, dtype=torch.long) for batch in batches_x]

    return batches_x


In [5]:
x_train, (i2w, w2i) = load_ndfa(n=150_000)
batches_x = padding(x_train, w2i)

# Set up the model
class AutoregressiveLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim=32, hidden_size=16, num_layers=1):
        super(AutoregressiveLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        output = self.linear(lstm_out)
        return output

In [42]:


def train(batches_x, model, epochs=10, lr=0.001, optimizer='Adam', do_seed=False):

    if optimizer == 'Adam':
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer == 'SGD':
        optimizer = torch.optim.SGD(model.parameters(), lr=lr)
      
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    optimizer.zero_grad()
    # Training loop
    for epoch in range(epochs):
        losses = []
        accuracies = []
        
        
        random.shuffle(batches_x)
        for batch in batches_x:
              # Zero the gradients
            target_batch = batch[:, 1:]  # Target sequence (exclude first token)
            zeroes = torch.zeros(batch.shape[0], 1, dtype=torch.long)
            target_batch = torch.cat((target_batch, zeroes), dim=1)

            # Forward pass
            predictions = model(batch)
            # Calculate loss
            loss = criterion(predictions.reshape(-1, len(w2i)), target_batch.reshape(-1))
            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            losses.append(loss.item())
            predictions = predictions.argmax(dim=2)
            print(f'predictions: {predictions}, target: {target_batch}')
            # calculate accuracy: number of correct predictions / number of predictions
            n_correct = (predictions == target_batch).sum().item()
            accuracy = n_correct / (predictions.shape[0]*predictions.shape[1])
            print(f'accuracy: {accuracy}')
            accuracies.append(accuracy)
        
        if do_seed:
            seed(model)

        # Print the average loss for the epoch
        print('Epoch: ', epoch, 'Loss: ', np.mean(losses), 'Accuracy: ', np.mean(accuracies))  



In [30]:
model = AutoregressiveLSTM(len(w2i))
train(batches_x[:1000], model, epochs=5, lr=0.001, optimizer='Adam')

Epoch:  0 Loss:  0.14719433334185714 Accuracy:  0.98875
Epoch:  1 Loss:  6.06505567908755e-07 Accuracy:  1.0
Epoch:  2 Loss:  0.0 Accuracy:  1.0
Epoch:  3 Loss:  0.0 Accuracy:  1.0
Epoch:  4 Loss:  0.0 Accuracy:  1.0


Question 7

In [26]:
import torch.distributions as dist
def sample(lnprobs, temperature=1.0):
    if temperature == 0.0:
        return lnprobs.argmax()
    
    p = torch.nn.functional.softmax(lnprobs / temperature, dim=0)
    cd = dist.Categorical(p)
    
    return cd.sample()

In [37]:
def seed(model):
    for _ in range(10):  # Generate 10 sequences after each epoch

        seed_sequence = [w2i['.start'], w2i['('], w2i['('], w2i[')']]
        seed_input = torch.tensor([seed_sequence], dtype=torch.long)

        # Generate samples
        max_length = 30  # Maximum sequence length
        generated_sequence = seed_sequence.copy()

        while True:
            output_logits = model(seed_input)
            next_token_logits = output_logits[0, -1, :]
            next_token_index = sample(next_token_logits, temperature=0.5)  # Adjust temperature as needed

            generated_sequence.append(next_token_index.item())
            if next_token_index.item() == w2i['.end'] or len(generated_sequence) >= max_length:
                break

            seed_input = torch.tensor([generated_sequence], dtype=torch.long)

        # Convert indices back to tokens and print the generated sequence
        generated_sequence_tokens = [i2w[index] for index in generated_sequence]
        print('Generated Sequence:', ' '.join(generated_sequence_tokens))


In [60]:
x_train, (i2w, w2i) = load_brackets(n=150_000)
batches_x = padding(x_train, w2i)
print(x_train[0], x_train[1000], x_train[10000])
model = AutoregressiveLSTM(len(w2i))
# train(batches_x, model, epochs=5, lr=0.001, optimizer='Adam', do_seed=True)

[1, 4, 5, 2] [1, 4, 5, 2] [1, 4, 5, 2]
