### Tokenization

In [12]:
# 입력 텍스트를 띄어쓰기 기준으로 split하여 단어 리스트로 변환
input_text = "나는 최근 파리 여행을 다녀왔다"
input_text_list = input_text.split()
print("input_text_list: ", input_text_list)

# 토큰 -> 아이디 딕셔너리와 아이디 -> 토큰 딕셔너리 생성
str2idx = {word:idx for idx, word in enumerate(input_text_list)}
idx2str = {idx:word for idx, word in enumerate(input_text_list)}
print("str2idx: ", str2idx)
print("idx2str: ", idx2str)

# 토큰을 토큰 아이디로 변환
input_ids = [str2idx[word] for word in input_text_list]
print("input_ids: ", input_ids)

input_text_list:  ['나는', '최근', '파리', '여행을', '다녀왔다']
str2idx:  {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
idx2str:  {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
input_ids:  [0, 1, 2, 3, 4]


### Token Embedding

In [13]:
import torch
import torch.nn as nn

embedding_dim = 16
embed_layer = nn.Embedding(len(str2idx), embedding_dim)

input_embeddings = embed_layer(torch.tensor(input_ids))
input_embeddings = input_embeddings.unsqueeze(0) # 배치 차원 추가
input_embeddings.shape

torch.Size([1, 5, 16])

### Position Encoding

In [14]:
embedding_dim = 16
max_position = 12

# Token Embedding layer
embed_layer = nn.Embedding(len(str2idx), embedding_dim)
# Positional Encoding layer
position_embed_layer = nn.Embedding(max_position, embedding_dim)

position_ids = torch.arange(len(input_ids), dtype=torch.long).unsqueeze(0)  # 배치 차원 추가
position_encodings = position_embed_layer(position_ids)
token_embeddings = embed_layer(torch.tensor(input_ids))
token_embeddings = token_embeddings.unsqueeze(0)  # 배치 차원 추가

# 최종 입력 임베딩: 토큰 임베딩 + 위치 임베딩
input_embeddings = token_embeddings + position_encodings
input_embeddings.shape

torch.Size([1, 5, 16])

### nn.Linear Layer (create Query, Key, Value)

In [15]:
head_dim = 16

# Convert input embeddings to Q, K, V
weight_q = nn.Linear(embedding_dim, head_dim)
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

# convert
querys = weight_q(input_embeddings)
keys = weight_k(input_embeddings)
values = weight_v(input_embeddings)

### Scaled Dot-Poduct Attention

1. query와 key를 곱한다. (이때, 분산이 커지는 것을 방지하기 위해 임베딩 차원 수의 제곱근으로 나눔)
2. scores를 합이 1이 되도록 softmax를 취해 가중치로 바꾼다.
3. 가중치와 값을 곱해 입력과 동일한 형태의 출력을 반환한다.

In [16]:
from math import sqrt
import torch.nn.functional as F

def compute_attention(querys, keys, values, is_causal=False):
    dim_k = querys.size(-1)
    scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return weights @ values

### Attention Cal input/output

In [17]:
print("원본 입력 형태: ", input_embeddings.shape)

after_attention_embeddings = compute_attention(querys, keys, values)

print("어텐션 적용 후 형태: ", after_attention_embeddings.shape)

원본 입력 형태:  torch.Size([1, 5, 16])
어텐션 적용 후 형태:  torch.Size([1, 5, 16])


### AttentionHead Class (Attention Mechanism)

In [18]:
class AttentionHead(nn.Module):
    def __init__(self, token_embed_dim, head_dim, is_causal=False):
        super().__init__()
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, head_dim) # 쿼리 벡터 생성을 위한 선형 층
        self.weight_k = nn.Linear(token_embed_dim, head_dim) # 키 벡터 생성을 위한 선형 층
        self.weight_v = nn.Linear(token_embed_dim, head_dim) # 값 벡터 생성을 위한 선형 층

    def forward(self, querys, keys, values):
        outputs = compute_attention(
            self.weight_q(querys),
            self.weight_k(keys),
            self.weight_v(values),
            is_causal=self.is_causal
        )
        return outputs

attention_head = AttentionHead(embedding_dim, embedding_dim)
after_attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)

### Multi-Head Attention

In [22]:
class MultiheadAttention(nn.Module):
    def __init__(self, token_embed_dim, d_model, n_head, is_causal=False):
        super().__init__()
        self.n_head = n_head
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, d_model)
        self.weight_k = nn.Linear(token_embed_dim, d_model)
        self.weight_v = nn.Linear(token_embed_dim, d_model)
        self.concat_linear = nn.Linear(d_model, d_model)
    
    def forward(self, querys, keys, values):
        B, T, C = querys.size()
        querys = self.weight_q(querys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        keys = self.weight_k(keys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        values = self.weight_v(values).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        attention = compute_attention(querys, keys, values, self.is_causal)
        output = attention.transpose(1, 2).contiguous().view(B, T, C)
        output = self.concat_linear(output)
        return output

n_head = 4
mh_attention = MultiheadAttention(embedding_dim, embedding_dim, n_head)
after_attention_embeddings = mh_attention(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape

torch.Size([1, 5, 16])

### Layer Normalization

In [24]:
norm = nn.LayerNorm(embedding_dim)
norm_x = norm(input_embeddings)
norm_x.shape

norm_x.mean(dim=-1).data, norm_x.std(dim=-1).data

(tensor([[3.7253e-08, 7.4506e-09, 0.0000e+00, 5.4017e-08, 1.4901e-08]]),
 tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328]]))

### Feed Forward Layer

In [25]:
class PreLayerNormFeedForward(nn.Module):
    def __init__(self, d_model, dim_feedforward, dropout):
        super().__init__()
        self.linear1 = nn.Linear(d_model, dim_feedforward) # 선형 층 1
        self.linear2 = nn.Linear(dim_feedforward, d_model) # 선형 층 2
        self.dropout1 = nn.Dropout(dropout) # 드랍아웃 층 1
        self.dropout2 = nn.Dropout(dropout) # 드랍아웃 층 2
        self.activation = nn.GELU() # 활성 함수
        self.norm = nn.LayerNorm(d_model) # 층 정규화
    
    def forward(self, src):
        x = self.norm(src) # 입력에 층 정규화 적용
        x = x + self.linear2(self.dropout1(self.activation(self.linear1(x))))
        x = self.dropout2(x)
        return x

### Encoder Layer

1. self.norm1으로 층 정규화
2. self.attn으로 멀티 헤드 어텐션 연산
3. 잔차 연결을 위해 어텐션 결과에 self.dropout1(attn_output)과 src를 더해준다.
4. self.feed_forward(x)로 피드 포워드 연산

In [27]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super().__init__()
        self.attn = MultiheadAttention(d_model, d_model, nhead) # 멀티 헤드 어텐션 클래스
        self.norm1 = nn.LayerNorm(d_model) # 층 정규화
        self.dropout1 = nn.Dropout(dropout) # 드랍아웃
        self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout) # 피드포워드
    
    def forward(self, src):
        norm_x = self.norm1(src)
        attn_output = self.attn(norm_x, norm_x, norm_x)
        x = src + self.dropout1(attn_output)

        # 피드포워드
        x = self.feed_forward(x)
        return x

### Encoder

In [28]:
import copy

def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super().__init__()
        self.layers = get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
    
    def forward(self, src):
        output = src
        for mod in self.layers:
            output = mod(output)
        return output

### Mask Attention (Attention Mechanism in Decoder)

아래 코드에서 `if is_causal`

-> torch.ones로 모두 1인 행렬에 tril 함수를 취해 대각선 아래 부분만 1로 유지되고 나머지는 음의 무한대(-inf)로 변경해서 마스크를 생성한다.

In [29]:
def compute_attention(querys, keys, values, is_causal=False):
    dim_k = querys.size(-1)
    scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)
    if is_causal:
        query_length = querys.size(2)
        key_length = keys.size(2)
        temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
        scores = scores.masked_fill(temp_mask == False, float("-inf"))
    weights = F.softmax(scores, dim=-1)
    return weights @ values

### Decoder Layer (Cross Attention)

In [30]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super().__init__()
        self.self_attn = MultiheadAttention(d_model, d_model, nhead)
        self.multihead_attn = MultiheadAttention(d_model, d_model, nhead)
        self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
    
    def forward(self, tgt, encoder_output, is_causal=True):
        # self attention
        x = self.norm1(tgt)
        x = x + self.dropout1(self.self_attn(x, x, x, is_causal=is_causal))

        # cross attention
        x = self.norm2(x)
        x = x + self.dropout2(self.multihead_attn(x, encoder_output, encoder_output))

        # feed forward
        x = self.feed_forward(x)
        return x

### Decoder

In [33]:
import copy

def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])

class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers):
        super().__init__()
        self.layers = get_clones(decoder_layer, num_layers)
        self.num_layers = num_layers
    
    def forward(self, tgt, src):
        output = tgt
        for mod in self.layers:
            output = mod(output, src)
        return output