# Transformer 
Transformer from scratch with NumpPy

## Library

In [None]:
# run if u haven't install the library yet
!pip install numpy
!pip install matplotlib

In [None]:
import numpy as np
import matplotlib.pyplot as plt

## Token Embedding

In [None]:
# make a list of sentences
sentences = [
    "noodle is delicious",
    "i like hot rice",
    "the weather is cold",
    "that rice is cold",
    "i eat hot noodle",
    "good weather today",
    "noodle and rice are food",
    "i eat food",
    "the food is good",
    "the noodle is good"
]
sentences

In [None]:
len(sentences)

In [None]:
# get words from sentences
words = set()
for sentence in sentences:
    for word in sentence.split():
        words.add(word)

words

In [None]:
# map each words with unique id
vocab = {
    "<S>":0, # start of the sentence
    "<E>":1, # end of the sentence
    "<P>":2 # padding/empty word 
}

# map each id to word
rvocab = {
    0:"<S>", # start of the sentence
    1:"<E>", # end of the sentence
    2:"<P>" # padding/empty word 
}

for i, word in enumerate(words, start=3):
    vocab[word] = i
    rvocab[i] = word

vocab # word to id

In [None]:
rvocab # id to word

In [None]:
# fucntion to tokenize the sentence based on our vocab
def tokenize(sentence, vocab, seq_len=5):
    tokens = [vocab.get(word.lower()) for word in sentence.split()]
    return [vocab["<S>"]] + tokens + [vocab["<E>"]] + [vocab["<P>"]]*max(0, seq_len-len(tokens))

In [None]:
# tokenize each sentences
tokenized_sentence = [tokenize(sentence,vocab) for sentence in sentences]
tokenized_sentence

In [None]:
# create embedding for the token
embedding_dimension = 4
embedding_matrix = np.random.rand(len(vocab), embedding_dimension) * 0.01
embedding = embedding_matrix[tokenized_sentence]
embedding

In [None]:
embedding.shape # (batch, seq_len, d_model)

## Positional Encoding

In [None]:
# function for the sinusoidal positional encoding
def sinusoidalPositionalEncoding(seq_len, d, n=10000):
    encoding = np.zeros((seq_len, d))
    for pos in range(seq_len):
        for i in np.arange(int(d/2)):
            denominator = np.power(n, 2*i/d)
            theta = pos/denominator
            encoding[pos, 2*i] = np.sin(theta)
            encoding[pos, 2*i+1] = np.cos(theta)
    return encoding

In [None]:
# get token length & embedding dimension
token_len = len(tokenized_sentence[0])
embedding_dimension = embedding.shape[-1]

# create positional encoding based on token_len & embedding_dimension
positional_enc = sinusoidalPositionalEncoding(token_len, embedding_dimension)

# add positional encoding to embedding
embedding_with_positional_enc = embedding + positional_enc
embedding_with_positional_enc

In [None]:
embedding_with_positional_enc.shape # (batch, seq_len, d_model)

## Causal Masking

In [None]:
mask_len = embedding_with_positional_enc.shape[1]
causal_mask = np.triu(np.ones((mask_len, mask_len)) * -1e9, k=1)
causal_mask

## Scaled Dot-Product Attention

In [None]:
def softmax(x, axis=-1):
    e_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return e_x/np.sum(e_x, axis=-1, keepdims=True)

In [None]:
# test softmax with random input
for i in range(10):
    random_input = np.random.randn(5)
    softmax_output = softmax(random_input, axis=-1)
    # use np.isclose to handle floating point precision issues
    assert np.isclose(softmax_output.sum(), 1.0), "sum of all softmax output must equal to 1"
    assert softmax_output.min() >= 0 and softmax_output.max() <= 1, "softmax output must be around 0<=x<=1"
"Softmax test passed"

In [None]:
# function for scaled dot product attention
def scaled_dot_product_attention(Q, K, V, mask=None, multi=False):
    d_k = Q.shape[-1]
    scores = np.matmul(Q, (K.transpose(0, 1, 3, 2) if multi is True else K.transpose(0, 2, 1))) / np.sqrt(d_k)
    if mask is not None:
        scores+=mask
    weights = softmax(scores)
    output = np.matmul(weights, V)
    return output, weights

In [None]:
# test single head function 
W_Q = np.random.rand(4, 4)*0.01
W_K = np.random.rand(4, 4)*0.01
W_V = np.random.rand(4, 4)*0.01

Q = np.matmul(embedding_with_positional_enc, W_Q)
K = np.matmul(embedding_with_positional_enc, W_K)
V = np.matmul(embedding_with_positional_enc, W_V)

In [None]:
Q

In [None]:
K

In [None]:
V

In [None]:
Q.shape # (batch, seq_len, d_model)

In [None]:
K.shape # (batch, seq_len, d_model)

In [None]:
V.shape # (batch, seq_len, d_model)

In [None]:
# test without masking mask
single_output, single_weights = scaled_dot_product_attention(Q, K, V)
single_output

In [None]:
single_weights

In [None]:
# test with mask
single_masked_output, single_masked_weights = scaled_dot_product_attention(Q, K, V, mask=causal_mask)
single_masked_output

In [None]:
single_masked_weights

In [None]:
s_idx = 1
v_sentence = sentences[s_idx]
# plot weight before being masked
plt.figure(figsize=(8, 6))
plt.imshow(single_weights[s_idx], cmap='inferno', aspect='auto', vmin=0, vmax=1)
plt.colorbar(label='Attention Weight')
plt.xlabel('Position')
plt.ylabel('Position')
plt.title(f"Single Unmasked Attention Weights '{v_sentence}'")

plt.show()

In [None]:
# plot weight after being masked
plt.figure(figsize=(8, 6))
plt.imshow(single_masked_weights[s_idx], cmap='inferno', aspect='auto')
plt.colorbar(label='Attention Weight')
plt.xlabel('Position')
plt.ylabel('Position')
plt.title(f"Single Masked Attention Weights '{v_sentence}'")

plt.show()

In [None]:
single_masked_output.shape # (batch, seq_len, d_model)

In [None]:
single_masked_weights.shape # (batch, seq_len, d_model)

## Multi-Head Attention 

In [None]:
def split_heads(x, num_heads):
    batch_size, seq_len, d_model = x.shape
    head_dim = d_model // num_heads
    x = x.reshape(batch_size, seq_len, num_heads, head_dim)
    return x.transpose(0, 2, 1, 3)  

In [None]:
def combine_heads(x):
    batch_size, num_heads, seq_len, head_dim = x.shape
    return x.transpose(0, 2, 1, 3).reshape(batch_size, seq_len, num_heads * head_dim)

In [None]:
# function for multi head attention
def multi_head_attention(X, num_heads):
    assert num_heads>0, "number of heads must be more than 0"
    
    d_model = X.shape[-1]
    assert d_model % num_heads == 0, "input dimension must be divisible by number of heads"

    # init random Q, V, K
    W_Q = np.random.randn(d_model, d_model) * 0.01
    W_K = np.random.randn(d_model, d_model) * 0.01
    W_V = np.random.randn(d_model, d_model) * 0.01

    Q = np.matmul(X, W_Q)  # [batch, seq_len, d_model]
    K = np.matmul(X, W_K)
    V = np.matmul(X, W_V)

    # split into heads
    Q = split_heads(Q, num_heads)  # [batch, heads, seq_len, head_dim]
    K = split_heads(K, num_heads)
    V = split_heads(V, num_heads)

    # attention for each head
    attention_output, attn_weights = scaled_dot_product_attention(Q, K, V, mask=causal_mask, multi=True) # use causal mask & add multi=True to use the correct transpose function

    # combine heads
    output = combine_heads(attention_output)  

    return output, attn_weights

In [None]:
# get multi head attention output & weights based on our embedding input
heads = 2
out, weights = multi_head_attention(embedding_with_positional_enc, heads)
(out, weights)

In [None]:
out.shape # (batch, seq_len, d_model)

In [None]:
weights.shape # (batch, num_heads, seq_len, seq_len)

## Feed-Forward Network

In [None]:
class FeedForward:
    def __init__(self, d_model, d_ff=16):
        self.W1 = np.random.randn(d_model, d_ff) * 0.01
        self.b1 = np.zeros(d_ff)
        self.W2 = np.random.randn(d_ff, d_model) * 0.01
        self.b2 = np.zeros(d_model)

    def __call__(self, x):
        hidden = np.maximum(0, x @ self.W1 + self.b1)  # ReLU
        return hidden @ self.W2 + self.b2

In [None]:
ffn = FeedForward(embedding_dimension) # init the ffn

In [None]:
ffn_out = ffn(out)
ffn_out

In [None]:
ffn_out.shape # (batch, seq_len, d_model)

## Layer Normalization

In [None]:
def layer_norm(x, eps=1e-6):
    mean = np.mean(x, axis=-1, keepdims=True)
    std = np.std(x, axis=-1, keepdims=True)
    return (x - mean) / (std + eps)

## Residual Connection

In [None]:
# Residual + LayerNorm after multi-head attention
attn_residual = layer_norm(embedding_with_positional_enc + out)

# Residual + LayerNorm after feed-forward
ffn_residual = layer_norm(attn_residual + ffn_out)

ffn_residual

In [None]:
ffn_residual.shape # (batch, seq_len, d_model)

## Output Layer

In [None]:
# Suppose ffn_residual is your final transformer block output
vocab_size = len(vocab)
d_model = ffn_residual.shape[-1]
W_out = np.random.randn(d_model, vocab_size) * 0.01
b_out = np.zeros(vocab_size)

# Project to vocabulary size (logits for each token)
logits = ffn_residual @ W_out + b_out
logits.shape  # (batch, seq_len, vocab_size)

In [None]:
logits

In [None]:
# get random sentence from sentences list
sentence_idx = 2
print(f"The sentence is {sentences[sentence_idx]}")
sentence_token = tokenized_sentence[sentence_idx]

# get last word in sentence (not <P>/<E>)
idx = len(sentence_token)-1
while(sentence_token[idx] <= 2):
    idx-=1

# get the sentence that we want to predict
predicted_sentence = [rvocab[sentence_token[i]] for i in range(idx)]

# based on the sentence the word should be this
print(f"Based on the sentence, the last word of sequence {predicted_sentence} should be {rvocab[sentence_token[idx]]}")

idx-=1 # do this to exclude the last word because we want to predict what the last word are

# get the logits of the same sentence and exclude special words in vocab like (<S>, <P>, <E>)
last_logits_token = logits[sentence_idx, idx][3:]

# get the next token probability
probs = softmax(last_logits_token)

# get all word exceb special words
word_list = [rvocab[i] for i in range(3, vocab_size)]

# print the probability of each words
for i in range(len(probs)):
    print(f"Word: '{word_list[i]}' have a probability of {probs[i]}")

In [None]:
plt.figure(figsize=(10, 4))
plt.bar(word_list, probs)
plt.xlabel('Word')
plt.ylabel('Probability')
plt.title(f"Next Word Probability Distribution After '{predicted_sentence}'")
plt.xticks(rotation=45)
plt.show()