## 토큰화 

In [2]:
input_text = "바다 코끼리는 바다에 살 수 있도록 진화한 코끼리이다."

input_text_list = input_text.split()
print("input_text_list: ", input_text_list)

# 토큰 -> 아이디 딕셔너리와 아이디 -> 토큰 딕셔너리 만들기
str2idx = {word:idx for idx, word in enumerate(input_text_list)}
idx2str = {idx:word for idx, word in enumerate(input_text_list)}
print("str2idx: ", str2idx)
print("idx2str: ", idx2str)

# 토큰을 토큰 아이디로 변환
input_ids = [str2idx[word] for word in input_text_list]
print("input_ids: ", input_ids)

input_text_list:  ['바다', '코끼리는', '바다에', '살', '수', '있도록', '진화한', '코끼리이다.']
str2idx:  {'바다': 0, '코끼리는': 1, '바다에': 2, '살': 3, '수': 4, '있도록': 5, '진화한': 6, '코끼리이다.': 7}
idx2str:  {0: '바다', 1: '코끼리는', 2: '바다에', 3: '살', 4: '수', 5: '있도록', 6: '진화한', 7: '코끼리이다.'}
input_ids:  [0, 1, 2, 3, 4, 5, 6, 7]


## 토큰화 -> 임베딩

In [3]:
import torch
import torch.nn as nn

embedding_dim = 16
embed_layer = nn.Embedding(len(str2idx), embedding_dim)

# 이때 tensor로 감싼 후 처리 
input_embeddings = embed_layer(torch.tensor(input_ids))
# 딥러닝 프레임워크는 보통 가장 첫 번째 요소를 "배치 차원"으로 기대하므로, unsqueeze(0)을 통해 크기가 1인 차원을 하나 추가
input_embeddings = input_embeddings.unsqueeze(0)
input_embeddings.shape

torch.Size([1, 8, 16])

## 위치 인코딩 
- Transformer는 RNN과는 다르게 전체 Sequence를 입력으로 받으므로, 각 토큰의 위치 정보를 추가로 저장해야됨

In [5]:
embedding_dim = 16
max_position = 12 # 최대 토큰의 길이

embed_layer = nn.Embedding(len(str2idx), embedding_dim)
position_embed_layer = nn.Embedding(max_position, embedding_dim) 
# input_ids의 길이만큼 position_ids를 생성 
position_ids = torch.arange(len(input_ids), dtype = torch.long).unsqueeze(0)
position_encodings = position_embed_layer(position_ids)

token_embeddings = embed_layer(torch.tensor(input_ids)).unsqueeze(0)

input_embeddings = token_embeddings + position_encodings
input_embeddings.shape

torch.Size([1, 8, 16])

## Query, Key, Value 벡터 만들기
- 선형층을 통해 쿼리, 키, 값 벡터 생성

In [None]:
head_dim = 16

# 벡터 생성 
weight_q = nn.Linear(embedding_dim, head_dim)
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

# 변환 수행
querys = weight_q(input_embeddings) # torch.Size([1, 8, 16])
keys = weight_k(input_embeddings) 
values = weight_v(input_embeddings)

querys.shape, keys.shape, values.shape

(torch.Size([1, 8, 16]), torch.Size([1, 8, 16]), torch.Size([1, 8, 16]))

## Scaled dot Attention

In [7]:
from math import sqrt
import torch.nn.functional as F

def compute_attention(querys, keys, values, is_casual = False):
  dim_k = querys.size(-1) # 16 -> dim_k로 스케일링하기 위해 추출
  scores = querys @ keys.transpose(-2,-1) / sqrt(dim_k)
  weights = F.softmax(scores, dim = -1) # 가장 마지막 차원을 기준으로 연산 
  return weights @ values

print("원본 입력 형태: ", input_embeddings.shape)

after_attention_embeddings = compute_attention(querys, keys, values)

print("어텐션 적용 후 형태: ", after_attention_embeddings.shape)

원본 입력 형태:  torch.Size([1, 8, 16])
어텐션 적용 후 형태:  torch.Size([1, 8, 16])


## AttentionHead 클래스

In [None]:
class AttentionHead(nn.Module):
  def __init__(self, token_embed_dim, head_dim, is_casual = False):
    super().__init__()
    self.is_casual = is_casual
    # query, key, value 벡터 생성 위한 선형 층 
    self.weight_q = nn.Linear(token_embed_dim, head_dim)
    self.weight_k = nn.Linear(token_embed_dim, head_dim)
    self.weight_v = nn.Linear(token_embed_dim, head_dim)
    
  def forward(self, querys, keys, values) -> torch.Tensor:
    outputs = compute_attention(
      self.weight_q(querys),
      self.weight_k(keys),
      self.weight_v(values),
      is_casual=self.is_casual
    )
    
    return outputs
    
attention_head = AttentionHead(embedding_dim, embedding_dim)
# input_embeddings로 query, key, value값 모두 동일 -> self-attention 
after_attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)

after_attention_embeddings.shape

torch.Size([1, 8, 16])

## MultiheadAttention

In [None]:
class MultiheadAttention(nn.Module):
  def __init__(self, token_embed_dim, d_model, nhead, is_causal = False):
    super().__init__()
    self.nhead = nhead
    self.is_causal = is_causal
    self.weight_q = nn.Linear(token_embed_dim, d_model) # 입력한 토큰에 대한 임베딩 차원과 모델 은닉 차원 
    self.weight_k = nn.Linear(token_embed_dim, d_model)
    self.weight_v = nn.Linear(token_embed_dim, d_model)
    # 어텐션 헤드의 출력들을 하나로 합친 후에 최종 출력 변환 
    self.concat_linear = nn.Linear(d_model, d_model)
    
  def forward(self, querys, keys, values) -> torch.Tensor:
    B, T, C = querys.size() # B: 배치 차원, T: 토큰, C: 임베딩 차원
    
    # 멀티헤드 어텐션이므로, 차원 분할
    # C차원이던 벡터를 nhead개의 헤드로 나누어서 연산, 계산 후에 view를 통해 재배열 
    querys = self.weight_q(querys).view(B, T, self.nhead, C // self.nhead).transpose(1,2)
    keys = self.weight_k(keys).view(B, T, self.nhead, C // self.nhead).transpose(1, 2)
    values = self.weight_v(values).view(B, T, self.nhead, C // self.nhead).transpose(1, 2)
    # view -> [B, T, nhead, head_dim]
    # transpose(1,2) -> [B, nhead, T, head_dim]
    
    attention = compute_attention(querys, keys, values, self.is_causal)
    output = attention.transpose(1, 2).contiguous().view(B, T, C)
    output = self.concat_linear(output)
    return output
  
nhead = 4
mh_attention = MultiheadAttention(embedding_dim, embedding_dim, nhead)
after_attention_embeddings = mh_attention(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape

torch.Size([1, 8, 16])

## Layer Normalization & FeedForward Layer

In [24]:
norm = nn.LayerNorm(embedding_dim)
norm_x = norm(input_embeddings)
norm_x.shape # torch.Size([1, 5, 16])

norm_x.mean(dim=-1).data, norm_x.std(dim=-1).data

(tensor([[-1.4901e-08, -1.4901e-08,  0.0000e+00,  2.2352e-08, -1.4901e-08,
           3.3528e-08, -2.9802e-08,  1.4901e-08]]),
 tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328, 1.0328, 1.0328, 1.0328]]))

In [26]:
class PreLayerNormFeedForward(nn.Module):
  def __init__(self, d_model, dim_feedforward, dropout):
    super().__init__()
    self.linear1 = nn.Linear(d_model, dim_feedforward) # 선형 층 1
    self.linear2 = nn.Linear(dim_feedforward, d_model) # 선형 층 2
    self.dropout1 = nn.Dropout(dropout) # 드랍아웃 층 1
    self.dropout2 = nn.Dropout(dropout) # 드랍아웃 층 2
    self.activation = nn.GELU() # 활성 함수
    self.norm = nn.LayerNorm(d_model) # 층 정규화

  def forward(self, src):
    x = self.norm(src)
    x = x + self.linear2(self.dropout1(self.activation(self.linear1(x))))
    x = self.dropout2(x)
    return x

## Transformer Encoder Layer

In [27]:
class TransformerEncoder(nn.Module):
  def __init__(self, d_model, nhead, dim_feedforward, dropout):
    super().__init__()
    self.attn = MultiheadAttention(d_model, d_model, nhead)
    self.norm1 = nn.LayerNorm(d_model) 
    self.dropout1 = nn.Dropout(dropout) # 여기서 dropout의 비율은 이미 정의됨 
    self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout)
  
  # 입력 src 
  def forward(self, src) -> torch.Tensor:
    norm_x = self.norm1(src)
    # query, key, value
    attn_output = self.attn(norm_x, norm_x, norm_x)
    # 잔차 연결, 입력 값 src에 어텐션 + 드롭아웃에 통과시킨 후 그 출력
    x = src + self.dropout1(attn_output) 
    
    # 피드 포워드
    x = self.feed_forward(x)
    return x

## Encoder

In [None]:
import copy

# 인코더 레이어 클론 함수  
def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerEncoder(nn.Module):
  def __init__(self, encoder_layer, num_layers):
    super().__init__()
    self.layers = get_clones(encoder_layer, num_layers)
    self.num_layers = num_layers
    self.norm = norm

  def forward(self, src):
    output = src
    for mod in self.layers:
        output = mod(output)
    return output

## Attention in Decoder

In [28]:
def compute_attention(querys, keys, values, is_causal=False):
	dim_k = querys.size(-1) # 16
	scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)
  # causal model or AR인 경우 
	if is_causal:
		query_length = querys.size(2)
		key_length = keys.size(2)
    # torch.ones로 모두 1인 행렬을 만든 후에, tril 메서드를 통해 대각선 아래 부분만 1로 유지되는 역삼각행렬 생성 
    # diahonal은 기준선 포함  
		temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
    # temp_mask가 False, 즉 행렬값이 0인 부분은 음의 무한대로 
    # softmax 연산 시, 값이 클 수록 softmax의 값이 크므로 음의 무한대 값은 0으로 처리된다. -> mask됨
		scores = scores.masked_fill(temp_mask == False, float("-inf"))
	weights = F.softmax(scores, dim=-1) 
	return weights @ values 

## Cross Attention Decoder

In [31]:
class TransformerDecoderLayer(nn.Module):
  def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
    super().__init__()
    # self-attention과 디코더에는 cross-attention이 있으므로 따로 정의 
    self.self_attn = MultiheadAttention(d_model, d_model, nhead)
    self.multihead_attn = MultiheadAttention(d_model, d_model, nhead)
    self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout)

    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)

  def forward(self, tgt, encoder_output, is_causal=True):
    # 셀프 어텐션 연산
    x = self.norm1(tgt)
    x = x + self.dropout1(self.self_attn(x, x, x, is_causal=is_causal))
    # 크로스 어텐션 연산
    x = self.norm2(x)
    x = x + self.dropout2(self.multihead_attn(x, encoder_output, encoder_output))
    # 피드 포워드 연산
    x = self.feed_forward(x)
    return x

In [32]:
import copy
def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerDecoder(nn.Module):
  def __init__(self, decoder_layer, num_layers):
    super().__init__()
    self.layers = get_clones(decoder_layer, num_layers)
    self.num_layers = num_layers

  def forward(self, tgt, src):
    output = tgt
    for mod in self.layers:
        output = mod(output, src)
    return output