In [None]:
import os
from argparse import Namespace
from collections import Counter
import json
import re
import string

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import tqdm

# Vocabulary와 SequenceVocabulary

In [None]:
class Vocabulary(object):
  """매핑을 위해 텍스트를 처리하고 어휘 사전을 만드는 클래스 """

  def __init__(self, token_to_idx=None):
      """
      매개변수:
          token_to_idx (dict): 기존 토큰-인덱스 매핑 딕셔너리
      """

      if token_to_idx is None:
          token_to_idx = {}
      self._token_to_idx = token_to_idx

      self._idx_to_token = {idx: token
                            for token, idx in self._token_to_idx.items()}

  def to_serializable(self):
      """ 직렬화할 수 있는 딕셔너리를 반환합니다 """
      return {'token_to_idx': self._token_to_idx}

  @classmethod
  def from_serializable(cls, contents):
      """ 직렬화된 딕셔너리에서 Vocabulary 객체를 만듭니다 """
      return cls(**contents)

  def add_token(self, token):
        """ 토큰을 기반으로 매핑 딕셔너리를 업데이트합니다

        매개변수:
            token (str): Vocabulary에 추가할 토큰
        반환값:
            index (int): 토큰에 상응하는 정수
        """
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index

  def add_many(self, tokens):
      """토큰 리스트를 Vocabulary에 추가합니다.

      매개변수:
          tokens (list): 문자열 토큰 리스트
      반환값:
          indices (list): 토큰 리스트에 상응되는 인덱스 리스트
      """
      return [self.add_token(token) for token in tokens]

  def lookup_token(self, token):
      """토큰에 대응하는 인덱스를 추출합니다.

      매개변수:
          token (str): 찾을 토큰
      반환값:
          index (int): 토큰에 해당하는 인덱스
      """
      return self._token_to_idx[token]

  def lookup_index(self, index):
        """ 인덱스에 해당하는 토큰을 반환합니다.

        매개변수:
            index (int): 찾을 인덱스
        반환값:
            token (str): 인텍스에 해당하는 토큰
        에러:
            KeyError: 인덱스가 Vocabulary에 없을 때 발생합니다.
        """
        if index not in self._idx_to_token:
            raise KeyError("the index (%d) is not in the Vocabulary" % index)
        return self._idx_to_token[index]

  def __str__(self):
      return "<Vocabulary(size=%d)>" % len(self)

  def __len__(self):
      return len(self._token_to_idx)


In [None]:
class SequenceVocabulary(Vocabulary):
  def __init__(self, token_to_idx=None, unk_token="<UNK>",
                mask_token="<MASK>", begin_seq_token="<BEGIN>",
                end_seq_token="<END>"):

      super(SequenceVocabulary, self).__init__(token_to_idx)

      self._mask_token = mask_token
      self._unk_token = unk_token
      self._begin_seq_token = begin_seq_token
      self._end_seq_token = end_seq_token

      self.mask_index = self.add_token(self._mask_token)
      self.unk_index = self.add_token(self._unk_token)
      self.begin_seq_index = self.add_token(self._begin_seq_token)
      self.end_seq_index = self.add_token(self._end_seq_token)

  def to_serializable(self):
      contents = super(SequenceVocabulary, self).to_serializable()
      contents.update({'unk_token': self._unk_token,
                        'mask_token': self._mask_token,
                        'begin_seq_token': self._begin_seq_token,
                        'end_seq_token': self._end_seq_token})
      return contents

  def lookup_token(self, token):
      """ 토큰에 대응하는 인덱스를 추출합니다.
      토큰이 없으면 UNK 인덱스를 반환합니다.

      매개변수:
          token (str): 찾을 토큰
      반환값:
          index (int): 토큰에 해당하는 인덱스
      노트:
          UNK 토큰을 사용하려면 (Vocabulary에 추가하기 위해)
          `unk_index`가 0보다 커야 합니다.
      """
      if self.unk_index >= 0:
          return self._token_to_idx.get(token, self.unk_index)
      else:
          return self._token_to_idx[token]

# Vectorizer

In [None]:
class NMTVectorizer(object):
  def __init__(self, source_vocab, max_source_length, max_target_length):
    '''
    source_vocab : 소스 단어 정수 매핑
    target_vocab : 타깃 단어 정수 매핑
    max_source_length : 소스 데이터셋에서 가장 긴 시퀀스 길이
    max_target_length : 타깃 데이터셋에서 가장 긴 시퀀스 길이
    '''

    self.source_vocab = source_vocab
    self.max_source_length = max_source_length
    self.max_target_length = max_target_length

  def _vectorize(self, indices, vector_length=-1, mask_index=0):
    '''
    index를 벡터로 변환

    indices : 시퀀스를 나타내는 정수 리스트
    vector_length : 인덱스 벡터 길이
    mask_index : 사용할 MASK 인덱스
    '''

    if vector_length < 0:             # vector_length 파라미터 전달이 안됐을 경우, indices길이로 지정
      vector_length = len(indices)

    vector = np.zeros(vector_length, dtype=np.int64)
    vector[:len(indices)] = indices
    vector[len(indices):] = mask_index  # MASK 인덱스로 패딩

    return vector

  def _get_source_indices(self, text):
    '''
    벡터로 변환된 소스 텍스트를 반환 (소스 텍스트의 벡터를 반환)
    '''
    indices = [self.source_vocab.begin_seq_index]
    indices.extend(self.source_vocab.lookup_token(token) for token in text.split(" ")) # token의 인덱스를 찾음
    indices.append(self.source_vocab.end_seq_index)
    return indices

  def _get_target_indices(self, text):
    '''
    벡터로 변환된 타깃 텍스트를 반환
    '''

    indices = [self.target_vocab.lookup_token(token) for token in text.split(' ')]
    x_indices = [self.target_vocab.begin_seq_index] + indices
    y_indices = indices + [self.taget_vocab.end_seq_index]
    return x_indices, y_indices # x_indices : 디코더에서 샘플을 나타내는 정수 리스트, y_indices : 디코더에서 예측을 나타내는 정수 리스트

  def vectorize(self, source_text, target_text, use_dataset_max_lengths=True):
    '''
    벡터화된 소스 텍스트와 타깃 텍스트 반환
    '''
    source_vector_length = -1
    target_vector_length = -1

    if use_dataset_max_lengths:
      source_vector_length = self.max_source_length + 2
      target_vector_length = self.max_target_length + 1

    source_indices = self._get_source_indices(source_text)
    source_vector = self._vectorize(source_indices,
                                    vector_length=source_vector_length,
                                    mask_index=self.source_vocab.mask_index)

    target_x_indices, target_y_indices = self._get_target_indices(target_text)
    target_x_vector = self._vectorize(target_x_indices,
                                      vector_length=target_vector_length,
                                      mask_index=self.target_vocab.mask_index)
    target_y_vector = self._vectorize(target_y_indices,
                                      vector_length=target_vector_length,
                                      mask_index=self.target_vocab.mask_index)
    return {
        "source_vector":source_vector,
        "target_x_vector":target_x_vector,
        "target_y_vector":target_y_vector,
        "source_length":len(source_indices)
    }

  @classmethod
  def from_dataframe(cls, bitext_df):
    '''
    데이터셋 데이터프레임으로 Vectorizer 초기화
    '''

    source_vocab = SequenceVocabulary()
    target_vocab = SequenceVocabulary()

    max_source_length, max_target_length = 0, 0

    for _, row in bitext_df.iterrows():
      source_tokens = row["source_language"].split(' ')
      if len(source_tokens) > max_source_length:
        max_source_length = len(source_tokens)
      for token in source_tokens:
        source_vocab.add_token(token)

      target_tokens = row["target_language"].split(' ')
      if len(target_tokens) > max_target_length:
        max_target_length = len(target_tokens)
      for token in target_tokens:
        target_vocab.add_token(token)

    return cls(source_vocab, max_source_length, max_target_length)

# 데이터셋

In [None]:
class NMTDataset(Dataset):
  def __init__(self, text_df, vectorizer):
      """
      매개변수:
          text_df (pandas.DataFrame): 데이터셋
          vectorizer (SurnameVectorizer): 데이터셋에서 만든 Vectorizer 객체
      """
      self.text_df = text_df
      self._vectorizer = vectorizer

      self.train_df = self.text_df[self.text_df.split=='train']
      self.train_size = len(self.train_df)

      self.val_df = self.text_df[self.text_df.split=='val']
      self.validation_size = len(self.val_df)

      self.test_df = self.text_df[self.text_df.split=='test']
      self.test_size = len(self.test_df)

      self._lookup_dict = {'train': (self.train_df, self.train_size),
                            'val': (self.val_df, self.validation_size),
                            'test': (self.test_df, self.test_size)}

      self.set_split('train')

  @classmethod
  def load_dataset_and_make_vectorizer(cls, dataset_csv):
      """데이터셋을 로드하고 새로운 Vectorizer를 만듭니다

      매개변수:
          dataset_csv (str): 데이터셋의 위치
      반환값:
          NMTDataset의 객체
      """
      text_df = pd.read_csv(dataset_csv)
      train_subset = text_df[text_df.split=='train']
      return cls(text_df, NMTVectorizer.from_dataframe(train_subset))

  @classmethod
  def load_dataset_and_load_vectorizer(cls, dataset_csv, vectorizer_filepath):
      """데이터셋과 새로운 Vectorizer 객체를 로드합니다.
      캐싱된 Vectorizer 객체를 재사용할 때 사용합니다.

      매개변수:
          dataset_csv (str): 데이터셋의 위치
          vectorizer_filepath (str): Vectorizer 객체의 저장 위치
      반환값:
          NMTDataset의 객체
      """
      text_df = pd.read_csv(dataset_csv)
      vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
      return cls(text_df, vectorizer)

  @staticmethod
  def load_vectorizer_only(vectorizer_filepath):
      """파일에서 Vectorizer 객체를 로드하는 정적 메서드

      매개변수:
          vectorizer_filepath (str): 직렬화된 Vectorizer 객체의 위치
      반환값:
          NMTVectorizer의 인스턴스
      """
      with open(vectorizer_filepath) as fp:
          return NMTVectorizer.from_serializable(json.load(fp))

  def save_vectorizer(self, vectorizer_filepath):
      """Vectorizer 객체를 json 형태로 디스크에 저장합니다

      매개변수:
          vectorizer_filepath (str): Vectorizer 객체의 저장 위치
      """
      with open(vectorizer_filepath, "w") as fp:
          json.dump(self._vectorizer.to_serializable(), fp)

  def get_vectorizer(self):
      """ 벡터 변환 객체를 반환합니다 """
      return self._vectorizer

  def set_split(self, split="train"):
      self._target_split = split
      self._target_df, self._target_size = self._lookup_dict[split]

  def __len__(self):
      return self._target_size

  def __getitem__(self, index):
      """파이토치 데이터셋의 주요 진입 메서드

      매개변수:
          index (int): 데이터 포인트에 대한 인덱스
      반환값:
          데이터 포인트(x_source, x_target, y_target, x_source_length)를 담고 있는 딕셔너리
      """
      row = self._target_df.iloc[index]

      vector_dict = self._vectorizer.vectorize(row.source_language, row.target_language)

      return {"x_source": vector_dict["source_vector"],
              "x_target": vector_dict["target_x_vector"],
              "y_target": vector_dict["target_y_vector"],
              "x_source_length": vector_dict["source_length"]}

  def get_num_batches(self, batch_size):
      """배치 크기가 주어지면 데이터셋으로 만들 수 있는 배치 개수를 반환합니다

      매개변수:
          batch_size (int)
      반환값:
          배치 개수
      """
      return len(self) // batch_size

# NMT를 위한 미니배치 생성 함수 (PackedSequence 데이터 구조를 만듦)

In [None]:
 def generate_nmt_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"):
  dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)

  for data_dict in dataloader:
    lengths = data_dict['x_source_length'].numpy()           # 각 시퀀스 길이
    sorted_length_indices = lengths.argsort()[::-1].tolist() # 시퀀스 길이 순서대로 내림차순 정렬

    out_data_dict = {}
    for name, tensor in data_dict.items():
      out_data_dict[name] = data_dict[name][sorted_length_indices].to(device)
    yield out_data_dict

# NMT 모델

# NMT 인코더

# NMT 디코더