In this task we implement a very simple version of GPT ourselves.

## Grammar Description

This grammar generates sentences where a subject (a cat, dog, or bird) performs an action (jumps, runs, or flies) and either goes over another noun or goes through the air.

The grammar rules are:

- `<start>` ::= `<sentence>`
- `<sentence>` ::= `<subject>` `<verb>` `<object>` '.'
- `<subject>` ::= 'the' `<noun>`
- `<noun>` ::= 'cat' | 'dog' | 'bird'
- `<verb>` ::= 'jumps' | 'runs' | 'flies'
- `<object>` ::= 'over' 'the' `<noun>` | 'through' 'the' 'air'

### Example Sentences

- The dog runs over the cat.
- The bird flies through the air.
- The cat jumps over the bird.
- The dog runs through the air.
- The bird flies over the dog.

In [None]:
import random

def generate_sentence():
    sentence = generate_subject() + generate_verb() + generate_object() + '.'
    return sentence

def generate_subject():
    return 'the ' + random.choice(['cat', 'dog', 'bird']) + ' '

def generate_verb():
    return random.choice(['jumps', 'runs', 'flies']) + ' '

def generate_object():
    if random.random() < 0.5:
        return 'over the ' + random.choice(['cat', 'dog', 'bird']) + ' '
    else:
        return 'through the air '

# Generate and save 100 sentences to a text file
with open('sentences.txt', 'w') as f:
    for i in range(100):
        sentence = generate_sentence()
        f.write(sentence + '\n')
        if i % 10 == 0:
            print(sentence)

Next we create a dataloader loading the created dataset:

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class SentenceDataset(Dataset):
    def __init__(self, filename, tokenizer, max_len=10):
        with open(filename, 'r') as f:
            self.sentences = [line.strip() for line in f.readlines()]
        
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        sentence = '<S> ' + self.sentences[idx] + ' <EOS>'
        words = sentence.split()
        words = words[:self.max_len] + ['<EOS>'] * max(0, self.max_len - len(words))
        return torch.tensor([self.tokenizer[word] for word in words])

Now we implement the actual GPT model:

In [None]:
import torch
import math
import torch.nn as nn
import torch.nn.functional as F

class GPT(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_heads, num_layers):
        super().__init__()

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # Multi-head attention layers
        self.num_heads = num_heads
        self.attention_layers = nn.ModuleList([
            nn.MultiheadAttention(embed_dim=embedding_dim, num_heads=num_heads, batch_first=True) 
            for _ in range(num_layers)
        ])
        self.layer_norms1 = nn.ModuleList([
            torch.nn.LayerNorm(embedding_dim)
            for _ in range(num_layers)
        ])

        # Position-wise feedforward layers
        self.feedforward_layers = nn.ModuleList([
            nn.Sequential(
                nn.Linear(embedding_dim, embedding_dim),
                nn.ReLU(),
            )
            for _ in range(num_layers)
        ])
        self.layer_norms2 = nn.ModuleList([
            torch.nn.LayerNorm(embedding_dim)
            for _ in range(num_layers)
        ])

        self.unembed = nn.Linear(embedding_dim, vocab_size)

    def forward(self, input_ids):
        # Embed input sequence
        x = self.embedding(input_ids)

        # Calculate self-attention for each layer
        for attention_layer, feedforward_layer, layer_norm1, layer_norm2 in zip(self.attention_layers, self.feedforward_layers, self.layer_norms1, self.layer_norms2):
            pos_enc = torch.zeros(x.shape[1], x.shape[-1])
            # calculate the position and dimension values for each element in the matrix
            pos = torch.arange(x.shape[1], dtype=torch.float).unsqueeze(1)
            div = torch.exp(
                torch.arange(0, x.shape[-1], 2).float() *
                (-math.log(10000.0) / x.shape[-1])
            )
            # apply the sin/cos formula to each element in the matrix
            pos_enc[:, 0::2] = torch.sin(pos * div)
            pos_enc[:, 1::2] = torch.cos(pos * div)
            pos_enc = pos_enc.unsqueeze(0)

            x = x + pos_enc.to(x)
            # TODO create a mask to prevent the model from attending to future tokens and apply the attention layer and save the result in attn_output - 10 points
            
            
            attn_output = attn_output + x
            attn_output = layer_norm1(attn_output)

            # Position-wise feedforward layer
            ff_output = feedforward_layer(attn_output)
            ff_output = ff_output + attn_output
            ff_output = layer_norm2(ff_output)

            # Update input embeddings for next layer
            x = ff_output

        return self.unembed(x)



We implement and run the training loop:

In [None]:
import torch.optim as optim

# Define training hyperparameters
learning_rate = 0.001
batch_size = 32
num_epochs = 10

# Define tokenizer and GPT model
tokenizer = {"<S>": 0, "<EOS>": 1, "the": 2, "dog": 3, "cat": 4, "bird": 5, "flies": 6, "jumps": 7, "through": 8, "air": 9, "runs": 10, "over":11, ".": 12}
gpt_model = GPT(vocab_size=len(tokenizer), embedding_dim=64, num_heads=4, num_layers=4)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(gpt_model.parameters(), lr=learning_rate)

# Load training data
training_data = SentenceDataset("sentences.txt", tokenizer)

# Define dataloader
dataloader = torch.utils.data.DataLoader(training_data, batch_size=batch_size, shuffle=True)

# Train model
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, batch in enumerate(dataloader):
        # Zero gradients
        optimizer.zero_grad()

        # Forward pass
        # TODO create inputd_ids and target_ids based on the current batch for parallel training over time steps - 10 points

        outputs = gpt_model(input_ids)

        # Compute loss and backpropagate
        loss = criterion(outputs.permute(0, 2, 1), target_ids)
        loss.backward()
        optimizer.step()

        # Print loss statistics
        running_loss += loss.item()

    print(f"Epoch {epoch + 1}, Loss: {running_loss / 10}")
    running_loss = 0.0


Write a method for sampling a sentence from the model:

In [None]:
def sample_sentence(model, tokenizer, max_len=10):
    # TODO implement sentence sampling for the given model that returns the sentence as a string - 30 points

    return sentence

Now we sample 10 sentences from GPT:

In [None]:
def sample_sentences(model, tokenizer, num_sentences=10, max_len=10):
    # Generate num_sentences sentences
    sentences = []
    for i in range(num_sentences):
        sentence = sample_sentence(model, tokenizer, max_len=max_len)
        sentences.append(sentence)

    return sentences

# Generate 10 sample sentences
sampled_sentences = sample_sentences(gpt_model, tokenizer, num_sentences=10)

for sentence in sampled_sentences:
    print(sentence)