[nn.Transformer와 Torchtext로 언어 번역하기](https://tutorials.pytorch.kr/beginner/translation_transformer.html)

In [1]:
# Transformer를 사용한 번역 모델을 스크래치부터 학습
# Multi30k 데이터셋을 사용한 독일어 -> 영어 번역 모델 학습

In [2]:
# requirement dependency
!pip install -U torchdata
!pip install -U spacy
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm
!pip install portalocker

Collecting spacy
  Downloading spacy-3.5.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m62.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: spacy
  Attempting uninstall: spacy
    Found existing installation: spacy 3.5.3
    Uninstalling spacy-3.5.3:
      Successfully uninstalled spacy-3.5.3
Successfully installed spacy-3.5.4
2023-07-05 14:57:20.743303: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-07-05 14:57:22.984428: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:996] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node ze

## 데이터 구하고 처리하기

In [3]:
"""
torchtext 라이브러리: 언어 번역 모델 생성 위한 데이터셋을 만들기 위한 도구들 존재

수행
    ㄴ torchtext의 내장 데이터셋 활용
    ㄴ 원시 텍스트 문장을 토큰화하고, 텐서로 수치화하기
"""

from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import multi30k, Multi30k

# 데이터셋의 링크 수정(원본 데이터 링크 동작 안함)
multi30k.URL["train"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/training.tar.gz"
multi30k.URL["valid"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/validation.tar.gz"

SRC_LANGUAGE = 'de'
TGT_LANGUAGE = 'en'

# placeholder
token_transform = {}
vocab_transform = {}

In [4]:
from typing import Iterable, List

token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='de_core_news_sm')
token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='en_core_web_sm')

# 토큰 목록 생성 위한 헬퍼 함수
def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

    for data_sample in data_iter:
        yield token_transform[language](data_sample[language_index[language]])

# 특수 기호 및 인덱스 정의
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    # 학습용 데이터 반복
    train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))

    # vocab 생성
    vocab_transform[ln] = build_vocab_from_iterator(
        yield_tokens(train_iter, ln),
        min_freq=1,
        specials=special_symbols,
        special_first=True
    )

# UNK_IDX를 기본으로 설정(oov vocabulary)
# 기본 인덱스 미설정시 RuntimeError 발생
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[ln].set_default_index(UNK_IDX)

## Transformer를 이용한 seq2seq 신경망

In [5]:
"""
Transformer
    ㄴ seq2seq 모델
    ㄴ 구성
        1. embedding layer: 위치 인코딩 추가
        2. transformer 모델
        3. un-normalizer probability
"""

from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# position encoding위한 헬퍼 모듈
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int =5000):
        super(PositionalEncoding, self).__init__()

        # torch.arange(0, emb_size, 2): 0 ~ emb_size-1 까지의 텐서를 2간격으로 생성
        den = torch.exp(-torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)

        # 모델의 파라미터로 취급받지 않기 위해 buffer에 등록
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(
        self,
        token_embedding: Tensor
    ):
        return self.dropout(
            token_embedding + self.pos_embedding[:token_embedding.size(0), :]
        )

In [6]:
# 입력 인덱스의 텐서를 해당하는 토큰 임베딩의 텐서로 변환하기 위한 헬퍼 모듈
class TokenEmbedding(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        emb_size
    ):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        # 논문에서 아래의 식으로 진행함(paper 3.4 표기)
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

In [7]:
# Seq2Seq 신경망
class Seq2SeqTransformer(nn.Module):
    def __init__(
        self,
        num_encoder_layers: int,
        num_decoder_layers: int,
        emb_size: int,
        nhead: int,
        src_vocab_size: int,
        tgt_vocab_size: int,
        dim_feedforward: int = 512,
        dropout: float = 0.1,
    ):
        super(Seq2SeqTransformer, self).__init__()

        self.transformer = Transformer(
            d_model=emb_size,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )

        self.generator = nn.Linear(emb_size, tgt_vocab_size)

        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)

        self.positional_encoding = PositionalEncoding(
            emb_size,
            dropout=dropout
        )

    def forward(
        self,
        src: Tensor,
        trg: Tensor,
        src_mask: Tensor,
        tgt_mask: Tensor,
        src_padding_mask: Tensor,
        tgt_padding_mask: Tensor,
        memory_key_padding_mask: Tensor,
    ):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))

        # 파라미터 설명 > https://stackoverflow.com/questions/62170439/difference-between-src-mask-and-src-key-padding-mask
        outs = self.transformer(
            src_emb,
            tgt_emb,
            src_mask,
            tgt_mask,
            None, # memory_mask: additive mask for the encoder output
            src_padding_mask,
            tgt_padding_mask,
            memory_key_padding_mask,
        )
        return self.generator(outs)

    def encode(
        self,
        src: Tensor,
        src_mask: Tensor,
    ):
        return self.transformer.encoder(
            self.positional_encoding(self.src_tok_emb(src)),
            src_mask
        )

    def decode(
        self,
        tgt: Tensor,
        memory: Tensor,
        tgt_mask: Tensor,
    ):
        return self.transformer.decoder(
            self.positional_encoding(self.tgt_tok_emb(tgt)),
            memory,
            tgt_mask,
        )

In [8]:
"""
1. 학습 중, 모델이 예측할 때 이후 출현 단어를 볼 수 없도록 후속 단어 마스크가 필요
2. 출발어와 도착어의 패딩 토큰들도 숨겨야 함
"""

def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=DEVICE).type(torch.bool)

    # PAD_IDX = 1
    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [9]:
# 모델의 매개변수 정의, 객체 생성
# 손실함수 및 옵티마이저 정의

torch.manual_seed(0)

# vocab_transform[LANGUAGE] = 생성된 Vocab
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

transformer = Seq2SeqTransformer(
    NUM_ENCODER_LAYERS,
    NUM_DECODER_LAYERS,
    EMB_SIZE,
    NHEAD,
    SRC_VOCAB_SIZE,
    TGT_VOCAB_SIZE,
    FFN_HID_DIM
)

for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

# ignore_index: specify target value that is ignored
#    and doesn't contribute to input gradient
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = torch.optim.Adam(transformer.parameters(),
                             lr=0.0001,
                             betas=(0.9, 0.98),
                             eps=1e-9)

## 대조

In [11]:
"""
데이터 반복자: 원시 문자열의 쌍 생성
문자열 쌍을 신경망에서 처리할 수 있도록 텐서 묶음으로 변환

대응어 함수
    ㄴ 원시 문자열들의 묶음을 텐서 묶음으로 변환하여 모델에 직접 전달하기 위해 정의
"""
from torch.nn.utils.rnn import pad_sequence

# 순차적인 작업들을 하나로 묶는 헬퍼 함수
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# BOS/EOS 추가 및 입력 순서 인덱스에 대한 텐서 생성
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

# src / tgt 원시 문자열들을 텐서 인덱스로 변형
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(
        token_transform[ln], # 토큰화 -> spacy
        vocab_transform[ln], # 수치화 -> Vocab
        tensor_transform) # BOS/EOS 추가

# 데이터를 텐서로 조합(collate)하는 함수 정의
def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

    # pad_sequence: pad a list of variable length Tensors with padding_value
    # ㄴ to set same length
    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)

    return src_batch, tgt_batch

In [12]:
# evaluation 단계
from torch.utils.data import DataLoader

def train_epoch(model, optimizer):
    model.train()
    losses = 0
    train_tier = Multi30k(split='train',
                          language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    train_dataloader = DataLoader(train_iter,
                                  batch_size=BATCH_SIZE,
                                  collate_fn=collate_fn)

    for src, tgt in train_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src,
                                                                             tgt_input)
        logits = model(src,
                       tgt_input,
                       src_mask,
                       tgt_mask,
                       src_padding_mask,
                       tgt_padding_mask,
                       src_padding_mask)

        optimizer.zero_grad()

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        loss.backward()

        optimizer.step()
        losses += loss.item()

    return losses / len(list(train_dataloader))

def evaluate(model):
    model.eval()
    losses = 0
    val_iter = Multi30k(split='valid',
                        language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))
    val_dataloader = DataLoader(val_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    for src, tgt in val_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src,
                                                                             tgt_input)
        logits = model(src,
                       tgt_input,
                       src_mask,
                       tgt_mask,
                       src_padding_mask,
                       tgt_padding_mask,
                       src_padding_mask)

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        losses += loss.item()

    return losses / len(list(val_dataloader))

In [14]:
# 학습 고고
from timeit import default_timer as timer
NUM_EPOCHS = 18

for epoch in range(1, NUM_EPOCHS+1):
    start_time = timer()
    train_loss = train_epoch(transformer, optimizer)
    end_time = timer()
    val_loss = evaluate(transformer)
    print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f},\
        "f"Epoch time = {(end_time - start_time):.3f}s"))




Epoch: 1, Train loss: 5.344, Val loss: 4.114,        Epoch time = 56.197s
Epoch: 2, Train loss: 3.761, Val loss: 3.320,        Epoch time = 49.366s
Epoch: 3, Train loss: 3.162, Val loss: 2.894,        Epoch time = 50.164s
Epoch: 4, Train loss: 2.768, Val loss: 2.640,        Epoch time = 45.690s
Epoch: 5, Train loss: 2.481, Val loss: 2.441,        Epoch time = 44.190s
Epoch: 6, Train loss: 2.250, Val loss: 2.317,        Epoch time = 44.835s
Epoch: 7, Train loss: 2.060, Val loss: 2.204,        Epoch time = 44.073s
Epoch: 8, Train loss: 1.897, Val loss: 2.115,        Epoch time = 43.956s
Epoch: 9, Train loss: 1.754, Val loss: 2.062,        Epoch time = 44.767s
Epoch: 10, Train loss: 1.631, Val loss: 2.003,        Epoch time = 43.884s
Epoch: 11, Train loss: 1.524, Val loss: 1.973,        Epoch time = 46.211s
Epoch: 12, Train loss: 1.420, Val loss: 1.944,        Epoch time = 43.629s
Epoch: 13, Train loss: 1.333, Val loss: 1.964,        Epoch time = 44.684s
Epoch: 14, Train loss: 1.251, Val 

In [17]:
# Generate output sequence by using greedy algorithm
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    memory = model.encode(src, src_mask)

    # [[start_symbol]] 생성
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0))
                    .type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])

        # next_word = 가장 확률 높은 단어의 vocab index
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        # 기존의 output과 concat
        # 예) [EOS] -> [EOS] I
        ys = torch.cat([ys,
                        torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == EOS_IDX:
            break
    return ys

# 입력 문장을 tgt_langague로 번역
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    num_tokens = src.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)

    tgt_tokens = greedy_decode(
        model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX
    ).flatten()
    return " ".join(
        vocab_transform[TGT_LANGUAGE].lookup_tokens(
            list(tgt_tokens.cpu().numpy())
        )
    ).replace("<bos>", "").replace("<eos>", "")

In [18]:
print(translate(transformer, "Eine Gruppe von Menschen steht vor einem Iglu ."))

 A group of people stand in front of an igloo . 
