In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
from IPython.core.display_functions import clear_output
import matplotlib.pyplot as plt
import math
import wandb
import re
from tqdm import tqdm,trange
device = "cpu"
if torch.backends.mps.is_available():
    device = "mps:0"
elif torch.cuda.is_available():
    device = "cuda:0" 
print(f"Using device: {device}")

Using device: mps:0


In [None]:
with open('input.txt') as f:
    text = f.read()
    

print("----Sample Shakespeare----")
print(text[:250])

In [None]:
def split_to_words(text):
    return re.findall(r"\w+(?:'\w+)?|[.,!?;:\"()\[\]{}<>\\/\-—–…]|\n", text)

vocab = list(set(split_to_words(text)))
c = len(vocab)
print("Number of words: {}".format(len(split_to_words(text))))
print("Number of distinct words in text: {}".format(c))

In [None]:
stoi = {word:i for i, word in enumerate(vocab)}
itos = {i:word for i, word in enumerate(vocab)}

def words_to_tokens(words):
    """
    Convert a list of words to a list of tokens
    """
    return [stoi[w] for w in words]

def tokens_to_words(index_list):
    """
    Convert a list of tokens to a list of words
    """
    decoded = " ".join([itos[i] for i in index_list])
    return re.sub(r'\s+([.,!?;:"(){}\[\]<>\\/\-—–…])', r'\1', decoded)

# Checking that the word to token and back conversion works
sample_words = text[:36]
token_ids = words_to_tokens(split_to_words(sample_words))
recovered_words = tokens_to_words(token_ids)
print(f"Original text: {sample_words}\n")
print(f"Encoded text: {token_ids}\n")
print(f"Recovered text: {recovered_words}\n")

In [None]:
tokenized_text = words_to_tokens(split_to_words(text))
print("Encoded text sample: {}".format(tokenized_text[:10]))
print(tokens_to_words(tokenized_text[:10]))

# The works of Shakespeare are now a sequence of integers representing the words in the text. Sorry, William.
tokenized_text = torch.tensor(tokenized_text)
tokenized_text.shape

In [None]:
# Create co-occurrence matrix
# The co-occurrence matrix C is a c x c (c is our vocab size) symmetric matrix where C_ij is how many times the ith word appears within W words away from the jth word.
with torch.no_grad():
    W = 10
    C = torch.zeros(len(vocab),len(vocab))
    for t_idx in trange(len(tokenized_text)):
        left_bound = max(t_idx-W//2,0)
        right_bound = min(t_idx+W//2+1,len(tokenized_text))
        context_words = tokenized_text[left_bound : right_bound]
        for u_idx in range(left_bound, right_bound):
            t = tokenized_text[t_idx]
            u = tokenized_text[u_idx]
            C[t, u] += 1.0
    C = C.to(device)
    
# C should be a symmetric matrix
torch.isclose(C, C.T, atol=1e-3).all()

In [None]:
# n is the number of eigenvectors we want to keep
n = 256
with torch.no_grad():
    # Normalize the data
    Z = C - C.mean(dim=1, keepdim=True)
    Z /= Z.std(dim=1, keepdim=True)

    # Compute the covariance matrix
    cov = (Z @ Z.T)/(Z.shape[0] - 1)
    # Compute the eigenvectors and eigenvalues
    L, Q = torch.linalg.eigh(cov)
    # Get the n largest eigenvectors
    principal_eigv = Q[:, -n:].T

    # PCA embeddings for training
    pca_embeddings = Z @ principal_eigv.T # (c, n)

In [None]:
# This is the same as the MultiheadLayer in the lab 6 notebook. It corresponds to the equations in Section 3 of this lab's writeup.
class MultiHeadLayer(nn.Module):
    """
    An implementation of the multihead attention layer.
    The difference between AttentionLayer and this class is,
    now Q,K,V are matrices of shape (H, m, n), and the attention matrix B is of shape (H, T, T)
    (one attention feature per head)
    Args:
        m (int): The dimension of the Q and K matrices.
        n (int): The number of features, n=12 in our case.
        k (int): The dimension of the W matrix.
        H (int): The number of heads.
    """
    def __init__(self, m, n, H):
        super(MultiHeadLayer, self).__init__()
        self.m = m
        self.H = H

        self.Q = nn.Parameter(torch.empty(H, m, n))
        self.K = nn.Parameter(torch.empty(H, m, n))
        self.V = nn.Parameter(torch.empty(H, m, n))

        self.W = nn.Parameter(torch.empty(H, n, m))
        
        self.nonlinearity = nn.ReLU()
        self.initialize_parameters()

    def initialize_parameters(self):
        """
        Initialize the values of the learnable parameter matrices.
        Kaiming uniform is just a type of random initialization, you don't need to 
        worry about it. It is a good default initialization for linear layers.
        """
        nn.init.kaiming_uniform_(self.Q, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.K, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.V, a=math.sqrt(5))

        nn.init.kaiming_uniform_(self.W, a=math.sqrt(5))

    def forward(self, X):
        """
        Args:
            X (torch.Tensor): The input embeddings.
        Returns:
            X_l (torch.Tensor): The output of the multihead attention layer.
        """
        B, n, T = X.shape  # X: (B, n, T)

        # Expand X to include the head dimension
        X_expanded = X.unsqueeze(1)  # (B, 1, n, T)

        # Compute QX, KX, VX for each head
        QX = torch.matmul(self.Q.unsqueeze(0), X_expanded)  # (B, H, m, T)
        KX = torch.matmul(self.K.unsqueeze(0), X_expanded)  # (B, H, m, T)
        VX = torch.matmul(self.V.unsqueeze(0), X_expanded)  # (B, H, m, T)
        
        QX_t = QX.transpose(-2, -1)  # (B, H, T, m)

        # Compute attention scores B per head
        B = torch.matmul(QX_t, KX)  # (B, H, T, T)
        A = F.softmax(B, dim=-1)
    
        A_t = A.transpose(-2,-1)
        VXA_t = torch.matmul(VX, A_t) # (B, H, m, T)
        Y = torch.matmul(self.W, VXA_t) # (B, H, T, n)
        
        X_l = X + self.nonlinearity(Y.sum(dim=1))

        return X_l
    
    
model = MultiHeadLayer(m=32, n=256, H=8).to(device)
X_tilde = torch.randn(1,256,64).to(device)
out = model(X_tilde)

print(f"out.shape: {out.shape}")

In [None]:
# Language transformer
class LanguageTransformer(nn.Module):
    """
    
    Mutlihead Transformer, analogous to the Transformer class, in the single head case.
    Args:
        m (int): The dimension of the Q and K matrices.
        n (int): The number of features, n=12 in our case.
        k (int): The dimension of the W matrix.
        L (int): The number of layers.
        H (int): The number of heads.
    """
    def __init__(self, m, n, L, H):
        super(LanguageTransformer, self).__init__()
        self.layers = nn.ModuleList([
            MultiHeadLayer(m, n, H) for _ in range(L)
        ])
        # Word embedding table. This is the only change from the previous lab's code. We have 
        # PCA embeddings to convert word indices to embeddings.
        self.embedding_table = pca_embeddings
        
    def forward(self, E):
        """
        The forward pass of the multihead transformer, stacks L multihead layers.
        This class is essentially the same as the Transformer class, but using the 
        MultiHeadLayer class instead of the AttentionLayer class.
        Args:
            E (torch.Tensor): The input word indices.
        Returns:
            X_L^{T-1} (torch.Tensor): The last vector of the output of the transformer.
        """
        # Convert word indices to embeddings. We need to transpose the result to get the shape (B, n, T).
        X = self.embedding_table[E].transpose(1,2)
        B, n, T = X.shape

        # Compute the mean token to append to the sequence.
        X_tilde = X.mean(dim=2, keepdim=True) # mean over the time dimension
        X_tilde = torch.cat((X, X_tilde), dim=-1)
        
        # X_l has shape (B, n, T+1)
        X_l = X_tilde
        for layer in self.layers:
            X_l = layer(X_l)
        
        # Output the last vector.
        return X_l[:,:,-1]

# Test
model = LanguageTransformer(L=2, H=2, m=32, n=256).to(device)
E = torch.randint(0, pca_embeddings.shape[0], (1,5)).to(device).long()
out = model(E)
print(f"output.shape: {out.shape}")

In [None]:
T = 64 # context size
split_factor = 0.9
split_index = int(split_factor * len(tokenized_text))
    
# Splitting into train and test sets
train = tokenized_text[:split_index].to(device)
test = tokenized_text[split_index:].to(device)

In [None]:
# Dataset
class WordIndexDataset(Dataset):
    """
    This Dataset class takes and encoded tensor of word indices and returns a tensor of context windows of size T.
    The tensors returned by this dataset are not yet one-hot encoded.
    """
    def __init__(self, text, T):
        self.text = text
        self.T = T
        assert self.T < len(text), "context_size (T) must be less than len(text)"

    def __len__(self):
        return len(self.text) - self.T

    def __getitem__(self, idx):
        """
        Return a single context window of size T. 
        The context window is a sequence of T words.

        During training, we will predict the next token of every word in the context window,
        so Y_item is the next word for every word in the context window.
        """
        X_item = self.text[idx:idx + self.T]
        Y_item = self.text[idx + 1:idx + self.T + 1]

        return X_item, Y_item

train_dataset = WordIndexDataset(train, T)
test_dataset = WordIndexDataset(test, T)


# Example of a batch
B = 64
train_loader = DataLoader(train_dataset, batch_size=B, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=B, shuffle=False)

E, y_idx = next(iter(train_loader))
print(f"X_idx shape: {E.shape}")
print(f"y_idx shape: {y_idx.shape}")

In [None]:
# Training
n_epochs = 3
m = 32
n = 256
L = 6
T = 64
H = 8

estimator = LanguageTransformer(m, n, L, H).float().to(device)
optimizer = torch.optim.SGD(estimator.parameters(), lr=1e-5)

train_loader = DataLoader(train_dataset, batch_size=B, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=B, shuffle=False)

cross_entropy_loss = nn.MSELoss()
estimator.train()
train_loss = []

for epoch in range(n_epochs): # Iterate over n_epochs epochs

    for x_batch, y_batch in tqdm(train_loader): # Iterate over all batches in the dataset 
        # Load the embeddings for the target word
        # We want to predict the last word of the context window for this exercise.
        y_word_to_predict = y_batch[:,-1]
        Y_embeddings = pca_embeddings[y_word_to_predict].transpose(0,1).to(device) # (B, n)
        
        # (Step i) Load the data. These commands send the data to the GPU memory.
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)

        # (Step ii) Compute the gradients. We use automated differentiation.
        optimizer.zero_grad() # Gradient reset to indicate where the backward computation stops.

        # Call the neural network. In this case, we will take the average of the output of the
        # transformer as the prediction.
        y_hat = estimator(x_batch).mean(dim=-1)
        cross_entropy_value = cross_entropy_loss(y_hat,Y_embeddings)

        cross_entropy_value.backward() # Compute gradients moving backwards untit the gradient reset.

        # (Step iii) Update parameters by taking an SGD (or other optimizer) step.
        optimizer.step()

        train_loss.append(cross_entropy_value.item())
    if epoch print(f"Epoch {epoch}/{n_epochs} Loss: {train_loss[-1]}")

    # End of batch loop.

# Evaluate test loss
estimator.eval()
with torch.no_grad():
    test_losses = []
    for x_batch, y_batch in tqdm(test_loader):
        y_word_to_predict = y_batch[:,-1]
        Y_embeddings = pca_embeddings[y_word_to_predict].transpose(0,1).to(device) # (B, n)
        y_hat = estimator(x_batch).mean(dim=-1)
        test_losses.append(cross_entropy_loss(y_hat,Y_embeddings).item())
    test_loss = torch.tensor(test_losses).mean().item()

print(f"Train loss: {train_loss[-1]}")
print(f"Test loss: {test_loss}")

In [None]:
# adding readout
class LanguageTransformerWithReadout(nn.Module):
    """
    A slight modification of the LanguageTransformer class of Task 4.
    Args:
        m (int): The dimension of the Q and K matrices.
        n (int): The number of features, n=12 in our case.
        k (int): The dimension of the W matrix.
        L (int): The number of layers.
        H (int): The number of heads.
        c (int): The vocabulary size.
    """
    def __init__(self, m, n, L, H, c):
        super(LanguageTransformerWithReadout, self).__init__()
        self.layers = nn.ModuleList([
            MultiHeadLayer(m, n, H) for _ in range(L)
        ])

        self.embedding_table = pca_embeddings

        # Adding readout layer
        self.readout = nn.Parameter(torch.empty(c, n).to(device))
        nn.init.kaiming_uniform_(self.readout, a=math.sqrt(5))
        
    def forward(self, E):
        """
        We change the forward pass from the previous Transformer.
        Instead of concatenating a vector to the sequence, we now output a vector of probabilities for each word in the sequence.
        Args:
            E (torch.Tensor): The input word indices.
        Returns:
            Y_hat (torch.Tensor): The output of the transformer, passed through the readout layer.
        """
        X = self.embedding_table[E].transpose(1,2)

        B, n, T = X.shape
        
        # X_l has shape (B, n, T+1)
        X_l = X
        for layer in self.layers:
            X_l = layer(X_l)

        # We implement the readout layer as a linear mapping on each word in the sequence.
        Y_hat = torch.matmul(self.readout, X_l) # (B, c, T)

        # Notice, we don't apply the softmax here, because we keep the probabilities unnormalized until 
        # we call the loss function, for numerical stability.
        return Y_hat

# testing. Now the transformer outputs a vector of probabilities for each word in the sequence.
E = torch.randint(0, len(vocab), (1,5)).to(device).long()
model = LanguageTransformerWithReadout(m=32, n=256, L=6, H=8, c=c).to(device)
out = model(E)
print(f"out.shape: {out.shape}")

In [None]:
# Training
n_epochs = 5
B = 64
m = 32
n = 256
L = 6
T = 32
H = 8

estimator = LanguageTransformerWithReadout(m, n, L, H, c).float().to(device)
optimizer = torch.optim.SGD(estimator.parameters(), lr=1e-5)

train_loader = DataLoader(train_dataset, batch_size=B, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=B, shuffle=False)

# We use the Cross Entropy loss for estimating the probabilities of the next word.
cross_entropy_loss = nn.CrossEntropyLoss()
estimator.train()
train_loss = []

for epoch in range(n_epochs): # Iterate over n_epochs epochs

    for x_batch, y_batch in tqdm(train_loader): # Iterate over all batches in the dataset 
        # We want to predict tha last word of the context window for this exercise.
        y_word_to_predict = y_batch[:,-1]
        
        # Load the embeddings for the words.
        X_embeddings = pca_embeddings[x_batch].transpose(1,2) # (B, n, T)
        
        # (Step i) Load the data. These commands send the data to the GPU memory.
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)

        # (Step ii) Compute the gradients. We use automated differentiation.
        optimizer.zero_grad() # Gradient reset to indicate where the backward computation stops.

        # Call the neural network. Get the prediction for the last word.
        y_hat = estimator(x_batch)[:,:,-1]

        # The softmax function is applied internally to the transformer's output y_hat.
        cross_entropy_value = cross_entropy_loss(y_hat,y_word_to_predict)

        cross_entropy_value.backward() # Compute gradients moving backwards untit the gradient reset.

        # (Step iii) Update parameters by taking an SGD (or other optimizer) step.
        optimizer.step()

        train_loss.append(cross_entropy_value.item())

    print(f"Epoch {epoch}/{n_epochs} Loss: {train_loss[-1]}")

    # End of batch loop.
# Evaluate test loss at the end of training
estimator.eval()
with torch.no_grad():
    test_losses = []
    for x_batch, y_batch in tqdm(test_loader):
        y_word_to_predict = y_batch[:,-1]
        Y_embeddings = pca_embeddings[y_word_to_predict].transpose(0,1).to(device) # (B, n)
        y_hat = estimator(x_batch).mean(dim=-1)
        cross_entropy_value = cross_entropy_loss(y_hat,y_word_to_predict)
        test_losses.append(cross_entropy_value.item())
    test_loss = torch.tensor(test_losses).mean().item()

print(f"Train loss: {train_loss[-1]}")
print(f"Test loss: {test_loss}")

In [None]:
# Taking a snippet of the text set to test the model.
starting_point = torch.randint(0, len(test)-T, (1,))
initial_indices = test[starting_point:starting_point+T].unsqueeze(0)

log_probabilities = model(initial_indices)
print(f"log_probabilities.shape: {log_probabilities.shape}")
last_word_probabilities = log_probabilities[:,:,-1]
probabilities = F.softmax(last_word_probabilities, dim=-2)

print(f"Input text: {tokens_to_words(initial_indices.reshape(-1).tolist())}")
print(f"\nThe most likely next word is: {tokens_to_words([torch.argmax(probabilities).item()])}")

print("\nSampled words according to a multinomial distribution (either could be the next word when using sampling):")
for _ in range(10):
    sampled_word = torch.multinomial(probabilities, num_samples=1).item()
    print(f"{tokens_to_words([sampled_word])}", end=" ")

In [None]:
def generate_text(model, X, max_generate_tokens=500):
    """
    Generate text from a model given an initial input token sequence.
    Args:
        model (nn.Module): The model to use for generation.
        input_tokens (torch.Tensor): The initial input token sequence.
        max_generate_tokens (int): The maximum number of tokens to generate.
    Returns:
        torch.Tensor: The generated token sequence.
    """
    with torch.no_grad():
        context = X.clone()
        generated_sequence = X.cpu().squeeze().tolist()  # Ensure it's a 1D list
        for _ in range(max_generate_tokens):
            logits = model(context)
            
            last_word_embeddings = logits[:,:,-1]
            probs = F.softmax(last_word_embeddings, dim=-1)
            next_token = torch.multinomial(probs, num_samples=1)
            
            # Slide context window: remove the first token and append the next token
            context = torch.cat([context[:, 1:], next_token], dim=1)  
            generated_sequence.append(next_token.squeeze().item())  # Add new token to generated sequence
        generated_words = tokens_to_words(generated_sequence)
        generation_string = "".join(generated_words)
        return generation_string

# Test generate
#model = LanguageTransformerWithReadout(m=32, n=256, L=6, H=8, c=c).to(device)
starting_point = torch.randint(0, len(test)-T, (1,))
initial_indices = test[starting_point:starting_point+T].unsqueeze(0)
print(f"========== INPUT TEXT ==========")
print(f"{tokens_to_words(initial_indices.reshape(-1).tolist())}\n")

# This is the model from task 7
print(f"========== INPUT + GENERATED TEXT ==========")
print(generate_text(estimator, initial_indices, max_generate_tokens=100))
print(f"====================================")

In [None]:
def generate_from_prompt(estimator,prompt,max_generate_tokens=100):
    words = split_to_words(prompt)
    tokens = words_to_tokens(words)
    token_tensor = torch.tensor(tokens).to(device).unsqueeze(0)
    return generate_text(estimator, token_tensor, max_generate_tokens=max_generate_tokens)
print(generate_from_prompt(estimator,"Alas mother,",max_generate_tokens=10))

In [None]:
# Some examples of generated text with the trained model
shakespeare_quotes = [
    "All the world's a stage, and all the men and women merely players.",  # As You Like It (Act 2, Scene 7)
    "A fool thinks himself to be wise, but a wise man knows himself to be a fool.",  # As You Like It (Act 5, Scene 1)
    "How beauteous mankind is! O brave new world!",  # The Tempest (Act 5, Scene 1) – Miranda.
    "O brave new world, that has such people in't!",  # The Tempest (Act 5, Scene 1) – Miranda.
    "Love all, trust a few, do wrong to none.",  # All's Well That Ends Well (Act 1, Scene 1)
    "To be or not to be, that is the question.",  # Hamlet (Act 3, Scene 1)
]

for quote in shakespeare_quotes:
    try: 
        print(f"========== INPUT ==========")
        print(f"{' '.join(split_to_words(quote))}")
        print(f"========== INPUT + GENERATED TEXT ==========")
        print(generate_from_prompt(estimator,quote,max_generate_tokens=15))
        print(f"====================================")
    except Exception as e:
        # Some of those words weren't on our vocabulary so the model doesn't know what to do.
        print(f"Error generating from prompt: {e}")

In [None]:
# incorporate positional encoding
class LanguageTransformerWithReadoutAndPositionalEncoding(nn.Module):
    """
    Modification of the LanguageTransformerWithReadout class of Task 7 to include positional encoding.
    Positional encoding is a learnable matrix that is added to the embeddings of the input tokens.
    
    Each entry in the positional encoding matrix is is a vector of size n that represents a position in the sequence.
    """
    def __init__(self, m, n, L, H, c):
        super(LanguageTransformerWithReadoutAndPositionalEncoding, self).__init__()
        self.layers = nn.ModuleList([
            MultiHeadLayer(m, n, H) for _ in range(L)
        ])

        # Learnable parameters for positional encoding. 
        # Each entry in the positional encoding matrix is is a vector of size n that represents a position in the sequence.
        self.position_embedding = nn.Embedding(T, n)

        self.embedding_table = pca_embeddings

        # Adding readout layer
        self.readout = nn.Parameter(torch.empty(c, n).to(device))
        nn.init.kaiming_uniform_(self.readout, a=math.sqrt(5))
        
    def forward(self, E):
        """
        We change the forward pass from the previous Transformer.
        Instead of concatenating a vector to the sequence, we now output a vector of probabilities for each word in the sequence.
        Args:
            E (torch.Tensor): The input word indices.
        Returns:
            Y_hat (torch.Tensor): The output of the transformer, passed through the readout layer.
        """
        B, T = E.shape

        # Word embeddings
        X = self.embedding_table[E].transpose(1,2) # (B, n, T)

        # To create positional encodings, we need to create a vector for each position in the sequence.
        P = self.position_embedding(torch.arange(T, device=device)).transpose(0,1) # (n, T)
        
        # Adding word embeddings and positional encoding
        # Although P is (n,T), this is broadcasted to (B, n, T), which means that the same 
        # positional encoding is added to every sequence in the batch.
        X_tilde = X + P
        
        # X_l has shape (B, n, T+1)
        X_l = X_tilde
        for layer in self.layers:
            X_l = layer(X_l)

        # We implement the readout layer as a linear mapping on each word in the sequence.
        Y_hat = torch.matmul(self.readout, X_l) # (B, c, T)

        # Notice, we don't apply the softmax here, because we keep the probabilities unnormalized until 
        # we call the loss function, for numerical stability.
        return Y_hat

# testing. Now the transformer outputs a vector of probabilities for each word in the sequence.
E = torch.randint(0, len(vocab), (1,5)).to(device).long()
print(f"E.shape: {E.shape}")
model = LanguageTransformerWithReadoutAndPositionalEncoding(m=32, n=256, L=6, H=8, c=c).to(device)
out = model(E)
print(f"out.shape: {out.shape}")

In [None]:
# We now need to modify both MultiHeadLayer and the LanguageTransformer class to include layer normalization.
class MultiHeadLayer(nn.Module):
    """
    A modified version of the MultiHeadLayer class with layer normalization.
    It will have two normalization layers, one after the multi-head attention and one after the nonlinearity.
    """
    def __init__(self, m, n, H):
        super(MultiHeadLayer, self).__init__()
        self.m = m
        self.H = H

        self.Q = nn.Parameter(torch.empty(H, m, n))
        self.K = nn.Parameter(torch.empty(H, m, n))
        self.V = nn.Parameter(torch.empty(H, m, n))

        self.W = nn.Parameter(torch.empty(n, m))
        
        # First layer normalization object.
        # Layernorm will average over the n dimensions of each element in the sequence.
        self.layer_norm1 = nn.LayerNorm(n)
        
        self.nonlinearity = nn.ReLU()
        
        # Second layer normalization object.
        self.layer_norm2 = nn.LayerNorm(n)
        
        self.initialize_parameters()

    def initialize_parameters(self):
        """
        Initialize the values of the learnable parameter matrices.
        Kaiming uniform is just a type of random initialization, you don't need to 
        worry about it. It is a good default initialization for linear layers.
        """
        nn.init.kaiming_uniform_(self.Q, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.K, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.V, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.W, a=math.sqrt(5))

    def forward(self, X):
        """
        Forward pass of the multihead attention layer with layer normalization.

        Args:
            X (torch.Tensor): The input embeddings.
        Returns:
            X_l (torch.Tensor): The output of the multihead attention layer.
        """
        B, n, T = X.shape  # X: (B, n, T)

        # First layer normalization.
        # An annoying Pytorch detail: layer norm function expects the normalization to be over the last dimension.
        # Therefore, we need to transpose the last two dimensions of the input to shape (B, T, n) each time we normalize, then transpose back.
        # (X.transpose(-2,-1) means that we are transposing over the last two dimensions)
        X = self.layer_norm1(X.transpose(-2,-1)).transpose(-2,-1)

        # Expand X to include the head dimension
        X_expanded = X.unsqueeze(1)  # (B, 1, n, T)

        # Compute QX, KX, VX for each head
        QX = torch.matmul(self.Q.unsqueeze(0), X_expanded)  # (B, H, m, T)
        KX = torch.matmul(self.K.unsqueeze(0), X_expanded)  # (B, H, m, T)
        VX = torch.matmul(self.V.unsqueeze(0), X_expanded)  # (B, H, m, T)
        
        QX_t = QX.transpose(-2, -1)  # (B, H, T, m)

        # Compute attention scores B per head
        B = torch.matmul(QX_t, KX)  # (B, H, T, T)
        A = F.softmax(B, dim=-1)
    
        A_t = A.transpose(-2,-1)
        VXA_t = torch.matmul(VX, A_t) # (B, H, m, T)
        Y = torch.matmul(self.W, VXA_t) # (B, H, T, n)

        # Second layer normalization. Transpose over the last two dimensions
        Y = self.layer_norm2(Y.transpose(-2,-1)).transpose(-2,-1)
        
        X_l = X + self.nonlinearity(Y.sum(dim=1))

        return X_l

# Testing the change
model = MultiHeadLayer(m=32, n=256, H=2).to(device)
X = torch.randn(1,256,5).to(device)
out = model(X)
print(f"out.shape: {out.shape}")

In [None]:
class LanguageTransformer(nn.Module):
    """
    Taken from Task 10 and added layer normalization. This is the final version of this class.
    """
    def __init__(self, m, n, L, H, c, T):
        super(LanguageTransformer, self).__init__()
        
        self.layers = nn.ModuleList([
            MultiHeadLayer(m, n, H) for _ in range(L)
        ])

        # PCA Word embeddings
        self.embedding_table = pca_embeddings
        
        # Positional encoding
        self.position_embedding = nn.Embedding(T, n)

        # Layer normalization
        self.layer_norm = nn.LayerNorm(n)

        # Adding readout layer
        self.readout = nn.Parameter(torch.empty(c, n).to(device))
        nn.init.kaiming_uniform_(self.readout, a=math.sqrt(5))
        
    def forward(self, E):
        """
        Args:
            E (torch.Tensor): The input word indices.
        Returns:
            Y_hat (torch.Tensor): The output of the transformer, passed through the readout layer.
        """
        B, T = E.shape

        # Word embeddings
        X = self.embedding_table[E].transpose(1,2) # (B, n, T)

        # To create positional encodings, we need to create a vector for each position in the sequence.
        P = self.position_embedding(torch.arange(T, device=device)).transpose(0,1) # (n, T)
        
        X_tilde = X + P
        
        # X_l has shape (B, n, T+1)
        X_l = X_tilde
        for layer in self.layers:
            X_l = layer(X_l)

        X_l = self.layer_norm(X_l.transpose(-2,-1)).transpose(-2,-1)

        # We implement the readout layer as a linear mapping on each word in the sequence.
        Y_hat = torch.matmul(self.readout, X_l) # (B, c, T)

        
        return Y_hat

# testing. 
E = torch.randint(0, pca_embeddings.shape[0], (1,5)).to(device).long()
print(f"E.shape: {E.shape}")
model = LanguageTransformer(m=32, n=256, L=6, H=8, c=c, T=5).to(device)
out = model(E)
print(f"out.shape: {out.shape}")

In [None]:
# future masking
# this is a small example to gain an intuition of how masking will work. 
B = torch.randn(5,5)
print("B:")
display(B)
# FUTURE MASKING: 
# To mask attention, we create a matrix that indicates if an entry in B is a word in the future
mask = torch.triu(torch.ones(T, T), diagonal=1).to(device)
print()

# If an entry is in the future, we set it to -inf, 
# so that when we apply softmax, the probability of that word is 0, while 
# the rest of the words sum to 1.
B = B.masked_fill(mask == 1, float('-inf'))

In [None]:
# We need to modify both MultiHeadLayer and the LanguageTransformer class to include layer normalization.
class MultiHeadLayer(nn.Module):
    """
    A modified version of the MultiHeadLayer class with layer normalization.
    It will have two normalization layers, one after the multi-head attention and one after the nonlinearity.
    """
    def __init__(self, m, n, H):
        super(MultiHeadLayer, self).__init__()
        self.m = m
        self.H = H

        self.Q = nn.Parameter(torch.empty(H, m, n))
        self.K = nn.Parameter(torch.empty(H, m, n))
        self.V = nn.Parameter(torch.empty(H, m, n))

        self.W = nn.Parameter(torch.empty(n, m))
        
        # First layer normalization object.
        # Layernorm will average over the n dimensions of each element in the sequence.
        self.layer_norm1 = nn.LayerNorm(n)
        
        self.nonlinearity = nn.ReLU()
        
        # Second layer normalization object.
        self.layer_norm2 = nn.LayerNorm(n)
        
        self.initialize_parameters()

    def initialize_parameters(self):
        """
        Initialize the values of the learnable parameter matrices.
        Kaiming uniform is just a type of random initialization, you don't need to 
        worry about it. It is a good default initialization for linear layers.
        """
        nn.init.kaiming_uniform_(self.Q, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.K, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.V, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.W, a=math.sqrt(5))

    def forward(self, X):
        """
        Forward pass of the multihead attention layer with layer normalization.

        Args:
            X (torch.Tensor): The input embeddings.
        Returns:
            X_l (torch.Tensor): The output of the multihead attention layer.
        """
        B, n, T = X.shape  # X: (B, n, T)

        # First layer normalization.
        # An annoying Pytorch detail: layer norm function expects the normalization to be over the last dimension.
        # Therefore, we need to transpose the last two dimensions of the input to shape (B, T, n) each time we normalize, then transpose back.
        # (X.transpose(-2,-1) means that we are transposing over the last two dimensions)
        X = self.layer_norm1(X.transpose(-2,-1)).transpose(-2,-1)

        # Expand X to include the head dimension
        X_expanded = X.unsqueeze(1)  # (B, 1, n, T)

        # Compute QX, KX, VX for each head
        QX = torch.matmul(self.Q.unsqueeze(0), X_expanded)  # (B, H, m, T)
        KX = torch.matmul(self.K.unsqueeze(0), X_expanded)  # (B, H, m, T)
        VX = torch.matmul(self.V.unsqueeze(0), X_expanded)  # (B, H, m, T)
        
        QX_t = QX.transpose(-2, -1)  # (B, H, T, m)

        # Compute attention scores B per head
        B = torch.matmul(QX_t, KX)  # (B, H, T, T)

        # FUTURE MASKING: 
        # To mask attention, we create a matrix that indicates if an entry in B is a word in the future
        mask = torch.triu(torch.ones(T, T), diagonal=1).to(device)
        
        # If an entry is in the future, we set it to -inf, 
        # so that when we apply softmax, the probability of that word is 0, while 
        # the rest of the words sum to 1.
        B = B.masked_fill(mask == 1, float('-inf'))

        # Now when we apply softmax, only the words in the past are have nonzero probability.
        A = F.softmax(B, dim=-1)
    
        A_t = A.transpose(-2,-1)
        VXA_t = torch.matmul(VX, A_t) # (B, H, m, T)
        Y = torch.matmul(self.W, VXA_t) # (B, H, T, n)

        # Second layer normalization. Transpose over the last two dimensions
        Y = self.layer_norm2(Y.transpose(-2,-1)).transpose(-2,-1)
        
        X_l = X + self.nonlinearity(Y.sum(dim=1))

        return X_l

# Testing the change
model = MultiHeadLayer(m=32, n=256, H=2).to(device)
X = torch.randn(1,256,5).to(device)
out = model(X)
print(f"out.shape: {out.shape}")

In [None]:
# Training
n_epochs=5

L = 6
H = 8
m=32
n=256
lr = 1e-4
T=64
B=32

train_loader = DataLoader(train_dataset, batch_size=B, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=B, shuffle=False)

estimator = LanguageTransformer(m=m, n=n, L=L, H=H, c=c,T=T).to(device)
estimator.train()

# Here we use AdamW instead of SGD. It is just a different optimizer.
optimizer = optim.AdamW(estimator.parameters(), lr=lr)

# We use the Cross Entropy loss for estimating the probabilities of the next word.
cross_entropy_loss = nn.CrossEntropyLoss()

train_loss = []
for epoch in range(n_epochs): # Iterate over n_epochs epochs

    for x_batch, y_batch in tqdm(train_loader): # Iterate over all batches in the dataset 
        # (Step i) Load the data. These commands send the data to the GPU memory.
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        
        batch_size, T = x_batch.shape

        # (Step ii) Compute the gradients. We use automated differentiation.
        optimizer.zero_grad() # Gradient reset to indicate where the backward computation stops.

        # Call the neural network. In this case, we will take the average of the output of the
        # transformer as the prediction.
        y_hat = estimator(x_batch)

        # Reshape logits and y to be able to evaluate cross entropy on 
        # each token in the sequence.
        y_hat = y_hat.permute(0,2,1)
        y_hat = y_hat.reshape(batch_size * T, c)

        # Y should also be condensed into one dimension.
        y_batch = y_batch.view(batch_size * T, -1).squeeze()

        # When using cross entropy loss, we need to pass the target as a 1D tensor of class indices.
        # The softmax function is applied internally to the transformer's output y_hat.
        cross_entropy_value = cross_entropy_loss(y_hat,y_batch)

        cross_entropy_value.backward() # Compute gradients moving backwards untit the gradient reset.

        # (Step iii) Update parameters by taking an SGD (or other optimizer) step.
        optimizer.step()

        train_loss.append(cross_entropy_value.item())
    print(f"Epoch {epoch}/{n_epochs} Loss: {train_loss[-1]}")

    # End of batch loop.

estimator.eval()
with torch.no_grad():
    test_losses = []
    for x_batch, y_batch in tqdm(test_loader):
        x_batch = x_batch.to(device)
        batch_size, T = x_batch.shape
        y_batch = y_batch.to(device)

        optimizer.zero_grad()
        y_hat = estimator(x_batch)

        y_hat = y_hat.permute(0,2,1)
        y_hat = y_hat.reshape(batch_size * T, c)
        y_batch = y_batch.view(batch_size * T, -1).squeeze()
        cross_entropy_value = cross_entropy_loss(y_hat,y_batch)
        test_losses.append(cross_entropy_value.item())
    test_loss = torch.tensor(test_losses).mean().item()

print(f"Train loss: {train_loss[-1]}")
print(f"Test loss: {test_loss}")

In [None]:
# repeating the generative task
initial_indices.shape

In [None]:
starting_point = torch.randint(0, len(test)-T, (1,))
# Example sampled from test set.
initial_indices = test[starting_point:starting_point+T].unsqueeze(0)
print(f"========== INPUT TEXT ==========")
print(f"{tokens_to_words(initial_indices.reshape(-1).tolist())}\n")

# This is the model from task 7
print(f"========== INPUT + GENERATED TEXT ==========")
print(generate_text(estimator, initial_indices, max_generate_tokens=100))
print(f"====================================")

In [None]:
# Some examples of generated text with the trained model
shakespeare_quotes = [
    "All the world's a stage, and all the men and women merely players.",  # As You Like It (Act 2, Scene 7)
    "A fool thinks himself to be wise, but a wise man knows himself to be a fool.",  # As You Like It (Act 5, Scene 1)
    "How beauteous mankind is! O brave new world!",  # The Tempest (Act 5, Scene 1) – Miranda.
    "O brave new world, that has such people in't!",  # The Tempest (Act 5, Scene 1) – Miranda.
    "Love all, trust a few, do wrong to none.",  # All's Well That Ends Well (Act 1, Scene 1)
    "To be or not to be, that is the question.",  # Hamlet (Act 3, Scene 1)
]

for quote in shakespeare_quotes:
    try: 
        print(f"========== INPUT ==========")
        print(f"{' '.join(split_to_words(quote))}")
        print(f"========== INPUT + GENERATED TEXT ==========")
        print(generate_from_prompt(estimator,quote,max_generate_tokens=15))
        print(f"====================================")
    except Exception as e:
        # Some of those words weren't on our vocabulary so the model doesn't know what to do.
        print(f"Error generating from prompt: {e}")