# 1. 트랜스포머 아키텍처란

**RNN vs 트랜스포머**
* RNN: 텍스트를 순차적으로 하나씩 입력
    * 이전 토큰의 출력을 다시 모델에 입력으로 사용 - 순차적 처리
    * 느린 학습 속도
    * 입력이 길어지면 토큰의 정보가 희석되면서 성능이 떨어짐
    * 층을 깊이 쌓으면 gradient vanishing, gradient exploding 발생

* 트랜스포머
    * **self-attention**: 입력된 문장 내 **각 단어의 관련성을 계산해**, 각 단어의 표현을 조정
    * 확장성 용이, 학습 시간 단축, 입력이 길어져도 성능 유지
    * 인코더(언어를 이해) + 디코더(언어를 생성) 구조

# 2. 텍스트를 임베딩으로 변환하기

텍스트를 모델에 입력할 수 있는 숫자형 데이터인 임베딩으로 변환
* **tokenisation**: 텍스트를 적절한 단위로 잘라, 숫자형 id를 부여
* **토큰 임베딩 층**: 토큰 id를 여러 숫자의 집합인 토큰 임베딩으로 변환
* **위치 인코딩 층**: 토큰의 위치 정보 추가

## 2.1 tokenisation / 토큰화

* 텍스트를 적절한 단위로 나누고 숫자 ID를 부여
* subword 토큰화 방식
    * 자주 나오는 단어는 단어 단위 그대로, 가끔 나오는 단어는 작은 단위로 나눔
    * 사전의 크기를 작고 효율적으로 유지
    

In [None]:
# 단어 단위 토큰화
input_text = "백두산 맑은 정기 타고 난 우리"
input_text_list = input_text.split()
print(f"input_text_list: {input_text_list}")

# 토큰 -> ID 및 ID -> 토큰 딕셔너리 만들기
word2idx = {word: idx for idx, word in enumerate(input_text_list)}
idx2word = {idx: word for idx, word in enumerate(input_text_list)}
print(f"word2idx: {word2idx}")
print(f"idx2word: {idx2word}")

# 토큰을 토큰 ID로 변화
input_ids = [word2idx[word] for word in input_text_list]
print(f"input_ids: {input_ids}")

input_text_list: ['백두산', '맑은', '정기', '타고', '난', '우리']
word2idx: {'백두산': 0, '맑은': 1, '정기': 2, '타고': 3, '난': 4, '우리': 5}
idx2word: {0: '백두산', 1: '맑은', 2: '정기', 3: '타고', 4: '난', 5: '우리'}
input_ids: [0, 1, 2, 3, 4, 5]


## 2.2 token embedding / 토큰 임베딩으로 변환하기
* 토큰과 토큰 사이의 관계를 계산할 수 있어야 함
* **토큰 임베딩**: 토큰 id를 의미를 담아 숫자 집합으로 변환

In [None]:
import torch
torch.tensor(input_ids)

tensor([0, 1, 2, 3, 4, 5])

In [None]:
import torch
import torch.nn as nn

embedding_dim = 16
embed_layer = nn.Embedding(len(word2idx), embedding_dim)

input_embeddings = embed_layer(torch.tensor(input_ids)) # (6, 16)
input_embeddings = input_embeddings.unsqueeze(0) # (1, 6, 16)
input_embeddings.shape
# 1개의 문장, 5개의 토큰, 16차원의 임베딩

torch.Size([1, 6, 16])

* 현재 임베딩 층은 토큰 아이디를 16차원의 임의의 숫자 집합으로 바꿈
* 딥러닝 모델이 학습되는 과정에서, 임베딩 층도 데이터의 의미를 잘 담은 임베딩을 만들도록 학습됨

## 2.3 position encoding / 위치 인코딩

* 트랜스포머는 모든 입력을 동시에 처리 - 순서 정보가 사라짐
* 순서 정보 처리 역할을 위치 인코딩이 담당
* **absolute position encoding**: 토큰 위치에 따라 고정된 임베딩을 더해줌 (e.g., 수식 or 위치에 따른 임베딩 층)
* **relative position encoding**: 토큰과 토큰 사이 상대적 위치 정보를 활용해, 긴 텍스트 추론 가능

In [None]:
# 위치 인덱스에 따라 임베딩을 더하도록 구현한 레이어
embedding_dim = 16
max_position = 12 # 최대 토큰 수

embed_layer = nn.Embedding(len(word2idx), embedding_dim)
position_embed_layer = nn.Embedding(max_position, embedding_dim)

position_ids = torch.arange(len(input_ids), dtype=torch.long).unsqueeze(0) # (1, 6)
position_encodings = position_embed_layer(position_ids) # (1, 6, 16)
token_embeddings = embed_layer(torch.tensor(input_ids)) # (6, 16)
token_embeddings = token_embeddings.unsqueeze(0) # (1, 6, 16)

input_embeddings = token_embeddings + position_encodings
input_embeddings.shape

torch.Size([1, 6, 16])

# 3. 어텐션 이해하기

## 3.1 사람이 글을 읽는 방법과 어텐션

* 어텐션: 단어와 단어 사이의 관계를 계산
* 관련이 깊은 단어와 그렇지 않은 단어 구분
* 관련이 깊을수록 더 많이 맥락을 반영해야 함

## 3.2 쿼리, 키, 값 이해하기

e.g., '나는 최근 파리 여행을 다녀왔다'

* 쿼리: '파리'와 관련 있는 단어를 찾고자 함 -> '파리'
* 키: 쿼리와 관련이 있는지 계산하기 위해 문서가 가진 특징 -> 문장 속의 각 단어
* 값: 쿼리와 관련 있는 키 -> '여행을', '다녀왔다'

'파리'와 키 집합의 관계 개산하기
* '파리'의 임베딩과 각 토큰의 임베딩의 관계 계산
* $W_Q$, $W_K$ 가중치를 통해 토큰 임베딩 변환
* 변환된 토큰 임베딩끼리 벡터곱을 통해 관련도 계산
* 관련도와 $W_V$를 통해 변환된 토큰 임베딩 값을 가중합하여 주변 맥락을 반영한 결과를 얻을 수 있음


# 3.3 코드로 보는 어텐션

In [None]:
# 쿼리, 키, 값 벡터를 만드는 nn.Linear 층의 출력 차원
head_dim = 16

# 쿼리, 키, 값을 계산하기 위한 변환
weight_q = nn.Linear(embedding_dim, head_dim)
weight_k = nn.Linear(embedding_dim, head_dim)
weight_v = nn.Linear(embedding_dim, head_dim)

querys = weight_q(input_embeddings) # (1, 6, 16)
keys = weight_k(input_embeddings) # (1, 6, 16)
values = weight_v(input_embeddings) # (1, 6, 16)

In [None]:
# 스케일 점곱 방식의 어텐션
from math import sqrt
import torch.nn.functional as F

def compute_attention(querys, keys, values, is_causal=False):
    dim_k = querys.size(-1) # 16

    # 1. 쿼리와 키를 곱한다.
    # 분산이 커지는 것을 방지하기 위해 임베딩 차원 수의 제곱으로 나눈다.
    scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)

    # 2. 스코어의 합이 1이 되도록 소프트맥스를 취해 가중치로 바꾼다.
    weights = F.softmax(scores, dim=-1)

    # 3. 가중치와 값을 곱해 입력과 동일한 형태로 출력한다.
    return weights @ values

In [None]:
print(f"원본 입력 형태: {input_embeddings.shape}")
after_attention_embeddings = compute_attention(querys, keys, values)
print(f"어텐션 적용 후 형태: {after_attention_embeddings.shape}")

원본 입력 형태: torch.Size([1, 6, 16])
어텐션 적용 후 형태: torch.Size([1, 6, 16])


In [None]:
# 지금까지의 어텐션 과정을 클래스로 나타내기
class AttentionHead(nn.Module):
    def __init__(self, token_embed_dim, head_dim, is_causal=False):
        super().__init__()
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, head_dim)
        self.weight_k = nn.Linear(token_embed_dim, head_dim)
        self.weight_v = nn.Linear(token_embed_dim, head_dim)

    def forward(self, querys, keys, values):
        outputs = compute_attention(
            self.weight_q(querys),
            self.weight_k(keys),
            self.weight_v(values),
            is_causal=self.is_causal
        )
        return outputs

attention_head = AttentionHead(embedding_dim, embedding_dim)
after_attention_embeddings = attention_head(input_embeddings, input_embeddings, input_embeddings)

## 3.4 멀티헤드 어텐션
* 여러 어텐션 연산을 동시에 적용하기
* 여러 측면을 동시에 고려하기 위한 절차
* 헤드의 수만큼 연산을 수행하고, 각각의 어텐션을 계산한 뒤, 입력과 같은 형태로 반환하고, 선형 층을 통과시켜 최종 결과 반환

In [None]:
# 지금까지의 어텐션 과정을 클래스로 나타내기
class MultiHeadAttention(nn.Module):
    def __init__(self, token_embed_dim, d_model, n_head, is_causal=False):
        super().__init__()
        self.n_head = n_head
        self.is_causal = is_causal
        self.weight_q = nn.Linear(token_embed_dim, d_model)
        self.weight_k = nn.Linear(token_embed_dim, d_model)
        self.weight_v = nn.Linear(token_embed_dim, d_model)
        self.concat_linear = nn.Linear(d_model, d_model)

    def forward(self, querys, keys, values):
        B, T, C = querys.size()

        # Q, K, V가 처음 통과하는 선형 층
        # 쿼리, 키, 값을 n_head개로 쪼개고 각각의 어텐션을 계산
        querys = self.weight_q(querys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        keys = self.weight_k(keys).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        values = self.weight_v(values).view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
        print(querys.shape)

        # h(헤드 수)번의 스케일 점곱 어텐션
        attention = compute_attention(querys, keys, values, self.is_causal)
        print(attention.shape)

        # 어텐션 결과의 연결
        output = attention.transpose(1, 2).contiguous().view(B, T, C)

        # 마지막 선형 층
        output = self.concat_linear(output)
        return output

n_head = 4
mh_attention = MultiHeadAttention(embedding_dim, embedding_dim, n_head)
after_attention_embeddings = mh_attention(input_embeddings, input_embeddings, input_embeddings)
after_attention_embeddings.shape

torch.Size([1, 4, 6, 4])
torch.Size([1, 4, 6, 4])


torch.Size([1, 6, 16])

# 4. 정규화와 피드 포워드 층

## 4.1 층 정규화 (layer normalisation)

* 데이터 정규화를 통해 모든 입력 변수가 비슷한 범위와 분포를 갖도록 조정할 수 있음
* $\textbf{norm_x} = (\textbf{x} - 평균) / 표준편차$

* 배치 정규화: 모델에 입력으로 들어가는 미니 배치 사이에 정규화 수행
    * 자연어 처리에서는 입력 문장의 길이가 다양
    * 정규화에 포함되는 데이터의 수가 제각각이라 효과 낮음 (패딩 토큰 존재)

* 층 정규화: 각 샘플(토큰 임베딩) 내부의 평균과 표준편차를 구해 정규화를 수행
    * 각각 샘플별로 정규화를 수행하기 때문에, 정규화 효과에 차이 없음

* 사후 정규화에 비해, 어텐션 및 피드 포워드 이전에 정규화를 하는 사전 정규화가 주로 활용됨

In [None]:
norm = nn.LayerNorm(embedding_dim)
norm_x = norm(input_embeddings)
print(norm_x.shape) # 1, 6, 16

print(norm_x.mean(axis=-1).data, norm_x.std(axis=-1).data)
# 스케일링이 이루어져 완전히 0과 1이랑 동일하지 않을 수 있음.

torch.Size([1, 6, 16])
tensor([[ 2.3283e-08,  2.2352e-08,  4.4703e-08, -7.4506e-09,  3.7253e-08,
          2.9802e-08]]) tensor([[1.0328, 1.0328, 1.0328, 1.0328, 1.0328, 1.0328]])


## 4.2 피드 포워드 층 (feed forward layer)

* 데이터의 특징을 학습하는 fully connected layer
* MHA가 단어 사이 관계를 파악한 역할이라면, 피드 포워드는 입력 텍스트 전체를 이해하는 역할
* 층을 쌓고 확장하기 위해 입력층, 출력층의 차원은 동일하게 설정

In [49]:
class PreLayerNormFeedForward(nn.Module):
    def __init__(self, d_model, dim_feedforward, dropout):
        super().__init__()
        # 입출력 차원을 동일하게
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.dropout1 = nn.Dropout(dropout) # 드롭아웃 층
        self.dropout2 = nn.Dropout(dropout) # 드롭아웃 층
        self.activation = nn.GELU() # 활성함수
        self.norm = nn.LayerNorm(d_model) # 층 정규화

    def forward(self, src):
        x = self.norm(src) # 사전 정규화
        x = x + self.linear2(self.dropout1(self.activation(self.linear1(x)))) # 잔차 연결-다음절 참고
        x = self.dropout2(x)
        return x


# 5. 인코더

* layer normalisation -> multi-head attention -> layer normalisation -> feed-forward network 의 반복 형태
* 잔차 연결: 출력값에 입력을 다시 더해주는 형태, 안정적 학습에 도움을 줌

In [50]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout):
        super().__init()
        self.attn = MultiHeadAttention(d_model, d_model, nhead) # 멀티헤드 어텐션 클래스
        self.norm1 = nn.LayerNorm(d_model) # 층 정규화
        self.dropout1 = nn.Dropout(dropout) # 드롭아웃
        self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout) # 피드 포워드

    def forward(self, src):
        norm_x = self.norm1(src)
        attn_output = self.attn(norm_x, norm_x, norm_x)
        x = src + self.dropout1(attn_output)
        x = self.feed_forward(x)
        return x

In [None]:
# 인코더 층을 반복하여 쌓기

import copy

def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super().__init__()
        self.layers = get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm

    def forward(self, src):
        output = src
        for mod in self.layers:
            output = mod(output)
        return output

# 6. 디코더
* 인코더와 다르게 **마스크 멀티헤드 어텐션** 사용
* 생성을 담당: 앞에서 생성한 토큰을 기반으로 다음 토큰 생성 (인과적, causal)
* 실제 텍스트를 생성할 때 디코더는 이전까지 생성한 텍스트만 확인할 수 있음
* 단, 학습할 때는 인코더와 디코더 모두 완성된 테스트를 입력으로 받음
* 미래 시점에 작성해야 하는 텍스트를 미리 확인하게 되는 문제를 막기 위해, 특정 시점엔 그 이전에 생성된 토큰까지만 확인할 수 있게 함 -> **마스킹**

In [None]:
# 디코더에서 어텐션 연산 - 마스크 어텐션

def compute_attention(querys, keys, values, is_causal=False):
    dim_k = querys.size(-1) # 16
    scores = querys @ keys.transpose(-2, -1) / sqrt(dim_k)

    if is_causal:
        query_length = querys.size(-2)
        key_length = keys.size(-2)

        # 대각선 아래는 True, 위는 False
        temp_mask = torch.ones(query_length, key_length, dtype=torch.bool).tril(diagonal=0)

        # False인 위치는 -inf로 채움
        scores = scores.masked_fill(temp_mask == False, float("-inf"))

    # softmax를 취하면 -inf였던 원소는 0이 됨
    weights = F.softmax(scores, dim=-1)

    return weights @ values

* **크로스 어텐션**: 인코더의 결과를 디코더가 활용하는 연산
* 영->한 번역 시, 인코더는 영어 문장 입력으로 받고, 처리한 결과를 번역한 한국어를 생성하는 디코더가 받아 활용
    * 키, 값: 인코더의 결과
    * 쿼리: 디코더의 잠재 상태

In [51]:
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init()
        self.attn1 = MultiHeadAttention(d_model, d_model, nhead) # 멀티헤드 어텐션 클래스
        self.attn2 = MultiHeadAttention(d_model, d_model, nhead) # 크로스 어텐션 구현
        self.feed_forward = PreLayerNormFeedForward(d_model, dim_feedforward, dropout) # 피드 포워드

        self.norm1 = nn.LayerNorm(d_model) # 층 정규화
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout) # 드롭아웃
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, tgt, encoder_output, is_causal=True):
        # 셀프 어텐션
        x = self.norm1(tgt)
        x = x + self.dropout1(self.attn1(x, x, x, is_causal=is_causal))

        # 크로스 어텐션
        x = self.norm2(x)
        x = x + self.dropout2(self.attn2(x, encoder_output, encoder_output))

        # 피드 포워드 연산
        x = self.feed_forward(x)
        return x

In [None]:
# 디코더 구현
import copy
def get_clones(module, N):
    return nn.ModuleList([copy.deepcopy(module) for i in range(N)])

class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers):
        super().__init__()
        self.layers = get_clones(decoder_layer, num_layers)
        self.num_layers = num_layers

    def forward(self, tgt, src):
        output = tgt
        for mod in self.layers:
            output = mod(tgt, src)
        return output