### 토큰화 코드 작성 
- 최근의 토큰화는 서브워드 토큰화가 기본이지만 실습 편의를 위해 단어 단위(띄어쓰기) 토큰화 수행
- 문장을 띄어쓰기 단위로 분리하고 
- 각 토큰에 0부터 토큰 아이디 부여 -> str2idx 저장
- 각 토큰을 str2idx 딕셔너리로 토큰 아이디로 변환해서 input_ids에 저장 


In [6]:
# 띄어쓰기 단위로 분리
input_text = "나는 최근 상해 여행을 다녀왔다."
input_text_list = input_text.split()
print("input_text_list: ", input_text_list)

# 토큰 -> 아이디
# 딕셔너리와 아이디 -> 토큰 딕셔너리 만들기
str2dix = {word: idx for idx, word in enumerate(input_text_list)}
idx2str = {idx: word for idx, word in enumerate(input_text_list)}
print("str2dix: ", str2dix)
print("idx2str: ", idx2str)

# 토큰을 아이디로 변환
input_ids = [str2dix[word] for word in input_text_list]
print("input_ids: ", input_ids)

input_text_list:  ['나는', '최근', '상해', '여행을', '다녀왔다.']
str2dix:  {'나는': 0, '최근': 1, '상해': 2, '여행을': 3, '다녀왔다.': 4}
idx2str:  {0: '나는', 1: '최근', 2: '상해', 3: '여행을', 4: '다녀왔다.'}
input_ids:  [0, 1, 2, 3, 4]


###  토큰 임베딩


In [7]:
import torch
import torch.nn as nn

embedding_dim = 16
embed_layer = nn.Embedding(len(str2dix), embedding_dim)

input_embeddings = embed_layer(torch.tensor(input_ids)) # (5, 16)
input_embeddings =input_embeddings.unsqueeze(0) # (1, 5, 16) 
input_embeddings.shape

torch.Size([1, 5, 16])

### 절대적 위치 인코딩 
- 새로운 임베딩 층 하나 추가 후 위치 인덱스(position_ids)에 따라 임베딩 더하도록 구현 
- position_ids 를 위치 임베딩 층에 입력해 위치 인코딩 생성
- 토큰 임베딩에 위치 인코딩을 더해서 모델에 입력할 최종 입력 임베딩 준비 

In [8]:
embedding_dim = 16
max_position = 12
embed_layer = nn.Embedding(len(str2dix), embedding_dim)
position_embed_layer = nn.Embedding(max_position, embedding_dim)

position_ids = torch.arange(len(input_ids), dtype=torch.long).unsqueeze(0)
position_encodings = position_embed_layer(position_ids)
token_embeddings = embed_layer(torch.tensor(input_ids)) # (5, 16)
token_embeddings = token_embeddings.unsqueeze(0) # (1, 5, 16)
input_embeddings = token_embeddings + position_encodings
input_embeddings.shape


torch.Size([1, 5, 16])

## attention

### 쿼리, 키, 값에 가중치를 가지고 변환 

In [9]:
head_dim = 16

# 쿼리, 키, 값을 계산하기 위한 변환
weight_q = nn.Linear(embedding_dim, head_dim)
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

# 변환 수행 
querys = weight_q(input_embeddings) # (1, 5, 16)
keys = weight_k(input_embeddings) # (1, 5, 16)
values = weight_v(input_embeddings) # (1, 5, 16)

In [10]:
from math import sqrt
import torch.nn.functional as F

# Q * K^T * V
# 스케일 점곱 방식의 어텐션  
def compute_attention(querys, keys, values, is_causal=False):
    dim_k = querys.size(-1) # 16
    scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return weights @ values 

In [11]:
print("원본 입력 형태: ", input_embeddings.shape)

after_attention_embeddings = compute_attention(querys, keys, values)

print("어텐션 후 입력 형태: ", after_attention_embeddings.shape)

원본 입력 형태:  torch.Size([1, 5, 16])
어텐션 후 입력 형태:  torch.Size([1, 5, 16])


In [12]:
class AttentionHead(nn.Module):
    def __init__(self, token_embed_dim, head_dim, is_causal=False):
        super().__init__()
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, head_dim) # 쿼리 벡터 생성을 위한 선형 층
        self.weight_k = nn.Linear(token_embed_dim, head_dim) # 키 벡터 생성을 위한 선형 층
        self.weight_v = nn.Linear(token_embed_dim, head_dim) # 값 벡터 생성을 위한 선형 층
    
    def forward(self, querys, keys, values):
        output = compute_attention(
            self.weight_q(querys),
            self.weight_k(keys),
            self.weight_v(values)
        )
        return output

In [13]:
attention_head = AttentionHead(embedding_dim, head_dim)
after_attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)

In [14]:
# 멀티 헤드 어텐션 구현
class MultiheadAttention(nn.Module):
    def __init__(self, token_embed_dim, d_model, n_head, is_causal=False):
        super().__init__()
        self.n_head = n_head
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, d_model)
        self.weight_k = nn.Linear(token_embed_dim, d_model)
        self.weight_v = nn.Linear(token_embed_dim, d_model)
        self.concat_linear = nn.Linear(d_model, d_model)

    def forward(self, querys, keys, values):
        B, T, C = querys.size()
        # 선형층 통과
        querys = self.weight_q(querys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        keys = self.weight_k(keys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        values = self.weight_v(values).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        # h번의 스케일 점곱 어텐션 수행
        attention = compute_attention(querys, keys, values, self.is_causal)
        # 어텐션 결과 연결
        output = attention.transpose(1, 2).contiguous().view(B, T, C)
        # 선형 층 대응
        output = self.concat_linear(output)
        return output
    
n_head = 4
mh_attemtnion = MultiheadAttention(embedding_dim, embedding_dim, n_head)
after_attention_embeddings = mh_attemtnion(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape

torch.Size([1, 5, 16])

In [15]:
# 층 정규화
norm = nn.LayerNorm(embedding_dim)
norm_x = norm(input_embeddings)
norm_x.shape 
 
norm_x.mean(dim=-1).data, norm_x.std(dim=-1).data # 실제 평균과 표준편차 확인

(tensor([[-1.8626e-08,  1.1176e-08,  1.4901e-08, -1.1176e-08, -2.2352e-08]]),
 tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328]]))

In [15]:
# 피드 포워드 층

class PreLayerNormFeedForward(nn.Module):
    def __init__(self, d_model, dim_feedforward, dropout):
        super().__init__()
        self.linear1 = nn.Linear(d_model, dim_feedforward) # 선형 층 1
        self.linear2 = nn.Linear(dim_feedforward, d_model) # 선형 층 2
        self.dropout1 = nn.Dropout(dropout) # 드롭아웃 층 1
        self.dropout2 = nn.Dropout(dropout) # 드롭아웃 층 2
        self.activation = nn.GELU() # 활성 함수
        self.norm = nn.LayerNorm(d_model) # 층 정규화 
        
        def forward(self, src):
            x = self.norm(src)
            x = self.linear2(self.dropout1(self.activation(self.linear1(x))))
            x = self.dropout2(x)
            return x 

In [None]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super().__init__()
        self.attn = MultiheadAttention(d_model, d_model, nhead) # 멀티 헤드 어텐션 클래스
        self.norm1 = nn.LayerNorm(d_model) # 층 정규화
        self.dropout1 = nn.Dropout(dropout) # 드롭아웃
        self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout) # 피드 포워드 층
        
    def forward(self, src):
        norm_x = self.norm1(src)
        attn_output = self.attn(norm_x, norm_x, norm_x)
        x = src + self.dropout1(attn_output)
        
        # 피드 포워드 
        x = self.feed_forward(x)
        return x
        

In [16]:
import copy

def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])


In [None]:
class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super().__init__()
        self.layers = get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm
    
    def forward(self, src):
        output = src
        for mod in self.layers:
            output = mod(output)
        return output

In [None]:
# 디코더의 어텐션 연산 (마스크 어텐션)
def compute_attention(querys, keys, values, is_causal=False):
    dim_k = querys.size(-1) # 16
    scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k) # (1, 5, 5)
    if is_causal:
        query_length = querys.size(-2)
        key_length = keys.size(-2)
        temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
        scores = scores.masked_fill(scores, dim=-1) # (1, 5, 5)
    weights = F.softmax(scores, dim=-1) # (1, 5, 5)
    return weights @ values # (1, 5, 16)

In [None]:
# 크로스 어텐션이 포함된 디코더 층
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.self_attn = MultiheadAttention(d_model, d_model, nhead)
        self.multihead_attn = MultiheadAttention(d_model, d_model, nhead)
        self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout)
        
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        
    def forward(self, tgt, encoder_output, is_causal=True):
        # 셀프 어텐션 연산
        x = self.norm1(tgt)
        x = x + self.dropout1(self.self_attn(x, x, x, is_causal=is_causal))
        
        # 크로스 어텐션 연산
        x = self.norm2(x)
        x = x + self.dropout2(self.multihead_attn(x, encoder_output, encoder_output))
        
        # 피드 포워드 연산
        x = self.feed_forward(x)
        return x

In [None]:
import copy

def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers):
        super().__init__()
        self.layers = get_clones(decoder_layer, num_layers)
        self.num_layers = num_layers
        
    def forward(self, tgt, src):
        output = tgt
        for mod in self.layers:
            output = mod(output, src)
        return output