In [4]:
import torch
from torch import nn, optim
from torch.utils.data import TensorDataset, Dataset, DataLoader
from tqdm import tqdm

import re
import collections
import itertools

In [5]:
remove_marks_regex = re.compile('[\,\(\)\[\]\*:;¿¡]|<.*?>') # 불필요한 문자
shift_marks_regex = re.compile('([?!\.])') # 공백 추가할 문자

unk = 0
sos = 1
eos = 2


def normalize(text):
    text = text.lower()
    text = remove_marks_regex.sub('', text)
    text = shift_marks_regex.sub(r' \1', text)
    return text


def parse_line(line):
    line = normalize(line.strip())
    src, trg, _ = line.split('\t')
    src_tokens = src.strip().split()
    trg_tokens = trg.strip().split()
    return src_tokens, trg_tokens


def build_vocab(tokens):
    # 파일 안의 모든 문장에서 토큰의 등장 횟수를 확인
    counts = collections.Counter(tokens)
    # 토큰의 등장 횟수를 많은 순으로 나열
    sorted_counts = sorted(counts.items(), key=lambda c: c[1], reverse=True)
    # 세개의 태그를 추가해서 정방향 리스트와 역방향 용어집 만들기
    list_word = ['<UNK>', '<SOS>', '<EOS>'] + [x[0] for x in sorted_counts]
    dict_word = {w:i for i, w in enumerate(list_word)}
    
    return list_word, dict_word

def words2tensor(words, word_dict, max_len, padding=0):
    # <EOS>는 2
    words = words + ['<EOS>'] # 종료태그
    words = [word_dict.get(w, 0) for w in words]
    seq_len = len(words)
    if seq_len < max_len + 1:
        words = words + [padding] * (max_len + 1 - seq_len)
    
    return torch.tensor(words, dtype=torch.int64), seq_len

In [6]:
class TranslationPairDataset(Dataset):
    def __init__(self, path, max_len=15):
        def filter_pair(p):
            return not (len(p[0]) > max_len or len(p[1]) > max_len)
        
        with open(path) as fp:
            pairs = map(parse_line, fp)
            pairs = filter(filter_pair, pairs)
            pairs = list(pairs)
        
        src = [p[0] for p in pairs]
        trg = [p[1] for p in pairs]
        
        # 각각 어휘집 작성
        self.list_src_word, self.dict_src_word = build_vocab(
            itertools.chain.from_iterable(src)
        )
        self.list_trg_word, self.dict_trg_word = build_vocab(
            itertools.chain.from_iterable(trg)
        )
        
        self.src_data = [words2tensor(words, self.dict_src_word, max_len) for words in src]
        self.trg_data = [words2tensor(words, self.dict_trg_word, max_len) for words in trg]
        
    def __len__(self):
        return len(self.src_data)
    
    def __getitem__(self, idx):
        src, lsrc = self.src_data[idx]
        trg, ltrg = self.trg_data[idx]
        
        return src, lsrc, trg, ltrg

In [8]:
batch_size = 64
max_len = 10
path_data = 'data/spa-eng/spa.txt'
ds = TranslationPairDataset(path_data, max_len=max_len)
loader = DataLoader(ds, batch_size=batch_size, shuffle=True)

In [13]:
class Encoder(nn.Module):
    def __init__(
        self,
        num_embeddings,
        embedding_dim=50,
        hidden_size=50,
        num_layers=1,
        dropout=.2
    ):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout
        )
        
    def forward(self, x, h0=None, l=None):
        x = self.emb(x)
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(x, l, batch_first=True)
        _, h = self.lstm(x, h0)
        
        return h

In [55]:
class Decoder(nn.Module):
    def __init__(
        self,
        num_embeddings,
        embedding_dim=50,
        hidden_size=50,
        num_layers=1,
        dropout=.2
    ):
        super().__init__()
        self.emb =  nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(
            embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout
        )
        self.linear = nn.Linear(hidden_size, num_embeddings)
        
    def forward(self, x, h, l=None):
        # print(f'x shape: {x.shape}')
        x = self.emb(x)
        # print(f'x shape: {x.shape}')
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(x, l, batch_first=True)
        # print(f'x shape: {x.shape}')
        x, h = self.lstm(x, h)
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(x, batch_first=True, padding_value=0)[0]
        # print(f'x shape: {x.shape}')
        x = self.linear(x)
        # print(f'x shape: {x.shape}')
        return x, h

In [69]:
def translate(input_str, enc, dec, max_len=15, device='cpu'):
    words = normalize(input_str).split()
    input_tensor, seq_len = words2tensor(words, ds.dict_src_word, max_len=max_len)
    input_tensor = input_tensor.unsqueeze(0)
    seq_len = [seq_len]
    
    # sos 어디서 튀어나온건지...
    sos_inputs = torch.tensor(sos, dtype=torch.int64)
    input_tensor = input_tensor.to(device)
    sos_inputs = sos_inputs.to(device)
    
    ctx = enc(input_tensor, l=seq_len)
    z = sos_inputs
    h = ctx
    results = []
    
    # print('h: ', h[0].shape)
    for i in range(max_len):
        # print(z.view(1, 1))
        o, h = dec(z.view(1, 1), h)
        # print(o.shape)
        wi = o.detach().view(-1).max(0)[1]
        # print(f'wi shape: {wi} - {wi.shape}')
        
        if wi.item() == eos:
            break
        results.append(wi.item())
        z = wi
        
    return ' '.join(ds.list_trg_word[i] for i in results)
    
    

In [70]:
enc = Encoder(len(ds.list_src_word), 100, 100, 2)
dec = Decoder(len(ds.list_trg_word), 100, 100, 2)
translate('I am a student.', enc, dec)

'dispararan tropezó beicon güey leerse leerse verte verte avaricioso avaricioso güey tofu prometedores prometedores güey'

### <b style="color: #0eab81">모델학습</b>

In [None]:
from statistics import mean

def to2D(x):
    shapes = x.shape
    return x.reshape(shapes[0] * shapes[1], -1)

for epoc in range(30):
    enc.train(), dec.train()
    losses = []