In [10]:
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

class QRNN():

    def __init__(self, in_size, size, conv_size=2):
        self.kernel = None
        self.batch_size = -1
        self.conv_size = conv_size
        self.c = None
        self.h = None
        self._x = None
        self.kernel = QRNNConvolution(in_size, size, conv_size)

    def _step(self, f, z, o):
        with tf.variable_scope("fo-Pool"):
            # f,z,o is batch_size x size
            self.c = tf.multiply(f, self.c) + tf.multiply(1 - f, z)
            self.h = tf.multiply(o, self.c)  # h is size vector
        return self.h

    def forward(self, x):
        length = lambda mx: int(mx.get_shape()[0])

        with tf.variable_scope("QRNN/Forward"):
            if self.c is None:
                # init context cell
                self.c = tf.zeros([length(x), self.kernel.size], dtype=tf.float32)

            c_f, c_z, c_o = self.kernel.conv(x)
            for i in range(length(c_f)):
                f, z, o = c_f[i], c_z[i], c_o[i]
                f = tf.sigmoid(f)
                z = tf.tanh(z)
                o = tf.sigmoid(o)
                self._step(f, z, o)

        return self.h

class QRNNConvolution():

    def __init__(self, in_size, size, conv_size):
        self.in_size = in_size
        self.size = size
        self.conv_size = conv_size
        self._weight_size = self.size * 3  # z, f, o

        with tf.variable_scope("QRNN/Variable/Convolution"):
            initializer = tf.random_normal_initializer()
            self.conv_filter = tf.get_variable("conv_filter", [conv_size, in_size, self._weight_size], initializer=initializer)

    def conv(self, x):
        # !! x is batch_size x sentence_length x word_length(=channel) !!
        _weighted = tf.nn.conv1d(x, self.conv_filter, stride=1, padding="SAME", data_format="NWC")

        # _weighted is batch_size x conved_size x output_channel
        _w = tf.transpose(_weighted, [1, 0, 2])  # conved_size x  batch_size x output_channel
        _ws = tf.split(_w, num_or_size_splits=3, axis=2) # make 3(f, z, o) conved_size x  batch_size x size
        return _ws

In [13]:
import os
import unittest
import time
import functools
import numpy as np
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score

def measure_time(func):
    @functools.wraps(func)
    def _measure_time(*args, **kwargs):
        start = time.time()
        func(*args, **kwargs)
        elapse = time.time() - start
        print("takes {} seconds.".format(elapse))
    return _measure_time

class TestQRNNWork(unittest.TestCase):

    #@measure_time
    def test_qrnn(self):
        print("QRNN Working check")
        with tf.Graph().as_default() as qrnn:
            self.experiment(qrnn, qrnn=5)


    def experiment(self, graph, qrnn=-1, baseline=False, random=False):
        digits = load_digits()
        horizon, vertical, n_class = (8, 8, 10)  # 8 x 8 image, 0~9 number(=10 class)
        size = 128  # state vector size
        batch_size = 128
        images = digits.images / np.max(digits.images)  # simple normalization
        target = np.array([[1 if t == i else 0 for i in range(n_class)] for t in digits.target])  # to 1 hot vector
        learning_rate = 0.001
        train_iter = 1000

        with tf.name_scope("placeholder"):
            X = tf.placeholder(tf.float32, [batch_size, vertical, horizon])
            y = tf.placeholder(tf.float32, [batch_size, n_class])

        pred = self.qrnn_forward(X, size, n_class, batch_size, conv_size=qrnn)
        
        with tf.name_scope("optimization"):
            loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=pred, labels=y))
            optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)

        with tf.name_scope("evaluation"):
            correct_pred = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
            accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            for i in range(train_iter):
                indices = np.random.randint(len(digits.target) - batch_size, size=batch_size)
                _X = images[indices]
                _y = target[indices]
                sess.run(optimizer, feed_dict={X: _X, y: _y})

                if i % 100 == 0:
                    _loss, _accuracy = sess.run([loss, accuracy], feed_dict={X: _X, y: _y})
                    print("Iter {}: loss={}, accuracy={}".format(i, _loss, _accuracy))

            with tf.name_scope("test-evaluation"):
                acc = sess.run(accuracy, feed_dict={X: images[-batch_size:], y: target[-batch_size:]})
                print("Testset Accuracy={}".format(acc))


    def qrnn_forward(self, X, size, n_class, batch_size, conv_size):
        in_size = int(X.get_shape()[2])

        qrnn = QRNN(in_size=in_size, size=size, conv_size=conv_size)
        hidden = qrnn.forward(X)

        with tf.name_scope("QRNN-Classifier"):
            W = tf.Variable(tf.random_normal([size, n_class]), name="W")
            b = tf.Variable(tf.random_normal([n_class]), name="b")
            output = tf.add(tf.matmul(hidden, W), b)

        return output

if __name__ == "__main__":
    unittest.main(argv=['first-arg-is-ignored'], exit=False)