In [38]:
import numpy as np
from tqdm import tqdm
import os

np.random.seed(42)

<h1>Load dataset</h1>

In [39]:
dataset_file_path = "data/The Complete Works of William Shakespeare.txt"

with open(dataset_file_path, 'r', encoding= 'utf-8') as file:
    text = file.read()

print(text[1500:1600])

chars = sorted(list(set(text)))
vocab_size = len(chars)
print(chars)
print(f'vocab_size: {vocab_size}')

# encoder and decoder
char_to_idx = {ch: idx for idx, ch in enumerate(chars)}
idx_to_char = {idx: ch for idx, ch in enumerate(chars)}

# convert dataset from text to index of char
dataset = np.array([char_to_idx[ch] for ch in text], dtype= np.long)

# save a bit of memory why not
del text

e riper should by time decease,
His tender heir might bear his memory:
But thou contracted to thine 
['\t', '\n', ' ', '!', '&', "'", '(', ')', '*', ',', '-', '.', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', ']', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'À', 'Æ', 'Ç', 'É', 'à', 'â', 'æ', 'ç', 'è', 'é', 'ê', 'ë', 'î', 'œ', '—', '‘', '’', '“', '”', '…', '\ufeff']
vocab_size: 101


## Crate Parameters and Hyperparameters

In [40]:
# hyperparameters
CONTEXT_LENGHT = 64
EMBEDDING_DIM = 32
ATTENTION_HEAD_DIM = 16
MLP_HIDDEN_DIM = 128

In [41]:
# This LLM transformer do SA --> MLP --> SA --> MLP

def init_parameters(vocab_size: int,
                    embedding_dim: int,
                    attention_head_dim: int,
                    mlp_hidden_dim: int,
                    scaling_factor: float) -> dict[str:np.array]:
    """
    Initialize all model parameters (Only for this LLM or)

    Parameters
    vocab_size:         int
    embeding_dim:       int
    attention_head_dim: int
    mlp_hidden_dim:     int
    scaling_factor:     int

    Output
    parameters: dict[str:np.array]
    parameters that contain the weight and bias matrix (np.array) of all
    LLM parameters, can be access by name (str) of parameter
    """
    parameters = {}

    # Map token index to embedding vector
    parameters['embedding'] = np.random.randn(vocab_size, embedding_dim) * scaling_factor           # (vocab_size, embed)

    # 1st SA
    parameters['W_k1'] = np.random.randn(embedding_dim, attention_head_dim) * scaling_factor        # (embed, attend)
    parameters['W_q1'] = np.random.randn(embedding_dim, attention_head_dim) * scaling_factor        # (embed, attend)
    parameters['W_Vup1'] = np.random.randn(embedding_dim, attention_head_dim) * scaling_factor      # (embed, attend)
    parameters['W_Vdown1'] = np.random.randn(attention_head_dim, embedding_dim) * scaling_factor    # (attend, embed)

    # 1st MLP
    parameters['W_mlp1_up'] = np.random.randn(embedding_dim, mlp_hidden_dim) * scaling_factor       # (embed, hidden)
    parameters['b_mlp1_up'] = np.zeros((1, mlp_hidden_dim))                                         # (1, hidden)
    parameters['W_mlp1_down'] = np.random.randn(mlp_hidden_dim, embedding_dim) * scaling_factor     # (hidden, embed)
    parameters['b_mlp1_down'] = np.zeros((1, embedding_dim))                                        # (1, embed)

    # 2nd SA
    parameters['W_k2'] = np.random.randn(embedding_dim, attention_head_dim) * scaling_factor        # (embed, attend)
    parameters['W_q2'] = np.random.randn(embedding_dim, attention_head_dim) * scaling_factor        # (embed, attend)
    parameters['W_Vup2'] = np.random.randn(embedding_dim, attention_head_dim) * scaling_factor      # (embed, attend)
    parameters['W_Vdown2'] = np.random.randn(attention_head_dim, embedding_dim) * scaling_factor    # (attend, embed)

    # 2nd MLP
    parameters['W_mlp2_up'] = np.random.randn(embedding_dim, mlp_hidden_dim) * scaling_factor       # (embed, hidden)
    parameters['b_mlp2_up'] = np.zeros((1, mlp_hidden_dim))                                         # (1, hidden)
    parameters['W_mlp2_down'] = np.random.randn(mlp_hidden_dim, embedding_dim) * scaling_factor     # (hidden, embed)
    parameters['b_mlp2_down'] = np.zeros((1, embedding_dim))                                        # (1, embed)

    # Unembedding
    parameters['unembedding'] = np.random.randn(embedding_dim, vocab_size) * scaling_factor         # (embed, vocab_size)

    return parameters

In [42]:
parameters = init_parameters(vocab_size, EMBEDDING_DIM, ATTENTION_HEAD_DIM, MLP_HIDDEN_DIM, 0.01)

In [43]:
sum_of_parameters = 0
for key, value in parameters.items():
    print(f'{key}: {value.shape}')
    sum_of_parameters += value.shape[0] * value.shape[1]
print(f'Total number of parameters: {sum_of_parameters}')
del sum_of_parameters

embedding: (101, 32)
W_k1: (32, 16)
W_q1: (32, 16)
W_Vup1: (32, 16)
W_Vdown1: (16, 32)
W_mlp1_up: (32, 128)
b_mlp1_up: (1, 128)
W_mlp1_down: (128, 32)
b_mlp1_down: (1, 32)
W_k2: (32, 16)
W_q2: (32, 16)
W_Vup2: (32, 16)
W_Vdown2: (16, 32)
W_mlp2_up: (32, 128)
b_mlp2_up: (1, 128)
W_mlp2_down: (128, 32)
b_mlp2_down: (1, 32)
unembedding: (32, 101)
Total number of parameters: 27264


# Activation function

$\text{let}\ v = \begin{bmatrix}x_1\ x_2 \ ...\ x_n\end{bmatrix}\\\text{softmax}(v) = \begin{bmatrix}\frac{e^{x_1}}{\Sigma_{i=1}^ne^{x_i}}\ \frac{e^{x_2}}{\Sigma_{i=1}^ne^{x_i}}\ ...\ \frac{e^{x_n}}{\Sigma_{i=1}^ne^{x_i}}\end{bmatrix}$

$\text{ReLU}(x) = \begin{cases}x, &x>0\\0,&x\leq0\end{cases}$

In [44]:
def softmax(matrix: np.array) -> np.array:
    """
    Compute softmax value for each row

    Input
    matrix: np.array

    Output
    matrix: np.array
    """
    # Prevent overflow but still have the same softmax value
    exp_matrix = np.exp(matrix - np.max(matrix, axis=-1, keepdims=True))
    return exp_matrix / np.sum(exp_matrix, axis=-1, keepdims=True)

def relu(matrix: np.array) -> np.array:
    """
    Compute ReLU for each value in the matrix

    Input
    matrix: np.array

    Output
    matrix: np.array
    """
    return np.maximum(0, matrix)

# Functions

In [45]:
def crate_casual_mask(seq_len: int) -> np.array:
    """
    Crate casual mask for self-attention

    Input
    seq_len:    int

    Output
    masking: np.array[[bool]]
    """
    mask = np.triu(np.ones(seq_len, seq_len))
    return mask == 0

In [46]:
def cross_entropy_loss_all_position(probability:    np.array,
                                    target_tokens:  np.array) -> float:
    """
    Map the result with prediction and return the average cross entropy loss of all position

    Input
    probability:    np.array
    target_tokens:  np.array

    Output
    average_error:  float
    """
    correct_probability = probability[np.arange(len(target_tokens)), target_tokens]

    losses = -np.log(correct_probability + 1e-8) # Add 1e-8 for numerical stability

    return np.mean(losses)

# Transformer block

In [47]:
def self_attention_block(context:   np.array,
                         W_k:       np.array,
                         W_q:       np.array,
                         W_Vup:     np.array,
                         W_Vdown:   np.array) -> tuple[np.array, np.array]:
    """
    Self-attention block

    Input
    context:   np.array,
    W_k:       np.array,
    W_q:       np.array,
    W_Vup:     np.array,
    W_Vdown:   np.array,

    Output
    tuple[output, attention_weight]
    """
    context_lenght, embedding_dim = context.shape # (context_lenght, embedding_dim)

    K = context @ W_k # K: (context_lenght, attention_head_dim)
    Q = context @ W_q # Q: (context_lenght, attention_head_dim)

    attention_scores = (Q @ K.T) / np.sqrt(Q.shape[0]) # (context_lenght, context_lenght)

    # Masking attention scores by replace the lower triangle with -inf
    causal_mask = crate_casual_mask(context_lenght)
    attention_scores = np.where(causal_mask, attention_scores, -1e9)

    attention_weights = softmax(attention_scores)

    V_up = context @ W_Vup # (context_lenght, attention_head_dim)

    attention_output = attention_weights @ V_up # (context_lenght, attention_head_dim)

    output = attention_output @ W_Vdown # (context_lenght, embedding_dim)

    return output, attention_weights

In [48]:
def mlp_block(context:  np.array,
              W_up:     np.array,
              b_up:     np.array,
              W_down:   np.array,
              b_down:   np.array) -> np.array:
    """
    MLP block

    Input
    context: np.array
    W_up:    np.array
    b_up:    np.array
    W_down:  np.array
    b_down:  np.array

    Output
    output: np.array
    """
    hidden = context @ W_up + b_up
    hidden = relu(hidden)
    output = hidden @ W_down + b_down
    return output

# Forward Pass

In [49]:
def forward_pass(tokens: np.array,
                 parameters: dict[str: np.array]) -> tuple[np.array, dict[str: np.array]]:
    """
    Complete forward pass

    Input
    tokens:     np.array
    parameters: dict[str: np.array]

    Output
    probability of each token:  np.array
    cache: dict[str: np.array]
    """

    # 1. Embedding lookup
    token = parameters['embedding'][tokens]

    cache = {'token_0': token.copy()}

    # 2. 1st SA block
    token_sa1, attention_weights1 = self_attention_block(token,
                                                         parameters['W_k1'],
                                                         parameters['W_q1'],
                                                         parameters['W_Vup1'],
                                                         parameters['W_Vdown1'])
    cache['token_sa1'] = token_sa1.copy()
    cache['attention_weights1'] = attention_weights1.copy()

    token = token + token_sa1

    # 3. 1st MLP block
    token_mlp1 = mlp_block(token,
                           parameters['W_mlp1_up'],
                           parameters['b_mlp1_up'],
                           parameters['W_mlp1_down'],
                           parameters['b_mlp1_down'])
    cache['token_mlp1'] = token_mlp1.copy()

    token = token + token_mlp1

    # 4. 2nd SA block
    token_sa2, attention_weights2 = self_attention_block(token,
                                                         parameters['W_k2'],
                                                         parameters['W_q2'],
                                                         parameters['W_Vup2'],
                                                         parameters['W_Vdown2'])
    cache['token_sa2'] = token_sa2.copy()
    cache['attention_weights2'] = attention_weights2.copy()

    token = token + token_sa2

    # 5. 2nd MLP block
    token_mlp2 = mlp_block(token,
                           parameters['W_mlp2_up'],
                           parameters['b_mlp2_up'],
                           parameters['W_mlp2_down'],
                           parameters['b_mlp2_down'])
    cache['token_mlp2'] = token_mlp2.copy()

    token = token + token_mlp2

    # 6. Prediction (unembedding)
    logits = token @ parameters['unembedding'] # (32, vocab_size)
    probability = softmax(logits)

    cache['logits'] = logits.copy()
    cache['probability'] = probability.copy()
    cache['final_embedding'] = token.copy()

    return probability, cache

# Training functions

In [50]:
def get_batch(data:             np.array,
              batch_size:       int,
              context_length:   int) -> tuple[np.array, np.array]:
    """
    Get a batch of training examples

    Input
    data:           np.array
    batch_size:     int
    context_length: int

    Output
    (contexts, targets)
    contexts:   np.array
    targets:    np.array
    """

    indices = np.random.randint(0, len(data) - context_length - 1, batch_size)

    contexts = np.zeros((batch_size, context_length), dtype=np.int32)
    targets = np.zeros(batch_size, dtype=np.int32)

    for index, data_index in enumerate(indices):
        contexts[index] = data[data_index: data_index + context_length]
        targets[index] = data[data_index + 1: data_index + context_length + 1]
    
    return contexts, targets

In [51]:
def train_step_batch(context_tokens:    np.array,
                     target_tokens:     np.array,
                     parameters:        dict[str: np.array],
                     learning_rate:     float   = 0.001) -> tuple[float, np.array]:
    """
    Perform one training step for all positions
    

    Input
    context_tokens: np.array
    target_tokens:  np.array
    parameters:     dict[str : np.array]
    learning_rateL  float

    Output
    (loss, probability)
    loss:           float
    probability:    np.array
    """
    # Forward pass yay
    probability, cache = forward_pass(context_tokens, parameters)

    # Compute loss
    loss = cross_entropy_loss_all_position(probability, target_tokens)

    IT'S NOT DONE YET

SyntaxError: unterminated string literal (detected at line 26) (894065645.py, line 26)