In [1]:
import re
import collections
import itertools

# Encoder-Decoderモデルによる機械翻訳
# 1. 翻訳元のデータ(ソース、src)をEncoderに入力し、特徴量ベクトルを得る
# 2. 特徴量ベクトルをDecoderに入力し、翻訳先(ターゲット、trg)のデータを得る
# http://www.manythings.org/anki/spa-eng.zip
# タブ区切りで公開されたデータセット
remove_marks_regex = re.compile('[,\.\(\)\[\]\*:;¿¡]|<.*?>')
shift_marks_regex = re.compile('([?!\.])')
# unknown, start of sentence, end of sentenceのid
unk = 0
sos = 1
eos = 2

def normalize(text):
    # 全ての文字を小文字に変換
    text = text.lower()
    # 不要な文字を削除
    text = remove_marks_regex.sub('', text)
    # ?!.と単語の間に空白を挿入
    text = shift_marks_regex.sub(r' \1', text)
    return text

def parse_line(line):
    # 文字列を正規化
    line = normalize(line.strip())
    # 翻訳元と翻訳先がタブで分かれている
    # 翻訳元(src)と翻訳先(trg)それぞれのトークンのリストを作る
    src, trg = line.split('\t')
    # 文章を単語の配列にする
    src_tokens = src.strip().split()
    trg_tokens = trg.strip().split()
    return src_tokens, trg_tokens

def build_vocab(tokens):
    # ファイル内全ての文章でのトークンの出現数を数え、多い順に並べる
    counts = collections.Counter(tokens)
    sorted_counts = sorted(counts.items(), key=lambda c: c[1], reverse=True)
    # 3つのタグを追加して正引きリストと逆引き用辞書を作る
    # unknown, start of sentence, end of sentence
    word_list = ['<UNK>', '<SOS>', '<EOS>'] + [x[0] for x in sorted_counts]
    word_dict = dict((w, i) for i, w in enumerate(word_list))
    return word_list, word_dict

def words2tensor(words, word_dict, max_len, padding=0):
    # 末尾に終了タグをつける
    words = words + ['<EOS>']
    # 辞書を利用して数値のリストに変換する
    words = [word_dict.get(w, 0) for w in words]
    seq_len = len(words)
    # 長さがmax_len以下の場合はパディングする
    if seq_len < max_len + 1:
        words = words + [padding] * (max_len + 1 - seq_len)
    # Tensorに変換して渡す
    return torch.LongTensor(words), seq_len

In [2]:
from torch.utils.data import Dataset


class TranslationPairDataset(Dataset):
    def __init__(self, path, max_len=15):
        # 単語数が多い文章をフィルタリングする関数
        def filter_pair(p):
            return not (len(p[0]) > max_len or len(p[1]) > max_len)
        
        # ファイルを開き、パース/フィルタリングをする
        with open(path) as fp:
            pairs = map(parse_line, fp)
            pairs = filter(filter_pair, pairs)
            pairs = list(pairs)
        # 文章のペアをソースとターゲットに分ける
        src = [p[0] for p in pairs]
        trg = [p[1] for p in pairs]
        # それぞれの語彙集を作成する
        self.src_word_list, self.src_word_dict = build_vocab(itertools.chain.from_iterable(src))
        self.trg_word_list, self.trg_word_dict = build_vocab(itertools.chain.from_iterable(trg))
        # 語彙集を使用してTensorに変換する
        self.src_data = [words2tensor(words, self.src_word_dict, max_len) for words in src]
        self.trg_data = [words2tensor(words, self.trg_word_dict, max_len, -100) for words in trg]
        
    def __len__(self):
        return len(self.src_data)
    
    def __getitem__(self, idx):
        src, lsrc = self.src_data[idx]
        trg, ltrg = self.trg_data[idx]
        return src, lsrc, trg, ltrg

In [3]:
import torch
from torch.utils.data import DataLoader


batch_size = 64
max_len = 10
path = './spa.txt'
ds = TranslationPairDataset(path, max_len=max_len)
loader = DataLoader(ds, batch_size=batch_size, shuffle=True, num_workers=4)

In [4]:
from torch import nn


# encoderは内部状態だけ必要なので出力層は必要ない
class Encoder(nn.Module):
    def __init__(self, num_embeddings, embedding_dim=50, hidden_size=50, num_layers=1, dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout)
        
    def forward(self, x, h0=None, l=None):
        x = self.emb(x)
        if l is not None:
            x = nn.utils.rnn.pack_padded_sequence(x, l, batch_first=True)
        _, h = self.lstm(x, h0)
        return h

In [5]:
# decoderは出力層が必要
class Decoder(nn.Module):
    def __init__(self, num_embeddings, embedding_dim=50, hidden_size=50, num_layers=1, dropout=0.2):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.linear = nn.Linear(hidden_size, num_embeddings)
    
    def forward(self, x, h):
        x = self.emb(x)
        # lstmの内部状態の初期値はencoderの最後の内部状態にする
        x, h = self.lstm(x, h)
        x = x.view(-1, self.lstm.hidden_size)
        x = self.linear(x)
        return x, h

In [6]:
def translate(input_str, enc, dec, max_len=15):
    # 入力文字列を数値化してTensorに変換
    words = normalize(input_str).split()
    input_tensor, seq_len = words2tensor(words, ds.src_word_dict, max_len=max_len)
    input_tensor = input_tensor.unsqueeze(0)
    # Encoderで使用するので入力の長さもリストにしておく
    seq_len = [seq_len]
    # 開始トークンを準備
    # 開始トークンはstart of sentence
    sos_inputs = torch.LongTensor([sos]).unsqueeze(1)
    # 入力文字列をEncoderにいれてコンテキストを得る
    ctx = enc(V(input_tensor), l=seq_len)
    # 開始トークンとコンテキストをDecoderの初期値にセット
    z = V(sos_inputs)
    h = ctx
    results = []
    for i in range(max_len):
        # Decoderで次の単語を予測
        o, h = dec(z, h)
        # 線形層の出力が最も大きい場所が次の単語のid
        wi = o.data.max(1)[1].view(1)
        if wi[0] == eos:
            break
        results.append(wi[0])
        # 次の入力は今回の出力のidを使用する
        z = V(wi.view(1, 1))
    # 記録しておいた出力のidを文字列に変換
    return " ".join(ds.trg_word_list[i] for i in results)

In [7]:
from torch.autograd import Variable as V


enc = Encoder(len(ds.src_word_list), 100, 100, 2)
dec = Decoder(len(ds.trg_word_list), 100, 100, 2)
translate('I am a student.', enc, dec)

'violentamente dr rugiendo rugiendo 1955 1955 meticulosa fiar causa causa causa causa causa causa causa'

In [8]:
from torch import optim


enc = Encoder(len(ds.src_word_list), 100, 100, 1, dropout=0.1)
dec = Decoder(len(ds.trg_word_list), 100, 100, 1, dropout=0.1)
opt_enc = optim.Adam(enc.parameters(), 0.002)
opt_dec = optim.Adam(dec.parameters(), 0.01)
loss_f = nn.CrossEntropyLoss()

In [None]:
from statistics import mean


for epoc in range(10):
    # ネットワークを訓練モードにする
    enc.train()
    dec.train()
    losses = []
    for x, lx, y, ly in loader:
        # 開始トークンの準備
        sos_inputs = torch.LongTensor([sos] * len(x)).unsqueeze(1)
        # PackedSequenceを作るために翻訳元の長さで降順にソート
        lx, sort_idx = lx.sort(descending=True)
        x, y = x[sort_idx], y[sort_idx]
        x, y = V(x), V(y)
        loss = 0
        # 翻訳元をEncoderにいれてコンテキストを得る
        ctx = enc(x, l=list(lx))
        # Decoderの初期値をセット
        z = V(sos_inputs)
        h = ctx
        # Decoderで繰り返し予測して損失関数を積み重ねていく
        for i in range(max_len):
            o, h = dec(z, h)
            # 損失関数を計算
            loss += loss_f(o, y[:, i])
            wi = o.data.max(1)[1].unsqueeze(1)
            z = V(wi)
        # バックプロパゲートを実行
        enc.zero_grad()
        dec.zero_grad()
        loss.backward()
        opt_enc.step()
        opt_dec.step()
        losses.append(loss.data[0])
    # データセットに対して一通り計算したら現在の損失関数の値や翻訳結果を表示
    enc.eval()
    dec.eval()
    print('===================================================================================')
    print(epoc, mean(losses))
    print(translate('I am a student.', enc, dec, max_len=max_len))
    print(translate('He likes to eat pizza.', enc, dec, max_len=max_len))
    print(translate('She is my mother.', enc, dec, max_len=max_len))

0 47.569277028376746
estoy un
él no gusta que a
ella está mi mi
1 37.78607394313754
soy estudiante
a gusta gusta gusta gusta
ella está mi mi
2 33.177613713856786
soy un estudiante
le gusta le gusta comer
ella está mi mi
3 30.027399156665158
soy estudiante
a gusta gusta gusta pizza comer
ella está mi madre
