#### Encoder
- transformer의 인코더는 multihead attention, layerNorm, feedfoward 층이 반복되는 형태이다.
- 안정적인 학습이 가능하도록 residual connection이 있다.
- 이러한 인코더 블록을 반복해서 쌓아 transformer 인코더가 된다.

In [1]:
# 띄어쓰기 단위로 분리하는 토큰화 코드
input_text = "나는 최근 파리 여행을 다녀왔다"
input_text_list = input_text.split()
print("input_text_list: ", input_text_list)

# 토큰 -> 아이디 딕셔너리와 아이디 -> 토큰 딕셔너리 만들기
str2idx = {word:idx for idx, word in enumerate(input_text_list)}
idx2str = {idx:word for idx, word in enumerate(input_text_list)}
print("str2idx: ", str2idx)
print("idx2str: ", idx2str)

# 토큰을 토큰 아이디로 변환
input_ids = [str2idx[word] for word in input_text_list]
print("input_ids: ", input_ids)

input_text_list:  ['나는', '최근', '파리', '여행을', '다녀왔다']
str2idx:  {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
idx2str:  {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
input_ids:  [0, 1, 2, 3, 4]


In [2]:
# 토큰 임베딩 (토큰 아이디 -> 벡터)
import torch
import torch.nn as nn

embedding_dim = 16
# 사전 크기가 ?이고, 차원이 ?인 embed_layer 생성
embed_layer = nn.Embedding(len(str2idx), embedding_dim)

# 입력 토큰을 임베딩으로 변환
# 즉, 토큰 1개를 16차원의 벡터로 변환
input_embeddings = embed_layer(torch.tensor(input_ids))     # (5, 16)
input_embeddings = input_embeddings.unsqueeze(0)        #(1, 5, 16)
input_embeddings.shape

torch.Size([1, 5, 16])

In [3]:
# 쿼리, 키, 값 벡터를 만드는 nn.Linear 층

head_dim = 16

# q, k, v를 계산하기 위한 변환
# 각 가중치
weight_q = nn.Linear(embedding_dim, head_dim)
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

# 변환 수행
# 각 가중치에 입력값 넣어 쿼리, 키, 값 생성
querys = weight_q(input_embeddings)
keys = weight_k(input_embeddings)
values = weight_v(input_embeddings)

In [4]:
# 스케일 내적 방식의 어텐션
from math import sqrt
import torch.nn.functional as F

def compute_attention(qureys, keys, values, is_casual=False):
    dim_k = querys.size(-1) # 16
    # 분산이 커지는 것을 방지하기 위해 임베딩 차원 수의 제곱근으로 나눔
    scores = qureys @ keys.transpose(-2, -1) / sqrt(dim_k)
    # scores를 합이 1이 되도록 softmax를 취해 weight로 바꿈
    weights = F.softmax(scores, dim=-1)
    # 가중치와 값을 곱해 입력과 동일한 형태의 출력을 반환함.
    # 이는 새로운 토큰 임베딩이다.
    return weights @ values

In [5]:
class MultiheadAttention(nn.Module):
    def __init__(self, token_embed_dim, d_model, n_head, is_causal=False):
        super().__init__()
        self.n_head = n_head  # 사용할 어텐션 헤드 수
        self.is_causal = is_causal  # 인과적 어텐션 여부
        # 쿼리, 키, 값 벡터를 생성하는 선형 변환 레이어
        self.weight_q = nn.Linear(token_embed_dim, d_model)
        self.weight_k = nn.Linear(token_embed_dim, d_model)
        self.weight_v = nn.Linear(token_embed_dim, d_model)
        # 여러 헤드를 concat한 후 차원 맞추는 선형 변환 레이어
        self.concat_linear = nn.Linear(d_model, d_model)

    def forward(self, querys, keys, values):
        B, T, C = querys.size()  # B: 배치 크기, T: 시퀀스 길이, C: 임베딩 차원
        # print(B)
        # print(T)
        # print(C)

        # 헤드의 수만큼 연산하기 위해 q, k, v값을 n_head개로 쪼갬
        querys = self.weight_q(querys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        keys = self.weight_k(keys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        values = self.weight_v(values).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

        # 어텐션 연산 수행, 결과 크기는 (B, n_head, T, head_dim)
        attention = compute_attention(querys, keys, values, self.is_causal)

        # 헤드 축과 시퀀스 길이 축을 교환 후 (B, T, C)로 reshape
        # 즉, 입력과 같은 형태로 다시 변환
        output = attention.transpose(1, 2).contiguous().view(B, T, C)
        
        # 마지막으로 선형 layer를 통과시키고 최종 결과를 반환 
        output = self.concat_linear(output)
        
        return output

n_head = 4  # 어텐션 헤드 수
mh_attention = MultiheadAttention(embedding_dim, embedding_dim, n_head)
# 멀티헤드 어텐션 연산 수행 후 최종 출력
after_attention_embeddings = mh_attention(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape  # (B, T, C)

torch.Size([1, 5, 16])

In [7]:
# feed foward layer
class PreLayerNormFeedFoward(nn.Module):
    def __init__(self, d_model, dim_feedforward, dropout):
        super().__init__()
        # 첫 번째 linear layer
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        # 두 번째 linear layer
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        # 첫 번째 dropout layer
        self.dropout1 = nn.Dropout(dropout)
        # 두 번째 dropout layer
        self.dropout2 = nn.Dropout(dropout)
        # 활성화 함수
        self.activation = nn.GELU()
        # 층 정규화
        self.norm = nn.LayerNorm(d_model)

    def forward(self, src):
        x = self.norm(src)
        x = x + self.linear2(self.dropout1(self.activation(self.linear1(x))))
        x = self.dropout2(x)
        return x

In [None]:
# encoder layer
import torch.nn as nn

class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super().__init__()
        self.attn = MultiheadAttention(d_model, d_model, nhead)
        self.norm1 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.feed_forward = PreLayerNormFeedFoward(d_model, dim_feedforward, dropout)

    def forward(self, src):
        norm_x = self.norm1(src)
        attn_output = self.attn(norm_x, norm_x, norm_x)
        x = src + self.dropout1(attn_output)

        # feed foward
        x = self.feed_forward(x)
        return x

In [9]:
# encoder 구현
import copy

def get_clones(module, N):
    return nn.Modulelist([copy.deepcopy(module) for i in range(N)])

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super().__init__()
        self.layers = get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers

    def forward(self, src):
        output = src
        for mod in self.layers:
            output = mod(output)
        return output