<a href="https://colab.research.google.com/github/anhnguyenvv/NLP-A/blob/main/lab_2_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

- Nguyễn Thị Lan Anh
- MSSV: 21120198

# Lab 2: Coding Attention Mechanisms



Bài tập 1: Hiện thực cơ chế Attention theo hướng dẫn dưới đây


In [None]:
#!pip install tiktoken


In [None]:
from importlib.metadata import version
import tiktoken
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch

print("torch version:", version("torch"))

torch version: 2.2.1+cu121


In [None]:

class GPTDatasetV1(Dataset):
    def __init__(self, txt, tokenizer, max_length, stride):
        self.tokenizer = tokenizer
        self.input_ids = []
        self.target_ids = []

        # Tokenize the entire text
        token_ids = self.tokenizer.encode(txt, allowed_special={'<|endoftext|>'})

        # Use a sliding window to chunk the book into overlapping sequences of max_length
        for i in range(0, len(token_ids) - max_length, stride):
            input_chunk = token_ids[i:i + max_length]
            target_chunk = token_ids[i + 1: i + max_length + 1]
            self.input_ids.append(torch.tensor(input_chunk))
            self.target_ids.append(torch.tensor(target_chunk))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.target_ids[idx]


def create_dataloader(txt, batch_size=4, max_length=256, stride=128, shuffle=True):
    # Initialize the tokenizer
    tokenizer = tiktoken.get_encoding("gpt2")

    # Create dataset
    dataset = GPTDatasetV1(txt, tokenizer, max_length, stride)

    # Create dataloader
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

    return dataloader


with open("small-text-sample.txt", "r", encoding="utf-8") as f:
    raw_text = f.read()

tokenizer = tiktoken.get_encoding("gpt2")
encoded_text = tokenizer.encode(raw_text)

vocab_size = 50257
output_dim = 256
max_len = 1024
context_length = max_len


token_embedding_layer = nn.Embedding(vocab_size, output_dim)
pos_embedding_layer = torch.nn.Embedding(context_length, output_dim)

max_length = 4
dataloader = create_dataloader(raw_text, batch_size=8, max_length=max_length, stride=max_length)

for batch in dataloader:
    x, y = batch

    token_embeddings = token_embedding_layer(x)
    pos_embeddings = pos_embedding_layer(torch.arange(max_length))

    input_embeddings = token_embeddings + pos_embeddings

    break
print(input_embeddings.shape)


torch.Size([8, 4, 256])


### Stacking multiple single-head attention layers

In [None]:
class CausalAttention(nn.Module):

    def __init__(self, d_in, d_out, context_length, dropout, qkv_bias=False):
        super().__init__()
        self.d_out = d_out
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key   = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.dropout = nn.Dropout(dropout) # New
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1)) # New

    def forward(self, x):
        b, num_tokens, d_in = x.shape # New batch dimension b
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        attn_scores = queries @ keys.transpose(1, 2) # Changed transpose
        attn_scores.masked_fill_(  # New, _ ops are in-place
            self.mask.bool()[:num_tokens, :num_tokens], -torch.inf)
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights) # New

        context_vec = attn_weights @ values
        return context_vec


In [None]:
class MultiHeadAttentionWrapper(nn.Module):

    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        self.heads = nn.ModuleList(
            [CausalAttention(d_in, d_out, context_length, dropout, qkv_bias)
             for _ in range(num_heads)]
        )

    def forward(self, x):
        return torch.cat([head(x) for head in self.heads], dim=-1)

torch.manual_seed(123)
batch = input_embeddings
context_length = batch.shape[1] # This is the number of tokens
d_in = batch.shape[2]
d_out = d_in//2
mha = MultiHeadAttentionWrapper(d_in, d_out, context_length, 0.0, num_heads=2)

context_vecs = mha(batch)

print(context_vecs)
print("context_vecs.shape:", context_vecs.shape)

tensor([[[ 4.6033e-02, -6.3286e-01,  1.7111e-01,  ...,  6.4656e-02,
           9.8718e-01, -2.6798e+00],
         [ 6.5834e-04,  2.3516e-01,  4.7861e-01,  ..., -8.1290e-01,
           1.4536e+00,  2.1630e-01],
         [-4.5651e-03, -5.3619e-01,  6.0358e-01,  ..., -1.0656e-01,
           1.0925e+00, -9.9287e-01],
         [-9.8647e-01, -5.9767e-02,  5.8819e-01,  ...,  5.4085e-02,
           1.0313e+00, -1.7786e-01]],

        [[-5.7155e-01,  2.6873e-01,  2.6834e-01,  ...,  1.3315e+00,
           1.3944e+00, -1.8267e+00],
         [-2.9561e-01,  8.0231e-01,  7.7059e-01,  ..., -7.8213e-01,
           1.3960e+00, -4.4450e-01],
         [-3.4341e-01,  5.5711e-01,  5.3558e-01,  ...,  8.3890e-01,
           1.3271e+00, -5.1539e-01],
         [-4.2369e-01,  3.0098e-01, -1.1571e-01,  ..., -1.4267e-01,
           1.0688e+00, -4.6010e-01]],

        [[-8.8587e-01, -1.3805e+00,  3.7421e-01,  ..., -2.0886e-01,
           5.6081e-01, -1.2400e+00],
         [-7.3261e-02, -9.8847e-01,  4.8681e-01,  .

### Implementing multi-head attention with weight splits

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        assert d_out % num_heads == 0, "d_out must be divisible by num_heads"

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads # Reduce the projection dim to match desired output dim

        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.out_proj = nn.Linear(d_out, d_out)  # Linear layer to combine head outputs
        self.dropout = nn.Dropout(dropout)
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1))

    def forward(self, x):
        b, num_tokens, d_in = x.shape

        keys = self.W_key(x) # Shape: (b, num_tokens, d_out)
        queries = self.W_query(x)
        values = self.W_value(x)

        # We implicitly split the matrix by adding a `num_heads` dimension
        # Unroll last dim: (b, num_tokens, d_out) -> (b, num_tokens, num_heads, head_dim)
        keys = keys.view(b, num_tokens, self.num_heads, self.head_dim)
        values = values.view(b, num_tokens, self.num_heads, self.head_dim)
        queries = queries.view(b, num_tokens, self.num_heads, self.head_dim)

        # Transpose: (b, num_tokens, num_heads, head_dim) -> (b, num_heads, num_tokens, head_dim)
        keys = keys.transpose(1, 2)
        queries = queries.transpose(1, 2)
        values = values.transpose(1, 2)

        # Compute scaled dot-product attention (aka self-attention) with a causal mask
        attn_scores = queries @ keys.transpose(2, 3)  # Dot product for each head

        # Original mask truncated to the number of tokens and converted to boolean
        mask_bool = self.mask.bool()[:num_tokens, :num_tokens]

        # Use the mask to fill attention scores
        attn_scores.masked_fill_(mask_bool, -torch.inf)

        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights)

        # Shape: (b, num_tokens, num_heads, head_dim)
        context_vec = (attn_weights @ values).transpose(1, 2)

        # Combine heads, where self.d_out = self.num_heads * self.head_dim
        context_vec = context_vec.contiguous().view(b, num_tokens, self.d_out)
        context_vec = self.out_proj(context_vec) # optional projection

        return context_vec

torch.manual_seed(123)

batch = input_embeddings
batch_size, context_length, d_in = batch.shape
d_out = d_in
mha = MultiHeadAttention(d_in, d_out, context_length, 0.0, num_heads=2)

context_vecs = mha(batch)

print(context_vecs)
print("context_vecs.shape:", context_vecs.shape)

tensor([[[-0.9617,  0.1650, -0.3285,  ..., -0.1773, -0.1679,  0.6039],
         [-0.4813,  0.1987, -0.2177,  ..., -0.0611, -0.2748,  0.1040],
         [-0.8232, -0.0221, -0.1746,  ..., -0.6942, -0.0821,  0.2019],
         [-0.4854,  0.1744, -0.2682,  ..., -0.1140, -0.3442,  0.2085]],

        [[-1.1217,  0.2088, -0.2023,  ..., -0.6521, -0.5238,  0.5701],
         [-0.8093,  0.3071,  0.1316,  ..., -0.5678, -0.4888,  0.3124],
         [-0.7436,  0.2415, -0.1342,  ..., -0.5415, -0.3638,  0.0940],
         [-0.4893,  0.4997,  0.0320,  ..., -0.4244, -0.3667, -0.0307]],

        [[-0.7319,  0.0512,  0.2716,  ...,  0.0594, -0.3071,  0.3349],
         [-0.7018,  0.4336,  0.0914,  ..., -0.2570, -0.0165, -0.4193],
         [-0.5730,  0.3150,  0.0230,  ..., -0.4784, -0.1448, -0.2210],
         [-0.3104,  0.3939,  0.2216,  ..., -0.4126, -0.1577, -0.1642]],

        ...,

        [[-1.1467, -0.3973, -0.5450,  ..., -0.0322, -0.6292,  0.2600],
         [-0.6773,  0.2409, -0.1651,  ..., -0.0339, -0.62

# Bài tập 2:

So sánh SelfAttention\_v1 và SelfAttention\_v2

Lưu ý rằng nn.Linear trong SelfAttention\_v2 sử dụng sơ đồ khởi tạo trọng số khác với nn.Parameter\(torch.rand\(d\_in, d\_out\)\) được sử dụng trong SelfAttention\_v1, khiến cả hai cơ chế tạo ra các kết quả khác nhau. Để kiểm tra xem cả hai cách triển khai SelfAttention\_v1 và SelfAttention\_v2 có giống nhau hay không, chúng ta có thể chuyển ma trận trọng số từ đối tượng SelfAttention\_v2 sang SelfAttention\_v1, sao cho cả hai đối tượng đều tạo ra kết quả giống nhau.

Nhiệm vụ của bạn là gán chính xác các trọng số từ một instance của SelfAttention\_v2 cho một instance của SelfAttention\_v1. Để làm được điều này, bạn cần hiểu mối quan hệ giữa các trọng số trong cả hai phiên bản. \(Gợi ý: nn.Linear lưu trữ ma trận trọng số ở dạng chuyển đổi\). Sau khi gán, kết quả đầu ra của cả 2 phải giống nhau.

Gợi ý: sử dụng input "Your journey starts with one step" có ở phần đầu.


In [None]:
import torch

inputs = torch.tensor(
  [[0.43, 0.15, 0.89], # Your     (x^1)
   [0.55, 0.87, 0.66], # journey  (x^2)
   [0.57, 0.85, 0.64], # starts   (x^3)
   [0.22, 0.58, 0.33], # with     (x^4)
   [0.77, 0.25, 0.10], # one      (x^5)
   [0.05, 0.80, 0.55]] # step     (x^6)
)
inputs.shape

torch.Size([6, 3])

In [None]:
import torch.nn as nn

class SelfAttention_v1(nn.Module):

    def __init__(self, d_in, d_out):
        super().__init__()
        self.d_out = d_out
        self.W_query = nn.Parameter(torch.rand(d_in, d_out))
        self.W_key   = nn.Parameter(torch.rand(d_in, d_out))
        self.W_value = nn.Parameter(torch.rand(d_in, d_out))

    def forward(self, x):
        keys = x @ self.W_key
        queries = x @ self.W_query
        values = x @ self.W_value

        attn_scores = queries @ keys.T # omega
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)

        context_vec = attn_weights @ values
        return context_vec
d_in, d_out = inputs.shape[1], 2
torch.manual_seed(123)
sa_v1 = SelfAttention_v1(d_in, d_out)

In [None]:
class SelfAttention_v2(nn.Module):

    def __init__(self, d_in, d_out, qkv_bias=False):
        super().__init__()
        self.d_out = d_out
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key   = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)

    def forward(self, x):
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        attn_scores = queries @ keys.T
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=1)

        context_vec = attn_weights @ values
        return context_vec

torch.manual_seed(123)
sa_v2 = SelfAttention_v2(d_in, d_out)

In [None]:
sa_v1.W_query = torch.nn.Parameter(sa_v2.W_query.weight.T)
sa_v1.W_key = torch.nn.Parameter(sa_v2.W_key.weight.T)
sa_v1.W_value = torch.nn.Parameter(sa_v2.W_value.weight.T)

In [None]:
print(sa_v1(inputs))
print(sa_v2(inputs))


tensor([[-0.5337, -0.1051],
        [-0.5323, -0.1080],
        [-0.5323, -0.1079],
        [-0.5297, -0.1076],
        [-0.5311, -0.1066],
        [-0.5299, -0.1081]], grad_fn=<MmBackward0>)
tensor([[-0.5337, -0.1051],
        [-0.5323, -0.1080],
        [-0.5323, -0.1079],
        [-0.5297, -0.1076],
        [-0.5311, -0.1066],
        [-0.5299, -0.1081]], grad_fn=<MmBackward0>)


=> SelfAttention_v1 và SelfAttention_v2 cho ra kết quả giống nhau.

# Bài tập 3:

Trả về vectơ nhúng 2 chiều

Thay đổi đối số đầu vào cho lệnh gọi MultiHeadAttentionWrapper\(..., num\_heads=2\) sao cho vectơ ngữ cảnh đầu ra là 2 chiều thay vì 4 chiều trong khi vẫn giữ cài đặt num\_heads=2. Gợi ý: Bạn không phải sửa đổi cách triển khai lớp; bạn chỉ cần thay đổi một trong các đối số đầu vào khác.



In [None]:
batch = torch.stack((inputs, inputs), dim=0)
print(batch.shape) # 2 inputs with 6 tokens each, and each token has embedding dimension 3

torch.Size([2, 6, 3])


- Để vectơ ngữ cảnh đầu ra là 2 chiều thay vì 4 chiều trong khi vẫn giữ cài đặt *num_heads* = 2 ta thay đổi *d_out* = 1 vì vectơ ngữ cảnh đầu ra của hàm MultiHeadAttentionWrapper(..., num_heads=2) có chiều dài là *d_out x num_head*.

In [None]:
class CausalAttention(nn.Module):

    def __init__(self, d_in, d_out, context_length, dropout, qkv_bias=False):
        super().__init__()
        self.d_out = d_out
        self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_key   = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
        self.dropout = nn.Dropout(dropout) # New
        self.register_buffer('mask', torch.triu(torch.ones(context_length, context_length), diagonal=1)) # New

    def forward(self, x):
        b, num_tokens, d_in = x.shape # New batch dimension b
        keys = self.W_key(x)
        queries = self.W_query(x)
        values = self.W_value(x)

        attn_scores = queries @ keys.transpose(1, 2) # Changed transpose
        attn_scores.masked_fill_(  # New, _ ops are in-place
            self.mask.bool()[:num_tokens, :num_tokens], -torch.inf)
        attn_weights = torch.softmax(attn_scores / keys.shape[-1]**0.5, dim=-1)
        attn_weights = self.dropout(attn_weights) # New

        context_vec = attn_weights @ values
        return context_vec


In [None]:
class MultiHeadAttentionWrapper(nn.Module):

    def __init__(self, d_in, d_out, context_length, dropout, num_heads, qkv_bias=False):
        super().__init__()
        self.heads = nn.ModuleList(
            [CausalAttention(d_in, d_out, context_length, dropout, qkv_bias)
             for _ in range(num_heads)]
        )

    def forward(self, x):
        return torch.cat([head(x) for head in self.heads], dim=-1)


torch.manual_seed(123)

context_length = batch.shape[1] # This is the number of tokens
d_in, d_out = 3, 1
mha = MultiHeadAttentionWrapper(d_in, d_out, context_length, 0.0, num_heads=2)

context_vecs = mha(batch)

print(context_vecs)
print("context_vecs.shape:", context_vecs.shape)

tensor([[[-0.5740,  0.2216],
         [-0.7320,  0.0155],
         [-0.7774, -0.0546],
         [-0.6979, -0.0817],
         [-0.6538, -0.0957],
         [-0.6424, -0.1065]],

        [[-0.5740,  0.2216],
         [-0.7320,  0.0155],
         [-0.7774, -0.0546],
         [-0.6979, -0.0817],
         [-0.6538, -0.0957],
         [-0.6424, -0.1065]]], grad_fn=<CatBackward0>)
context_vecs.shape: torch.Size([2, 6, 2])
