In [None]:
import numpy as np
lr = 1e-4
params = 0

In [None]:
class Head:
    def __init__(self, d_model, d_k, d_v):
        self.d_k = d_k
        self.W_k = np.random.randn(d_model, d_k) * np.sqrt(2 / d_model)
        self.W_q = np.random.randn(d_model, d_k) * np.sqrt(2 / d_model)
        self.W_v = np.random.randn(d_model, d_v) * np.sqrt(2 / d_model)
        
        global params
        params += 2 * (d_model * d_k) + (d_model * d_v)

    def softmax(self, x):
        x_max = np.max(x, axis=-1, keepdims=True)
        x_shifted = x - x_max
        exp_x = np.exp(x_shifted)
        return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

    def forward(self, inp):
        self.inp = inp

        self.K = inp @ self.W_k
        self.Q = inp @ self.W_q
        self.V = inp @ self.W_v

        self.att_kq = (self.Q @ self.K.T) / np.sqrt(self.d_k)
        self.att = self.softmax(self.att_kq)
        self.att_v = self.att @ self.V

        return self.att_v
    
    def backward(self, prev_grad):
        self.d_V = self.att @ prev_grad
        d_att = prev_grad @ self.V.T
        
        row_dot = (d_att * self.att).sum(axis=-1, keepdims=True)
        d_att_kq = (d_att - row_dot) * self.att
        
        self.d_Q = (d_att_kq @ self.K) / np.sqrt(self.d_k)
        self.d_K = (d_att_kq.T @ self.Q) / np.sqrt(self.d_k)

        d_W_k = self.inp.T @ self.d_K
        d_W_q = self.inp.T @ self.d_Q
        d_W_v = self.inp.T @ self.d_V

        self.W_k -= lr * d_W_k
        self.W_q -= lr * d_W_q
        self.W_v -= lr * d_W_v

In [None]:
class MultiHead:
    def __init__(self, d_model, d_k, d_v, num_heads):
        self.num_heads = num_heads
        self.d_v = d_v
        self.d_k = d_k
        self.d_model = d_model
        self.heads = [Head(d_model, d_k, d_v) for _ in range(num_heads)]
        self.W_o = np.random.randn(num_heads * d_v, d_model) * np.sqrt(2 / (num_heads * d_v))
        
        global params
        params += (num_heads * d_v * d_model)

    def get_d_X(self):
        self.d_X = np.zeros((self.L, self.d_model))
        for head in self.heads:
            self.d_X += (head.d_Q @ head.W_q.T) + (head.d_K @ head.W_k.T) + (head.d_V @ head.W_v.T)
        return self.d_X
    
    def forward(self, x):
        self.L = x.shape[0]
        self.head_outputs = np.zeros((self.L, self.num_heads * self.d_v))
        for idx, head in enumerate(self.heads):
            self.head_outputs[:, idx*self.d_v : (idx+1)*self.d_v] = head.forward(x)
        self.out = self.head_outputs @ self.W_o
        return self.out

    def backward(self, prev_grad):
        d_W_o = self.head_outputs.T @ prev_grad
        d_head_outputs = prev_grad @ self.W_o.T
        for idx, head in enumerate(self.heads):
            head.backward(d_head_outputs[:, idx*self.d_v : (idx+1)*self.d_v])
        self.W_o -= lr * d_W_o
        d_X = self.get_d_X()
        return d_X

In [None]:
class Layer:
    def __init__(self, d_model, d_k, d_v, num_heads, d_ff):
        self.multi_head = MultiHead(d_model, d_k, d_v, num_heads)
        self.W_1 = np.random.randn(d_model, d_ff) * np.sqrt(2 / d_model)
        self.W_2 = np.random.randn(d_ff, d_model) * np.sqrt(2 / d_ff)
        self.b_1 = np.zeros((d_ff,))
        self.b_2 = np.zeros((d_model,))

        global params
        params += 2 * (d_model * d_ff) + d_ff + d_model
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def layer_norm(self, x):
        mean = x.mean(axis=-1, keepdims=True)
        std = x.std(axis=-1, keepdims=True)
        return (x - mean) / std
    
    def forward(self, x):
        self.m_att = self.multi_head.forward(x)
        

        self.ff_1 = (self.m_att @ self.W_1) + self.b_1
        self.ff_1_dash = self.relu(self.ff_1)
        self.ff_2 = (self.ff_1_dash @ self.W_2) + self.b_2
        
        return self.ff_2
    
    def backward(self, prev_grad):
        d_W_2 = self.ff_1_dash.T @ prev_grad
        d_b_2 = np.sum(prev_grad, axis=0)
        d_ff_1_dash = prev_grad @ self.W_2.T
        relu_mask = (self.ff_1 > 0).astype(int)
        d_ff_1 = d_ff_1_dash * relu_mask

        d_W_1 = self.m_att.T @ d_ff_1
        d_b_1 = np.sum(d_ff_1, axis=0)

        d_m_att = d_ff_1 @ self.W_1.T
        d_X = self.multi_head.backward(d_m_att)

        self.W_1 -= lr * d_W_1
        self.b_1 -= lr * d_b_1
        self.W_2 -= lr * d_W_2
        self.b_2 -= lr * d_b_2

        return d_X

In [None]:
class Transformer:
    def __init__(self, d_model, d_k, d_v, num_heads, d_ff, num_layers, d_vocab):
        self.layers = [Layer(d_model, d_k, d_v, num_heads, d_ff) for _ in range(num_layers)]
        self.W_emb = np.random.randn(d_vocab, d_model) * np.sqrt(2 / d_vocab)
        self.W_cls = np.random.randn(d_model, 1) * np.sqrt(2 / d_model)
        self.b_cls = np.zeros((1,))
        self.d_vocab = d_vocab
        self.d_model = d_model

        global params
        params += (d_vocab * d_model) + d_model + 1
    
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def one_hot(self, x):
        one_hot = np.zeros((self.L, self.d_vocab), dtype=np.float32)
        one_hot[np.arange(self.L), x] = 1.0
        return one_hot

    def pos_enc(self, x):
        PE = np.zeros((self.L, self.d_model), dtype=np.float32)
        position = np.arange(self.L)[:, np.newaxis]            
        div_term = np.exp(np.arange(0, self.d_model, 2) * -(np.log(10000.0) / self.d_model)) 

        PE[:, 0::2] = np.sin(position * div_term) 
        PE[:, 1::2] = np.cos(position * div_term)
        return x + PE
    
    def add_cls_vec(self, x):
        arr = np.zeros((x.shape[0] + 1, x.shape[1]))
        self.L += 1
        arr[1:, :] = x
        return arr
    
    def forward(self, x):
        self.L = x.shape[0]
        x = self.one_hot(x)
        x = self.add_cls_vec(x)
        self.one_hot_x = x
        x = x @ self.W_emb
        x = self.pos_enc(x)
        for layer in self.layers:
            x = layer.forward(x)
        self.cls = x[0, :]
        out = (self.cls @ self.W_cls) + self.b_cls
        return self.sigmoid(out)
    
    def backward(self, output, label):
        d_out = np.array([output - label])
        self.cls = np.expand_dims(self.cls, axis=0)
        d_W_cls = self.cls.T @ d_out
        d_b_cls = d_out.reshape(1,)

        d_cls = np.zeros((self.L, self.d_model))
        d_cls_vec = d_out @ self.W_cls.T
        d_cls[0, :] = d_cls_vec

        prev_grad = d_cls
        for layer in reversed(self.layers):
            prev_grad = layer.backward(prev_grad)
            
        d_W_emb = self.one_hot_x.T @ prev_grad

        self.W_emb -= lr * d_W_emb
        self.W_cls -= lr * d_W_cls
        self.b_cls -= lr * d_b_cls

    def fit(self, epochs, train_dataset):
        for epoch in range(epochs):
            loss = 0
            for example, label in zip(train_dataset.data, train_dataset.labels):
                pred = self.forward(example)
                loss += -(label * np.log(pred) + (1 - label) * np.log(1 - pred))
                self.backward(pred, label)
            print('loss:', loss / len(train_dataset))
    
    def predict(self, test_dataset):
        correct = 0
        for example, label in zip(test_dataset.data, test_dataset.labels):
            pred = self.forward(example)
            if pred > 0.5:
                pred = 1
            else:
                pred = 0
            
            if pred == label:
                correct += 1
        
        print(correct / len(test_dataset))


In [None]:
class ContainsTokenDataset:
    def __init__(self, vocab_size=20, seq_len=10, special_token=7, n_samples=1000):
        self.vocab_size = vocab_size
        self.seq_len = seq_len
        self.special_token = special_token

        labels = np.random.randint(0, 2, size=n_samples, dtype=np.int32)

        data = np.random.randint(0, vocab_size, size=(n_samples, seq_len), dtype=np.int32)

        for i, label in enumerate(labels):
            if label == 1:
                pos = np.random.randint(0, seq_len)
                data[i, pos] = special_token
            else:
                data[i, data[i] == special_token] = (special_token + 1) % vocab_size

        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]


d_vocab = 48
train_dataset = ContainsTokenDataset(vocab_size=d_vocab, seq_len=32, special_token=7, n_samples=1000)
test_dataset = ContainsTokenDataset(vocab_size=d_vocab, seq_len=32, special_token=7, n_samples=250)

In [None]:
d_model = 64
d_k = 16
d_v = 16
d_ff = 64
num_heads = 4
num_layers = 3
lr = 5e-4

t = Transformer(d_model, d_k, d_v, num_heads, d_ff, num_layers, d_vocab)
print(params)
t.fit(10, train_dataset)
t.predict(test_dataset)