In [None]:
!python -m spacy download en
!python -m spacy download de

In [None]:
import spacy

spacy_en = spacy.load('en')
spacy_de = spacy.load('de')

In [None]:
spacy_en

<spacy.lang.en.English at 0x7f4266a8bd90>

In [None]:
# tokenization 

tokenized = spacy_en.tokenizer("I am a graduate student")

for i, token in enumerate(tokenized):
    print(f"index {i}: {token.text}")

index 0: I
index 1: am
index 2: a
index 3: graduate
index 4: student


In [None]:
tokenized

I am a graduate student

In [None]:
# 우리는 독일어를 영어로 번역하는 task를 하고있다. 

def tokenize_de(text):
    '''
    text를 받아서 array형태로 만든다. 그 array는 reversed된 array이다.
    '''
    return [token.text for token in spacy_de.tokenizer(text)][::-1] # source data를 reverse해준다. source == input

def tokenize_en(text):
    '''
    text를 받아서 array형태로 만든다. 이 array는 reversed되지 않았다. 
    '''
    return [token.text for token in spacy_en.tokenizer(text)]       # target data는 reverse를 해주지 않는다. target == output

In [None]:
!pip install -U torchtext==0.8.0

# 그냥 전처리임 
# Field 라이브러리를 이용하여 데이터셋에 대한 구체적인 전처리 내용을 명시 
from torchtext.data import Field, BucketIterator

SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True)   # source = 독일어
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True)   # target = 영어 





In [None]:
from torchtext.datasets import Multi30k

train_dataset, valid_dataset, test_dataset = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG))

downloading training.tar.gz


training.tar.gz: 100%|██████████| 1.21M/1.21M [00:01<00:00, 810kB/s] 


downloading validation.tar.gz


validation.tar.gz: 100%|██████████| 46.3k/46.3k [00:00<00:00, 250kB/s]


downloading mmt_task1_test2016.tar.gz


mmt_task1_test2016.tar.gz: 100%|██████████| 66.2k/66.2k [00:00<00:00, 238kB/s]


In [None]:
print("len(train_dataset.examples)", len(train_dataset.examples))
print("len(validation_dataset.examples)", len(valid_dataset.examples))
print("len(test_dataset.examples)", len(test_dataset.examples))

len(train_dataset.examples) 29000
len(validation_dataset.examples) 1014
len(test_dataset.examples) 1000


In [None]:
# 학습 데이터 중 하나를 선택해 출력해 본다.
print(vars(train_dataset.examples[10])['src'])  # source이기 때문에 reversed 된 것을 확인 할 수 있다.
print(vars(train_dataset.examples[10])['trg'])  # target이기 때문에 reversed가 되지 않은 것을 확인 할 수 있다. 

['.', 'springen', 'nacheinander', 'die', ',', 'mädchen', 'fünf', 'mit', 'ballettklasse', 'eine']
['a', 'ballet', 'class', 'of', 'five', 'girls', 'jumping', 'in', 'sequence', '.']


In [None]:
# Field객체의 build_vocab메서드를 활용하여 영어와 독어의 단어 사전을 생성. 최소 2번 이상 등장한 단어만을 사용 

SRC.build_vocab(train_dataset, min_freq=2)
TRG.build_vocab(train_dataset, min_freq=2)

print("len(SRC.vocab):", len(SRC.vocab), "단어가 2번 나옴")
print("len(TRG.vocab):", len(TRG.vocab), "단어가 2번 나옴")

len(SRC.vocab): 7855 단어가 2번 나옴
len(TRG.vocab): 5893 단어가 2번 나옴


In [None]:
# Targets

print(TRG.vocab.stoi["abcabc"])         # 없는 단어는 0으로 설정해줌
print(TRG.vocab.stoi[TRG.pad_token])    # padding: 1
print(TRG.vocab.stoi["<sos>"])          # 2
print(TRG.vocab.stoi["<eos>"])          # 3
print(TRG.vocab.stoi["hello"])          # 
print(TRG.vocab.stoi["world"])          # 

0
1
2
3
4112
1752


In [None]:
# Sources

print(SRC.vocab.stoi["abcabc"])         # 없는 단어는 0으로 설정해줌
print(SRC.vocab.stoi[SRC.pad_token])    # padding: 1
print(SRC.vocab.stoi["<sos>"])          # 2
print(SRC.vocab.stoi["<eos>"])          # 3
print(SRC.vocab.stoi["eine"])           # 

0
1
2
3
8


In [None]:
import torch

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE = 128 

# 한문장에 포함된 단어가 연속적으로 LSTM에 입력 되어야 한다.
# 따라서 하나의 배치에 포함된 문장들이 가지는 단어의 개수가 유사하도록 만들면 좋다.
# 이를 위해 BucketIterator를 사용한다.

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (
        train_dataset,
        valid_dataset,
        test_dataset,
    ),
    batch_size = BATCH_SIZE,
    device = device
)



In [None]:
for i, batch in enumerate(train_iterator):
    src = batch.src 
    trg = batch.trg 

    print("=========0th sentence=========")
    for i in range(src.shape[0]):
        print(src[i][0].item())    # 0 번째 문장 
    
    print("=========12th sentence=========")
    for i in range(src.shape[0]):
        print(src[i][12].item())   # 12번째 문장

    # 첫번째 배치만 확인해 준다.
    break



2
4
0
15
59
6996
23
1263
25
492
3
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
2
4
245
22
2180
58
60
16
8
3
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1


In [None]:
import torch.nn as nn 

Encoder

- 주어진 소스 문장을 context vector로 인코딩 
- hidden state와 cell state를 반환 


In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()    # nn.Module에 있는 모든 '__init__에 정의되어 있는 것들'을 가져와준다. 

        # embedding layer 
        # embedding은 one-hot encoding을 특정 차원의 임베딩으로 매핑하는 레이어 
        self.embedding = nn.Embedding(input_dim, embed_dim)

        # LSTM layer 
        self.hidden_dim = hidden_dim 
        self.n_layers   = n_layers 
        self.rnn        = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)

        # dropout
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, src):

        # src:[단어 개수, 배치 크기] --> embedded:[단어 개수, 배치 크기, embedded]
        embedded = self.dropout(self.embedding(src))

        # embedded --> RNN
        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs:[단어 개수, 배치 크기, 히든 차원]. 현재 단어의 출력 정보 
        # hidden:[레이어 개수, 배치 크기, 히든 차원]. 현재까지의 모든 단어의 정보 
        # cell:[레이어 개수, 배치 크기, 히든 차원]. 현재까지의 모든 단어의 정보 
        
        # context vector를 반환 
        return hidden, cell 

Decoder

- context vector를 타겟 문장으로 디코딩 
- hidden state, cell state, prediction을 반환

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()

        self.embedding = nn.Embedding(output_dim, embed_dim)

        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)

        self.dropout = nn.Dropout(dropout_ratio)

        # 밑의 것들이 Encoder과 다르게 Decoder에 추가되었다. Encoder와 구조적으로 다른 부분이다. 
        self.output_dim = output_dim
        self.fc_out = nn.Linear(hidden_dim, output_dim) # hidden_dim == input, output_dim == output

    
    # 디코더는 현재까지 출력된 문장에 대한 정보를 입력으로 받아 타겟 문장을 반환 
    def forward(self, input, hidden, cell):
        '''
        input:[배치 크기]. 단어의 개수는 항상 1개이도록 구현
        hidden:[레이어 개수, 배치 크기, 히든 차원]
        cell=context:[레이어 개수, 배치 크기, 히든 차원]
        '''
        input = input.unsqueeze(0)  # input:[단어 개수=1, 배치 크기]

        embedded = self.dropout(self.embedding(input))  # embedded:[단어 개수, 배치 크기, 임베딩 차원]

        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output:[단어 개수 = 1, 배치 크기, 히든 차원]. 현재 단어의 출력 정보
        # hidden:[레이어 개수, 배치 크기, 히든 차원]. 현재까지의 모든 단어의 정보
        # cell:[레이어 개수, 배치 크기, 히든 차원]. 현재까지의 모든 단어의 정보

        # 단어 개수는 어차피 1개이므로 차원 제거 
        prediction = self.fc_out(output.squeeze(0))
        # prediction:[배치 크기, 출력 차원]

        # prediction=현재 출력 단어, hidden=현재까지의 모든 단어 정보, cell=현재까지의 모든 단어의 정보 
        return prediction, hidden, cell 

Seq2Seq

- 앞서 정의한 인코더와 디코더를 가지고 있는 하나의 아키텍쳐임
- 인코더: 주어진 소스 문장을 context vector로 인코딩 
- 디코더: 주어진 context vector를 타겟 문장으로 디코딩 
- teacher forcing: 디코더의 predicition을 다음 입력으로 사용하지 않고, ground-truth를 다음 입력으로 사용한다.

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder 
        self.device  = device 

    def forward(self, src, trg, teacher_forcing_ratio=0.5):

        # 먼저 인코더를 거쳐 context vector를 추출 
        hidden, cell = self.encoder(src)

        # decoder의 최종 결과를 담을 텐서 객체 만들기 
        trg_len = trg.shape[0]  # 단어 갯수 
        batch_size = trg.shape[1] # 배치 크기 
        trg_vocab_size = self.decoder.output_dim # 출력 차원 

        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        input = trg[0, :]

        # 타겟 단어의 개수만큼 반복하여 디코더에 forwarding
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)

            outputs[t] = output # FC를 거쳐서 나온 현재의 출력 단어 정보 
            top1 = output.argmax(1) # 가장 확률이 높은 단어의 인덱스를 추출 

            # teacher_forcing_ratio: 학습할 때 ground truth를 사용하는 비율 
            teacher_force = random.random() < teacher_forcing_ratio 
            input = trg[t] if teacher_force else top1 

        return outputs 

Training

In [None]:
INPUT_DIM  = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)

ENCODER_EMBED_DIM = 256
DECODER_EMBED_DIM = 256
HIDDEN_DIM = 512
N_LAYERS = 2
ENC_DROPOUT_RATIO = 0.5
DEC_DROPOUT_RATIO = 0.5

In [None]:
print(INPUT_DIM)
print(OUTPUT_DIM)

7855
5893


In [None]:
# 인코더(encoder)와 디코더(decoder) 객체 선언
enc = Encoder(INPUT_DIM,  ENCODER_EMBED_DIM, HIDDEN_DIM, N_LAYERS, ENC_DROPOUT_RATIO)
dec = Decoder(OUTPUT_DIM, DECODER_EMBED_DIM, HIDDEN_DIM, N_LAYERS, DEC_DROPOUT_RATIO)

# Seq2Seq 객체 선언
model = Seq2Seq(enc, dec, device).to(device)

In [None]:
# 논문의 내용대로 u(-0.08, 0.08)의 값으로 모델 가중치 파라미터 초기화 
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)
        
model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(7855, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(5893, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
    (fc_out): Linear(in_features=512, out_features=5893, bias=True)
  )
)

In [None]:
import torch.optim as optim

# Adam optimizer로 학습 최적화
optimizer = optim.Adam(model.parameters())

# 뒷 부분의 패딩(padding)에 대해서는 값 무시
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

In [None]:
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0 

    for i, batch in enumerate(iterator):
        src = batch.src 
        trg = batch.trg 

        optimizer.zero_grad()

        output = model(src, trg)

        output_dim = output.shape[-1]

        # 출력 단어의 인덱스 0은 사용하지 않음
        output = output[1:].view(-1, output_dim)

        trg = trg[1:].view(-1)  # trg = [(타겟 단어의 개수 - 1) * batch size]

        loss = criterion(output, trg)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [None]:
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0 

    with torch.no_grad():
        for i, batch in enumerate(iterator):
            src = batch.src 
            trg = batch.trg 

            output = model(src, trg, 0) # 0 inidicates no teacher forcing 

            output_dim = output.shape[-1]

            # 출력 단어의 인덱스 0은 사용하지 않는다. 
            # output = [(출력 단어의 개수 - 1) * batch size, output dim]
            output = output[1:].view(-1, output_dim)   

            # trg = [(타겟 단어의 개수 - 1) * batch size]
            trg = trg[1:].view(-1)

            loss = criterion(output, trg)

            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time 
    elapsed_mins = int(elapsed_time / 60)

    return elapsed_mins

In [None]:
import time
import math
import random

In [None]:
N_EPOCHS = 20
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time() # 시작 시간 기록


    print(f"Training epoch-{epoch} started")
    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)

    print(f"Validation epoch-{epoch} started")
    valid_loss = evaluate(model, valid_iterator, criterion)

    end_time = time.time() # 종료 시간 기록
    epoch_mins  = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'seq2seq.pt')

    print(f'Epoch: {(epoch + 1):02} | Time: {epoch_mins}m')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
    print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')


In [None]:
# from google.colab import files 

# files.download('seq2seq.pt')

In [None]:
model.load_state_dict(torch.load('seq2seq.pt'))

test_loss = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):.3f}')



Test Loss: 3.730 | Test PPL: 41.697


Use my own data

In [None]:
# 번역(translation) 함수
def translate_sentence(sentence, src_field, trg_field, model, device, max_len=50):
    model.eval() # 평가 모드

    tokens = [token.lower() for token in sentence]

    # 처음에 <sos> 토큰, 마지막에 <eos> 토큰 붙이기
    tokens = [src_field.init_token] + tokens + [src_field.eos_token]
    print(f"전체 소스 토큰: {tokens}")

    src_indexes = [src_field.vocab.stoi[token] for token in tokens]
    print(f"소스 문장 인덱스: {src_indexes}")

    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

    # 인코더(endocer)에 소스 문장을 넣어 문맥 벡터(context vector) 계산
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)    # hidden, cell == context vector

    # 처음에는 <sos> 토큰 하나만 가지고 있도록 하기
    trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]

    for i in range(max_len):
        # 이전에 출력한 단어가 현재 단어로 입력될 수 있도록
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)

        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token) # 출력 문장에 더하기

        # <eos>를 만나는 순간 끝
        if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
            break

    # 각 출력 단어 인덱스를 실제 단어로 변환
    trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]

    # 첫 번째 <sos>는 제외하고 출력 문장 반환
    return trg_tokens[1:]

In [None]:
example_idx = 10

src = vars(test_dataset.examples[example_idx])['src']
trg = vars(test_dataset.examples[example_idx])['trg']

print("Target:", trg)
print()

print("모델 출력 결과:", " ".join(translate_sentence(src, SRC, TRG, model, device)))

Target: ['a', 'mother', 'and', 'her', 'young', 'song', 'enjoying', 'a', 'beautiful', 'day', 'outside', '.']

전체 소스 토큰: ['<sos>', '.', 'freien', 'im', 'tag', 'schönen', 'einen', 'genießen', 'sohn', 'kleiner', 'ihr', 'und', 'mutter', 'eine', '<eos>']
소스 문장 인덱스: [2, 4, 88, 20, 200, 780, 19, 565, 624, 70, 134, 10, 364, 8, 3]
모델 출력 결과: a girl and her mother are enjoying a stroll in the woods . <eos>


In [None]:
src = tokenize_de("Guten Abend.")
trg = "Good evening"

print("Target:", trg)
print()

print("모델 출력 결과:", " ".join(translate_sentence(src, SRC, TRG, model, device)))

Target: Good evening

전체 소스 토큰: ['<sos>', '.', 'abend', 'guten', '<eos>']
소스 문장 인덱스: [2, 4, 1163, 3799, 3]
모델 출력 결과: <unk> . <eos>
