### 0. MHA
- source : https://github.com/huggingface/transformers/blob/98dda8ed03ac3f4af5733bdddaa1dab6a81e15c1/src/transformers/models/ctrl/modeling_ctrl.py#L88

In [10]:
import torch
import torch.nn as nn

class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_size, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.hidden_size = hidden_size
        self.num_heads = num_heads
        self.head_dim = hidden_size // num_heads
        
        assert (
            self.head_dim * num_heads == hidden_size
        )
        
        self.v_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        self.k_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        self.q_proj = nn.Linear(self.hidden_size, self.hidden_size, bias=False)
        self.o_proj = nn.Linear(self.hidden_size, self.hidden_size)
        
    def split_into_heads(self, x, batch_size):
        x = x.reshape(batch_size, -1, self.num_heads, self.head_dim)
        return x.permute([0, 2, 1, 3])
    
    def forward(self, values, keys, query, mask):
        print(f"==== 기본 정보 ====")
        print(f"embedding size는 {self.hidden_size} 입니다.")
        print(f"heads 수는 {self.num_heads} 입니다.")
        print(f"head의 dimension은 {self.head_dim} 입니다.")
        print('\n')
        print(f"==== 초기 정보 ====")
        print(f"query.shape : {query.shape}")
        print(f"keys.shape : {keys.shape}")
        print(f"values.shape : {values.shape}")
        if mask != None:
            print(f"mask.shape : {mask.shape}")

        query = self.q_proj(query)
        keys = self.k_proj(keys)
        values = self.v_proj(values)
        print('\n')
        print("==== 선형레이어를 통과합니다. ====")
        print(f"query.shape : {query.shape}")
        print(f"keys.shape : {keys.shape}")
        print(f"values.shape : {values.shape}")

        batch_size = query.shape[0]
        query = self.split_into_heads(query, batch_size)
        keys = self.split_into_heads(keys, batch_size)
        values = self.split_into_heads(values, batch_size)
        print('\n')
        print(f"==== Multi-Head에 의해 분할합니다 ====")
        print(f"query.shape : {query.shape}")
        print(f"keys.shape : {keys.shape}")
        print(f"values.shape : {values.shape}")
        
        
        # Attention 수행 (최종 shape 형태 : [batch, head, query, key])
        print('\n')
        print("==== Attention 연산을 수행합니다. ====")
        attention_score = torch.einsum("nqhd,nkhd->nhqk", [query, keys])
        print(f"attention_score shape: {attention_score.shape}")
        if mask is not None:
            attention_score = attention_score.masked_fill(mask == 0, float("-1e20"))
            
        attention_distribution = torch.softmax(attention_score / (self.hidden_size ** (1/2)), dim=3)
        print(f"attention_distribution shape: {attention_distribution.shape}")

        attention_value_matrix = torch.einsum("nhql,nlhd->nqhd", [attention_distribution, values]).reshape(
            batch_size, -1, self.num_heads * self.head_dim
        )
        print(f"attention_value_matrix shape : {attention_value_matrix.shape}")
        
        out = self.o_proj(attention_value_matrix)
        print(f"out shape : {out.shape}")
        return out


In [11]:
# MHA 레이어 객체 지정
hidden_size = 512
num_heads = 8
attention = MultiHeadAttention(hidden_size, num_heads)

In [14]:
# 더미 입력 데이터 생성
N, seq_length = 1, 1  # 배치 크기와 시퀀스 길이
dummy = torch.rand((N, seq_length, hidden_size))
dummy_mask = None  # 필요한 경우 마스크를 사용할 수 있습니다.

# 멀티 헤드 주의 레이어 실행
output = attention(dummy, dummy, dummy, dummy_mask)

==== 기본 정보 ====
embedding size는 512 입니다.
heads 수는 8 입니다.
head의 dimension은 64 입니다.


==== 초기 정보 ====
query.shape : torch.Size([1, 1, 512])
keys.shape : torch.Size([1, 1, 512])
values.shape : torch.Size([1, 1, 512])


==== 선형레이어를 통과합니다. ====
query.shape : torch.Size([1, 1, 512])
keys.shape : torch.Size([1, 1, 512])
values.shape : torch.Size([1, 1, 512])


==== Multi-Head에 의해 분할합니다 ====
query.shape : torch.Size([1, 8, 1, 64])
keys.shape : torch.Size([1, 8, 1, 64])
values.shape : torch.Size([1, 8, 1, 64])


==== Attention 연산을 수행합니다. ====
attention_score shape: torch.Size([1, 1, 8, 8])
attention_distribution shape: torch.Size([1, 1, 8, 8])
attention_value_matrix shape : torch.Size([1, 1, 512])
out shape : torch.Size([1, 1, 512])


### 1. MQA, GQA 
- source : https://github.com/huggingface/transformers/blob/98dda8ed03ac3f4af5733bdddaa1dab6a81e15c1/src/transformers/models/llama/modeling_llama.py#L317 
- LlamaAttention 클래스 부분확인

In [19]:
# num_key_value_heads가 1이면, MQA
# num_key_value_heads가 2이상이면, GQA

import math
class MultiQueryAttention(nn.Module):
    def __init__(self, 
                 hidden_size, 
                 num_heads,
                 num_key_value_heads):
        super().__init__()
        self.hidden_size = hidden_size # 512
        self.num_heads = num_heads # 8
        self.head_dim = self.hidden_size // self.num_heads # 64
        self.num_key_value_heads = num_key_value_heads
        self.num_key_value_groups = self.num_heads // self.num_key_value_heads
        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
        self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=False)
        self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=False)
        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
        
    def repeat_kv(self, 
                  hidden_states: torch.Tensor, 
                  n_rep: int) -> torch.Tensor:
        
        batch, num_key_value_heads, slen, head_dim = hidden_states.shape
        if n_rep == 1:
            return hidden_states
        hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim)
        return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim)


    def forward(self,values,keys,query):
        print(f"==== 기본 정보 ====")
        print(f"embedding size는 {self.hidden_size} 입니다.")
        print(f"heads 수는 {self.num_heads} 입니다.")
        print(f"head의 dimension은 {self.head_dim} 입니다.")
        print(f"head의 dimension은 {self.head_dim} 입니다.")
        print('\n')
        print(f"==== 초기 정보 ====")
        print(f"query.shape : {query.shape}")
        print(f"keys.shape : {keys.shape}")
        print(f"values.shape : {values.shape}")
        
        bsz, q_len, _ = query.size()
        query_states = self.q_proj(query)
        key_states = self.k_proj(keys)
        value_states = self.v_proj(values)
        print('\n')
        print("==== 선형레이어를 통과합니다. ====")
        print(f"query.shape : {query_states.shape}")
        print(f"keys.shape : {key_states.shape}")
        print(f"values.shape : {value_states.shape}")

        query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
        key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
        value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
        key_states = self.repeat_kv(key_states, self.num_key_value_groups)
        value_states = self.repeat_kv(value_states, self.num_key_value_groups)
        print('\n')
        print(f"==== Query는 Multi-Head에 의해 분할하며, Key와 Value는 동일한 값들로 복제합니다. ====")
        print(f"query.shape : {query_states.shape}")
        print(f"keys.shape : {key_states.shape}")
        print(f"values.shape : {value_states.shape}")
        attention_score = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)
        print(f"attention_score shape: {attention_score.shape}")
        attention_distribution = nn.functional.softmax(attention_score, dim=-1, dtype=torch.float32).to(query_states.dtype)
        print(f"attention_distribution shape: {attention_distribution.shape}")
        attention_value_matrix = torch.matmul(attention_distribution, value_states)
        print(f"attention_value_matrix shape: {attention_value_matrix.shape}")
        attention_value_matrix = attention_value_matrix.transpose(1, 2).contiguous()
        print(f"attention_value_matrix shape: {attention_value_matrix.shape}")
        attention_value_matrix = attention_value_matrix.reshape(bsz, q_len, self.hidden_size)
        print(f"attention_value_matrix shape: {attention_value_matrix.shape}")
        attn_output = self.o_proj(attention_value_matrix)
        print(f"out shape : {attn_output.shape}")
        return attn_output

In [20]:
# 모델 초기화
hidden_size = 512
num_heads = 8
num_key_value_heads = 2 
model = MultiQueryAttention(hidden_size, 
                             num_heads,
                             num_key_value_heads)

# 더미 입력 데이터 생성
N = 1  # 배치 크기
seq_length = 1  # 시퀀스 길이

dummy = torch.rand((N, seq_length, hidden_size))
dummy_mask = None  # 필요한 경우 마스크를 사용할 수 있습니다.

# 모델에 데이터 전달
attn_output = model(dummy, dummy, dummy)
print(attn_output.shape)  # 출력의 차원 확인

==== 기본 정보 ====
embedding size는 512 입니다.
heads 수는 8 입니다.
head의 dimension은 64 입니다.
head의 dimension은 64 입니다.


==== 초기 정보 ====
query.shape : torch.Size([1, 1, 512])
keys.shape : torch.Size([1, 1, 512])
values.shape : torch.Size([1, 1, 512])


==== 선형레이어를 통과합니다. ====
query.shape : torch.Size([1, 1, 512])
keys.shape : torch.Size([1, 1, 128])
values.shape : torch.Size([1, 1, 128])


==== Query는 Multi-Head에 의해 분할하며, Key와 Value는 동일한 값들로 복제합니다. ====
query.shape : torch.Size([1, 8, 1, 64])
keys.shape : torch.Size([1, 8, 1, 64])
values.shape : torch.Size([1, 8, 1, 64])
attention_score shape: torch.Size([1, 8, 1, 1])
attention_distribution shape: torch.Size([1, 8, 1, 1])
attention_value_matrix shape: torch.Size([1, 8, 1, 64])
attention_value_matrix shape: torch.Size([1, 1, 8, 64])
attention_value_matrix shape: torch.Size([1, 1, 512])
out shape : torch.Size([1, 1, 512])
torch.Size([1, 1, 512])
