In [1]:
import numpy as np

In [2]:
def softmax(x):
    e_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return e_x / np.sum(e_x, axis=-1, keepdims=True)


In [3]:
def relu(x):
    return np.maximum(0, x)

In [4]:
class Linear:
    def __init__(self, input_dim, output_dim):
        self.weights = np.random.randn(input_dim, output_dim) * np.sqrt(2.0 / input_dim)
        self.biases = np.zeros(output_dim)

    def forward(self, x):
        return np.dot(x, self.weights) + self.biases

In [5]:
class TokenEmbedding:
    def __init__(self, vocab_size, d_model):
        self.embedding_matrix = np.random.randn(vocab_size, d_model)

    def forward(self, token_ids):
        return self.embedding_matrix[token_ids]

In [6]:
class PositionalEncoding:
    def __init__(self, d_model, max_seq_len=5000):
        pe = np.zeros((max_seq_len, d_model)) # positional encoding matrix
        position = np.arange(0, max_seq_len, dtype=np.float32).reshape(-1, 1)
        div_term = np.exp(np.arange(0, d_model, 2).astype(np.float32) * -(np.log(10000.0) / d_model))

        pe[:, 0::2] = np.sin(position * div_term)
        pe[:, 1::2] = np.cos(position * div_term)

        self.pe = pe[np.newaxis, ...]

    def forward(self, x):
        return x + self.pe[:, :x.shape[1], :]

In [7]:
class LayerNormalization:
    def __init__(self, d_model, epsilon=1e-5):
        self.gamma = np.ones(d_model)  # learnable scale parameter
        self.beta = np.zeros(d_model)   # learnable shift parameter
        self.epsilon = epsilon

    def forward(self, x):
        mean = np.mean(x, axis=-1, keepdims=True)
        std = np.std(x, axis=-1, keepdims=True)
        return self.gamma * (x - mean) / (std + self.epsilon) + self.beta

In [8]:
def scaled_dot_product_attention(q, k, v, mask=None):
    d_k = q.shape[-1]
    scores = np.matmul(q, k.swapaxes(-2, -1)) / np.sqrt(d_k)

    if mask is not None:
        scores += mask

    attention_weights = softmax(scores)
    output = np.matmul(attention_weights, v)
    return output, attention_weights

In [9]:
class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        self.W_q = Linear(d_model, d_model)
        self.W_k = Linear(d_model, d_model)
        self.W_v = Linear(d_model, d_model)
        self.W_o = Linear(d_model, d_model)

    def split_heads(self, x, batch_size):
        x = x.reshape(batch_size, -1, self.num_heads, self.d_k)
        return x.transpose(0, 2, 1, 3)

    def forward(self, q, k, v, mask=None):
        batch_size = q.shape[0]

        q = self.W_q.forward(q)
        k = self.W_k.forward(k)
        v = self.W_v.forward(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        attention_output, _ = scaled_dot_product_attention(q, k, v, mask)

        attention_output = attention_output.transpose(0, 2, 1, 3)
        concatenated_output = attention_output.reshape(batch_size, -1, self.d_model)

        output = self.W_o.forward(concatenated_output)
        return output

In [10]:
class FeedForwardNetwork:
    def __init__(self, d_model, d_ff):
        self.linear1 = Linear(d_model, d_ff)
        self.linear2 = Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2.forward(relu(self.linear1.forward(x)))

In [11]:
class TransformerDecoderBlock:
    def __init__(self, d_model, num_heads, d_ff):
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForwardNetwork(d_model, d_ff)
        self.ln1 = LayerNormalization(d_model)
        self.ln2 = LayerNormalization(d_model)

    def forward(self, x, mask):
        """
        x shape: (batch_size, seq_len, d_model)
        """
        # Multi-Head Attention
        ln_x = self.ln1.forward(x)
        attention_output = self.mha.forward(ln_x, ln_x, ln_x, mask)
        x = x + attention_output

        # Feed-Forward Network
        ln_x = self.ln2.forward(x)
        ffn_output = self.ffn.forward(ln_x)
        x = x + ffn_output

        return x

In [12]:
class DecoderOnlyTransformer:
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_seq_len):
        self.embedding = TokenEmbedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
        self.decoder_blocks = [TransformerDecoderBlock(d_model, num_heads, d_ff) for _ in range(num_layers)]
        self.final_ln = LayerNormalization(d_model)
        self.output_layer = Linear(d_model, vocab_size)

    def forward(self, x):
        batch_size, seq_len = x.shape

        # Causal mask
        mask = np.triu(np.ones((seq_len, seq_len)), k=1) * -1e9
        mask = mask.reshape(1, 1, seq_len, seq_len)

        # Input Embedding + Positional Encoding
        x = self.embedding.forward(x)
        x = self.pos_encoding.forward(x)

        for block in self.decoder_blocks:
            x = block.forward(x, mask)

        x = self.final_ln.forward(x)
        logits = self.output_layer.forward(x)

        return logits

In [15]:
# Simple Test Case (Bukti Uji Sederhana)
if __name__ == '__main__':
    # Hyperparameters
    vocab_size = 1000  # Size of our vocabulary
    d_model = 512      # Embedding dimension
    num_heads = 8      # Number of attention heads
    d_ff = 2048        # Hidden dimension of the FFN
    num_layers = 6     # Number of decoder blocks
    max_seq_len = 100  # Maximum sequence length

    batch_size = 4
    seq_len = 20

    print("Transformer Model Test")
    print(f"Hyperparameters: d_model={d_model}, num_heads={num_heads}, num_layers={num_layers}\n")

    model = DecoderOnlyTransformer(
        vocab_size=vocab_size,
        d_model=d_model,
        num_heads=num_heads,
        d_ff=d_ff,
        num_layers=num_layers,
        max_seq_len=max_seq_len
    )

    dummy_input = np.random.randint(0, vocab_size, size=(batch_size, seq_len))
    print(f"Input token IDs shape: {dummy_input.shape} (batch_size, seq_len)")

    logits = model.forward(dummy_input)
    print(f"Output logits shape: {logits.shape} (batch_size, seq_len, vocab_size)")

    last_token_logits = logits[:, -1, :]
    next_token_probs = softmax(last_token_logits)
    print(f"Probabilities for next token shape: {next_token_probs.shape} (batch_size, vocab_size)")

    prob_sums = np.sum(next_token_probs, axis=-1)
    with np.printoptions(precision=20, suppress=True):
        print(f"Sum of probabilities for each batch item: {prob_sums}")
    assert np.allclose(prob_sums, 1.0), "Probabilities should sum to 1"
    print("\nOutput dimensions are correct and softmax is valid.")

    print("\nCausal Masking Check")
    q_test = k_test = v_test = np.random.rand(1, 4, 32) # (batch, seq_len, d_k)
    test_seq_len = q_test.shape[1]
    causal_mask = np.triu(np.ones((test_seq_len, test_seq_len)), k=1) * -1e9

    _, attention_weights = scaled_dot_product_attention(q_test, k_test, v_test, causal_mask)
    print("Attention weights matrix for a single head (shape {}):".format(attention_weights.shape))
    print(np.round(attention_weights.squeeze(), 2))
    print("\nThe upper triangle is all zeros, proving the mask works.")

Transformer Model Test
Hyperparameters: d_model=512, num_heads=8, num_layers=6

Input token IDs shape: (4, 20) (batch_size, seq_len)
Output logits shape: (4, 20, 1000) (batch_size, seq_len, vocab_size)
Probabilities for next token shape: (4, 1000) (batch_size, vocab_size)
Sum of probabilities for each batch item: [1. 1. 1. 1.]

Output dimensions are correct and softmax is valid.

Causal Masking Check
Attention weights matrix for a single head (shape (1, 4, 4)):
[[1.   0.   0.   0.  ]
 [0.39 0.61 0.   0.  ]
 [0.29 0.26 0.45 0.  ]
 [0.22 0.19 0.27 0.33]]

The upper triangle is all zeros, proving the mask works.
