Positional Encoding

In [20]:
import numpy as np
import math
import torch
import torch.nn.functional as F
def positional_encoding(max_len, d_model):
    # max_len: độ dài tối đa của 1 chuỗi
    # d_model: kích thước embedding
    pe = np.zeros((max_len, d_model))
    for pos in range(max_len):
        for i in range(0, d_model, 2):
            angle = pos / (10000 ** (i / d_model))
            # Vị trí chẵn
            pe[pos, i] = math.sin(angle)
            # Vị trí lẻ
            pe[pos, i+1] = math.cos(angle)
    return pe


In [3]:
max_len = 5
d_model = 4
pe = np.zeros((5, 4))
print(f"ma trận ban đầu:\n{pe}")
print(f"------------------")
for pos in range(max_len):
    for i in range(0, d_model, 2):
        angle = pos / (10000 ** (i / d_model))
        pe[pos][i] = math.sin(angle)
        pe[pos][i+1] = math.cos(angle)
print(f"ma trận positional encoding:\n{pe}")
print(f"--------------------------")
for i in range(max_len):
    print(f"vector PE cho vị trí thứ: {i} là:\n{pe[i]}")
    print(f"------------------------------")


ma trận ban đầu:
[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]
------------------
ma trận positional encoding:
[[ 0.          1.          0.          1.        ]
 [ 0.84147098  0.54030231  0.00999983  0.99995   ]
 [ 0.90929743 -0.41614684  0.01999867  0.99980001]
 [ 0.14112001 -0.9899925   0.0299955   0.99955003]
 [-0.7568025  -0.65364362  0.03998933  0.99920011]]
--------------------------
vector PE cho vị trí thứ: 0 là:
[0. 1. 0. 1.]
------------------------------
vector PE cho vị trí thứ: 1 là:
[0.84147098 0.54030231 0.00999983 0.99995   ]
------------------------------
vector PE cho vị trí thứ: 2 là:
[ 0.90929743 -0.41614684  0.01999867  0.99980001]
------------------------------
vector PE cho vị trí thứ: 3 là:
[ 0.14112001 -0.9899925   0.0299955   0.99955003]
------------------------------
vector PE cho vị trí thứ: 4 là:
[-0.7568025  -0.65364362  0.03998933  0.99920011]
------------------------------


Sau khi tính PE, ta cộng vào embedding ban đầu: X' = X + PE

In [3]:
# Giả sử vector X ban đầu có giá trị:
x = np.random.randn(max_len, d_model)
print(f"vector embedding của input:\n{x}")
x = x + pe
print(f"---------------------------------")
print(f"vector x khi thêm positional encoding:\n{x}")

vector embedding của input:
[[ 0.75207762  0.39295297  0.79099217  1.87580961]
 [ 0.35367123  0.5852904   1.61532946 -0.90602122]
 [ 0.74855612  0.69010519 -0.12972793 -1.72377946]
 [ 2.24647999 -1.12874016 -1.06103864 -1.01515607]
 [ 0.87578281  0.93191164 -0.57107056  0.27969836]]
---------------------------------
vector x khi thêm positional encoding:
[[ 0.75207762  1.39295297  0.79099217  2.87580961]
 [ 1.19514222  1.12559271  1.62532929  0.09392878]
 [ 1.65785354  0.27395836 -0.10972926 -0.72397945]
 [ 2.3876     -2.11873266 -1.03104314 -0.01560604]
 [ 0.11898031  0.27826802 -0.53108122  1.27889847]]


Residual and Normalization Layer

In [4]:
import torch
import torch.nn as nn
# Layer normalization
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-6):
        super().__init__()
        # gamma và beta
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps
    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased= False)

        x_hat = (x - mean) / torch.sqrt(var + self.eps)
        return self.gamma * x_hat + self.beta

In [5]:
d_model = 5
gamma1 = nn.Parameter(torch.zeros(d_model))
gamma2 = torch.zeros(d_model)
print(f"gamma1:\n{gamma1}")
print(f"gamma2:\n{gamma2}")
print(f"---------------------------")
matrix = torch.zeros(3,4)
print(matrix)

gamma1:
Parameter containing:
tensor([0., 0., 0., 0., 0.], requires_grad=True)
gamma2:
tensor([0., 0., 0., 0., 0.])
---------------------------
tensor([[0., 0., 0., 0.],
        [0., 0., 0., 0.],
        [0., 0., 0., 0.]])


In [6]:
# Tạo input gồm 2 câu batch_size = 2
batch_size = 2
seq_len = 3
d_model = 5
x = torch.randn(batch_size,seq_len, d_model)
print(x)
mean = x.mean(dim=-1, keepdim=True)
print(f"Trung bình của từng hàng: \n{mean}")
var = x.var(dim = -1, keepdim=True, unbiased=False)
print(f"Phương sai của từng hàng:\n{var}")

tensor([[[-0.7360, -1.2692, -1.3932,  1.6605, -0.7983],
         [-0.2400, -1.1686, -0.8228, -0.7886,  0.1682],
         [ 0.0861, -1.8974, -1.6468, -1.1555, -1.0670]],

        [[ 0.8839, -0.9669,  0.5603,  1.5008,  1.8502],
         [-0.0784, -0.8687, -0.4307, -2.7221, -1.1999],
         [-0.5919,  0.1197, -1.8306, -1.3393, -0.7754]]])
Trung bình của từng hàng: 
tensor([[[-0.5072],
         [-0.5704],
         [-1.1362]],

        [[ 0.7657],
         [-1.0600],
         [-0.8835]]])
Phương sai của từng hàng:
tensor([[[1.2404],
         [0.2248],
         [0.4679]],

        [[0.9549],
         [0.8357],
         [0.4416]]])


In [7]:
gamma = torch.randn(d_model)
print(f"gamma:\n{gamma}")
x_hat = (x - mean) / torch.sqrt(var + 1e-6)
print(f"Chuẩn hóa các giá trị của input:\n{x_hat}")
ans = gamma * x_hat + gamma
print(f"Đầu ra layernorm của input:\n{ans}")

gamma:
tensor([-0.1824,  0.3139,  1.1220,  0.6295,  1.2129])
Chuẩn hóa các giá trị của input:
tensor([[[-0.2054, -0.6842, -0.7955,  1.9464, -0.2614],
         [ 0.6968, -1.2618, -0.5325, -0.4604,  1.5578],
         [ 1.7869, -1.1130, -0.7466, -0.0283,  0.1010]],

        [[ 0.1210, -1.7730, -0.2101,  0.7523,  1.1098],
         [ 1.0737,  0.2092,  0.6884, -1.8182, -0.1531],
         [ 0.4388,  1.5097, -1.4253, -0.6859,  0.1627]]])
Đầu ra layernorm của input:
tensor([[[-0.1450,  0.0992,  0.2295,  1.8548,  0.8959],
         [-0.3096, -0.0822,  0.5245,  0.3397,  3.1024],
         [-0.5084, -0.0355,  0.2843,  0.6117,  1.3355]],

        [[-0.2045, -0.2427,  0.8862,  1.1031,  2.5590],
         [-0.3783,  0.3796,  1.8944, -0.5151,  1.0272],
         [-0.2625,  0.7879, -0.4772,  0.1977,  1.4102]]])


In [5]:
# Sublayer: Feed forward network: 2 hidden layer
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(d_ff, d_model)
    
    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        output = self.linear2(x)
        return output

In [9]:
d_model = 4
d_ff = 6
linear1 = nn.Linear(d_model, d_ff)
print(linear1)

Linear(in_features=4, out_features=6, bias=True)


In [11]:
dropout = nn.Dropout(0.3)
print(dropout)

Dropout(p=0.3, inplace=False)


Cài đặt Multi-head Attention

In [21]:
def softmax(x):
    # softmax theo hàng
    e_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return e_x/np.sum(e_x, axis=-1, keepdims=True)
def scaled_dot_product_attention(Q, K, V, mask=None):
    # Q: (n, d_k) - n query vectors
    # K: (m, d_k) - m key vectors
    # V: (m, d_v) - m value vectors
    # Lấy ra d_k cho bước tính căn bậc 2
    d_k = Q.size(-1)
    # Tính score
    scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32))
    # Kiểm tra điều kiện để lựa chọn masked attention/self attention
    if mask is not None:
        # mask = 0 => j <= i
        # mask = -inf => j > i
        scores = scores + mask
    # Tính trọng số attention
    weights = F.softmax(scores, dim=-1)
    # Tổ hợp tuyến tính với V
    output = np.matmul(weights, V)
    print("DEBUG - return:", output.shape, weights.shape)
    return output, weights

In [22]:
q = torch.randn(4,4)
print(q)
k = torch.randn(4,4)
v = torch.randn(4,4)
ans, weight = scaled_dot_product_attention(q, k , v, mask=None)
print(ans)

tensor([[-0.9091,  0.2061, -0.4546,  0.6367],
        [-1.2448,  0.4964,  1.6776, -0.1488],
        [ 0.5368, -0.0883,  0.4763,  0.2788],
        [-0.5657, -0.2928, -1.3897, -0.5829]])
DEBUG - return: torch.Size([4, 4]) torch.Size([4, 4])
tensor([[-1.3124, -0.7470, -0.2218, -0.5469],
        [-1.4794, -0.0801, -0.3178, -0.9315],
        [-1.3056, -0.7213, -0.4257, -1.0807],
        [-0.8558, -0.5707, -0.5549, -0.4428]])


In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        # d_model phải chia hết cho num_heads
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        # Khởi tạo ma trận trọng số
        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)
        # Ma trận trọng số được sử dụng cuối cùng
        self.W_O = nn.Linear(d_model, d_model)
    def split_heads(self, x):
        # x: (seq_len, d_model)
        # return: (num_heads, seq_len, d_k)
        seq_len = x.shape[0]
        # Tách X thành 1 mảng gồm seq_len ma trận, mỗi ma trận có num_heads hàng, d_k cột
        x = x.reshape(seq_len, self.num_heads, self.d_k)
        x = x.transpose(1, 0, 2)
        return x
    def combine_heads(self, x):
        # x: (num_heads, seq_len, d_k)
        # return: (seq_len, d_model)
        num_heads, seq_len, d_k = x.shape
        x = x.transpose(1, 0, 2).reshape(seq_len, self.d_model)
        return x
    def forward(self, Q, K, V, mask=None):
        # linear projection
        Q_proj = self.W_Q(Q)
        K_proj = self.W_K(K)
        V_proj = self.W_V(V)
        # Chia thành nhiều head
        Q_heads = self.split_heads(Q_proj)
        K_heads = self.split_heads(K_proj)
        V_heads = self.split_heads(V_proj)
        # Attention trên từng head
        head_outputs = []
        for i in range(self.num_heads):
            out, weights = scaled_dot_product_attention(Q_heads[i], K_heads[i], V_heads[i],mask=mask)
            head_outputs.append(out)
         # Chuyển đổi sang numpy
        head_outputs = np.array(head_outputs)
        #Ghép lại các head
        concat_output = self.combine_heads(head_outputs)
        #Linear cuối
        output = np.matmul(concat_output, self.W_O)
        return output


Pipeline của Encoder trong transformer

In [16]:
import numpy as np
class EncoderLayer:
    def __init__(self, d_model, d_ff, num_heads, dropout=0.1):
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model, d_ff)
        # Residual + layer Normalize (2 lần: sau MHA và sau FFN)
        self.norm1 = LayerNorm(d_model)
        self.norm2 = LayerNorm(d_model)

    def forward(self, x, mask=None):
        # self attention + residual và layer norm
        attn_ouput = self.self_attn.forward(x, x, x)
        x = torch.from_numpy(x).float()
        attn_ouput = torch.from_numpy(attn_ouput).float()
        x = self.norm1(x + attn_ouput)

        # ffn + residual và layer norm
        ff_output = self.feed_forward.forward(x)
        x = self.norm2(x + ff_output)
        return x

In [17]:
seq_len, d_model = 5, 9
d_ff = 16
num_heads = 3
# input
x = np.random.randn(seq_len, d_model)
print(f"input:\n{x}")

input:
[[-0.07509377 -0.3698525  -0.22432049  1.0959547   0.01523754  0.96403111
   0.02157748  0.4108375   0.25443107]
 [ 0.0676794   1.01323695 -0.93309202  2.40139986 -0.14957959  0.28381189
   0.5762631   0.10601228  1.40813583]
 [-0.03030701 -0.21323273  0.35084385  1.61485775 -1.21254393  0.26825369
  -1.2500518   0.10461103  0.92282904]
 [ 0.95577117  0.33804922  0.44740942 -2.00273896 -1.72381536  0.29851096
   0.90978136  2.21275475  1.40961909]
 [-1.76709122 -0.60497435 -0.38778948 -0.54013972  1.06609982  0.28589675
  -1.77708134  0.40230019 -1.2294474 ]]


In [18]:
# encoder layer
encoder_layer = EncoderLayer(d_model, d_ff, num_heads=num_heads)
# Forward
output = encoder_layer.forward(x)
print(f"output:\n{output}")

DEBUG - return: (5, 3) (5, 5)
DEBUG - return: (5, 3) (5, 5)
DEBUG - return: (5, 3) (5, 5)
output:
tensor([[ 1.1378, -0.0626, -1.1250,  0.7099,  0.2746, -0.5009, -0.4574, -1.6307,
          1.6544],
        [ 0.3400,  0.6331,  0.8598,  1.0983, -1.5449, -1.0693, -0.7974,  1.3024,
         -0.8221],
        [ 1.5158,  0.8121, -1.7491, -0.2766, -0.0488,  0.5146, -1.5276,  0.2849,
          0.4747],
        [ 1.3896,  0.7909, -1.0754,  0.0485, -0.4628,  0.2025, -2.0191,  0.1638,
          0.9621],
        [ 1.1174, -0.2628, -0.7621, -0.6514,  1.1118, -0.6572,  0.5800, -1.7477,
          1.2721]], grad_fn=<AddBackward0>)


Decoder

In [None]:
# Masked Attention
import torch
import torch.nn.functional as F
def generate_subsequent_mask(seq_len):
    # Tạo mask tam giác dưới: (seq_len, seq_len)
    # 1. Tạo ma trận toàn 1 (seq_len, seq_len)
    mask = torch.triu(torch.ones(seq_len, seq_len), diagnol= 1)
    # 2. Các phần tử bằng 1 được thay bằng -inf
    mask = mask.masked_fill(mask == 1, float('-inf'))
    return mask

In [24]:
d_k, d_v = 4, 8
batch_size, seq_len = 2, 5
Q = torch.randn(batch_size, seq_len, d_k)
print(f"ma trận Q:\n{Q}")
print(f"size of Q: {Q.size()}")
K = torch.randn(batch_size, seq_len, d_k)
V = torch.randn(batch_size, seq_len, d_v)
weights = F.softmax(Q, dim= -1)
print(f"Softmax theo từng hàng của ma trận Q:\n{weights}")


ma trận Q:
tensor([[[ 2.0861,  0.9458,  0.1764, -0.4620],
         [ 0.6028, -1.5721, -0.3679,  0.2793],
         [-0.9556, -0.5626, -2.6054,  1.1850],
         [ 0.2986,  2.6248, -1.9058,  1.1733],
         [-1.5534, -0.9213, -0.5887,  0.7975]],

        [[ 0.2087, -1.1785,  0.2484,  0.4529],
         [ 1.6350,  0.8900, -1.3365, -0.3949],
         [ 2.0959,  0.0076,  0.4040, -0.6172],
         [ 0.4017,  0.3683,  1.4506,  0.1693],
         [-0.9754,  0.3130,  0.6612, -0.0763]]])
size of Q: torch.Size([2, 5, 4])
Softmax theo từng hàng của ma trận Q:
tensor([[[0.6468, 0.2068, 0.0958, 0.0506],
         [0.4512, 0.0513, 0.1709, 0.3265],
         [0.0895, 0.1325, 0.0172, 0.7608],
         [0.0727, 0.7448, 0.0080, 0.1744],
         [0.0625, 0.1176, 0.1640, 0.6559]],

        [[0.2804, 0.0700, 0.2917, 0.3579],
         [0.6034, 0.2864, 0.0309, 0.0793],
         [0.7276, 0.0901, 0.1340, 0.0483],
         [0.1781, 0.1723, 0.5084, 0.1412],
         [0.0818, 0.2968, 0.4203, 0.2011]]])


In [31]:
matrix = torch.ones(seq_len, seq_len)
mask = torch.triu(matrix, diagonal= 1)
print(f"mask:\n{mask}")
mask = mask.masked_fill(mask == 1, float('-inf'))
print(f"mask sau thay đổi:\n{mask}")

mask:
tensor([[0., 1., 1., 1., 1.],
        [0., 0., 1., 1., 1.],
        [0., 0., 0., 1., 1.],
        [0., 0., 0., 0., 1.],
        [0., 0., 0., 0., 0.]])
mask sau thay đổi:
tensor([[0., -inf, -inf, -inf, -inf],
        [0., 0., -inf, -inf, -inf],
        [0., 0., 0., -inf, -inf],
        [0., 0., 0., 0., -inf],
        [0., 0., 0., 0., 0.]])


In [39]:
# Kiểm tra với input cụ thể
text = ["I", "go", "to", "the", "park", "near", "my", "house"]
c2i = {ch:i for i, ch in enumerate(text)}
print(c2i)
i2c = {i:ch for ch, i in c2i.items()}
print(i2c)
seq_len = len(text)
d_model = 6
embedding_matrix = torch.randn(seq_len, d_model)
print(f"Ma trận embedding của input:\n{embedding_matrix}")

{'I': 0, 'go': 1, 'to': 2, 'the': 3, 'park': 4, 'near': 5, 'my': 6, 'house': 7}
{0: 'I', 1: 'go', 2: 'to', 3: 'the', 4: 'park', 5: 'near', 6: 'my', 7: 'house'}
Ma trận embedding của input:
tensor([[-0.8859,  0.2058,  1.1791,  0.0449,  0.4983,  1.0668],
        [ 0.8939, -0.5181, -0.9900, -0.1671, -1.0639,  0.6621],
        [ 0.4418,  1.5516, -0.4362,  1.4317, -0.3831,  0.9559],
        [-1.2453, -0.5026,  0.1953, -1.1089, -1.2330, -0.6130],
        [ 1.5907,  0.6140, -1.6030, -1.1067,  1.7049, -0.2767],
        [ 0.1596, -0.2986,  1.3326, -0.0433, -0.0813,  0.1333],
        [ 2.3909,  0.7044,  0.1906, -0.0279,  0.8173, -0.7082],
        [-1.3154,  1.1149,  0.6866, -0.2226,  1.6634, -0.6156]])


In [40]:
d_k = 8
W_Q = torch.randn(d_model, d_k)
W_K = torch.randn(d_model, d_k)
W_V = torch.randn(d_model, d_v)
# Ma trận Q, K, V
Q = torch.matmul(embedding_matrix, W_Q)
K = torch.matmul(embedding_matrix, W_K)
V = torch.matmul(embedding_matrix, W_V)

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads) # masked attention
        self.enc_dec_attn = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)
    
    # self_mask -> masked attention
    # enc_mask -> self attention
    def forward(self, x, enc_out, self_mask=None, enc_mask=None):
        # x: decoder input embeddings (B, T_dec, d_model)
        # enc_out: encoder outputs (B, T_enc, d_model)
        
        # 1, masked self-attention (queries=keys=values = x)
        residual = x
        # sublayer
        attn_out, self_attn_map = self.self_attn(x, x, x, self_mask)
        x = residual + self.dropout(attn_out)
        x = self.norm1(x)
        # 2, encoder-decoder attention (queries from x, keys/values from enc_out)
        residual = x
        attn_out2, encdec_attn_map = self.enc_dec_attn(x, enc_out, enc_out, enc_mask)
        x = residual + self.dropout(attn_out2)
        x = self.norm2(x)
        # 3, feed-forward
        residual = x
        ff_out = self.ff(x)
        x = residual + self.dropout(ff_out)
        x = self.norm3(x)

        return x, self_attn_map, encdec_attn_map
    

NameError: name 'nn' is not defined

In [None]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, max_len = 10, dropout=0.1):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Embedding(max_len, d_model)
        # Tạo num_layers lớp Decoderlayer
        layers = []
        for i in range(num_layers):
            layers.append(DecoderLayer(d_model=d_model, num_heads=num_heads, d_ff=d_ff, dropout=dropout))
        self.layers = nn.ModuleList(layers)

        self.norm = nn.LayerNorm(d_model)
        self.d_model = d_model
    
    def forward(self, tgt_tokens, enc_out, tgt_mask=None, enc_mask=None):
        # tgt_tokens: (B, T_dec) token ids
        B, T = tgt_tokens.size() # Batch, T tokens
        # Positinal encoding
        positions = torch.arange(0, T, device=tgt_tokens.device).unsqueeze(0).expand(B, T)
        x = self.token_emb(tgt_tokens) * math.sqrt(self.d_model) + self.pos_emb(positions)

        attn_maps = {"masked_self": [], "encdec": []}
        for layer in self.layers:
            x, masked_map, encdec_map = layer(x, enc_out, self_mask=tgt_mask, enc_mask=enc_mask)
            attn_maps["masked_self"].append(masked_map)
            attn_maps["encdec"].append(encdec_map)
        # Chuẩn hóa trước khi đưa vào linear + softmax để sinh từ.
        x = self.norm(x)