We will first implement the embed function. Remember that the input to the transformer is text. However, it does not directly deal with raw text. Instead, it represents everything as tensors. First, we have a tokenizer that splits a large chunk of text into arrays of tokens. For example, suppose our input is HelloWorld and our tokenizer decides that Hello is the first token and World is the second token. Our real input tensor is thus [0, 1]. We use a d_model dimension vector to represent one token. Suppose we have V tokens in total, this gives us a (V, d_model) matrix, where each row is a token vector. The embed function should look up relevant vectors for our input tokens. The input to embed is x with size (1, m) (technically, it's (m, ) in Numpy), the embed matrix W_E with size (V, d_model). The output should have size (m, d_model)

In [3]:
import numpy as np

def embed(x, W_E):
    return W_E[x]


Now that we have already implemented embed, we will implement positional embedding, which tells the Transformer about the position information. We will use the sinusoidal position embedding. You can refer to page 6 of the attention paper for the exact equations. The input m is the sequence length, d_model the model dimension, and the output should be a size (m, d_model) numpy array.

In [4]:
def pos_embed(m, d_model):
    pe = np.zeros((m, d_model))

    pos = np.arange(m)[:, np.newaxis]
    dim = np.arange(0, d_model, 2)[np.newaxis, :]

    pe_temp = pos / (10000 ** (dim / d_model))

    pe[:, 0::2] = np.sin(pe_temp)
    pe[:, 1::2] = np.cos(pe_temp)

    return pe

In [5]:
print(pos_embed(5, 8))

[[ 0.00000000e+00  1.00000000e+00  0.00000000e+00  1.00000000e+00
   0.00000000e+00  1.00000000e+00  0.00000000e+00  1.00000000e+00]
 [ 8.41470985e-01  5.40302306e-01  9.98334166e-02  9.95004165e-01
   9.99983333e-03  9.99950000e-01  9.99999833e-04  9.99999500e-01]
 [ 9.09297427e-01 -4.16146837e-01  1.98669331e-01  9.80066578e-01
   1.99986667e-02  9.99800007e-01  1.99999867e-03  9.99998000e-01]
 [ 1.41120008e-01 -9.89992497e-01  2.95520207e-01  9.55336489e-01
   2.99955002e-02  9.99550034e-01  2.99999550e-03  9.99995500e-01]
 [-7.56802495e-01 -6.53643621e-01  3.89418342e-01  9.21060994e-01
   3.99893342e-02  9.99200107e-01  3.99998933e-03  9.99992000e-01]]


Implementation of RMSNorm

In [6]:
def rmsnorm(x, w=1, epsilon=1e-9):
    term = np.mean(x*x, axis=1, keepdims = True) + epsilon
    denominator = np.sqrt(term)
    rms = (x / denominator) * w
    return rms

Next, we will implement softmax with temperature t, which is a key operation in attention. softmax normalizes a raw vector into a probability distribution. The input is of size (n_head, m, m) or (m, m) or (V, ), and the output has the same size. Your code should handle all cases robustly. The temperature t rescales logits before normalization.

Next, we will implement softmax with temperature t, which is a key operation in attention. softmax normalizes a raw vector into a probability distribution. The input is of size (n_head, m, m) or (m, m) or (V, ), and the output has the same size. Your code should handle all cases robustly. The temperature t rescales logits before normalization.

In [7]:
def softmax(x, t=1):
    if t != 0:
        x = x / t
        x_new = x - np.max(x, axis = -1, keepdims = True)
    
        x_exp = np.exp(x_new)
    
        denominator = np.sum(x_exp, axis=-1, keepdims = True)
    
        return x_exp / denominator
        
    if t == 0:
        x_maxs = np.max(x, axis=-1, keepdims = True)
        mask = (x == x_maxs).astype(float)
        counts = np.sum(mask, axis=-1, keepdims = True)
        return mask/counts

Next, we will implement the apply_causal_mask function. The input x is of size (n_head, m, m). Remember in the lecture we discussed that the token i can only attend to the token j if 
. The input x[_][i][j] stores the raw score of how much token i attends to token j, without applying the causal mask. The output should be the same size as the input (still the the raw score), but with the causal mask applied.

In [19]:
def apply_causal_mask(x, very_negative_number=-1e9):
    n_head = x.shape[0]
    m = x.shape[-1]
    res = x.copy()

    ones = np.ones((m, m), dtype = bool)

    print(ones)

    good = np.tril(ones)

    print(good)

    good = np.repeat(good[None, :, :], n_head, axis=0)

    print(good)

    res[~good] = very_negative_number

    return res

In [20]:
x = np.ones((1, 2, 2))

res = apply_causal_mask(x)

print(res)

[[ True  True]
 [ True  True]]
[[ True False]
 [ True  True]]
[[[ True False]
  [ True  True]]]
[[[ 1.e+00 -1.e+09]
  [ 1.e+00  1.e+00]]]


You're going to implement the attention function in pure numpy! This is arguably the most complex part of this assignment. We will walk you through some details. The attention has n_head heads, and each head has d_head dimensions.

In [21]:
def attention(x, W_Q, W_K, W_V, W_O):
    
    d_model, d_head, n_head = W_Q.shape
    m = x.shape[0]

    Q = np.einsum("md,dhn->mhn", x, W_Q)
    K = np.einsum("md,dhn->mhn", x, W_K)
    V = np.einsum("md,dhn->mhn", x, W_V)

    Q = Q.transpose(2, 0, 1)
    K = K.transpose(2, 0, 1)
    V = V.transpose(2, 0, 1)

    scores = (Q @ K.transpose(0, 2, 1)) / np.sqrt(d_head)

    masked_s = apply_causal_mask(scores)

    attn_weights = softmax(masked_s)

    context = attn_weights @ V

    context = context.transpose(1, 0, 2).reshape(m, -1)

    res = context @ W_O
    return res

Next, we will implement the gelu function using tanh approximation, which many state-of-the-art LLMs use as the activation function for the MLP. Please refer to page 2 of the original GELU paper for exact equations. The input x is of size (m, d_mlp), and the output is of the same size.

In [22]:
def gelu(x):
    res = 0.5*x*(1 + np.tanh(np.sqrt(2/np.pi) * (x + 0.044715*x**3)))
    return res


Next, we will implement the mlp layer. You are expected to use gelu that you just implemented. The MLP has two layers (i.e., one hidden layer and one output layer, with gelu applied only at the hidden layer)

In [23]:
def mlp(x, W_1, b_1, W_2, b_2):
    hiddenL = x @ W_1 + b_1
    hiddenL = gelu(hiddenL)

    res = hiddenL @ W_2 + b_2

    return res


Congratulations on your achievements so far! You have all the mechanisms to implement a single transformer block. As before, our input is of size (m, d_model) and the output should have the same size.

In [24]:
from collections import namedtuple
BlockParameter = namedtuple(
    "BlockParameter",
    [
        "W_1",
        "b_1",
        "W_2",
        "b_2",
        "W_Q",
        "W_K",
        "W_V",
        "W_O"
    ]
)
    

In [26]:
def block(x, para: BlockParameter):
    x_normalized = rmsnorm(x)

    w_sum = attention(x_normalized, para.W_Q, para.W_K, para.W_V, para.W_O)

    res_connection = x + w_sum

    y_norm = rmsnorm(res_connection)

    mlp_res = mlp(y_norm, para.W_1, para.b_1, para.W_2, para.b_2)

    res_connection2 = res_connection + mlp_res

    return res_connection2


The only piece we left before a toy but functional transformer model is the unembed layer. The unembed layer is a linear projection to the vocabulary space with a unnormalized probability distribution. Specifically, the input has size (m, d_model) and the output has size (m, V). W_U has the size (d_model, V).

In [25]:
def unembed(x, W_U):
    return np.einsum("md,dv->mv", x, W_U)


We now have all the mechanisms to implement a toy but functional transformer architecture! You are expected to use embed, pos_embed, block, and unembed. The input x is a list of token IDs and is of size (m, ), and the output is a unnormalized probability distribution over the next possible token, of size (m, V). We have n_block that chains to each other, and paras is a list of BlockParameters.
Hints



In [27]:
from typing import List
def transformer(x, W_E, paras: List[BlockParameter], n_block):
    
    x_embed = embed(x, W_E)
    m, d_model = x_embed.shape

    pe = pos_embed(m, d_model)

    embeds = x_embed + pe

    for i in range(n_block):
        embeds = block(embeds, paras[i])

    logits = unembed(embeds, W_E.T)

    return logits

We will implement the self-supervised version of compute_loss. Suppose x is a list of tokens of length m (i.e., the input to the transformer function we have implemented). We already know that the transformer will output y of size (m, V): the unormalized probability of the next token. In other words, we can view the language modeling as m - 1 V-class classification problems, where the ground truth is x[i], and the predicted unormalized probability is y[i - 1], for 
. For each i, this is just a NLL loss. Your total loss should be the arithmetic mean of all the losses of m - 1 tokens. You should return a float number. All other inputs are the same as the transformer function.

In [28]:
def compute_loss(x, W_E, paras: List[BlockParameter], n_block):
    y = transformer(x, W_E, paras, n_block)
    loss = []

    gt = x[1:]
    logits = y[:-1, :]

    probs = softmax(logits)

    correct = probs[np.arange(len(gt)), gt]

    loss = -np.log(correct)

    return np.mean(loss)

Our final function is to implement topk_sample, which transforms a probability distribution into actual tokens. To make sampling high-quality and efficient, topk_sample will only consider the top k tokens, as the name suggests, where 
. In this function, you are given a prompt x of m tokens, go through the transformer function you implemented, use topk_sample to sample a new token, append that token to x, and go through this process again, until you sample n new tokens in total. You are expected to use transformer and softmax. All inputs are the same as in transformer or softmax. Note that the t here only refers to the temperature during sampling; you don't need to change the temperature inside the attention layer. You should return a numpy array of size (n, ) for new n token IDs.



In [30]:
def topk_sample(x, W_E, paras, n_block, n, k, t=1):
    ### Don't change the following line for auto-grading reasons
    rng = np.random.default_rng(42)

    res = []
    curr_seq = x.copy()

    for i in range(n):
        logits_all = transformer(curr_seq, W_E, paras, n_block)
        
        last = logits_all[-1, :]

        if t == 0:
            token = np.argmax(last)

        else:
            last /= t 

        top_k = np.argsort(last)[-k:]

        logits_top = last[top_k]

        probs = softmax(logits_top)

        sample_idx = rng.choice(k, p=probs)
        token = top_k[sample_idx]

        curr_seq = np.concatenate((curr_seq, [token]))
        res.append(token)

    return np.array(res)