<a href="https://colab.research.google.com/github/ShinAsakawa/ShinAsakawa.github.io/blob/master/2022notebooks/2022_0201Onomatopea_generator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import sys
import numpy as np
import random

# 本ファイルを Google Colaboratory 上で実行する場合に，必要となるライブラリをインストール
import platform
isColab = platform.system() == 'Linux'
if isColab:
    !git clone https://github.com/ShinAsakawa/ccap.git
    !pip install japanize_matplotlib > /dev/null 2>&1
    !pip install jaconv > /dev/null 2>&1


In [None]:
%reload_ext autoreload
%autoreload 2

# 上で git clone したリポジトリ中から オノマトペ クラスを輸入
from ccap import onomatope

O = onomatope.Onomatopea() # 以降オノマトペクラスを O とする
MAX_LENGTH = O.orth_max_length if O.orth_max_length > O.phon_max_length else O.phon_max_length
MAX_LENGTH += 1

In [None]:
#O.draw_phoneme_freq()
#O.draw_grapheme_freq()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#データセットのためのクラスを定義
class onmtpDataset(torch.utils.data.Dataset):
    def __init__(self, encoder):
        self.encoder = encoder
        
    def __getitem__(self, idx):
        return self.encoder(idx)['orth_ids'], self.encoder(idx)['phon_ids'], self.encoder.vocab[idx]
    
    def __len__(self):
        return self.encoder.__len__()


class EncoderRNN(nn.Module):
    """RNNによる符号化器"""
    def __init__(self, input_size, hidden_size):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, input, hidden):
        embedded = self.embedding(input).view(1, 1, -1)
        output = embedded
        output, hidden = self.gru(output, hidden)
        return output, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)


class AttnDecoderRNN(nn.Module):
    """注意付き復号化器の定義"""
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length

        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)

    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)

        attn_weights = F.softmax(
            self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                 encoder_outputs.unsqueeze(0))

        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)

        output = F.relu(output)
        output, hidden = self.gru(output, hidden)

        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size, device=device)

In [None]:
def tensorFromIds(sentence_ids):
    return torch.tensor(sentence_ids, dtype=torch.long, device=device).view(-1, 1)

teacher_forcing_ratio = 0.5  # 訳注：教師強制率。文献によっては，訓練中にこの値を徐々に減衰させることも行われます

def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH):
    encoder_hidden = encoder.initHidden() # 符号化器の中間層を初期化
    encoder_optimizer.zero_grad()         # 符号化器の最適化関数の初期化
    decoder_optimizer.zero_grad()         # 復号化器の最適化関数の初期化

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)
    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(
            input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]

    decoder_input = torch.tensor([[O.orthography.index('<SOW>')]], device=device)
    decoder_hidden = encoder_hidden
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
    if use_teacher_forcing:
        # Teacher forcing: Feed the target as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # Teacher forcing

    else:
        # Without teacher forcing: use its own predictions as the next input
        for di in range(target_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            topv, topi = decoder_output.topk(1)
            decoder_input = topi.squeeze().detach()  # detach from history as input

            loss += criterion(decoder_output, target_tensor[di])
            #if decoder_input.item() == EOS_token:
            if decoder_input.item() == O.phonology.index('<EOW>'):
                break

    loss.backward()
    encoder_optimizer.step()
    decoder_optimizer.step()

    return loss.item() / target_length

In [None]:
import time
import math

def asMinutes(s):
    """時間変数を見やすいように，分と秒に変換して返す"""
    m = math.floor(s / 60)
    s -= m * 60
    return f'{int(m):2d}分 {int(s):2d}秒'
    return '%dm %ds' % (m, s)


def timeSince(since, percent):
    """開始時刻 since と，現在の処理が全処理中に示す割合 percent を与えて，経過時間と残り時間を計算して表示する"""
    now = time.time()  #現在時刻を取得
    s = now - since    # 開始時刻から現在までの経過時間を計算
    #s = since - now    
    es = s / (percent) # 経過時間を現在までの処理割合で割って終了予想時間を計算
    rs = es - s        # 終了予想時刻から経過した時間を引いて残り時間を計算
    #return '%s (- %s)' % (asMinutes(s), asMinutes(rs))
    return f'経過時間:{asMinutes(s)} (残り時間 {asMinutes(rs)})'

In [None]:
import matplotlib.pyplot as plt
import japanize_matplotlib
#backend が 'agg' だと動作しない場合があるのでコメントアウト
#plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np

def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)

In [None]:
def fit(encoder, decoder, epochs=20, lr=0.01):
    start_time = time.time()
    
    encoder.train()
    decoder.train()
    
    encoder_optimizer = optim.SGD(encoder.parameters(), lr=lr)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=lr)
    criterion = nn.NLLLoss()
    losses = []

    for epoch in range(epochs):
        epoch_loss = 0
        
        #エポックごとに学習順をシャッフルする
        learning_order = np.random.permutation(len(O.vocab)) 
        #for i in tqdm(range(len(O.vocab)),desc='train'):
        for i in range(len(O.vocab)):
            x = learning_order[i]           # ランダムにデータを取り出す 
            inputs = O.tokenize(O.vocab[x]) # オノマトペを 1 つトークン化
            input_tensor = tensorFromIds(inputs['input_ids'])
            target_tensor = tensorFromIds(inputs['teach_ids'])
            
            #訓練の実施
            loss = train(input_tensor, target_tensor, 
                         encoder, decoder, encoder_optimizer, decoder_optimizer, 
                         criterion)
            epoch_loss += loss
            #tqdm.set_description(f'[エポック {epoch+1}]')
            #tqdm.set_description("[Epoch %d]" % (i + 1))
        
        losses.append(epoch_loss/len(O.vocab))
        print(f'エポック:{epoch:2d} 損失:{epoch_loss/len(O.vocab):.2f} {timeSince(start_time, (epoch+1) * len(O.vocab)/(epochs * len(O.vocab)))}')
            
    showPlot(losses)

In [None]:
%%time
hidden_size = 256
encoder1 = EncoderRNN(len(O.orthography), hidden_size).to(device)
attn_decoder1 = AttnDecoderRNN(hidden_size, len(O.phonology), dropout_p=0.1).to(device)

fit(encoder1, attn_decoder1, epochs=10)

In [None]:
def evaluate(encoder, decoder, input_ids, max_length=MAX_LENGTH):
    with torch.no_grad():
        input_tensor = tensorFromIds(input_ids)
        input_length = input_tensor.size()[0]
        encoder_hidden = encoder.initHidden()

        encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

        for ei in range(input_length):
            encoder_output, encoder_hidden = encoder(input_tensor[ei],
                                                     encoder_hidden)
            encoder_outputs[ei] += encoder_output[0, 0]

        decoder_input = torch.tensor([[O.phonology.index('<SOW>')]], device=device)
        decoder_hidden = encoder_hidden

        decoded_words = []
        decoder_attentions = torch.zeros(max_length, max_length)

        for di in range(max_length):
            decoder_output, decoder_hidden, decoder_attention = decoder(
                decoder_input, decoder_hidden, encoder_outputs)
            decoder_attentions[di] = decoder_attention.data
            topv, topi = decoder_output.data.topk(1)
            if topi.item() == O.phonology.index('<EOW>'):
                decoded_words.append('<EOW>')
                break
            else:
                decoded_words.append(O.phonology[topi.item()])

            decoder_input = topi.squeeze().detach()

        return decoded_words, decoder_attentions[:di + 1]

In [None]:
def evaluateRandomly(encoder, decoder, n=5):
    for x in np.random.randint(len(O.vocab), size=n):
        word = O.vocab[x]
        _x = O.tokenize(word)
        input_ids, teach_ids = _x['input_ids'], _x['teach_ids']
        print(f'入力: {input_ids} :{word}')
        print(f'正解: {teach_ids}')
        output_words, attentions = evaluate(encoder, decoder, input_ids)
        print(f'出力:  {", ".join(str(O.phoneme.index(c)) for c in output_words)}',
              f'{output_words}')
        print()

In [None]:
evaluateRandomly(encoder1, attn_decoder1, n=10)