### Seq2Seq) Word Level 번역기 만들기

-  데이터셋 : 영어 및 프랑스어 문장 벙렬 코퍼스 활용 (http://www.manythings.org/anki)

In [16]:
import os
import shutil
import zipfile
import requests
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np


### 1. 데이터 불러오기
# http://www.manythings.org/anki 내 fra-eng.zip이라는 영어 - 프랑스어 병렬 코퍼스 활용
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

def download_zip(url, output_path):
    response = requests.get(url, headers=headers, stream=True)
    if response.status_code == 200:
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"ZIP file downloaded to {output_path}")
    else:
        print(f"Failed to download. HTTP Response Code: {response.status_code}")

url = "http://www.manythings.org/anki/fra-eng.zip"
output_path = "./data/fra-eng.zip"
download_zip(url, output_path)

# 압축 파일 경로 설정
extract_path = "./data/"
with zipfile.ZipFile(output_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)  # ./Data/에 압축 해제

# 파일 읽기 (경로 수정)
lines = pd.read_csv(os.path.join(extract_path, 'fra.txt'), names=['src', 'tar', 'lic'], sep='\t')
del lines['lic']

# 데이터 확인
lines.head()

ZIP file downloaded to ./data/fra-eng.zip


Unnamed: 0,src,tar
0,Go.,Va !
1,Go.,Marche.
2,Go.,En route !
3,Go.,Bouge !
4,Hi.,Salut !


#### Pre-processing

1. 프랑스어 코퍼스의 악센트 및 영어 외 문자 제거
2. 기존 target data에 시작 토큰 "\<sos>" 및 끝 토큰 "\<eos>"을 추가한 뒤 띄어쓰기를 기준으로 split
3. 띄어쓰기된 Word 단위로 정수 인코딩을 진행하여 Encoder Input을 생성
4. 동일하게 띄어쓰기된 Word 단위로 정수 인코딩 진행하여 Decoder Input를 생섬 및 시작 토큰 "\<eos>"를 제외한 Decoder Target을 생성

In [17]:
### 전처리 진행
import re
import unicodedata

# Turn a Unicode string to plain ASCII
def to_ascii(s):
  return ''.join(c for c in unicodedata.normalize('NFD', s)
                   if unicodedata.category(c) != 'Mn')

# Lowercase, trim, and remove non-letter characters
def preprocess_sentence(sent):

  sent = to_ascii(sent.lower())
  sent = re.sub(r"([?.!,¿])", r" \1", sent)

  # (a-z, A-Z, ".", "?", "!", ",") 이들을 제외하고는 전부 공백으로 변환.
  sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent)

  # 다수 개의 공백을 하나의 공백으로 치환
  sent = re.sub(r"\s+", " ", sent)
  return sent


# 데이터셋 전처리 함수 : Encoder Input, Decoder Input, Decoder Target 반환
def load_preprocessed_data():
  encoder_input, decoder_input, decoder_target = [], [], []

  with open("./data/fra.txt", "r") as lines:
    for i, line in enumerate(lines):

      src_line, tar_line, _ = line.strip().split('\t')

      # source 데이터 전처리
      src_line = [w for w in preprocess_sentence(src_line).split()]

      # target 데이터 전처리
      tar_line = preprocess_sentence(tar_line)
      tar_line_in = [w for w in ("<sos> " + tar_line).split()]
      tar_line_out = [w for w in (tar_line + " <eos>").split()]

      encoder_input.append(src_line)
      decoder_input.append(tar_line_in)
      decoder_target.append(tar_line_out)

      if i == num_samples - 1:
        break

  return encoder_input, decoder_input, decoder_target


# 전처리 진행
num_samples = 33000

sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()
print('인코더의 입력 :',sents_en_in[:5])
print('디코더의 입력 :',sents_fra_in[:5])
print('디코더의 레이블 :',sents_fra_out[:5])

인코더의 입력 : [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더의 입력 : [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더의 레이블 : [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]


#### Dataset Preparation

1. Encoder Input
2. Decoder Input : <sos> 포함된 Decoder training용 input(교사 강요)
3. Decoder Target : <eos> 포함된 Decoder accuracy 측정을 위한 정답 ouput

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm

# Device 정의
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. 단어 사전 구축 함수
def build_vocab(sents):
  word_list = []

  for sent in sents:
      for word in sent:
        word_list.append(word)

  # 각 단어별 등장 빈도를 계산하여 등장 빈도가 높은 순서로 정렬
  word_counts = Counter(word_list)
  vocab = sorted(word_counts, key=word_counts.get, reverse=True)

  word_to_index = {}
  word_to_index['<PAD>'] = 0
  word_to_index['<UNK>'] = 1

  # 등장 빈도가 높은 단어일수록 낮은 정수를 부여
  for index, word in enumerate(vocab) :
    word_to_index[word] = index + 2

  return word_to_index


# 단어 사전 구축 및 size 측정
src_vocab = build_vocab(sents_en_in)
tar_vocab = build_vocab(sents_fra_in + sents_fra_out)

src_vocab_size = len(src_vocab)
tar_vocab_size = len(tar_vocab)



# 2. word2index : word 사전을 index화
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

def texts_to_sequences(sents, word_to_index):
  encoded_X_data = []
  for sent in tqdm(sents):
    index_sequences = []
    for word in sent:
      try:
          index_sequences.append(word_to_index[word])
      except KeyError:
          index_sequences.append(word_to_index['<UNK>'])
    encoded_X_data.append(index_sequences)
  return encoded_X_data


encoder_input = texts_to_sequences(sents_en_in, src_vocab)
decoder_input = texts_to_sequences(sents_fra_in, tar_vocab)
decoder_target = texts_to_sequences(sents_fra_out, tar_vocab)




# 3. Padding : 최대 길이에 맞춰서 padding 적용
def pad_sequences(sentences, max_len=None):
    # 최대 길이 값이 주어지지 않을 경우 데이터 내 최대 길이로 패딩
    if max_len is None:
        max_len = max([len(sentence) for sentence in sentences])

    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features

encoder_input = pad_sequences(encoder_input)
decoder_input = pad_sequences(decoder_input)
decoder_target = pad_sequences(decoder_target)




print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)
print(f'영어 단어 집합 크기: {str(src_vocab_size)}, 프랑스어 단어 집합 크기: {str(tar_vocab_size)}')


100%|██████████| 33000/33000 [00:00<00:00, 4022319.37it/s]
100%|██████████| 33000/33000 [00:00<00:00, 3339494.58it/s]
100%|██████████| 33000/33000 [00:00<00:00, 410157.12it/s]

인코더의 입력의 크기(shape) : (33000, 7)
디코더의 입력의 크기(shape) : (33000, 16)
디코더의 레이블의 크기(shape) : (33000, 16)
영어 단어 집합 크기: 4486, 프랑스어 단어 집합 크기: 7879





In [19]:
### Train / Test split
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)

encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

n_of_val = int(33000*0.1)

encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]



#### Tensor Dataset 설정 -> Pytorch Dataset 변환
from torch.utils.data import DataLoader, TensorDataset

encoder_input_train_tensor = torch.tensor(encoder_input_train, dtype=torch.long)
decoder_input_train_tensor = torch.tensor(decoder_input_train, dtype=torch.long)
decoder_target_train_tensor = torch.tensor(decoder_target_train, dtype=torch.long)

encoder_input_test_tensor = torch.tensor(encoder_input_test, dtype=torch.long)
decoder_input_test_tensor = torch.tensor(decoder_input_test, dtype=torch.long)
decoder_target_test_tensor = torch.tensor(decoder_target_test, dtype=torch.long)

train_dataset = TensorDataset(encoder_input_train_tensor, decoder_input_train_tensor, decoder_target_train_tensor)
valid_dataset = TensorDataset(encoder_input_test_tensor, decoder_input_test_tensor, decoder_target_test_tensor)

#### Training

- 학습 방법
    1. Encoder을 Encoder Input을 이용해서 학습 진행
    2. Encoder 내 LSTM 셀 중 마지막 출력을 Decoder의 Initial State로 활용
    3. Decoder을 Decoder Input을 활용하여 학습 진행
    4. Decoder의 결과와 Decoder Target 사이의 Cross Entropy Loss로 Backpropagation 진행

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import time

# Device 설정 (CUDA 사용 가능하면 GPU 사용)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 매개변수 설정
embedding_dim = 256
hidden_units = 256
batch_size = 128
num_epochs = 10


# 1. Encoder 정의
class Encoder(nn.Module):
    def __init__(self, src_vocab_size, embedding_dim, hidden_units):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0) # Padding 인덱스 설정
        self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first=True)

    def forward(self, x):
        x = self.embedding(x)   # (batch_size, seq_len, embedding_dim)
        
        # hidden.shape == (1, batch_size, hidden_units), cell.shape == (1, batch_size, hidden_units)
        _, (hidden, cell) = self.lstm(x)
        
        # 인코더의 출력은 hidden state, cell state
        return hidden, cell

# 2. Decoder 정의
class Decoder(nn.Module):
    def __init__(self, tar_vocab_size, embedding_dim, hidden_units):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(tar_vocab_size, embedding_dim, padding_idx=0) # Padding 인덱스 설정
        self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first=True)
        self.fc = nn.Linear(hidden_units, tar_vocab_size)

    def forward(self, x, hidden, cell):
        x = self.embedding(x)   # (batch_size, seq_len, embedding_dim)

        # output.shape == (batch_size, seq_len, hidden_units) | hidden.shape == (1, batch_size, hidden_units) | cell.shape == (1, batch_size, hidden_units)
        output, (hidden, cell) = self.lstm(x, (hidden, cell))
        output = self.fc(output)    # (batch_size, seq_len, tar_vocab_size)

        # 디코더의 출력은 예측값, hidden state, cell state
        return output, hidden, cell
    

# 3. Seq2Seq 모델 정의
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, encoder_input, decoder_input):
        hidden, cell = self.encoder(encoder_input)
        outputs, _, _ = self.decoder(decoder_input, hidden, cell)
        return outputs


encoder = Encoder(src_vocab_size, embedding_dim, hidden_units)
decoder = Decoder(tar_vocab_size, embedding_dim, hidden_units)
model = Seq2Seq(encoder, decoder)
model.to(device)

loss_function = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("Model 정보")
print(model)

print("-"*70)

Model 정보
Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(4486, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
  )
  (decoder): Decoder(
    (embedding): Embedding(7879, 256, padding_idx=0)
    (lstm): LSTM(256, 256, batch_first=True)
    (fc): Linear(in_features=256, out_features=7879, bias=True)
  )
)
----------------------------------------------------------------------


In [21]:
# 4. training by epoch & loss update
def evaluation(model, dataloader, loss_function, device):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total_count = 0

    with torch.no_grad():
        for encoder_inputs, decoder_inputs, decoder_targets in dataloader:
            encoder_inputs = encoder_inputs.to(device)
            decoder_inputs = decoder_inputs.to(device)
            decoder_targets = decoder_targets.to(device)

            # 순방향 전파
            # outputs.shape == (batch_size, seq_len, tar_vocab_size)
            outputs = model(encoder_inputs, decoder_inputs)

            # 손실 계산
            # outputs.view(-1, outputs.size(-1))의 shape는 (batch_size * seq_len, tar_vocab_size)
            # decoder_targets.view(-1)의 shape는 (batch_size * seq_len)
            loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
            total_loss += loss.item()

            # 정확도 계산 (패딩 토큰 제외)
            mask = decoder_targets != 0
            total_correct += ((outputs.argmax(dim=-1) == decoder_targets) * mask).sum().item()
            total_count += mask.sum().item()

    return total_loss / len(dataloader), total_correct / total_count



############################### 5. Training ##################################

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)

def train_seq2seq(model, train_dataloader, valid_dataloader, optimizer, loss_function, device, num_epochs):
    """
    Trains a Seq2Seq model using the provided dataloaders, optimizer, and loss function.
    
    Args:
        model: PyTorch model (Seq2Seq)
        train_dataloader: DataLoader for training data
        valid_dataloader: DataLoader for validation data
        optimizer: Optimizer for training
        loss_function: Loss function
        device: Device to run the model on ("cuda" or "cpu")
        num_epochs: Number of epochs to train
    """
    best_val_loss = float('inf')
    model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        
        for encoder_inputs, decoder_inputs, decoder_targets in train_dataloader:
            encoder_inputs = encoder_inputs.to(device)
            decoder_inputs = decoder_inputs.to(device)
            decoder_targets = decoder_targets.to(device)
            
            optimizer.zero_grad()
            
            outputs = model(encoder_inputs, decoder_inputs)
            
            loss = loss_function(outputs.view(-1, outputs.size(-1)), decoder_targets.view(-1))
            loss.backward()
            optimizer.step()
        
        train_loss, train_acc = evaluation(model, train_dataloader, loss_function, device)
        valid_loss, valid_acc = evaluation(model, valid_dataloader, loss_function, device)
        
        print(f'Epoch: {epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}')
    
    return model


model = train_seq2seq(model, train_dataloader, valid_dataloader, optimizer, loss_function, device, num_epochs)

Epoch: 1/10 | Train Loss: 2.9462 | Train Acc: 0.5296 | Valid Loss: 3.0746 | Valid Acc: 0.5267
Epoch: 2/10 | Train Loss: 2.2702 | Train Acc: 0.6028 | Valid Loss: 2.5088 | Valid Acc: 0.5910
Epoch: 3/10 | Train Loss: 1.8591 | Train Acc: 0.6467 | Valid Loss: 2.2164 | Valid Acc: 0.6206
Epoch: 4/10 | Train Loss: 1.5585 | Train Acc: 0.6835 | Valid Loss: 2.0304 | Valid Acc: 0.6418
Epoch: 5/10 | Train Loss: 1.2996 | Train Acc: 0.7201 | Valid Loss: 1.8792 | Valid Acc: 0.6604
Epoch: 6/10 | Train Loss: 1.0843 | Train Acc: 0.7586 | Valid Loss: 1.7636 | Valid Acc: 0.6793
Epoch: 7/10 | Train Loss: 0.9019 | Train Acc: 0.7942 | Valid Loss: 1.6698 | Valid Acc: 0.6908
Epoch: 8/10 | Train Loss: 0.7436 | Train Acc: 0.8287 | Valid Loss: 1.6059 | Valid Acc: 0.7012
Epoch: 9/10 | Train Loss: 0.6230 | Train Acc: 0.8503 | Valid Loss: 1.5710 | Valid Acc: 0.7074
Epoch: 10/10 | Train Loss: 0.5218 | Train Acc: 0.8755 | Valid Loss: 1.5327 | Valid Acc: 0.7144


In [22]:
# 검증 데이터에 대한 정확도와 손실 계산
val_loss, val_accuracy = evaluation(model, valid_dataloader, loss_function, device)

print(f'Best model validation loss: {val_loss:.4f}')
print(f'Best model validation accuracy: {val_accuracy:.4f}')

Best model validation loss: 1.5327
Best model validation accuracy: 0.7144


#### Translator Operation

In [23]:
#### Inference

index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0 and encoded_word != tar_vocab['<sos>'] and encoded_word != tar_vocab['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence


def decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, max_output_len, int_to_src_token, int_to_tar_token):
    encoder_inputs = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0).to(device)

    # 인코더의 초기 상태 설정
    hidden, cell = model.encoder(encoder_inputs)

    # 시작 토큰 <sos>을 디코더의 첫 입력으로 설정
    # unsqueeze(0)는 배치 차원을 추가하기 위함.
    decoder_input = torch.tensor([3], dtype=torch.long).unsqueeze(0).to(device)

    decoded_tokens = []

    # for문을 도는 것 == 디코더의 각 시점
    for _ in range(max_output_len):
        output, hidden, cell = model.decoder(decoder_input, hidden, cell)

        # 소프트맥스 회귀를 수행. 예측 단어의 인덱스
        output_token = output.argmax(dim=-1).item()

        # 종료 토큰 <eos>
        if output_token == 4:
            break

        # 각 시점의 단어(정수)는 decoded_tokens에 누적하였다가 최종 번역 시퀀스로 리턴합니다.
        decoded_tokens.append(output_token)

        # 현재 시점의 예측. 다음 시점의 입력으로 사용된다.
        decoder_input = torch.tensor([output_token], dtype=torch.long).unsqueeze(0).to(device)

    return ' '.join(int_to_tar_token[token] for token in decoded_tokens)



for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_train[seq_index]
  translated_text = decode_sequence(input_seq, model, src_vocab_size, tar_vocab_size, 20, index_to_src, index_to_tar)

  print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
  print("번역문장 :",translated_text)
  print("-"*50)

입력문장 : i am good . 
정답문장 : je suis bon . 
번역문장 : je suis bon .
--------------------------------------------------
입력문장 : i m getting old . 
정답문장 : je commence a devenir vieux . 
번역문장 : je me fais vieux .
--------------------------------------------------
입력문장 : this is my horse . 
정답문장 : ce cheval est a moi . 
번역문장 : c est mon cheval .
--------------------------------------------------
입력문장 : we have no proof . 
정답문장 : nous n avons aucune preuve . 
번역문장 : nous n avons aucune preuve .
--------------------------------------------------
입력문장 : you re very wise . 
정답문장 : vous etes tres avises . 
번역문장 : vous etes tres sages .
--------------------------------------------------
