# [영어-한국어 번역을 위한 Transformer 모델](https://github.com/crimsonjoo/Easy-Transformer/blob/main/Model/Transformer_translator_enkr.py)

이 노트북은 영어 문장을 한국어로 번역하는 Transformer 모델을 구현하고 학습하는 과정을 단계별로 설명합니다.

해당 Transformer 모델은 인코더와 디코더를 모두 사용합니다.

또한, Wordbase 토크나이저 구조를 통해 영어와 한국어를 번역하는 매우 기본적인 번역 구조의 확인이 가능합니다.

* 성능 고도화: BBPE(Byte-Level Byte Pair) SentencePiece 등의 토크나이저 사용 추천

&nbsp;

&nbsp;

## 1. 필요한 라이브러리 불러오기

먼저, 이 구현에 필요한 PyTorch 라이브러리를 불러옵니다.

In [None]:
import torch
import numpy as np
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformer import Transformer

&nbsp;

&nbsp;

## 2. 데이터셋 준비

영어와 한국어 문장 쌍이 포함된 데이터셋을 준비합니다. 이 예시에서는 데이터셋 파일 경로가 이미 정의되어 있다고 가정합니다.


In [None]:
english_file = 'path/to/english_data.txt'
korean_file = 'path/to/korean_data.txt'

&nbsp;

## 3. 어휘집(Vocabulary)과 토큰화(Tokenization)

Transformer 모델에서는 각 언어별로 고유한 토큰들이 정의된 어휘집이 필요합니다.

어휘집은 모델이 이해할 수 있는 형태로 문장을 변환하는 데 사용됩니다.


실제 어플리케이션에서는 각 언어의 다양한 단어와 구두점, 특수 문자 등을 포함하는 훨씬 더 큰 어휘집이 필요합니다.

여기서는 간단한 예시를 위해 축소된 어휘집을 사용합니다.


In [None]:
# 실제 어플리케이션에서는 더 많은 토큰이 포함되어야 합니다.
# Token (추후 개선: BBPE SentencePiece)
START_TOKEN = '<start>'
PADDING_TOKEN = '<pad>'
END_TOKEN = '<end>'



english_vocabulary = [START_TOKEN, PADDING_TOKEN, END_TOKEN, ...]
korean_vocabulary = [START_TOKEN, PADDING_TOKEN, END_TOKEN, ...]

# 어휘집에서 토큰과 인덱스 사이의 매핑 생성
english_to_index = {token: index for index, token in enumerate(english_vocabulary)}
index_to_english = {index: token for index, token in enumerate(english_vocabulary)}
korean_to_index = {token: index for index, token in enumerate(korean_vocabulary)}
index_to_korean = {index: token for index, token in enumerate(korean_vocabulary)}

&nbsp;

## 4. 데이터셋 클래스 정의

PyTorch의 `Dataset` 클래스를 상속받아, 우리의 번역 데이터셋을 위한 커스텀 데이터셋 클래스를 정의합니다.

이 클래스는 모델 학습에 필요한 데이터를 적절히 로딩하고 전처리하는 역할을 합니다.



In [None]:
# 데이터셋 클래스 정의
class TranslationDataset(Dataset):
    def __init__(self, source_sentences, target_sentences, src_vocab, tgt_vocab):
        self.source_sentences = source_sentences
        self.target_sentences = target_sentences
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab

    def __len__(self):
        return len(self.source_sentences)

    def __getitem__(self, idx):
        src_sentence = self.source_sentences[idx]
        tgt_sentence = self.target_sentences[idx]
        src_indices = [self.src_vocab[token] if token in self.src_vocab else self.src_vocab['<pad>'] for token in src_sentence.split(' ')]
        tgt_indices = [self.tgt_vocab[token] if token in self.tgt_vocab else self.tgt_vocab['<pad>'] for token in tgt_sentence.split(' ')]
        return torch.tensor(src_indices), torch.tensor(tgt_indices)


# 데이터셋 로딩 및 전처리
def load_and_preprocess_data(src_file, tgt_file):
    with open(src_file, 'r', encoding='utf-8') as f:
        src_sentences = f.readlines()
    with open(tgt_file, 'r', encoding='utf-8') as f:
        tgt_sentences = f.readlines()
    src_sentences = [line.strip() for line in src_sentences]
    tgt_sentences = [line.strip() for line in tgt_sentences]
    return src_sentences, tgt_sentences


# 데이터셋 및 데이터로더 선언
english_sentences, korean_sentences = load_and_preprocess_data(english_file, korean_file)
dataset = TranslationDataset(english_sentences, korean_sentences, english_to_index, korean_to_index)
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

&nbsp;

## 5. 모델 정의

Transformer 모델을 정의합니다.
모델은 영어 문장을 입력으로 받아 해당하는 한국어 문장을 출력합니다.


In [None]:
import numpy as np
import torch
import math
from torch import nn
import torch.nn.functional as F

def get_device():
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
    if mask is not None:
        scaled = scaled.permute(1, 0, 2, 3) + mask
        scaled = scaled.permute(1, 0, 2, 3)
    attention = F.softmax(scaled, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self):
        even_i = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(10000, even_i/self.d_model)
        position = (torch.arange(self.max_sequence_length)
                          .reshape(self.max_sequence_length, 1))
        even_PE = torch.sin(position / denominator)
        odd_PE = torch.cos(position / denominator)
        stacked = torch.stack([even_PE, odd_PE], dim=2)
        PE = torch.flatten(stacked, start_dim=1, end_dim=2)
        return PE

class SentenceEmbedding(nn.Module):
    "For a given sentence, create an embedding"
    def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
        super().__init__()
        self.vocab_size = len(language_to_index)
        self.max_sequence_length = max_sequence_length
        self.embedding = nn.Embedding(self.vocab_size, d_model)
        self.language_to_index = language_to_index
        self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
        self.dropout = nn.Dropout(p=0.1)
        self.START_TOKEN = START_TOKEN
        self.END_TOKEN = END_TOKEN
        self.PADDING_TOKEN = PADDING_TOKEN

    def batch_tokenize(self, batch, start_token, end_token):

        def tokenize(sentence, start_token, end_token):
            sentence_word_indicies = [self.language_to_index[token] for token in list(sentence)]
            if start_token:
                sentence_word_indicies.insert(0, self.language_to_index[self.START_TOKEN])
            if end_token:
                sentence_word_indicies.append(self.language_to_index[self.END_TOKEN])
            for _ in range(len(sentence_word_indicies), self.max_sequence_length):
                sentence_word_indicies.append(self.language_to_index[self.PADDING_TOKEN])
            return torch.tensor(sentence_word_indicies)

        tokenized = []
        for sentence_num in range(len(batch)):
           tokenized.append( tokenize(batch[sentence_num], start_token, end_token) )
        tokenized = torch.stack(tokenized)
        return tokenized.to(get_device())

    def forward(self, x, start_token, end_token): # sentence
        x = self.batch_tokenize(x, start_token, end_token)
        x = self.embedding(x)
        pos = self.position_encoder().to(get_device())
        x = self.dropout(x + pos)
        return x


class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model , 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, mask):
        batch_size, sequence_length, d_model = x.size()
        qkv = self.qkv_layer(x)
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3)
        q, k, v = qkv.chunk(3, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask)
        values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
        out = self.linear_layer(values)
        return out


class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        y = (inputs - mean) / std
        out = self.gamma * y + self.beta
        return out


class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x


class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, self_attention_mask):
        residual_x = x.clone()
        x = self.attention(x, mask=self_attention_mask)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x.clone()
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x

class SequentialEncoder(nn.Sequential):
    def forward(self, *inputs):
        x, self_attention_mask  = inputs
        for module in self._modules.values():
            x = module(x, self_attention_mask)
        return x

class Encoder(nn.Module):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                      for _ in range(num_layers)])

    def forward(self, x, self_attention_mask, start_token, end_token):
        x = self.sentence_embedding(x, start_token, end_token)
        x = self.layers(x, self_attention_mask)
        return x


class MultiHeadCrossAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_layer = nn.Linear(d_model , 2 * d_model)
        self.q_layer = nn.Linear(d_model , d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, y, mask):
        batch_size, sequence_length, d_model = x.size() # in practice, this is the same for both languages...so we can technically combine with normal attention
        kv = self.kv_layer(x)
        q = self.q_layer(y)
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim)
        q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
        kv = kv.permute(0, 2, 1, 3)
        q = q.permute(0, 2, 1, 3)
        k, v = kv.chunk(2, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask) # We don't need the mask for cross attention, removing in outer function!
        values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, d_model)
        out = self.linear_layer(values)
        return out


class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
        self.layer_norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.layer_norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, self_attention_mask, cross_attention_mask):
        _y = y.clone()
        y = self.self_attention(y, mask=self_attention_mask)
        y = self.dropout1(y)
        y = self.layer_norm1(y + _y)

        _y = y.clone()
        y = self.encoder_decoder_attention(x, y, mask=cross_attention_mask)
        y = self.dropout2(y)
        y = self.layer_norm2(y + _y)

        _y = y.clone()
        y = self.ffn(y)
        y = self.dropout3(y)
        y = self.layer_norm3(y + _y)
        return y


class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, self_attention_mask, cross_attention_mask = inputs
        for module in self._modules.values():
            y = module(x, y, self_attention_mask, cross_attention_mask)
        return y

class Decoder(nn.Module):
    def __init__(self,
                 d_model,
                 ffn_hidden,
                 num_heads,
                 drop_prob,
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN,
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

    def forward(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token):
        y = self.sentence_embedding(y, start_token, end_token)
        y = self.layers(x, y, self_attention_mask, cross_attention_mask)
        return y


class Transformer(nn.Module):
    def __init__(self,
                d_model,
                ffn_hidden,
                num_heads,
                drop_prob,
                num_layers,
                max_sequence_length,
                kn_vocab_size,
                english_to_index,
                kannada_to_index,
                START_TOKEN,
                END_TOKEN,
                PADDING_TOKEN
                ):
        super().__init__()
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, english_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, kannada_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
        self.linear = nn.Linear(d_model, kn_vocab_size)
        self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    def forward(self,
                x,
                y,
                encoder_self_attention_mask=None,
                decoder_self_attention_mask=None,
                decoder_cross_attention_mask=None,
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=False, # We should make this true
                dec_end_token=False): # x, y are batch of sentences
        x = self.encoder(x, encoder_self_attention_mask, start_token=enc_start_token, end_token=enc_end_token)
        out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, start_token=dec_start_token, end_token=dec_end_token)
        out = self.linear(out)
        return out


In [None]:
# Transformer 모델 선언
transformer = Transformer(
    d_model=512,
    ffn_hidden=2048,
    num_heads=8,
    drop_prob=0.1,
    num_layers=6,
    max_sequence_length=200,
    source_vocab_size=len(english_vocabulary),
    target_vocab_size=len(korean_vocabulary),
    target_pad_idx=korean_to_index[PADDING_TOKEN],
    source_pad_idx=english_to_index[PADDING_TOKEN]
)

&nbsp;

## 6. 모델 학습

학습 데이터를 사용하여 모델을 학습시킵니다.

이 과정에서 모델은 영어 문장과 그에 해당하는 한국어 번역 사이의 매핑을 학습합니다.


In [None]:
# 모델 훈련 로직
def train(model, data_loader, epochs=10):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    criterion = nn.CrossEntropyLoss(ignore_index=korean_to_index['<pad>'])

    for epoch in range(epochs):
        total_loss = 0
        for src_indices, tgt_indices in data_loader:
            optimizer.zero_grad()
            output = model(src_indices, tgt_indices[:, :-1])  # 마지막 <end> 토큰을 제외한 타겟 문장
            loss = criterion(output.view(-1, model.target_vocab_size), tgt_indices[:, 1:].reshape(-1))  # 첫 <start> 토큰을 제외한 타겟 문장
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {total_loss / len(data_loader)}")


# 학습을 시작합니다.
train(transformer, train_loader)

&nbsp;

## 7. 모델 평가

학습된 모델을 사용하여 새로운 영어 문장의 한국어 번역을 생성합니다.


In [None]:
# 모델 평가 로직을 정의합니다. 실제 번역 결과를 생성하고 평가하는 과정이 포함됩니다.
def translate(model, src_sentence, src_vocab, tgt_vocab, max_length=50):
    model.eval()
    src_indices = [src_vocab[token] if token in src_vocab else src_vocab['<pad>'] for token in src_sentence.split(' ')]
    src_tensor = torch.tensor(src_indices).unsqueeze(0)
    tgt_indices = [tgt_vocab['<start>']]

    for _ in range(max_length):
        tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0)
        with torch.no_grad():
            output = model(src_tensor, tgt_tensor)
        next_token_index = output.argmax(2)[:,-1].item()
        tgt_indices.append(next_token_index)
        if next_token_index == tgt_vocab['<end>']:
            break

    translated_sentence = ' '.join([index_to_korean[idx] for idx in tgt_indices[1:-1]])  # <start>와 <end> 토큰 제외
    return translated_sentence


# 모델 훈련
train(transformer, data_loader)


# 테스트 문장 번역
test_sentence = "Hello world!"
print("영어:", test_sentence)
print("번역된 한국어:", translate(transformer, test_sentence, english_to_index, korean_to_index))

&nbsp;

&nbsp;

&nbsp;

---



## 실제 어플리케이션에서의 고려사항

위 예시에서는 간소화된 어휘집을 사용했지만, 실제 어플리케이션에서는 훨씬 더 다양하고 복잡한 어휘가 사용됩니다. 또한, 실제 텍스트에서는 드물게 사용되는 단어나 표현도 포함될 수 있어, 모델이 이를 잘 처리할 수 있도록 충분히 크고 다양한 데이터셋으로 학습시키는 것이 중요합니다.

더 나아가, 효율적인 어휘집 관리와 모델의 일반화 능력을 향상시키기 위해, Byte Pair Encoding(BPE)이나 SentencePiece 같은 고급 토크나이징 방법을 사용할 수 있습니다. 이 방법들은 희귀 단어를 하위 단위(subword units)로 분해하여 처리함으로써, 모델이 미처본 단어에도 강건하게 대응할 수 있게 합니다.

이러한 고려사항들은 모델의 성능과 범용성을 크게 향상시킬 수 있으며,

&nbsp;

## BPE와 SentencePiece의 중요성

### - BPE(Byte Pair Encoding)
:BPE는 데이터에서 가장 자주 나타나는 바이트 쌍을 반복적으로 병합하여 어휘집을 구성하는 방식입니다. 이 방법은 초기에 데이터 압축 알고리즘으로 개발되었지만, 후에 자연어 처리에서 효율적인 서브워드 토큰화 방법으로 사용되고 있습니다. BPE를 사용하면, 어휘집의 크기를 고정하면서도 희귀 단어를 효과적으로 처리할 수 있으며, 새로운 단어나 외래어에 대한 모델의 대응 능력을 향상시킬 수 있습니다.

### - SentencePiece
:SentencePiece는 언어에 의존하지 않는 모델 학습을 위한 토큰화 도구입니다. 이 도구는 BPE와 유사한 방식으로 작동하지만, 언어의 사전 지식 없이도 사용할 수 있다는 장점이 있습니다. SentencePiece는 원시 텍스트 데이터로부터 직접 서브워드 단위로 토큰화를 수행하며, 이 과정에서 공백 정보도 함께 학습합니다. 이는 다양한 언어와 도메인에 걸쳐 모델의 일반화 능력을 향상시키는 데 도움을 줍니다.

&nbsp;

## 모델의 일반화 능력 향상

위에서 설명한 토크나이징 방법 외에도, 효과적인 데이터 전처리, 데이터 증강, 모델 정규화 기법 등 다양한 방법을 통해 모델의 일반화 능력을 향상시킬 수 있습니다. 이러한 기법들은 모델이 학습 데이터에만 과도하게 적응하는 것을 방지하고, 실제 세계에서 발생할 수 있는 다양한 시나리오에 더 잘 대응할 수 있도록 만듭니다.


&nbsp;

## 결론

영어-한국어어 번역기를 비롯한 자연어 처리 모델을 개발할 때, 단순히 모델의 구조만이 아니라 데이터의 특성과 전처리, 토큰화 방법, 모델의 일반화 능력 등 다양한 요소들을 종합적으로 고려해야 합니다. 이러한 과정을 통해 더욱 정확하고 범용적인 자연어 처리 시스템을 구축할 수 있습니다. 초보자들이 이 분야에 입문할 때, 이러한 다양한 요소들을 이해하고 실험해 보는 것이 매우 중요합니다.





---

효율적인 어휘집 관리와 모델의 일반화 능력을 향상시키기 위해, Byte Pair Encoding(BPE)이나 SentencePiece 같은 고급 토크나이징 방법을 사용할 수 있습니다. 이 방법들은 희귀 단어를 하위 단위(subword units)로 분해하여 처리함으로써, 모델이 미처본 단어에도 강건하게 대응할 수 있게 합니다.

이러한 고려사항들은 모델의 성능과 범용성을 크게 향상시킬 수 있으며, 이러한 토크나이저들을 사용함으로써, 모델은 더 넓은 범위의 언어적 다양성을 이해하고,실제 사용자가 입력할 수 있는 다양한 문장에 대해 더 정확한 번역을 제공할 수 있습니다.