# Imports and configuration

In [2]:
import torch
from torch import nn
from torch.nn import functional as F
import numpy as np
from matplotlib import pyplot as plt
import time
import pandas as pd
import random
from dataclasses import dataclass, field

In [3]:
# Create a config file for the cup shuffling task
@dataclass
class MASTER_CONFIG:
    # Training
    seed: int = 1337
    batch_size: int = 32
    training_split: float = 0.8
    
    max_iters = 100
    eval_interval = 10
    learning_rate = 2e-5
    eval_iters = 50
    
    # Model parameters
    dtype: torch.dtype = torch.float32
    d_model: int = 128
    n_embed: int = 1024
    l_max: int = 32
    n_heads: int = 8
    head_size: int = 128 # n_embed/n_heads?
    n_layers: int = 8
    dropout: float = 0.1
    
    # Data parameters
    n_cups: int = 3
    n_moves: int = 3
    n_samples: int = 1000

    # Tokenizer
    vocab: list[str] = field(default_factory = lambda: [])
    vocab_size: int = 0
    
    # Use CUDA or MPS if available else CPU
    if (torch.cuda.is_available()):
        device = torch.device("cuda")
        print("Using CUDA")
    elif (torch.backends.mps.is_available()):
        device = torch.device("mps")
        print("Using Apple Silicon MPS")
    else:
        device = torch.device("cpu")
        print("Using CPU")

Using Apple Silicon MPS


# Ball Shuffler

In [27]:
import random

def initial_ball_position(n=3):
    return random.randint(1, n)

def generate_shuffle_moves(n=3, num_moves=3):
    moves = []
    
    for _ in range(num_moves):
        # Randomly pick two different cups
        cup1, cup2 = random.sample(range(1, n + 1), 2)
        moves.append((cup1, cup2))
    
    return moves

def final_ball_position(initial_position, shuffle_moves):
    position = initial_position
    for move in shuffle_moves:
        # If the ball's current position matches one of the cups in the move, swap it.
        if position == move[0]:
            position = move[1]
        elif position == move[1]:
            position = move[0]
    
    return position

# Method for generating data and labels for batches.
# TODO: This can be done much better
def generate_batch(split: str, config=MASTER_CONFIG):
    B = config.batch_size
    T = config.l_max
    vocab_size = config.vocab_size
    
    # Generate the data
    data = generate_cup_data(config.n_cups, config.n_moves, config.n_samples)
    
    # Split the data into training and validation sets
    if split == "train":
        data = data[:int(config.training_split * len(data))]
    elif split == "val":
        data = data[int(config.training_split * len(data)):]
    else:
        raise ValueError("split must be either 'train' or 'val'")
    
    # Return input and output tensors
    inputs = torch.zeros((B, T), dtype=torch.long)
    outputs = torch.zeros((B, T), dtype=torch.long)
    
    

def generate_cup_shuffling_scenario(n=3, num_moves=3):
    # Generate initial ball position and shuffle moves
    initial_position = initial_ball_position(n)
    shuffle_moves = generate_shuffle_moves(n, num_moves)
    
    # Calculate the final ball position
    final_position = final_ball_position(initial_position, shuffle_moves)
    
    # Construct the input and output strings
    str = f"There are {n} cups\n"
    str += f"Ball is in cup {initial_position}\n"
    str += "\n".join([f"Switch cup {move[0]} and cup {move[1]}" for move in shuffle_moves])
    str += f"\nBall is now in cup {final_position}"
    
    return str

# Generate batches of data
def generate_cup_data(n_cups = 3, num_examples=1000, num_moves=range(1, 5)):
    data = []
    
    for _ in range(num_examples):
        moves = random.choice(num_moves)
        input = generate_cup_shuffling_scenario(n_cups, moves)

        # Finds the last cup number in the string
        output = input.split()[-1][:-5]
        output = f" cup {output}"
        
        data.append((input, output))
    
    return data

In [33]:
initial_position = initial_ball_position()
print(initial_position)

shuffle_moves = generate_shuffle_moves()
print(shuffle_moves)

final_position = final_ball_position(initial_position, shuffle_moves)
print(final_position)

scenario = generate_cup_shuffling_scenario()
print(scenario)

print(generate_cup_data(n_cups=3, num_moves=range(1,5), num_examples=1))

2
[(3, 2), (1, 3), (3, 2)]
1
There are 3 cups
Ball is in cup 1
Switch cup 2 and cup 1
Switch cup 2 and cup 3
Switch cup 3 and cup 2
Ball is now in cup 2
[('There are 3 cups\nBall is in cup 2\nSwitch cup 2 and cup 1\nSwitch cup 2 and cup 3\nBall is now in cup 1', ' cup ')]


# Tokenizer

In [35]:
# Tokenizer, we want BOS, EOS tokens
vocab = ['<BOS>', '<EOS>', '\n', 'Ball', 'There are', ' is now in', ' is in', 'Switch', ' and', ' cup', ' 1', ' 2', ' 3', ' 4', ' 5', ' 6', ' 7', ' 8', ' 9', ' cups']

setattr(MASTER_CONFIG, "vocab", vocab)
setattr(MASTER_CONFIG, "vocab_size", len(vocab))

idx_to_s = {i:ch for i, ch in enumerate(vocab)}
s_to_idx = {ch:i for i, ch in enumerate(vocab)}

def encode(s):
    ids = []
    i = 0
    while i < len(s):
        max_len = -1
        max_token = None
        for token in s_to_idx.keys():
            token_len = len(token)
            if s[i:i+token_len] == token:
                if token_len > max_len:
                    max_len = token_len
                    max_token = token
        if max_token:
            ids.append(s_to_idx[max_token])
            i += max_len
        else:
            print(f"Unrecognized sequence at index {i}, {s[i:i+1]}")
            
            break
    return ids

def decode(ids):
    return "".join([idx_to_s[i] for i in ids])


s = generate_cup_shuffling_scenario(4, 5)

print(f"The string is {len(encode(s))} tokens long:\n\n{s}\n\nAnd has the following tokenization:\n{encode(s)}")


The string is 48 tokens long:

There are 4 cups
Ball is in cup 1
Switch cup 1 and cup 2
Switch cup 3 and cup 4
Switch cup 3 and cup 2
Switch cup 1 and cup 4
Switch cup 3 and cup 4
Ball is now in cup 4

And has the following tokenization:
[4, 13, 19, 2, 3, 6, 9, 10, 2, 7, 9, 10, 8, 9, 11, 2, 7, 9, 12, 8, 9, 13, 2, 7, 9, 12, 8, 9, 11, 2, 7, 9, 10, 8, 9, 13, 2, 7, 9, 12, 8, 9, 13, 2, 3, 5, 9, 13]


# Models, functions and layers

In [7]:
def positional_embedding_naive(t: int, config=MASTER_CONFIG) -> torch.Tensor:
    """
    Input: t the position of the token in a sequence
    Output: the positional embedding of the position
    Parameters: W_p in R^{d_model x l_max} where l_max is the maximum sequence length or context size
    Return: e_p in R^{d_model}

    Using the sine positional embedding from [VSP17] paper "Attention is all you need".
    """
    d_model = config.d_model
    l_max = config.l_max
    
    # Create the positional embedding matrix
    W_p = torch.zeros((d_model, l_max))
    for t in range(l_max):
        for i in range(d_model):
            if i % 2 == 0:
                W_p[i, t] = np.sin(t / 10000 ** (i / d_model)) # TODO: check if this is correct, or should it be l_max instead of 10000?
            else:
                W_p[i, t] = np.cos(t / 10000 ** ((i - i % 2) / d_model))
    
    # Return the positional embedding vector
    return W_p[:, t]

class positional_encoding(nn.Module):
    """ 
    Positional encoding according to [VSP17] paper "Attention is all you need" based on sine and cosine functions.
    
    Input: x a sequence of tokens of shape (B, T, d_model)
    Output: x + p, where p is the positional encoding, of shape (B, T, d_model)
    """
    def __init__(self, config=MASTER_CONFIG):
        super().__init__()
        d_model = config.d_model
        l_max = config.l_max
        dtype = config.dtype
        
        self.p = torch.zeros((1, d_model, l_max))
        
        # Creates X = [[0], [1], ..., [l_max - 1]]
        num = torch.arange(l_max, dtype=dtype).reshape(-1, 1)
        # Creates Y = [10000^0/d_model, 10000^2/d_model, ..., 10000^(d_model - 1)/d_model]
        denum = torch.pow(10000, torch.arange(0, d_model, 2, dtype=dtype) / d_model)
        
        self.p[:, :, 0::2] = torch.sin(num / denum)
        self.p[:, :, 1::2] = torch.cos(num / denum)
        
    def forward(self, x):
        return x + self.p[:, :x.shape[1], :].to(x.device)  
    
def masked_softmask(x: torch.Tensor, mask: torch.Tensor, value: float = 1e6) -> torch.Tensor:
    """
    Softmax with masking on the last dimension. Replaces the masked values with value.
    param x: input tensor of (B, T, C) dimension
    param mask: mask tensor of (B) or (B, T) dimension
    param dim: dimension to apply softmax, default is -1 which is the last dimension
    """
    
    #x = x.masked_fill(~mask, float('-inf'))
    #x = F.softmax(x, dim=dim)
    #return x
    
    # Return dummy until finished
    return x
 
class DummyModel(nn.Module):
    """
    Dummy model for testing purposes. It takes a sequence of tokens and returns a probability distribution over the vocabulary.
    
    Input: x a sequence of tokens of shape (B, T)
    Output: a probability distribution over the vocabulary of shape (B, T, vocab_size)
    Parameters: 
    """
    def __init__(self, config=MASTER_CONFIG):
        super().__init__()
        self.config = config
        
        vocab_size = config.vocab_size
        d_model = config.d_model
        
        # Embedding layer from the vocabulary to the d_model dimension, parameter matrix W_e in R^{vocab_size x d_model}
        self.embedding = nn.Embedding(vocab_size, d_model)
        # Linear layer from d_model to vocab_size dimension. Parameter matrices W_l1, W_l2 in R^{d_model x d_model}, R^{d_model x vocab_size}
        self.linear = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.ReLU(),
            nn.Linear(d_model, vocab_size)
        )
        
        # Returns the number of parameters in the model
        self.num_parameters = sum(p.numel() for p in self.parameters() if p.requires_grad)
        
    def forward(self, x, targets=None):
        # x is of shape (B, T), each element is an integer in the range [0, vocab_size) representing a token. By seeing each token as a one-hot vector, x can be seen as a tensor of shape (B, T, vocab_size)
        B, T = x.shape
        
        # Embedd x into the d_model dimension which makes x of shape (B, T, d_model)
        x = self.embedding(x)
        
        # MLP layer into an unembedding layer, which makes x of shape (B, T, vocab_size)
        x = self.linear(x)
        
        # Softmax over the vocabulary dimension which is the last dimension, returns a probability distribution over the vocabulary, it is of shape (B, T, vocab_size)
        logits = F.softmax(x, dim=-1)
        
        if targets is not None:
            # Calculate the loss
            
            # Flatten the logits and targets to be of shape (B*T, vocab_size) and (B*T) respectively
            x = x.view(B*T, self.vocab_size)
            targets = targets.view(B*T)
            
            loss = F.cross_entropy(logits, targets)
            return loss, logits
        
        return logits


# Training pass

In [None]:
# Training of DummyModel
def train(model, optimizer, config=MASTER_CONFIG):
    # Set the model to training mode
    model.train()
    
    # Generate a batch of data
    inputs, targets = generate_batch("train", config)
    
    # Forward pass
    loss, logits = model(inputs, targets)
    
    # Backward pass
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    return loss.item()

# Initializing dummy model

In [8]:
config = MASTER_CONFIG
data = generate_cup_data(config.n_cups, config.n_moves, config.n_samples)

model = DummyModel(config)
#model.to(config.device)

optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)

print(MASTER_CONFIG.vocab_size)

# Test model
x = torch.randint(MASTER_CONFIG.vocab_size, (1, 3))
y = model(x)

print(x, y)

16
tensor([[4, 0, 1]]) tensor([[[0.0710, 0.0456, 0.0653, 0.0625, 0.0715, 0.0597, 0.0652, 0.0384,
          0.0535, 0.0717, 0.0579, 0.0632, 0.0618, 0.0659, 0.0708, 0.0762],
         [0.0469, 0.0616, 0.0535, 0.0587, 0.0566, 0.0597, 0.0599, 0.0584,
          0.1185, 0.0436, 0.0832, 0.0605, 0.0621, 0.0517, 0.0552, 0.0698],
         [0.0578, 0.0639, 0.0602, 0.0485, 0.0570, 0.0542, 0.0706, 0.0554,
          0.0867, 0.0536, 0.0731, 0.0751, 0.0796, 0.0503, 0.0543, 0.0599]]],
       grad_fn=<SoftmaxBackward0>)


In [9]:
def test_auto_regressive(model, config=MASTER_CONFIG):
    '''
    Tests if the model is auto-regressive by comparing the output of the model when given the entire input sequence and when given the input sequence one token at a time.
    '''
    
    # shape vocab_size, batch_size, block_size
    x = torch.randint(config.vocab_size, (1, 3))
    y1 = model(x)
    
    y2 = torch.zeros_like(y1)
    for b in range(x.size(1)):
        y_b = model(x[:, :b + 1])
        y2[:, b] = y_b[:, b]
            
    error = ((y1 - y2).norm() / (y1.norm() + y2.norm())).item()
    
    if error < 1e-5:
        print("Auto-regressive test passed")
    else:
        print("Auto-regressive test failed")
        
    # print(f"Error={error}")
    
test_auto_regressive(model)

Auto-regressive test passed


In [10]:
# Method for evaluating the loss of the PyTorch model on the validation set without defining the model.
@torch.no_grad()
def evaluate_model(model, criterion):
    out = {}
    model.eval()
    
    for split in ["train", "val"]:
        total_loss = 0
        total_tokens = 0
        
        for batch in generate_batch(split=split):
            # Get the inputs and targets
            inputs = batch["inputs"]
            targets = batch["targets"]
            
            # Get the model outputs
            outputs = model(inputs)
            
            # Calculate the loss
            loss = criterion(outputs, targets)
            
            # Update the total loss and tokens
            total_loss += loss.item()
            total_tokens += targets.shape[0] * targets.shape[1]
        
        # Calculate the average loss
        avg_loss = total_loss / total_tokens
        
        # Store the average loss
        out[f"{split}_loss"] = avg_loss
    