In [1]:
import numpy as np
import common.time_layers as TL


class Encoder:
    def __init__(self, vocab_size: int, wordvec_size: int, hidden_size: int) -> None:
        V, D, H = vocab_size, wordvec_size, hidden_size
        normal = np.random.normal
        mu = 0
        embed_W = normal(mu, 1/100, (V, D))
        lstm_Wx = normal(mu, 1/np.sqrt(D), (D, 4*H))
        lstm_Wh = normal(mu, 1/np.sqrt(H), (H, 4*H))
        lstm_b = np.zeros(4*H, dtype=np.float_)

        self.embed = TL.TimeEmbedding(embed_W)
        self.lstm = TL.TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=False)

        self.params = self.embed.params
        self.params.extend(self.lstm)
        self.grads = self.embed.grads
        self.grads.extend(self.lstm.grads)
        self.hs = None

    def forward(self, xs):
        xs = self.embed.forward(xs)
        hs = self.lstm.forward(xs)
        self.hs = hs
        return hs[:, -1, :]

    def backward(self, dh):
        dhs = np.zeros_like(self.hs)
        dhs[:, -1, :] = dh
        dout = self.lstm.backward(dhs)
        dout = self.embed.backward(dout)
        return dout


In [2]:
class Decoder:
    def __init__(self, vocab_size, wordvec_size, hiddne_size) -> None:
        V, H, D = vocab_size, wordvec_size, hiddne_size
        normal = np.random.normal
        mu = 0
        embed_W = normal(mu, 1/100, (V, D))
        lstm_Wx = normal(mu, 1/np.sqrt(D), (D, 4*H))
        lstm_Wh = normal(mu, 1/np.sqrt(H), (H, 4*H))
        lstm_b = np.zeros(4*H, dtype=np.float_)
        affine_W = normal(mu, 1/H**0.5, (H, V))
        affine_b = np.zeros(V, dtype=np.float_)

        self.embed = TL.TimeEmbedding(embed_W)
        self.lstm = TL.TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
        self.affine = TL.TimeAffine(affine_W, affine_b)

        self.params, self.grads = [], []

        for layer in (self.embed, self.lstm, self.affine):
            self.params .extend(layer.params)
            self.grads.extend(layer.grads)

    def forward(self, xs, h):
        self.lstm.set_state(h)

        out = self.embed.forward(xs)
        out = self.lstm.forward(out)
        score = self.affine.forward(out)
        return score

    def backward(self, dscore):
        dout = self.affine.backward(dscore)
        dout = self.lstm.backward(dout)
        dout = self.embed.backward(dout)
        dh = self.lstm.dh
        return dh

    def generate(self, h, start_id, sample_size):
        sampled = []
        sample_id = start_id
        self.lstm.set_state(h)

        for _ in range(sample_size):
            x = np.array(sample_id).reshape((1, 1))
            out = self.embed.forward(x)
            out = self.lstm.forward(out)
            score = self.affine.forward(out)

            sample_id = np.argmax(score.flatten())
            sampled.append(int(sample_id))

        return sampled


In [3]:
from common.base_model import BaseModel
class Seq2seq(BaseModel):
    def __init__(self, vocab_size, wordvec_size, hidden_size):
        V, D, H = vocab_size, wordvec_size, hidden_size
        self.encoder = Encoder(V, D, H)
        self.decoder = Decoder(V, D, H)
        self.softmax = TL.TimeSoftmaxWithLoss()

        self.params = self.encoder.params + self.decoder.params
        self.grads = self.encoder.grads + self.decoder.grads

    def forward(self, xs, ts):
        decoder_xs, decoder_ts = ts[:, :-1], ts[:, 1:]

        h = self.encoder.forward(xs)
        score = self.decoder.forward(decoder_xs, h)
        loss = self.softmax.forward(score, decoder_ts)
        return loss

    def backward(self, dout=1):
        dout = self.softmax.backward(dout)
        dh = self.decoder.backward(dout)
        dout = self.encoder.backward(dh)
        return dout

    def generate(self, xs, start_id, sample_size):
        h = self.encoder.forward(xs)
        sampled = self.decoder.generate(h, start_id, sample_size)
        return sampled

In [5]:
import numpy as np
import matplotlib.pyplot as plt
from dataset import sequence
from common.optimizers import Adam
from common.trainer import Trainer
from common.util import eval_seq2seq


# データセットの読み込み
(x_train, t_train), (x_test, t_test) = sequence.load_data('addition.txt')
char_to_id, id_to_char = sequence.get_vocab()

# Reverse input? =================================================
is_reverse = False  # True
if is_reverse:
    x_train, x_test = x_train[:, ::-1], x_test[:, ::-1]
# ================================================================

# ハイパーパラメータの設定
vocab_size = len(char_to_id)
wordvec_size = 16
hidden_size = 128
batch_size = 128
max_epoch = 25
max_grad = 5.0

# Normal or Peeky? ==============================================
model = Seq2seq(vocab_size, wordvec_size, hidden_size)
# model = PeekySeq2seq(vocab_size, wordvec_size, hidden_size)
# ================================================================
optimizer = Adam()
trainer = Trainer(model, optimizer)

acc_list = []
for epoch in range(max_epoch):
    trainer.fit(x_train, t_train, max_epoch=1,
                batch_size=batch_size, max_grad=max_grad)

    correct_num = 0
    for i in range(len(x_test)):
        question, correct = x_test[[i]], t_test[[i]]
        verbose = i < 10
        correct_num += eval_seq2seq(model, question, correct,
                                    id_to_char, verbose, is_reverse)

    acc = float(correct_num) / len(x_test)
    acc_list.append(acc)
    print('val acc %.3f%%' % (acc * 100))

# グラフの描画
x = np.arange(len(acc_list))
plt.plot(x, acc_list, marker='o')
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.ylim(0, 1.0)
plt.show()

No file: ./dataset/addition.txt


TypeError: cannot unpack non-iterable NoneType object