# **토큰화 코드**

In [12]:
# 띄어쓰기 단위로 분리
input_text = "나는 최근 파리 여행을 다녀왔다"
input_text_list = input_text.split()

print("input_text_list:", input_text_list)

# 토큰 -> 아이디 딕셔너리와 아이디 -> 토큰 딕셔너리 만들기
str2idx = {word:idx for idx , word in enumerate(input_text_list)}
idx2str = {idx:word for idx , word in enumerate(input_text_list)}

print("str2idx:", str2idx)
print("idx2str:", idx2str)

# 토큰을 토큰 아이디로 변환
input_ids = [str2idx[word] for word in input_text_list]
print("input_ids:", input_ids)

# 출력 결과
# input_text_list: ['나는', '최근', '파리', '여행을', '다녀왔다']
# str2idx: {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
# idx2str: {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
# input_ids: [0, 1, 2, 3, 4]

input_text_list: ['나는', '최근', '파리', '여행을', '다녀왔다']
str2idx: {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4}
idx2str: {0: '나는', 1: '최근', 2: '파리', 3: '여행을', 4: '다녀왔다'}
input_ids: [0, 1, 2, 3, 4]


# **토큰 아이디에서 벡터로 변환**


In [13]:
import torch
import torch.nn as nn

embedding_dim = 16
# len(str2idx) = 전체 단어의 길이 추출
# str2idx: {'나는': 0, '최근': 1, '파리': 2, '여행을': 3, '다녀왔다': 4} = 길이 5

# embedding_dim = 차원 = 16

# embed_layer = 위의 설정으로 임베딩 층을 만든다
embed_layer = nn.Embedding(len(str2idx), embedding_dim)

# torch.tensor = 텐서로 변환 / 리스트나 배열은 GPU 사용이 불가능해서 모델이 이해할 수 있게 바꿈
# torch.tensor(input_ids) = 토큰 아이디를 텐서로 변환
# embed_layer(torch.tensor(input_ids)) = 토큰 아이디를 16차원의 임의의 숫자 집합의 벡터로 변환
input_embeddings = embed_layer(torch.tensor(input_ids)) #(5, 16)

# unsqueeze = 텐서에 차원을 추가
# 딥러닝 모델은 데이터를 배치로 처리하기 때문에 한개가 있더라도 배치 형태로 만들어야 모델이 처리할 수 있다
input_embeddings = input_embeddings.unsqueeze(0) # (1, 5, 16)
input_embeddings.shape

torch.Size([1, 5, 16])

# **절대적 위치 인코딩**

In [14]:
embedding_dim = 16
max_position = 12
embed_layer = nn.Embedding(len(str2idx), embedding_dim)
# 기존의 임베딩 층에 추가할 위치 임베딩 층
# 위치 인코딩을 생성하는 위치 임베딩 층
# max_position = 12 = 최대 토큰 수 -> 12개
position_embed_layer = nn.Embedding(max_position, embedding_dim)

# torch.arange = 0 부터 시작하는 1차원 텐서 생성
# dtype=torch.long = long 타입으로 생성 -> 위치 인덱스를 나타내기에 알맞은 타입
# unsqueeze = 배치 처리하기 위해 차원 추가
position_ids = torch.arange(len(input_ids), dtype=torch.long).unsqueeze(0)
# 위치 아이디를 임베딩 층에 입력해 위치 인코딩 설정 (벡터)
position_encodings = position_embed_layer(position_ids)
token_embeddings = embed_layer(torch.tensor(input_ids))
token_embeddings = token_embeddings.unsqueeze(0)

# 토큰 임베딩에 위치 인코딩을 더해 모델에 입력할 최종 입력 임베딩 준비
input_embeddings = token_embeddings + position_encodings
input_embeddings.shape

torch.Size([1, 5, 16])

# **쿼리, 키, 값 벡터를 만드는 nn.Linear층**

In [15]:
head_dim = 16

# 쿼리, 키, 값을 계산하기 위한 변환
# 학습 가능한 가중치를 만들기 위해 선형 층을 사용
# 모델이 학습을 통해 어떤 특징이 중요한지 알아서 학습하도록 도와줌
# 선형층은 단순히 가중치를 곱하는 연산이지만,
# 이 가중치가 학습됨에 따라 입력 벡터에서 중요한 특징을 강조하고, 덜 중요한 특징을 약화할 수 있다.

# embedding_dim = 입력차원, head_dim = 출력차원?
weight_q = nn.Linear(embedding_dim, head_dim)
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

# 변환 수행
# 만들어낸 가중치에 임베딩을 통과시킴
querys = weight_q(input_embeddings) # (1, 5, 16)
keys = weight_k(input_embeddings) # (1, 5, 16)
values = weight_v(input_embeddings) # (1, 5, 16)

# **스케일 점곱 방식의 어텐션**

In [16]:
from math import sqrt
import torch.nn.functional as F

def compute_attention(querys, keys, values, is_causal=False):
  # querys.size(-1) querys의 마지막 차원의 크기를 가져옴
  # dim_k = 16과 같음(head_dim)
  dim_k = querys.size(-1) #16

  # @ 연산자는 행렬 곱셈(MatMul)을 수행하는 연산자
  # Python 3.5 이상에서는 NumPy와 PyTorch에서 @연산자가 matmul()과 동일하게 동작한다
  # transpose 두개의 차원을 서로 바꿔주는 PyTorch 함수 (안의 인자는 substring 작동방식과 유사)

  # 분산이 커지는 것을 방지하기 위해 임베딩 차원 수(dim_k)의 제곱근으로 나눈다 sqrt(dim_k)
  # 정규화 과정 진행
  scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)
  # 소프트 맥스 함수를 사용하여 다중 분류의 여러 선형 방정식의 출력 값을 정규화 (확률 분포로 변환 -> 각 쿼리에 대해 모든 키의 중요도를 확률 값으로 변환)
  # dim=1 -> 시퀀스 길이 차원에 대해 정규화

  # 결과적으로 weights는 가중치 행렬이 된다
  weights = F.softmax(scores, dim=1)

  # 가중치와 값을 곱해 입력과 동일한 형태의 출력을 반환한다
  return weights @ values

# **어텐션 연산의 입력과 출력**

In [17]:
print("원본 입력 형태: ", input_embeddings.shape)

after_attention_embeddings = compute_attention(querys, keys, values)

print("어텐션 적용 후 형태: ", after_attention_embeddings.shape)

원본 입력 형태:  torch.Size([1, 5, 16])
어텐션 적용 후 형태:  torch.Size([1, 5, 16])


# **어텐션 연산을 수행하는 AttentionHead 클래스**

In [18]:
# 파이썬 상속
# PyTorch의 nn.Module 상속
class AttentionHead(nn.Module):
  # 생성자
  def __init__(self, token_embed_dim, head_dim, is_causal=False):
    super().__init__()
    self.is_causal = is_causal
    self.weight_q = nn.Linear(token_embed_dim, head_dim) #쿼리 벡터 생성을 위한 선형 층
    self.weight_k = nn.Linear(token_embed_dim, head_dim) #키 벡터 생성을 위한 선형 층
    self.weight_v = nn.Linear(token_embed_dim, head_dim) #값 벡터 생성을 위한 선형 층

  def forward(self, querys, keys, values):
    # 위에서 관계도 계산을 위해 만들어둔 함수 재활용
    outputs = compute_attention(
        self.weight_q(querys), #쿼리 벡터
        self.weight_k(keys), #키 벡터
        self.weight_v(values), #값 벡터
        is_causal = self.is_causal
    )

    return outputs

# 클래스 생성
attention_head = AttentionHead(embedding_dim, embedding_dim)
after_attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)


# **멀티 헤드 어텐션 구현**

In [19]:
class MultiheadAttention(nn.Module):
  def __init__(self, token_embed_dim, d_model, n_head, is_causal=False):
    super().__init__()
    # 헤드의 개수
    self.n_head = n_head
    # 미래 단어를 가릴지 여부
    self.is_causal = is_causal
    self.weight_q = nn.Linear(token_embed_dim, d_model)
    self.weight_k = nn.Linear(token_embed_dim, d_model)
    self.weight_v = nn.Linear(token_embed_dim, d_model)
    # 멀티 헤드의 결과를 합치는 선형층
    self.concat_linear = nn.Linear(d_model, d_model)

  def forward(self, querys, keys, values):
    # B: 배치크기, T: 시퀀스 길이, C: 임베딩 차원이 들어가게 된다
    # querys: (1, 5, 64)일때 B = 1, T = 5, C = 64
    B, T, C = querys.size()

    # view는 PyTorch의 텐서 변환 함수로 텐서의 크기를 바꿀 때 사용
    # view를 사용할 때 기존 데이터의 순서는 유지되어야 한다

    # 멀티 헤드 어텐션에서는 입력 백터(C)를 여러 개의 Head로 나눠야 한다
    # 즉 C 차원을 n_head 개의 작은 벡터로 쪼개야 한다

    # querys의 (B, T, C) 값을 가졌다고 할 때 .view(B, T, self.n_head, C // self.n_head) 연산을 거치면
    # (B, T, n_head, head_dim)로 변환된다. ex -> (2, 5, 64) → (2, 5, 4, 16)
    # 즉 멀티 헤드 개수를 4로 뒀으므로 입력 벡터를 동등하게 처리하기 위해 나눈다? -> 64차원이 4개의 작은 묶음으로 쪼개짐
    # .transpose(1, 2) -> Head 차원을 앞으로 옮겨 연산을 쉽게 만듦 / (B, T, n_head, head_dim) → (B, n_head, T, head_dim)
    querys = self.weight_q(querys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    keys = self.weight_k(keys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
    values = self.weight_v(values).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

    # Self-Attention을 수행
    # 여기서 각 Head별로 독립적으로 Attention이 계산된다
    attention = compute_attention(querys, keys, values, self.is_causal)

    # 여러 개의 Head에서 나온 Attention 결과를 다시 합치는 과정
    # contiguous() PyTorch에서 텐서를 메모리에 연속된 상태로 정렬하는 함수
    # view()를 사용시 연속된 메모리 형태가 아니면 에러가 발생할 수 있음
    # 즉 view()를 안전하게 사용하기 위해 contiguous()를 호출하는 것이다
    output = attention.transpose(1, 2).contiguous().view(B, T, C)

    # 여러 개의 Head에서 계산한 정보를 최종적으로 하나의 벡터로 압축하기 위해 사용
    # Head의 정보를 조합하는 선형 변환 / 이렇게 하면 멀티 헤드 어텐션에서 최종적인 출력 벡터가 완성된다
    output = self.concat_linear(output)

    return output

n_head = 4
mh_attention = MultiheadAttention(embedding_dim, embedding_dim, n_head)
after_attention_embeddings = mh_attention(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape

torch.Size([1, 5, 16])

# **층 정규화 코드**

In [20]:
# 정규화 레이어 생성
norm = nn.LayerNorm(embedding_dim)
# 입력 임베딩을 층 정규화 레이어에 통과시켜 정규화된 임베딩으로 만든다
norm_x = norm(input_embeddings)
norm_x.shape # torch.Size([1, 5, 16])

# mean -> 텐서의 평균 계산, dim=-1 -> 마지막 차원
# std -> 표준편차 계산
# data -> 텐서의 원본 데이터를 가져오는 기능
norm_x.mean(dim=-1).data, norm_x.std(dim=-1).data

# (tensor([[ 2.2352e-08, -1.1176e-08, -7.4506e-09, -3.9116e-08, -1.8626e-08]]),
#  tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328]]))

(tensor([[ 0.0000e+00, -1.1176e-08,  1.4901e-08,  0.0000e+00, -2.2352e-08]]),
 tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328]]))

# **피드 포워드 층 코드**

In [21]:
class PreLayerNormFeedForward(nn.Module):
  def __init__(self, d_model, dim_feedforward, dropout):
    super().__init__()
    # 차원 확장을 위한 선형 층 -> d_model에서 dim_feedforward로 차원 확장
    # 차원 확장을 통해 더 많은 특징을 학습할 수 있도록 정보를 풍부하게 변환
    # 선형 변환(행렬 곱셈 + 덧셈)하는 신경망 층
    # 신경망의 기본 연산이 행렬 연산이기 때문,
    # 의미 있는 특징을 추출하고 불피요한 정보를 줄이기 위해 사용
    self.linear1 = nn.Linear(d_model, dim_feedforward) # 선형 층 1

    # 차원을 원래 크기로 다시 축소하는 용도
    # 위 선형층은 일시적으로 확장해서 정보를 풍부하게 만드는 역할
    # 다시 원래 크기로 줄여 입력과 동일한 크기로 유지
    self.linear2 = nn.Linear(dim_feedforward, d_model) # 선형 층 2

    # 과적합 방지를 위해 일부 뉴런을 랜덤하게 제거
    # 모델이 특정 특징에 과도하게 의존하는 것을 방지
    self.dropout1 = nn.Dropout(dropout) # 드롭아웃 층 1 (과대적합 방지를 위해)
    self.dropout2 = nn.Dropout(dropout) # 드롭아웃 층 2

    # 선형 변환 후 비선형 활성화 함수를 적용하여 복잡한 패턴을 학습할 수 있도록 함
    # 신경망이 선형 함수만 사용하면 아무리 층을 쌓아도 하나의 선형 변환과 동일하기 때문
    # GELU -> 최신 모델에서 사용, ReLU보다 부드럽고 학습 성능이 좋음
    # 트랜스포머 모델에서 사용됨
    self.activation - nn.GELU() # 활성 함수 -> 비선형층

    # 위에 쓴 정규화 코드인듯?
    self.norm = nn.LayerNorm(d_model) # 층 정규화

  def forward(self, src):
    # 입력 임베딩 정규화 수행
    x = self.norm(src)

    # (4) 선형 층으로 차원 축소
    # x = x + slef... (5) 원래 입력과 변환된 값을 더함
    # 잔차 연결(Residual Connection)이라고 부름
    # 원래 정보를 유지하면서 새로운 정보를 추가하는 효과
    # FFN (피드 포워드)이 너무 큰 변화를 주면 모델 학습이 불안정해질 수 있음
    # 원래 입력을 더해서 초기 정보를 보존하면서 학습할 수 있음
    x = x + self.linear2(
        # (3) 드롭아웃 적용
        self.dropout1(
            # (2) 활성화 함수 적용
            self.activation(
                # (1) 선형 층으로 차원 확장
                self.linear1(x)
            )
        )
    )

    # (6) 다시 드롭아웃 적용
    # 최종 출력을 얻기 전에 마지막으로 적용
    x = self.dropout2(x)

    return x

# **인코더 층**

In [22]:
class TransformerEncoderLayer(nn.Module):
  def __init__(self, d_model, nhead, dim_feedforward, dropout):
    super().__init__()

    self.attn = MultiheadAttention(d_model, d_model, nhead) # 멀티 헤드 어텐션 클래스
    self.norm1 = nn.LayerNorm(d_model) # 층 정규화
    self.dropout1 = nn.Dropout(dropout) #드롭아웃
    self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout) # 피드포워드

  def forward(self, src):
    # 층 정규화
    norm_x = self.norm1(src)

    # 멀티 헤드 어텐션 연산 수행
    attn_output = self.attn(norm_x, norm_x, norm_x)

    #  잔차 연결을 위해 어텐션 결과에 드롭아웃을 취한 값과 입력을 더한다
    x = src + self.dropout1(attn_output) # 잔차 연결

    # 피드 포워드
    x = self.feed_forward(x)

    return x

# **인코더 구현**

In [23]:
# 파이썬 객체 복사 모듈
import copy

# module -> PyTorch에서 신경망의 구성 요소
# 트랜스포머에서는 하나의 인코더 레이어도 하나의 모듈이다
def get_clones(module, N):
  # ModuleList -> 여러 개의 신경망 레이어(모듈)을 리스트 형태로 저장하는 컨테이너
  # ModuleList를 사용해야 내부 레이어의 파라미터도 포함됨
  return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerEncoder(nn.Module):
  def __init__(self, encoder_layer, num_layers):
    super().__init__()

    self.layers = get_clones(encoder_layer, num_layers)
    self.num_layers = num_layers
    self.norm = norm

  def forward(self, src):
    output = src

    # 바뀐 output을 재투입해 n번희 인코더 층을 반복
    for mod in self.layers:
      output = mod(output)

    return output

# **디코더에서 어텐션 연산(마스크 어텐션)**

In [None]:
def compute_attention(querys, keys, values, is_causal=False):
  dim_k = querys.size(-1) # 16
  scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k) # (1, 5, 5)

  if is_causal:
    query_length = querys.size(2)
    key_length = keys.size(2)
    temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)
    scores = scores.masked_fill(temp_mask == False, float("-inf"))

  weights = F.softmax(scores, dim=-1) # (1, 5, 5)

  return weights @ values # (1, 5, 16)