## 토큰화 코드(단어 단위)

In [None]:
# 띄어쓰기 단위로 분리
input_text = "나는 최근 파리 여행을 다녀왔다"
input_text_list = input_text.split()
print("input_text_list: ", input_text_list)

# 토큰 -> 아이디 딕셔너리와 아이디 -> 토큰 딕셔너리 만들기
str2idx = {word:idx for idx, word in enumerate(input_text_list)}
idx2str = {idx:word for idx, word in enumerate(input_text_list)}
print("str2idx: ", str2idx)
print("idx2str: ", idx2str)

# 토큰을 토큰 아이디로 변환
input_ids = [str2idx[word] for word in input_text_list]
print("input_ids: ", input_ids)

input_text_list:  ['나는', '최근', '파리', '여행을', '다녀왔다']
str2idx:  {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
idx2str:  {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
input_ids:  [0, 1, 2, 3, 4]


## 토큰 아이디에서 벡터로 변환

In [None]:
import torch
import torch.nn as nn

embedding_dim = 16 # 토큰 하나를 16차원의 임베딩으로 변환

# 사전 크기가 len(str2idx)이고 embedding_dim 차원의 임베딩을 생성하는 layer
# nn.Embedding: 입력으로 주어진 정수 인덱스를 고정된 차원의 벡터로 변환하는 역할
embed_layer = nn.Embedding(len(str2idx), embedding_dim)

input_embeddings = embed_layer(torch.tensor(input_ids)) # (5, 16)
input_embeddings = input_embeddings.unsqueeze(0) # (1, 5, 16) 차원 추가: 한 개의 문장, 다섯 개의 토큰, 각 토큰 당 16차원의 임베딩

print(input_embeddings.shape)

torch.Size([1, 5, 16])


## 절대적 위치 인코딩

In [None]:
# 논문과 달리 sin, cos 함수 사용X, 위치에 따른 임베딩 층을 추가해 위치 정보 학습 -> 이 방법 더 많이 씀

embedding_dim = 16
max_position = 12 # 최대 토큰 수

# 토큰 임베딩 층 생성
embed_layer = nn.Embedding(len(str2idx), embedding_dim)
# 위치 인코딩 층 생성
position_embed_layer = nn.Embedding(max_position, embedding_dim) # 각 위치(0~11)에 대해 16차원의 벡터로 변환

# input_ids의 길이 만큼 position_ids 생성하고 차원 추가
position_ids = torch.arange(len(input_ids), dtype=torch.long).unsqueeze(0) # tensor([[0, 1, 2, 3, 4]])
position_encodings = position_embed_layer(position_ids) # 각 위치에 해당하는 16차원의 임베딩 벡터 얻음

# 토큰 임베딩 생성
token_embeddings = embed_layer(torch.tensor(input_ids)) # (5, 16)
token_embeddings = token_embeddings.unsqueeze(0) # (1, 5, 16)

# 토큰 임베딩과 위치 인코딩을 더해 최종 입력 임베딩 생성
input_embeddings = token_embeddings + position_encodings

print(input_embeddings.shape)
print(input_embeddings)

torch.Size([1, 5, 16])
tensor([[[-1.9056,  0.4020, -0.6074,  2.6805,  3.7087, -0.6620, -1.2765,
           0.7662, -0.5810, -0.0544,  0.0943, -1.5781,  0.8282,  0.2502,
           1.8761, -2.7067],
         [ 1.1659, -0.9646, -0.1141,  0.8070, -0.1789, -1.8791,  0.4617,
          -1.0719,  0.2543, -0.5698,  0.6213, -1.4007,  0.6660, -0.0462,
          -1.5086, -3.6410],
         [ 3.1076,  1.3957,  1.5555,  0.9944,  1.4069, -2.1569, -0.5288,
           0.2961,  1.9556,  2.6009, -1.2951, -4.2932,  2.6212, -1.7372,
          -0.8859, -1.2518],
         [-2.4530, -1.4019, -1.1893,  2.2507,  2.3488, -2.5711,  0.7487,
           0.1037,  2.3612, -0.4340,  1.3498, -2.3379,  2.0035, -2.2868,
          -1.6020,  0.9593],
         [-1.3929, -0.7861,  0.9235, -2.5473,  1.9899, -1.3423,  1.8607,
          -0.7400, -2.9066,  1.3532,  0.4943,  1.7047,  1.0707, -0.5448,
          -1.0468, -0.3217]]], grad_fn=<AddBackward0>)


## 쿼리, 키, 값 벡터를 만드는 nn.Linear 층

In [None]:
head_dim = 16

# 쿼리, 키, 값을 계산하기 위한 변환
weight_q = nn.Linear(embedding_dim, head_dim) # nn.Linear(input dim, output dim): 선형 변환 수행
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

# 토큰(단어)마다 query, key, value 벡터 생성
querys = weight_q(input_embeddings) # (1, 5, 16) input_embeddings를 weight_q로 선형 변환하여 쿼리 벡터 생성
keys = weight_k(input_embeddings)   # (1, 5, 16) input_embeddings를 weight_k로 선형 변환하여 키 벡터 생성
values = weight_v(input_embeddings) # (1, 5, 16) input_embeddings를 weight_v로 선형 변환하여 값 벡터 생성

## Scaled Dot-Product Attention

In [None]:
from math import sqrt
import torch.nn.functional as F

def compute_attention(querys, keys, values, is_causal=False):
	dim_k = querys.size(-1) # 16, key 벡터의 차원
	# 내적으로 각 쿼리와 키 간의 유사도 계산, 어텐션 스코어의 분산 조정하기 위해 sqrt(dim_k)로 나눠주기
	scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k) # (1, 5, 5) query와 key 내적한 후 dim_k의 제곱근으로 나누기
	# 각 쿼리 벡터가 어느 키에 집중하는지 계산
	weights = F.softmax(scores, dim=-1) # (1, 5, 5) 행 별로 softmax 취해
	return weights @ values # 어텐션 가중치와 value 벡터의 가중합 반환

## 어텐션 연산의 입력과 출력

In [None]:
print("원본 입력 형태: ", input_embeddings.shape)

# 주변 단어의 맥락 반영 가능
after_attention_embeddings = compute_attention(querys, keys, values)

print("어텐션 적용 후 형태: ", after_attention_embeddings.shape)

# 입력과 동일한 형태의 출력 반환
# 원본 입력 형태: torch.Size([1, 5, 16])
# 어텐션 적용 후 형태: torch.Size([1, 5, 16]) 동일하당!

원본 입력 형태:  torch.Size([1, 5, 16])
어텐션 적용 후 형태:  torch.Size([1, 5, 16])


## 어텐션 연산을 수행하는 AttentionHead 클래스

In [None]:
# 이제 class로 만들어보자!

class AttentionHead(nn.Module):
  def __init__(self, token_embed_dim, head_dim, is_causal=False):
    super().__init__()
    self.is_causal = is_causal
    self.weight_q = nn.Linear(token_embed_dim, head_dim) # 쿼리 벡터 생성을 위한 선형 층
    self.weight_k = nn.Linear(token_embed_dim, head_dim) # 키 벡터 생성을 위한 선형 층
    self.weight_v = nn.Linear(token_embed_dim, head_dim) # 값 벡터 생성을 위한 선형 층

  def forward(self, querys, keys, values):
    outputs = compute_attention(
        self.weight_q(querys),  # 쿼리 벡터
        self.weight_k(keys),    # 키 벡터
        self.weight_v(values),  # 값 벡터
        is_causal=self.is_causal
    )
    return outputs

attention_head = AttentionHead(embedding_dim, embedding_dim)
after_attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)

## 멀티 헤드 어텐션 구현

In [None]:
class MultiheadAttention(nn.Module):
  def __init__(self, token_embed_dim, d_model, n_head, is_causal=False):
    super().__init__()
    self.n_head = n_head # head 수
    self.is_causal = is_causal
    self.weight_q = nn.Linear(token_embed_dim, d_model)
    self.weight_k = nn.Linear(token_embed_dim, d_model)
    self.weight_v = nn.Linear(token_embed_dim, d_model)
    self.concat_linear = nn.Linear(d_model, d_model)

  def forward(self, querys, keys, values):
    B, T, C = querys.size() # B: 배치 크기(문장 수), T: 토큰 수(단어 수), C: input 임베딩 차원
    # view를 통해 멀티-헤드 구조로 변경
    # n_head만큼 연산을 수행하기 위해 쿼리, 키, 값 벡터의 차원을 n_head개로 쪼갠다
    # (1, 5, 16) -> (1, 4(n_head), 5, 4(C//n.head))
    querys = self.weight_q(querys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    keys = self.weight_k(keys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    values = self.weight_v(values).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    # 각 헤드에서 어텐션을 계산
    attention = compute_attention(querys, keys, values, self.is_causal)
    # 그 결과를 다시 결합하여 입력과 같은 형태로 되돌리기
    # (1, 4, 5, 4) -> (1, 5, 4, 4) -> (1, 5, 16)
    output = attention.transpose(1, 2).contiguous().view(B, T, C)
    # 최종적으로 선형 변환을 한 번 더 적용하여 출력 차원을 조정
    output = self.concat_linear(output)
    return output

n_head = 4
mh_attention = MultiheadAttention(embedding_dim, embedding_dim, n_head)
after_attention_embeddings = mh_attention(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape

torch.Size([1, 5, 16])

## Layer Normalization

In [None]:
# 각 토큰 임베딩 별로 평균과 표준편차를 구해 정규화 수행
# 논문에서는 사후 정규화, 여기서는 사전 정규화(학습이 더 안정적)
norm = nn.LayerNorm(embedding_dim)
norm_x = norm(input_embeddings) # 입력 임베딩을 층 정규화 레이어에 통과시켜 정규화된 임베딩(norm_x)로 만들기
print(norm_x.shape) # torch.Size([1, 5, 16])

# 실제로 평균과 표준편차 확인하기
print(norm_x.mean(dim=-1).data)
print(norm_x.std(dim=-1).data)

torch.Size([1, 5, 16])
tensor([[ 1.4901e-08,  4.4703e-08, -4.8429e-08,  4.4703e-08, -1.8626e-08]])
tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328]])


## 피드 포워드 층 코드

In [None]:
# feed forward layer: 데이터의 특징을 학습하는 fully connected layer
# 멀티 헤드 어텐션이 단어 사이의 관계를 파악하는 역할이라면 feed forward layer는 입력 텍스트 전체를 이해하는 역할 담당

class PreLayerNormFeedForward(nn.Module):
  def __init__(self, d_model, dim_feedforward, dropout):
    super().__init__()
    # d_model 차원에서 dim_feedforward 차원으로 확장했다가 다시 d_model로 변환
    self.linear1 = nn.Linear(d_model, dim_feedforward) # 선형 층 1
    self.linear2 = nn.Linear(dim_feedforward, d_model) # 선형 층 2
    # 드랍아웃 층을 추가하여 과적합을 방지
    # 드랍아웃은 학습 과정에서 무작위로 네트워크의 일부 노드를 비활성화(무시)하여 모델이 특정 노드에 의존하지 않도록 함
    self.dropout1 = nn.Dropout(dropout) # 드랍아웃 층 1
    self.dropout2 = nn.Dropout(dropout) # 드랍아웃 층 2
    self.activation = nn.GELU() # 활성 함수
    self.norm = nn.LayerNorm(d_model) # 층 정규화

  def forward(self, src):
    x = self.norm(src)
    x = x + self.linear2(self.dropout1(self.activation(self.linear1(x))))
    # 입력 x와 연산 결과를 더해 Residual Connection(잔차 연결)을 적용
    # Residual Connection은 정보가 소실되지 않도록, 원래 입력을 연산 결과에 더하는 역할
    x = self.dropout2(x)
    return x

## 인코더 층

In [None]:
class TransformerEncoderLayer(nn.Module):
  def __init__(self, d_model, nhead, dim_feedforward, dropout):
    super().__init__()
    self.attn = MultiheadAttention(d_model, d_model, nhead) # 멀티 헤드 어텐션 클래스
    self.norm1 = nn.LayerNorm(d_model) # 층 정규화
    self.dropout1 = nn.Dropout(dropout) # 드랍아웃
    self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout) # 피드포워드

  def forward(self, src):
    norm_x = self.norm1(src) # 층 정규화
    attn_output = self.attn(norm_x, norm_x, norm_x) # 멀티 헤드 어텐션
    x = src + self.dropout1(attn_output) # 잔차 연결

    # 피드 포워드
    x = self.feed_forward(x) # 층 정규화, 피드 포워드, 잔차 연결 포함
    return x

## 인코더 구현
언어를 이해하는 역할

In [None]:
import copy

# get_clone 함수는 입력한 모듈을 deepcopy를 통해 N번 반복해 모듈 리스트에 담음
def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

# 인자로 전달받은 encoder_layer를 get_clones 함수를 통해 num_layers번 반복해 nn.ModuleList에 넣고,
# forward 메서드에서 for문을 통해 돌면서 인코더 층 연산을 반복 수행하도록 만든다.
class TransformerEncoder(nn.Module):
  def __init__(self, encoder_layer, num_layers):
    super().__init__()
    self.layers = get_clones(encoder_layer, num_layers)
    self.num_layers = num_layers
    self.norm = norm

  def forward(self, src):
    output = src
    for mod in self.layers:
        output = mod(output)
    return output

## 디코더에서 어텐션 연산(마스크 어텐션)
학습할 때는 인코더와 디코더 모두 완성된 텍스트를 입력으로 받는다. 따라서 어텐션을 그대로 활용할 경우 작성해야 하는 텍스트를 미리 확인하게 되는 문제가 생긴다. 이를 막기 위해 특정 시점에는 그 이전에 생성된 토큰까지만 확인할 수 있도록 마스크를 추가한다.

In [None]:
def compute_attention(querys, keys, values, is_causal=False):
	dim_k = querys.size(-1) # 16
	scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k) # (1, 5, 5)

	# is_causal이 true인 경우 마스크 연산 추가
	if is_causal:
		query_length = querys.size(2) # 5
		key_length = keys.size(2) # 5
		# 마스크 생성: torch.ones로 모두 1인 5x5 행렬에 tril 함수를 취해 대각선 포함한 아래 부분은 1, 나머지는 0
		temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
		"""
		(마스크 예시)
		1 0 0 0 0
    1 1 0 0 0
  	1 1 1 0 0
    1 1 1 1 0
    1 1 1 1 1
    """
		# scores에서 마스크의 false(0) 위치에 음의 무한대(-inf) 채워넣기 -> softmax 계산 시 0에 가까운 값 갖게 됨
		scores = scores.masked_fill(temp_mask == False, float("-inf"))
		"""
		(마스크 예시)
		12 -inf -inf -inf -inf
    2    4  -inf -inf -inf
  	-8   1    9  -inf -inf
    -10  4    6    4  -inf
    2    1   -6    12  -1
    """

	weights = F.softmax(scores, dim=-1) # (1, 5, 5)
	return weights @ values # (1, 5, 16)

## 크로스 어텐션이 포함된 디코더 층

In [None]:
# 크로스 어텐션: 인코더의 결과를 디코더가 활용
# 이때 query는 디코더의 잠재 상태를 사용하고, key와 value는 인코더의 결과를 사용한다.

class TransformerDecoderLayer(nn.Module):
  def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
    super().__init__()
    self.self_attn = MultiheadAttention(d_model, d_model, nhead)
    self.multihead_attn = MultiheadAttention(d_model, d_model, nhead)
    self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout)

    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout1 = nn.Dropout(dropout)
    self.dropout2 = nn.Dropout(dropout)

  def forward(self, tgt, encoder_output, is_causal=True):
    # 셀프 어텐션 연산
    x = self.norm1(tgt)
    x = x + self.dropout1(self.self_attn(x, x, x, is_causal=is_causal))
    # 크로스 어텐션 연산
    x = self.norm2(x)
    x = x + self.dropout2(self.multihead_attn(x, encoder_output, encoder_output)) # key, value에 encoder_output 사용
    # 피드 포워드 연산
    x = self.feed_forward(x)
    return x

## 디코더 구현
* 언어를 생성하는 역할
* 앞에서 생성한 토큰을 기반으로 다음 토큰 생성:  auto-regressive(자기 회귀적) 또는 causal(인과적)인 특징

In [None]:
import copy
def get_clones(module, N):
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerDecoder(nn.Module):
  def __init__(self, decoder_layer, num_layers):
    super().__init__()
    self.layers = get_clones(decoder_layer, num_layers)
    self.num_layers = num_layers

  def forward(self, tgt, src):
    output = tgt
    for mod in self.layers:
        output = mod(tgt, src)
    return output