In [1]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

import numpy as np

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
SOS_token = 0
EOS_token = 1

In [5]:
MAX_LENGTH = 10

In [6]:
# 인코더 RNN 클래스 (GRU 기반)
# 입력 문장을 임베딩 후, GRU를 이용해 시퀀스 전체를 인코딩하고, 디코더가 사용할 context 정보를 반환한다.
class EncoderRNN(nn.Module):
    """
    Args:
        input_size (int): 입력 어휘 사전의 크기 (vocab size)
        hidden_size (int): GRU hidden state의 차원
        dropout_p (float, optional): 드롭아웃 비율 (기본값 0.1)

    Inputs:
        input (Tensor): [batch_size, seq_len] 형태의 입력 시퀀스 (단어 인덱스)

    Returns:
        output (Tensor): [batch_size, seq_len, hidden_size], 각 time step의 hidden state
        hidden (Tensor): [1, batch_size, hidden_size], 마지막 hidden state (디코더 초기값)
    """

    def __init__(self, input_size, hidden_size, dropout_p=0.1):
        super().__init__()
        self.hidden_size = hidden_size

        # 단어 인덱스를 임베딩 벡터로 변환 (임베딩 차원 = hidden_size)
        self.embedding = nn.Embedding(input_size, hidden_size)

        # GRU 셀 (batch_first=True: 입력이 [batch, seq, feature] 형태)
        self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)

        # Dropout: 학습 중 일부 뉴런 무작위 비활성화로 과적합 방지
        self.dropout = nn.Dropout(dropout_p)


    def forward(self, input):    # input: [batch_size, seq_len]
        """
        Args:
            input (Tensor): [batch_size, seq_len] 형태의 단어 인덱스 텐서

        Returns:
            output (Tensor): GRU의 전체 출력, [batch_size, seq_len, hidden_size]
            hidden (Tensor): 마지막 hidden state, [1, batch_size, hidden_size]
        """

        # 입력 임베딩 후 드롭아웃 적용
        embedded = self.dropout(self.embedding(input))    # [batch_size, seq_len, hidden_size]

        # GRU를 통해 시퀀스를 따라 상태 계산
        output, hidden = self.gru(embedded)

        # 전체 시퀀스 출력 + 마지막 hidden state 반환
        return output, hidden

In [7]:
# Bahdanau (Additive) Attention 구현 클래스
# 쿼리(디코더 상태)와 키(인코더 출력)를 비교하여 어텐션 가중치(집중 정도)를 계산하고, 컨텍스트 벡터를 생성한다.

class BahdanauAttention(nn.Module):
    """
    Args:
        hidden_size (int): 쿼리 및 키의 hidden state 차원
    """
    
    def __init__(self, hidden_size):
        super().__init__()

        # query (디코더 hidden) 변환
        self.Wa = nn.Linear(hidden_size, hidden_size)

        # key (인코더 출력) 변환
        self.Ua = nn.Linear(hidden_size, hidden_size)

        # 유사도 점수 계산용 선형 변환
        self.Va = nn.Linear(hidden_size, 1)

    def forward(self, query, keys):
        """
        Args:
            query (Tensor): 디코더 hidden state  
                            shape = [batch_size, 1, hidden_size]

            keys (Tensor): 인코더의 전체 출력 시퀀스  
                           shape = [batch_size, seq_len, hidden_size]

        Returns:
            context (Tensor): 가중합된 인코더 출력 (컨텍스트 벡터)  
                              shape = [batch_size, 1, hidden_size]

            weights (Tensor): 어텐션 가중치  
                              shape = [batch_size, 1, seq_len]
        """

        # Wa(query): [batch, 1, hidden], Ua(keys): [batch, seq_len, hidden]
        # broadcast 되어 결과 shape: [batch, seq_len, hidden]
        scores = self.Va(torch.tanh(self.Wa(query) + self.Ua(keys)))    # [batch, seq_len, 1]

        # softmax를 위해 차원 조정
        scores = scores.squeeze(2).unsqueeze(1)    # [batch, 1, seq_len]

        # softmax를 통해 정규화된 어텐션 가중치 계산
        weights = F.softmax(scores, dim=-1)    # [batch, 1, seq_len]

        # 어텐션 가중치를 인코더 출력(keys)에 곱해 컨텍스트 벡터 계산
        # keys: [batch, seq_len, hidden], weights: [batch, 1, seq_len]
        context = torch.bmm(weights, keys)    # [batch, 1, hidden]

        return context, weights

### **[INSIGHT]**
- Additive 방식은 내적(dot-product) 방식보다 벡터 간 관계를 비선형적으로 표현할 수 있어 표현력이 풍부하다.
- 각 입력 위치에 얼마나 집중할지를 학습 가능한 방식으로 계산한다.

#### **<attention score 계산 시, **tanh**을 사용하는 이유>**
여기서 tanh를 제거하면 두 벡터 간 선형 관계를 측정하는 단순 모델이 되지만, tanh를 넣으면 쿼리와 키 사이의 복잡한 상호작용(ex.문맥적 중요성)의 반영이 가능해진다. 이를 통해 attention 가중치가 더 세밀하게 조정되어 모델의 성능이 향상된다.

In [None]:
# Bahdanau 어텐션을 활용한 GRU 기반 디코더 클래스
# 인코더의 출력과 디코더의 현재 상태를 이용해 어텐션을 적용하고, 이를 통해 매 시점마다 단어를 하나씩 생성한다.
class AttnDecoderRNN(nn.Module):
    """
    Args:
        hidden_size (int): hidden state 차원
        output_size (int): 출력 어휘 수 (target vocabulary size)
        dropout_p (float): 드롭아웃 확률
    """

    def __init__(self, hidden_size, output_size, dropout_p=0.1):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)

        # Bahdanau 어텐션 모듈
        self.attention = BahdanauAttention(hidden_size)

        # context vector (hidden_size) + embedding (hidden_size) → 2 * hidden_size
        self.gru = nn.GRU(2 * hidden_size, hidden_size, batch_first=True)

        # GRU 출력 → 단어 분포 예측
        self.out = nn.Linear(hidden_size, output_size)

        self.dropout = nn.Dropout(dropout_p)

    def forward(self, encoder_outputs, encoder_hidden, target_tensor=None):
        """
        Args:
            encoder_outputs (Tensor): 인코더의 전체 출력 [batch, seq_len, hidden_size]
            encoder_hidden (Tensor): 인코더의 마지막 hidden state [1, batch, hidden_size]
            target_tensor (Tensor or None): 교사 강요용 정답 시퀀스 [batch, MAX_LENGTH]

        Returns:
            decoder_outputs (Tensor): 전체 출력 시퀀스의 단어 분포 [batch, MAX_LENGTH, output_size]
            decoder_hidden (Tensor): 마지막 hidden state [1, batch, hidden_size]
            attentions (Tensor): 어텐션 가중치 모음 [batch, MAX_LENGTH, seq_len]
        """
        batch_size = encoder_outputs.size(0)

        # 디코더의 첫 입력 = <SOS> 토큰
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(SOS_token)

        decoder_hidden = encoder_hidden
        decoder_outputs = []  # 예측된 단어 분포 저장
        attentions = []       # 어텐션 가중치 저장

        for i in range(MAX_LENGTH):
            # 1 step 디코딩
            decoder_output, decoder_hidden, attn_weights = self.forward_step(
                decoder_input, decoder_hidden, encoder_outputs
            )

            decoder_outputs.append(decoder_output)    # decoder_output: [batch, 1, output_size]
            attentions.append(attn_weights)           # attn_weights: [batch, 1, seq_len]

            if target_tensor is not None:
                # Teacher forcing: 정답 토큰을 다음 입력으로 사용
                decoder_input = target_tensor[:, i].unsqueeze(1)  # [batch, 1]
            else:
                # 예측 결과를 다음 입력으로 사용
                _, topi = decoder_output.topk(1)  # topi: [batch, 1, 1]
                decoder_input = topi.squeeze(-1).detach()  # [batch, 1]

        # 시간 차원으로 연결
        decoder_outputs = torch.cat(decoder_outputs, dim=1)  # [batch, MAX_LENGTH, output_size]
        decoder_outputs = F.log_softmax(decoder_outputs, dim=-1)

        attentions = torch.cat(attentions, dim=1)  # [batch, MAX_LENGTH, seq_len]

        return decoder_outputs, decoder_hidden, attentions

    def forward_step(self, input, hidden, encoder_outputs):
        """
        Args:
            input (Tensor): 현재 입력 토큰 [batch, 1]
            hidden (Tensor): 이전 hidden state [1, batch, hidden_size]
            encoder_outputs (Tensor): 인코더 전체 출력 [batch, seq_len, hidden_size]

        Returns:
            output (Tensor): 예측 단어 분포 [batch, 1, output_size]
            hidden (Tensor): 업데이트된 hidden state [1, batch, hidden_size]
            attn_weights (Tensor): 어텐션 가중치 [batch, 1, seq_len]
        """

        # 입력 토큰을 임베딩 후 드롭아웃 적용
        embedded = self.dropout(self.embedding(input))    # [batch, 1, hidden_size]

        # hidden: [1, batch, hidden_size] → [batch, 1, hidden_size] (쿼리로 사용하기 위해 차원 전치)
        query = hidden.permute(1, 0, 2)

        # 어텐션 계산: context: [batch, 1, hidden_size], attn_weights: [batch, 1, seq_len]
        context, attn_weights = self.attention(query, encoder_outputs)

        # context와 임베딩을 연결 → GRU 입력 준비
        input_gru = torch.cat((embedded, context), dim=2)    # [batch, 1, 2 * hidden_size]

        # GRU 계산
        output, hidden = self.gru(input_gru, hidden)  # output: [batch, 1, hidden_size]

        # 출력층: 단어 분포 계산
        output = self.out(output)  # [batch, 1, output_size]

        return output, hidden, attn_weights

### **[INSIGHT]**
- forward_step() 분리로 재사용성과 가독성 향상
- 어텐션은 decoder가 매 시점마다 encoder의 전체 정보를 동적으로 참고하게 한다.

#### **<"decoder_input = topi.squeeze(-1).detach()"에서 '.detach()'을 사용한 이유>**
해당 텐서를 계산 그래프에서 분리해 역전파시 사용하지 않도록 한다. 그 이유는 학습 시, 이 출력 값을 다음 입력으로 사용하는 것일 뿐, 다시 학습 대상으로 삼을 필요가 없기 때문이다. 만약 학습 대상으로 하게 될 경우, gradient가 두 번 흐르게 됨으로 비효율적이고 잘못된 학습으로 이어질 수 있다.

-----
#### **<"input_gru = torch.cat((embedded, context), dim=2)"에서 GRU 입력으로 context와 임베딩을 cat해서 사용한 이유>**
디코더는 현재 시점의 토큰(=임베딩)만으로 다음 토큰을 예측하기엔 정보가 부족할 수 있다. 그래서 context와 토큰(임베딩)을 결합하여 더 풍부한 정보를 기반으로 디코딩하기 위함이다.
