In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install torch==2.4.0

Collecting torch==2.4.0
  Using cached torch-2.4.0-cp311-cp311-manylinux1_x86_64.whl.metadata (26 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch==2.4.0)
  Using cached nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-nccl-cu12==2.20.5 (from torch==2.4.0)
  Using cached nvidia_nccl_cu12-2.20.5-py3-none-manylinux2014_x86_64.whl.metadata (1.8 kB)
Collecting triton==3.0.0 (from torch==2.4.0)
  Using cached triton-3.0.0-1-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (1.3 kB)
Using cached torch-2.4.0-cp311-cp311-manylinux1_x86_64.whl (797.3 MB)
Using cached nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl (664.8 MB)
Using cached nvidia_nccl_cu12-2.20.5-py3-none-manylinux2014_x86_64.whl (176.2 MB)
Using cached triton-3.0.0-1-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (209.4 MB)
Installing collected packages: triton, nvidia-nccl-cu12, nvidia-cudnn-cu12, torch
  Attempting uninstall: trito

In [19]:
import re
import os
import unicodedata
import urllib3
import zipfile
import shutil
import numpy as np
import pandas as pd
import torch
from collections import Counter
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset

In [20]:
num_samples = 33000

In [21]:
def preprocess_sentence(sent):

    # 단어와 구두점 사이에 공백을 만듭니다.
    # Ex) "he is a boy." => "he is a boy ."
    sent = re.sub(r"([?.!,¿])", r" \1", sent)

    # (가-힣, a-z, A-Z, ".", "?", "!", ",") 이들을 제외하고는 전부 공백으로 변환합니다.
    sent = re.sub(r"[^가-힣a-zA-Z?.!]+", r" ", sent)

    # 다수 개의 공백을 하나의 공백으로 치환
    sent = re.sub(r"\s+", " ", sent)
    return sent

In [22]:
def load_preprocessed_data_from_excel():
    encoder_input, decoder_input, decoder_target = [], [], []

    # 엑셀 파일 읽기 (기본: 첫 번째 시트)
    df = pd.read_excel("/content/drive/MyDrive/Colab Notebooks/kor.xlsx")

    # 열 이름이 없다면 수동으로 지정해줘야 함
    # 예: df.columns = ["src", "tar"]

    for i in range(min(num_samples, len(df))):
        tar_line = df.iloc[i, 0]  # 첫 번째 열: 한국어
        src_line = df.iloc[i, 1]  # 두 번째 열: 영어

        # 전처리
        src_line = [w for w in preprocess_sentence(str(src_line)).split()]
        tar_line = preprocess_sentence(str(tar_line))
        tar_line_in = [w for w in ("<sos> " + tar_line).split()]
        tar_line_out = [w for w in (tar_line + " <eos>").split()]

        encoder_input.append(src_line)
        decoder_input.append(tar_line_in)
        decoder_target.append(tar_line_out)

    return encoder_input, decoder_input, decoder_target


In [23]:
# 전처리 테스트
en_sent = u"Have you had dinner?"
kr_sent = u"저녁 먹었니?"

print('전처리 전 영어 문장 :', en_sent)
print('전처리 후 영어 문장 :',preprocess_sentence(en_sent))
print('전처리 전 한국어 문장 :', kr_sent)
print('전처리 후 한국어 문장 :', preprocess_sentence(kr_sent))

전처리 전 영어 문장 : Have you had dinner?
전처리 후 영어 문장 : Have you had dinner ?
전처리 전 한국어 문장 : 저녁 먹었니?
전처리 후 한국어 문장 : 저녁 먹었니 ?


In [24]:
sents_en_in, sents_kr_in, sents_kr_out = load_preprocessed_data_from_excel()
print('인코더의 입력 :',sents_en_in[:5])
print('디코더의 입력 :',sents_kr_in[:5])
print('디코더의 레이블 :',sents_kr_out[:5])

인코더의 입력 : [['Do', 'you', 'work', 'at', 'a', 'City', 'bank', '?'], ['PURITO', 's', 'bestseller', 'which', 'recorded', 'th', 'rough', 'cuts', 'by', 'words', 'of', 'mouth', 'from', 'abroad', '.'], ['In', 'Chapter', 'Jesus', 'called', 'Lazarus', 'from', 'the', 'tomb', 'and', 'raised', 'him', 'from', 'the', 'dead', '.'], ['I', 'would', 'feel', 'grateful', 'to', 'know', 'how', 'many', 'stocks', 'will', 'be', 'secured', 'of', 'size', '.', 'and', '.'], ['fw', 'Kenzo', 'Tiger', 'Kids', 'and', 'refund', 'for', 'lacking', 'quantity', 'of', 'Kids', 'which', 'was', 'ordered', 'this', 'time', '.']]
디코더의 입력 : [['<sos>', '씨티은행에서', '일하세요', '?'], ['<sos>', '푸리토의', '베스트셀러는', '해외에서', '입소문만으로', '차', '완판을', '기록하였다', '.'], ['<sos>', '장에서는', '예수님이', '이번엔', '나사로를', '무덤에서', '불러내어', '죽은', '자', '가운데서', '살리셨습니다', '.'], ['<sos>', '.', '사이즈가', '몇', '개나', '더', '재입고', '될지', '제게', '알려주시면', '감사하겠습니다', '.'], ['<sos>', 'F', 'W', '겐조타이거', '키즈와', '그리고', '이번에', '주문한', '키즈', '중', '부족한', '수량에', '대한', '환불입니다', '.']]
디코더의 레이블 : 

In [25]:
def build_vocab(sents):
    word_list = []

    for sent in sents:
        for word in sent:
            word_list.append(word)

    # 각 단어별 등장 빈도를 계산하여 등장 빈도가 높은 순서로 정렬
    word_counts = Counter(word_list)
    vocab = sorted(word_counts, key=word_counts.get, reverse=True)

    word_to_index = {}
    word_to_index['<PAD>'] = 0
    word_to_index['<UNK>'] = 1

    # 등장 빈도가 높은 단어일수록 낮은 정수를 부여
    for index, word in enumerate(vocab) :
        word_to_index[word] = index + 2

    return word_to_index

In [27]:
src_vocab = build_vocab(sents_en_in)
tar_vocab = build_vocab(sents_kr_in + sents_kr_out)

src_vocab_size = len(src_vocab)
tar_vocab_size = len(tar_vocab)
print("영어 단어 집합의 크기 : {:d}, 한국어 단어 집합의 크기 : {:d}".format(src_vocab_size, tar_vocab_size))

영어 단어 집합의 크기 : 20505, 한국어 단어 집합의 크기 : 64787


In [28]:
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

def texts_to_sequences(sents, word_to_index):
    encoded_X_data = []
    for sent in tqdm(sents):
        index_sequences = []
        for word in sent:
            try:
                index_sequences.append(word_to_index[word])
            except KeyError:
                index_sequences.append(word_to_index['<UNK>'])
        encoded_X_data.append(index_sequences)
    return encoded_X_data

In [29]:
encoder_input = texts_to_sequences(sents_en_in, src_vocab)
decoder_input = texts_to_sequences(sents_kr_in, tar_vocab)
decoder_target = texts_to_sequences(sents_kr_out, tar_vocab)

100%|██████████| 33000/33000 [00:00<00:00, 239815.70it/s]
100%|██████████| 33000/33000 [00:00<00:00, 59975.57it/s]
100%|██████████| 33000/33000 [00:00<00:00, 301378.58it/s]


In [30]:
# 상위 5개의 샘플에 대해서 정수 인코딩 전, 후 문장 출력
# 인코더 입력이므로 <sos>나 <eos>가 없음
for i, (item1, item2) in zip(range(5), zip(sents_en_in, encoder_input)):
    print(f"Index: {i}, 정수 인코딩 전: {item1}, 정수 인코딩 후: {item2}")

Index: 0, 정수 인코딩 전: ['Do', 'you', 'work', 'at', 'a', 'City', 'bank', '?'], 정수 인코딩 후: [209, 11, 95, 32, 5, 1873, 800, 16]
Index: 1, 정수 인코딩 전: ['PURITO', 's', 'bestseller', 'which', 'recorded', 'th', 'rough', 'cuts', 'by', 'words', 'of', 'mouth', 'from', 'abroad', '.'], 정수 인코딩 후: [6173, 17, 10635, 83, 3437, 74, 1944, 5245, 38, 585, 8, 1393, 34, 1512, 2]
Index: 2, 정수 인코딩 전: ['In', 'Chapter', 'Jesus', 'called', 'Lazarus', 'from', 'the', 'tomb', 'and', 'raised', 'him', 'from', 'the', 'dead', '.'], 정수 인코딩 후: [114, 5246, 2772, 252, 10636, 34, 3, 5247, 9, 1806, 113, 34, 3, 2201, 2]
Index: 3, 정수 인코딩 전: ['I', 'would', 'feel', 'grateful', 'to', 'know', 'how', 'many', 'stocks', 'will', 'be', 'secured', 'of', 'size', '.', 'and', '.'], 정수 인코딩 후: [7, 75, 200, 2325, 4, 73, 175, 109, 2462, 26, 24, 6174, 8, 383, 2, 9, 2]
Index: 4, 정수 인코딩 전: ['fw', 'Kenzo', 'Tiger', 'Kids', 'and', 'refund', 'for', 'lacking', 'quantity', 'of', 'Kids', 'which', 'was', 'ordered', 'this', 'time', '.'], 정수 인코딩 후: [3764, 6175,

In [31]:
def pad_sequences(sentences, max_len=None):
    # 최대 길이 값이 주어지지 않을 경우 데이터 내 최대 길이로 패딩
    if max_len is None:
        max_len = max([len(sentence) for sentence in sentences])

    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features

In [32]:
encoder_input = pad_sequences(encoder_input)
decoder_input = pad_sequences(decoder_input)
decoder_target = pad_sequences(decoder_target)

In [33]:
print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)

인코더의 입력의 크기(shape) : (33000, 43)
디코더의 입력의 크기(shape) : (33000, 25)
디코더의 레이블의 크기(shape) : (33000, 25)


In [34]:
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print('랜덤 시퀀스 :',indices)

랜덤 시퀀스 : [ 3496 12292 20228 ... 12734   309 18328]


In [35]:
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

In [36]:
print([index_to_src[word] for word in encoder_input[30997]])
print([index_to_tar[word] for word in decoder_input[30997]])
print([index_to_tar[word] for word in decoder_target[30997]])

['I', 'had', 'to', 'sleep', 'in', 'a', 'place', 'called', 'the', 'season', 'room', 'there', '.', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']
['<sos>', '그곳에서', '나는', '시즌', '방이라는', '곳에서', '자게', '되었어', '.', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']
['그곳에서', '나는', '시즌', '방이라는', '곳에서', '자게', '되었어', '.', '<eos>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']


In [37]:
n_of_val = int(33000*0.1)
print('검증 데이터의 개수 :',n_of_val)

검증 데이터의 개수 : 3300


In [38]:
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

In [39]:
print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)

훈련 source 데이터의 크기 : (29700, 43)
훈련 target 데이터의 크기 : (29700, 25)
훈련 target 레이블의 크기 : (29700, 25)
테스트 source 데이터의 크기 : (3300, 43)
테스트 target 데이터의 크기 : (3300, 25)
테스트 target 레이블의 크기 : (3300, 25)


In [None]:
# Google Colab에서 노트북을 실행하실 때에는
# https://tutorials.pytorch.kr/beginner/colab 를 참고하세요.
%matplotlib inline

`nn.Transformer` 와 torchtext로 언어 번역하기
=============================================

이 튜토리얼에서는,

:   -   Transformer(트랜스포머)를 사용한 번역 모델을 바닥부터 학습하는
        방법을 배워보겠습니다.
    -   [Multi30k](http://www.statmt.org/wmt16/multimodal-task.html#task1)
        데이터셋을 사용하여 독일어(German)를 영어(English)로 번역하는
        모델을 학습해보겠습니다.


데이터 구하고 처리하기
======================

[torchtext 라이브러리](https://pytorch.org/text/stable/)에는 언어 번역
모델을 생성하기 위한 데이터셋을 쉽게 만들 수 있는 도구들이 있습니다. 이
튜토리얼에서는 torchtext의 내장(inbuilt) 데이터셋을 어떻게 사용하고,
원시(raw) 텍스트 문장을 토큰화(tokenize)하고, 토큰을 텐서로 수치화하는
방법을 살펴보겠습니다. 출발어(source)-도착어(target) 원시(raw) 문장을
생성하기 위해서는 [torchtext 라이브러리의 Multi30k
데이터셋](https://pytorch.org/text/stable/datasets.html#multi30k) 을
사용하겠습니다.

torchtext 데이터셋에 접근하기 전에, <https://github.com/pytorch/data> 을
참고하여 torchdata를 설치하시기 바랍니다.


In [3]:
#!pip install torch==2.4.0
!pip install torchdata==0.8.0
!pip install torchtext==0.17.0
!pip install portalocker
!pip install openpyxl

Collecting torchdata==0.8.0
  Using cached torchdata-0.8.0-cp311-cp311-manylinux1_x86_64.whl.metadata (5.4 kB)
Using cached torchdata-0.8.0-cp311-cp311-manylinux1_x86_64.whl (2.7 MB)
Installing collected packages: torchdata
  Attempting uninstall: torchdata
    Found existing installation: torchdata 0.7.1
    Uninstalling torchdata-0.7.1:
      Successfully uninstalled torchdata-0.7.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
torchtext 0.17.0 requires torch==2.2.0, but you have torch 2.4.0 which is incompatible.
torchtext 0.17.0 requires torchdata==0.7.1, but you have torchdata 0.8.0 which is incompatible.[0m[31m
[0mSuccessfully installed torchdata-0.8.0
Collecting torch==2.2.0 (from torchtext==0.17.0)
  Using cached torch-2.2.0-cp311-cp311-manylinux1_x86_64.whl.metadata (25 kB)
Collecting torchdata==0.7.1 (from torchtext==0.17.0)
  Using cached t

In [8]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from typing import Iterable, List
import pandas as pd

# 엑셀에서 열 이름 없이 불러오기
df = pd.read_excel("/content/drive/MyDrive/Colab Notebooks/kor.xlsx", header=None)  # 열 이름 없음
df = df.iloc[:100000]  # 앞에서 5000개만 사용
train_data = list(zip(df[1], df[0]))  # 영어, 한국어 순으로 zip

# 1. 언어 설정 (영어-한국어)
SRC_LANGUAGE = 'en'
TGT_LANGUAGE = 'ko'

token_transform = {}
vocab_transform = {}

출발어(source)와 목적어(target)의 토크나이저(tokenizer)를 생성합니다.
아래 필요 사항(dependency)을 모두 설치해주세요.

``` {.sourceCode .python}
pip install -U torchdata
pip install -U spacy
python -m spacy download ko_core_web_sm
python -m spacy download en_core_news_sm
```
위와 같은 내용으로 수정하려 했으나 'en_core_news_sm', 'ko_core_web_sm'은 공식적으로 존재하지 않음

In [10]:
# 간단한 띄어쓰기 기반 토크나이저 사용
token_transform[SRC_LANGUAGE] = lambda x: x.strip().split()
token_transform[TGT_LANGUAGE] = lambda x: x.strip().split()


# 토큰 목록을 생성하기 위한 헬퍼(helper) 함수
def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}

    for data_sample in data_iter:
        yield token_transform[language](data_sample[language_index[language]])

# 특수 기호(symbol)와 인덱스를 정의합니다
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
# 토큰들이 어휘집(vocab)에 인덱스 순서대로 잘 삽입되어 있는지 확인합니다
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']

for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    vocab_transform[ln] = build_vocab_from_iterator(
        yield_tokens(train_data, ln),
        min_freq=1,
        specials=special_symbols,
        special_first=True
    )
    vocab_transform[ln].set_default_index(UNK_IDX)

# 확인용 출력
print(f"English vocab size: {len(vocab_transform[SRC_LANGUAGE])}")
print(f"Korean vocab size: {len(vocab_transform[TGT_LANGUAGE])}")
print(f"Sample tokens (EN): {token_transform['en'](train_data[0][0])}")
print(f"Sample tokens (KO): {token_transform['ko'](train_data[0][1])}")
print(f"Sample indices (EN): {vocab_transform['en'](token_transform['en'](train_data[0][0]))}")
print(f"Sample indices (KO): {vocab_transform['ko'](token_transform['ko'](train_data[0][1]))}")

English vocab size: 51455
Korean vocab size: 127510
Sample tokens (EN): ['Bible', "Coloring'", 'is', 'a', 'coloring', 'application', 'that', 'allows', 'you', 'to', 'experience', 'beautiful', 'stories', 'in', 'the', 'Bible.']
Sample tokens (KO): ["'Bible", "Coloring'은", '성경의', '아름다운', '이야기를', '체험', '할', '수', '있는', '컬러링', '앱입니다.']
Sample indices (EN): [7431, 27332, 10, 7, 9292, 1133, 13, 3492, 8, 6, 370, 376, 2213, 12, 5, 11515]
Sample indices (KO): [43809, 49017, 35660, 447, 370, 7461, 20, 5, 13, 117720, 12878]


Transformer를 사용한 시퀀스-투-시퀀스(Seq2Seq) 신경망
=====================================================

Transformer(트랜스포머)는 기계번역 작업(task)을 위해 [\"Attention is all
you
need\"](https://papers.nips.cc/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf)
논문에 소개된 Seq2Seq 모델입니다. 아래에서 Transformer를 사용한 Seq2Seq
신경망을 만들어보겠습니다. 신경망은 세 부분으로 구성되는데, 첫번째
부분은 임베딩 계층(embedding layer)입니다. 이 계층은 입력 인덱스의
텐서를 입력 임베딩의 해당하는 텐서로 변환합니다. 이러한 임베딩은 입력
토큰의 위치 정보(position information)를 모델에 전달하기 위해 위치
인코딩(positional encoding)을 추가합니다. 두번째 부분은 실제
[Transformer](https://pytorch.org/docs/stable/generated/torch.nn.Transformer.html)
모델입니다. 마지막으로 Transformer 모델의 출력을 선형 계층에 통과시켜
도착어의 각 토큰에 대한 정규화되지 않은 확률(un-normalized
probability)로 제공합니다.


In [4]:
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 단어 순서 개념(notion)을 토큰 임베딩에 도입하기 위한 위치 인코딩(positional encoding)을 위한 헬퍼 모듈(Module)
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# 입력 인덱스의 텐서를 해당하는 토큰 임베딩의 텐서로 변환하기 위한 헬퍼 모듈(Module)
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Seq2Seq 신경망
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int,
                 num_decoder_layers: int,
                 emb_size: int,
                 nhead: int,
                 src_vocab_size: int,
                 tgt_vocab_size: int,
                 dim_feedforward: int = 512,
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self,
                src: Tensor,
                trg: Tensor,
                src_mask: Tensor,
                tgt_mask: Tensor,
                src_padding_mask: Tensor,
                tgt_padding_mask: Tensor,
                memory_key_padding_mask: Tensor):
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)


A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.0.2 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/usr/local/lib/python3.11/dist-packages/colab_kernel_launcher.py", line 37, in <module>
    ColabKernelApp.launch_instance()
  File "/usr/local/lib/python3.11/dist-packages/traitlets/config/application.py", line 992, in launch_instance
    app.start()
  File "/usr/local/lib/python3.11/dist-packages/ipykernel/kernelapp.py", line 712, in start
    self.io_loop.start()
  File "/usr/local/lib/python3.11/dist-package

학습하는 동안, 모델이 예측할 때 정답(이후 출현하는 단어)을 보지 못하도록
하는 후속 단어 마스크(subsequent word mask)가 필요합니다. 또한, 출발어와
도착어의 패딩(padding) 토큰들 또한 숨겨야 합니다. 아래에 두 가지 모두를
처리할 함수를 정의해보겠습니다.


In [5]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask


def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)

    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

이제 모델의 매개변수를 정의하고 객체를 생성(instantiate)해보겠습니다.
아래처럼 학습 단계에서 사용할 손실 함수(loss function)를 교차 엔트로피
손실(cross-entropy loss)로 정의하고, 옵티마이저(optimizer)도 정의합니다.


In [11]:
torch.manual_seed(0)

SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128 #128에서 수정
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3

transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)



대조(Collation)
===============

위의 `데이터 구하고 처리하기` 장에서 봤듯이, 데이터 반복자(iterator)는
원시 문자열의 쌍을 생성합니다. 이 문자열 쌍들을 이전에 정의한 `Seq2Seq`
신경망에서 처리할 수 있도록 텐서 묶음(batched tensor)으로 변환해야
합니다. 이제 원시 문자열들의 묶음(batch)을 텐서 묶음으로 변환하여 모델에
직접 전달할 수 있도록 하는 대응어(collate) 함수를 정의해보겠습니다.


In [12]:
from torch.nn.utils.rnn import pad_sequence

# 순차적인 작업들을 하나로 묶는 헬퍼 함수
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# BOS/EOS를 추가하고 입력 순서(sequence) 인덱스에 대한 텐서를 생성하는 함수
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([BOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

# 출발어(src)와 도착어(tgt) 원시 문자열들을 텐서 인덱스로 변환하는 변형(transform)
text_transform = {}
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
    text_transform[ln] = sequential_transforms(token_transform[ln], # 토큰화(Tokenization)
                                               vocab_transform[ln], # 수치화(Numericalization)
                                               tensor_transform) # BOS/EOS를 추가하고 텐서를 생성


# 데이터를 텐서로 조합(collate)하는 함수
def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))

    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch

각 에폭(epoch)마다 호출할 학습 및 검증(evaluation) 단계를
정의해보겠습니다.


In [13]:
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split

class CustomTranslationDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        src, tgt = self.data[idx]
        return src, tgt

train_data, val_data = train_test_split(train_data, test_size=0.1, random_state=42)

def train_epoch(model, optimizer):
    model.train()
    losses = 0
    train_dataset = CustomTranslationDataset(train_data)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

    for src, tgt in train_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

        optimizer.zero_grad()

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        loss.backward()

        optimizer.step()
        losses += loss.item()

    return losses / len(list(train_dataloader))


def evaluate(model):
    model.eval()
    losses = 0

    val_dataset = CustomTranslationDataset(val_data)
    val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    for src, tgt in val_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        losses += loss.item()

    return losses / len(list(val_dataloader))

이제 모델 학습을 위한 모든 요소가 준비되었습니다. 학습을 해보겠습니다!


In [None]:
from timeit import default_timer as timer
NUM_EPOCHS = 18

# Initialize best_valid_loss with a very large value
best_valid_loss = float('inf')

for epoch in range(1, NUM_EPOCHS+1):
    start_time = timer()
    train_loss = train_epoch(transformer, optimizer)
    end_time = timer()
    val_loss = evaluate(transformer)
    print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, "f"Epoch time = {(end_time - start_time):.3f}s"))

    # ✅ 검증 손실이 더 좋아졌을 때 모델 저장
    if val_loss < best_valid_loss:
        best_valid_loss = val_loss
        torch.save(transformer.state_dict(), '/content/drive/MyDrive/Colab Notebooks/best_transformer_model_EN-KR.pth')
        print("✔️ 모델 저장 완료 (성능 향상됨)")



Epoch: 1, Train loss: 8.448, Val loss: 7.827, Epoch time = 298.824s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 2, Train loss: 7.408, Val loss: 7.355, Epoch time = 303.669s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 3, Train loss: 6.831, Val loss: 7.026, Epoch time = 304.520s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 4, Train loss: 6.377, Val loss: 6.819, Epoch time = 303.620s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 5, Train loss: 6.005, Val loss: 6.646, Epoch time = 303.690s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 6, Train loss: 5.690, Val loss: 6.558, Epoch time = 303.086s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 7, Train loss: 5.412, Val loss: 6.448, Epoch time = 303.776s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 8, Train loss: 5.163, Val loss: 6.374, Epoch time = 303.942s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 9, Train loss: 4.932, Val loss: 6.336, Epoch time = 303.860s
✔️ 모델 저장 완료 (성능 향상됨)
Epoch: 10, Train loss: 4.720, Val loss: 6.347, Epoch time = 304.016s
Epoch: 11, Train loss: 4.521, Val loss: 6.405, Epoch time = 304.147s
Epoch: 12, Train loss: 4.332, Val loss: 6.315, Epoch time = 3

In [None]:
import numpy
from torch.utils.data import DataLoader

# 탐욕(greedy) 알고리즘을 사용하여 출력 순서(sequence)를 생성하는 함수
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)

    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0))
                    .type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys,
                        torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == EOS_IDX:
            break
    return ys


# 입력 문장을 도착어로 번역하는 함수
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)
    num_tokens = src.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    tgt_tokens = greedy_decode(model,  src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
    #return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "") # numpy()문제발생으로 인해
    return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(tgt_tokens.cpu().tolist())).replace("<bos>", "").replace("<eos>", "") # numpy() 제거 .tolist()로 수정

In [None]:
print(translate(transformer, "I don't think that you are expert."))

NameError: name 'transformer' is not defined

In [14]:
!pip install sacrebleu



In [15]:
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len - 1):
        tgt_mask = generate_square_subsequent_mask(ys.size(0)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        next_word = torch.argmax(prob, dim=1).item()
        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == EOS_IDX:
            break
    return ys

In [16]:
def translate(model: nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1).to(DEVICE)
    src_mask = torch.zeros((src.size(0), src.size(0))).type(torch.bool).to(DEVICE)

    tgt_tokens = greedy_decode(
        model, src, src_mask, max_len=50, start_symbol=BOS_IDX
    ).flatten()

    # 🐞 .cpu().numpy() 대신 .cpu().tolist()를 사용하여 NumPy 의존성 제거
    return " ".join(
        vocab_transform[TGT_LANGUAGE].lookup_tokens(tgt_tokens.cpu().tolist())
    ).replace("<bos>", "").replace("<eos>", "")

In [40]:
import sacrebleu

# 모델 불러오기
model = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                           NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM).to(DEVICE)
# model.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/best_transformer_model_EN-KR.pth'))
# 🐞 map_location='cpu'를 추가하여 CPU에서 모델 로드
model.load_state_dict(torch.load('/content/drive/MyDrive/Colab Notebooks/best_transformer_model_EN-KR.pth', map_location='cpu'))

# 번역 예측 및 참조 데이터 준비
references = []
hypotheses = []

# Using test data for evaluation
for src_sentence, tgt_sentence in zip(sents_en_in[-n_of_val:], sents_kr_out[-n_of_val:]):
    translated = translate(model, " ".join(src_sentence)) # Join source sentence tokens
    hypotheses.append(translated.strip())

    # 참조는 리스트 안에 리스트로 (BLEU의 다중 참조 구조 때문)
    references.append([" ".join(tgt_sentence).strip()])  # Join target sentence tokens


# BLEU 점수 계산
bleu = sacrebleu.corpus_bleu(hypotheses, references)
print(f"BLEU score: {bleu.score:.2f}")



BLEU score: 34.57


참고자료
========

1.  Attention is all you need 논문.
    <https://papers.nips.cc/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf>
2.  Transformer에 대한 설명.
    <https://nlp.seas.harvard.edu/2018/04/03/attention.html#positional-encoding>
