In [2]:
import numpy as np

np.set_printoptions(suppress=True)

In [4]:
def positional_encoding(embeddings: np.array, d_model: int = 4, verbose: bool = False) -> np.array:
    mat = np.zeros_like(a=embeddings, dtype=embeddings.dtype)
    for pos, embedding in enumerate(embeddings):
        if verbose:
            print(f"\nEmbedding {embedding}")
        # for i, _ in enumerate(embedding): 
        # Pos encoding should be word agnostic and only looking at the 
        # position in the embedding matrix pos and dimension inside each embedding i.
        for i in range(d_model):
            func = np.sin if not i % 2 else np.cos
            mat[pos][i] = func(pos / 10000 ** ((2 * i) / d_model))

            # Copilot generated print horror for sanity check
            if verbose:
                print(
                    f"i = {i} ({'even' if i % 2 == 0 else 'odd'}): PE({pos},{i}) = sin({pos} / 10000^({2 * i} / {d_model})) = sin({pos / 10000 ** ((2 * i) / d_model)}) = {func(pos / 10000 ** ((2 * i) / d_model))}"
                )
    return mat

In [6]:
sentence: str = "Hello World"
# The original transformer method used a vector of size 512, I will do 4.
# The embedding is random for now and I will hardcode a matrix. I might apply word2vec or GloVe later.
E = np.array([
    [1,2,3,4],
    [2,3,4,5]
], dtype=np.float64)

In [9]:
E + positional_encoding(E)

array([[1.        , 3.        , 3.        , 5.        ],
       [2.84147098, 3.99995   , 4.0001    , 6.        ]])

The decoder block receives two inputs: the output of the encoder and the generated output sequence. The output of the encoder is the representation of the input sequence. During inference, the generated output sequence starts with a special start-of-sequence token (SOS). During training, the target output sequence is the actual output sequence, shifted by one position. This will be clearer soon!

The decoder is autoregressive, that means that the decoder will take the previously generated tokens and again generate the second token.
- Iteration 1: Input is SOS, output is “hola”
- Iteration 2: Input is SOS + “hola”, output is “mundo”
- Iteration 3: Input is SOS + “hola” + “mundo”, output is EOS

Here, SOS is the start-of-sequence token and EOS is the end-of-sequence token. The decoder will stop when it generates the EOS token. It generates one token at a time. Note that all iterations use the embedding generated by the encoder.

This autoregressive design makes decoder slow. The encoder is able to generate its embedding in a single forward pass while **the decoder needs to do many forward passes**. This is one of the reasons why architectures that only use the encoder (such as BERT or sentence similarity models) are much faster than decoder-only architectures (such as GPT-2 or BART).

## Text embedding and positional encoding
The first text of the decoder is to embed the input tokens. The input token is SOS, so we’ll embed it. We’ll use the same embedding dimension as the encoder. Let’s assume the embedding vector for SOS is the following:

In [14]:
E = np.array([[1, 0, 0, 0]], dtype=np.float64)
E

array([[1., 0., 0., 0.]])

In [15]:
positional_encoding(E)

array([[0., 1., 0., 1.]])

In [17]:
E = E + positional_encoding(E)
E

array([[1., 2., 0., 2.]])

## Self-attention

In [18]:
d_embedding = 4
d_key = d_value = d_query = 4
d_feed_forward = 8
n_attention_heads = 2
epsilon = 1e-6
gamma = 1
beta = 0

def softmax(mat: np.array) -> np.array:
    return np.exp(mat) / np.sum(a=np.exp(mat), axis=1, keepdims=True)


def attention(E: np.array, WQ: np.array, WK: np.array, WV: np.array) -> np.array:
    Q = E @ WQ
    K = E @ WK
    V = E @ WV

    return softmax((Q @ K.T) / np.sqrt(K.shape[1])) @ V


def multihead_attention(x, WQs, WKs, WVs) -> np.array:
    attentions = np.concatenate(
        [attention(x, WQ, WK, WV) for WQ, WK, WV in zip(WQs, WKs, WVs)], axis=1
    )
    W = np.random.randn(n_attention_heads * d_value, d_embedding)
    return attentions @ W


def relu(x: np.array) -> np.array:
    return np.maximum(0, x)


def feed_forward(
    x: np.array, W1: np.array, W2: np.array, b1: np.array, b2: np.array
) -> np.array:
    return relu(x @ W1 + b1) @ W2 + b2


def layer_norm(x: np.array, epsilon: float=1e-6) -> np.array:
    mean = x.mean(axis=1, keepdims=True)
    # var = x.var(axis=1, keepdims=True)  # NOTE: variance is the standard deviation squared
    # To lower the amount of calcuations (avoid calc. var and then sqrt in the denominator) we calculate the std dev
    std_dev = x.std(axis=1, keepdims=True)
    return (x - mean) / std_dev + epsilon * gamma + beta

In [19]:
WQs = [np.random.randn(d_embedding, d_query) for _ in range(n_attention_heads)]
WKs = [np.random.randn(d_embedding, d_key) for _ in range(n_attention_heads)]
WVs = [np.random.randn(d_embedding, d_value) for _ in range(n_attention_heads)]

Z_self_attention = multihead_attention(E, WQs, WKs, WVs)
Z_self_attention

array([[-21.50369539,   7.5471242 ,  -5.23365504,  -3.92245733]])



Things are quite simple for inference. For training, things are a bit tricky. During training, we use unlabeled data: just a bunch of text data, frequentyl scraped from the web. While the encoder’s goal is to capture all information of the input, the decoder’s goal is to predict the most likely next token. This means that the decoder can only use the tokens that have been generated so far (it cannot cheat and see the next tokens).

Because of this, we use masked self-attention: **we mask the tokens that have not been generated yet. This is done by setting the attention scores to -inf. This is done in the original paper (section 3.2.3.1)**. We’ll skip this for now, but it’s important to keep in mind that the decoder is a bit more complex during training.


## Residual connection and normalization


In [20]:
Z_self_attention = layer_norm(Z_self_attention)
Z_self_attention

array([[-1.51918584,  1.28731024,  0.05260473,  0.17927487]])

## Encoder-decoder attention

In the self-attention mechanism, we calculate the queries, keys, and values from the input embedding.


In the encoder-decoder attention, we calculate the queries from the previous decoder layer and the keys and values from the encoder output! All the math is the same as before; the only difference is what embedding to use for the queries. Let’s look at some code:

In [89]:
def encoder_decoder_attention(
    encoder_output: np.array,
    attention_input: np.array,
    WQ: np.array,
    WK: np.array,
    WV: np.array,
) -> np.array:
    """
    attention_input: the output of the previous attention!
    """
    K = encoder_output @ WK
    V = encoder_output @ WV
    Q = attention_input @ WQ

    return softmax((Q @ K.T) / np.sqrt(K.shape[1])) @ V


def multihead_encoder_decoder_attention(
    encoder_output: np.array,
    attention_input: np.array,
    WQs: np.array,
    WKs: np.array,
    WVs: np.array,
) -> np.array:
    attentions = np.concatenate(
        [
            encoder_decoder_attention(encoder_output, attention_input, WQ, WK, WV)
            for WQ, WK, WV in zip(WQs, WKs, WVs)
        ],
        axis=1,
    )

    W = np.random.randn(n_attention_heads * d_value, d_embedding)
    return attentions @ W

In [90]:
WQs = [np.random.randn(d_embedding, d_query) for _ in range(n_attention_heads)]
WKs = [np.random.randn(d_embedding, d_key) for _ in range(n_attention_heads)]
WVs = [np.random.randn(d_embedding, d_value) for _ in range(n_attention_heads)]

# We assume this is the encoder's output
encoder_output = np.array([[-1.5, 1.0, -0.8, 1.5], [1.0, -1.0, -0.5, 1.0]], dtype=np.float64)

Z_encoder_decoder = multihead_encoder_decoder_attention(
    encoder_output, Z_self_attention, WQs, WKs, WVs
)

Z_encoder_decoder

array([[4.10578179, 0.99425408, 7.74487914, 4.23812386]])

The reason is that we want the decoder to focus on the relevant parts of the input text (e.g., “hello world”). The encoder-decoder attention allows each position in the decoder to attend over all positions in the input sequence. This is very helpful for tasks such as translation, where the decoder needs to focus on the relevant parts of the input sequence. The decoder will learn to focus on the relevant parts of the input sequence by learning to generate the correct output tokens.

In [33]:
Z_encoder_decoder = layer_norm(Z_encoder_decoder + Z_self_attention)
Z_encoder_decoder

array([[ 1.49096473, -0.94517828, -0.87899475,  0.3332123 ]])

## Feed forward

In [36]:
W1 = np.random.randn(4, 8)
W2 = np.random.randn(8, 4)
b1 = np.random.randn(8)
b2 = np.random.randn(4)

output = layer_norm(feed_forward(Z_encoder_decoder, W1, W2, b1, b2) + Z_encoder_decoder)
output

array([[ 0.71906955, -1.72527844,  0.51088732,  0.49532556]])

## Encapsulating everything: The Random Decoder

In [91]:
d_embedding = 4
d_key = d_value = d_query = 4
d_feed_forward = 8
n_attention_heads = 2
epsilon = 1e-6
gamma = 1
beta = 0


def softmax(mat: np.array) -> np.array:
    return np.exp(mat) / np.sum(a=np.exp(mat), axis=1, keepdims=True)


def attention(E: np.array, WQ: np.array, WK: np.array, WV: np.array) -> np.array:
    Q = E @ WQ
    K = E @ WK
    V = E @ WV

    return softmax((Q @ K.T) / np.sqrt(K.shape[1])) @ V


def multihead_attention(x, WQs, WKs, WVs) -> np.array:
    attentions = np.concatenate(
        [attention(x, WQ, WK, WV) for WQ, WK, WV in zip(WQs, WKs, WVs)], axis=1
    )
    W = np.random.randn(n_attention_heads * d_value, d_embedding)
    return attentions @ W


def relu(x: np.array) -> np.array:
    return np.maximum(0, x)


def feed_forward(
    x: np.array, W1: np.array, W2: np.array, b1: np.array, b2: np.array
) -> np.array:
    return relu(x @ W1 + b1) @ W2 + b2


def layer_norm(x: np.array, epsilon: float = 1e-6) -> np.array:
    mean = x.mean(axis=1, keepdims=True)
    # var = x.var(axis=1, keepdims=True)  # NOTE: variance is the standard deviation squared
    # To lower the amount of calcuations (avoid calc. var and then sqrt in the denominator) we calculate the std dev
    std_dev = x.std(axis=1, keepdims=True)
    return (x - mean) / std_dev + epsilon * gamma + beta


def encoder_decoder_attention(
    attention_input: np.array,
    encoder_output: np.array,
    WQ: np.array,
    WK: np.array,
    WV: np.array,
) -> np.array:
    """
    attention_input: the output of the previous attention!
    """
    K = encoder_output @ WK
    V = encoder_output @ WV
    Q = attention_input @ WQ

    return softmax((Q @ K.T) / np.sqrt(K.shape[1])) @ V


def multihead_encoder_decoder_attention(
    encoder_output: np.array,
    attention_input: np.array,
    WQs: np.array,
    WKs: np.array,
    WVs: np.array,
) -> np.array:
    attentions = np.concatenate(
        [
            encoder_decoder_attention(attention_input, encoder_output, WQ, WK, WV)
            for WQ, WK, WV in zip(WQs, WKs, WVs)
        ],
        axis=1
    )

    W = np.random.randn(n_attention_heads * d_value, d_embedding)
    return attentions @ W


def decoder_block(
    x,
    encoder_output,
    WQs_self_attention,
    WKs_self_attention,
    WVs_self_attention,
    WQs_ed_attention,
    WKs_ed_attention,
    WVs_ed_attention,
    W1,
    b1,
    W2,
    b2,
) -> np.array:
    Z = multihead_attention(
        x, WQs_self_attention, WKs_self_attention, WVs_self_attention
    )
    Z = layer_norm(x + Z)  # LayerNorm of the Residual on attention output

    enc_dec_attn = multihead_encoder_decoder_attention(
        encoder_output, Z, WQs_ed_attention, WKs_ed_attention, WVs_ed_attention
    )
    enc_dec_attn = layer_norm(enc_dec_attn + Z)

    output = feed_forward(enc_dec_attn, W1, W2, b1, b2)

    return layer_norm(output + enc_dec_attn)  # LayerNorm of the Residual on FFN output


def random_decoder_block(x: np.array, encoder_output: np.array) -> np.array:
    WQs_self_attention = [
        np.random.randn(d_embedding, d_query) for _ in range(n_attention_heads)
    ]
    WKs_self_attention = [
        np.random.randn(d_embedding, d_key) for _ in range(n_attention_heads)
    ]
    WVs_self_attention = [
        np.random.randn(d_embedding, d_value) for _ in range(n_attention_heads)
    ]

    WQs_ed_attention = [
        np.random.randn(d_embedding, d_query) for _ in range(n_attention_heads)
    ]
    WKs_ed_attention = [
        np.random.randn(d_embedding, d_key) for _ in range(n_attention_heads)
    ]
    WVs_ed_attention = [
        np.random.randn(d_embedding, d_value) for _ in range(n_attention_heads)
    ]

    W1 = np.random.randn(d_embedding, d_feed_forward)
    b1 = np.random.randn(d_feed_forward)
    W2 = np.random.randn(d_feed_forward, d_embedding)
    b2 = np.random.randn(d_embedding)

    return decoder_block(
        x,
        encoder_output,
        WQs_self_attention,
        WKs_self_attention,
        WVs_self_attention,
        WQs_ed_attention,
        WKs_ed_attention,
        WVs_ed_attention,
        W1,
        b1,
        W2,
        b2,
    )


def decoder(x, decoder_embedding, n=6) -> np.array:
    for _ in range(n):
        x = random_decoder_block(x, decoder_embedding)
    return x

In [103]:
E = np.array([[-1.5, 1.0, -0.8, 1.5], [1.0, -1.0, -0.5, 1.0]], dtype=np.float64)
E = np.random.randn(2, 4)

# NOTE: He is getting two rows while I am getting only one. What the fuck is going on? I get as many rows as there are in E. Should this happen? 
decoder(E, encoder_output)

array([[-1.03733016,  0.43541475,  1.43082842, -0.82890901],
       [-1.03733416,  0.43540516,  1.43083342, -0.82890042]])

In [98]:
E = np.array([[1, 0, 0, 0]], dtype=np.float64)
E += positional_encoding(E)

encoder_output = np.array([[-1.5, 1.0, -0.8, 1.5], [1.0, -1.0, -0.5, 1.0]], dtype=np.float64)

decoder(E, decoder_embedding=encoder_output)

array([[ 0.34707193, -0.6623798 , -1.14447372,  1.45978559]])