In [None]:
# To unzip notMNIST data folder, make sure the directory in which the code is run has the notMNIST_small.zip file
from zipfile import *
zip = ZipFile('/content/notMNIST_small.zip')
zip.extractall()

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from PIL import Image
import os

import tensorflow as tf
#from tensorflow.python.keras.datasets import mnist
#from tensorflow.examples.tutorials.mnist import input_data
from tensorflow.contrib.eager.python import tfe
import matplotlib.pyplot as plt
import time

In [None]:
######################################### notMNIST Data ###############################################
class notMNIST:
    def __init__(self):
        images, labels = [], []

        for i, letter in enumerate(['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']):
            directory = 'notMNIST_small/%s/' % letter
            files = os.listdir(directory)
            label = np.array([0]*10)
            label[i] = 1
            for file in files:
                try:
                    im = Image.open(directory+file)
                except:
                    #print ("Skip a corrupted file: " + file)
                    continue
                pixels = np.array(im.convert('L').getdata())
                images.append(pixels/255.0)
                labels.append(label)
          
        train_images, test_images, train_labels, test_labels =  train_test_split(images, labels, test_size=0.2, random_state=0)
        
        class train:
            def __init__(self):
                self.images = []
                self.labels = []
                self.batch_counter = 0
                
            def next_batch(self, num):
                if self.batch_counter + num >= len(self.labels):
                    batch_images = self.images[self.batch_counter:]
                    batch_labels = self.labels[self.batch_counter:]
                    left = num - len(batch_labels)
                    batch_images.extend(self.images[:left])
                    batch_labels.extend(self.labels[:left])
                    self.batch_counter = left
                else:
                    batch_images = self.images[self.batch_counter:self.batch_counter+num]
                    batch_labels = self.labels[self.batch_counter:self.batch_counter+num]                  
                    self.batch_counter += num
                    
                return (batch_images, batch_labels)
                    
        class test:
            def __init__(self):
                self.images = []
                self.labels = []
                
        self.train = train()
        self.test = test()
                
        self.train.images = train_images
        self.train.labels = train_labels
        self.test.images = test_images
        self.test.labels = test_labels

In [None]:
"""
Routine to create RNN Cells in Tensorflow 2.0 using eager execution.
Code adapted from Google AI Language Team
"""
tf.enable_eager_execution()
tf.set_random_seed(0)
np.random.seed(0)

In [None]:
######################################### LSTM Model ###############################################
class BasicLSTM(tf.keras.Model):
    def __init__(self, units, return_sequence=False, return_states=False, **kwargs):
        super(BasicLSTM, self).__init__(**kwargs)
        self.units = units
        self.return_sequence = return_sequence
        self.return_states = return_states

        def bias_initializer(_, *args, **kwargs):
            # Unit forget bias from the paper
            # - [Learning to forget: Continual prediction with LSTM](http://www.mitpressjournals.org/doi/pdf/10.1162/089976600300015015)
            return tf.keras.backend.concatenate([
                tf.keras.initializers.Zeros()((self.units,), *args, **kwargs),  # input gate
                tf.keras.initializers.Ones()((self.units,), *args, **kwargs),  # forget gate
                tf.keras.initializers.Zeros()((self.units * 2,), *args, **kwargs),  # context and output gates
            ])

        self.kernel = tf.keras.layers.Dense(4 * units, use_bias=False)
        self.recurrent_kernel = tf.keras.layers.Dense(4 * units, kernel_initializer='glorot_uniform', bias_initializer=bias_initializer)

    def call(self, inputs, training=None, mask=None, initial_states=None):
        # LSTM Cell in pure TF Eager code
        # reset the states initially if not provided, else use those
        if initial_states is None:
            h_state = tf.zeros((inputs.shape[0], self.units))
            c_state = tf.zeros((inputs.shape[0], self.units))
        else:
            assert len(initial_states) == 2, "Must pass a list of 2 states when passing 'initial_states'"
            h_state, c_state = initial_states

        h_list = []
        c_list = []

        for t in range(inputs.shape[1]):
            # LSTM gate steps
            ip = inputs[:, t, :]
            z = self.kernel(ip)
            z += self.recurrent_kernel(h_state)

            z0 = z[:, :self.units]
            z1 = z[:, self.units: 2 * self.units]
            z2 = z[:, 2 * self.units: 3 * self.units]
            z3 = z[:, 3 * self.units:]

            # gate updates
            i = tf.keras.activations.sigmoid(z0)
            f = tf.keras.activations.sigmoid(z1)
            c = f * c_state + i * tf.nn.tanh(z2)

            # state updates
            o = tf.keras.activations.sigmoid(z3)
            h = o * tf.nn.tanh(c)

            h_state = h
            c_state = c

            h_list.append(h_state)
            c_list.append(c_state)

        hidden_outputs = tf.stack(h_list, axis=1)
        hidden_states = tf.stack(c_list, axis=1)

        if self.return_states and self.return_sequence:
            return hidden_outputs, [hidden_outputs, hidden_states]
        elif self.return_states and not self.return_sequence:
            return hidden_outputs[:, -1, :], [h_state, c_state]
        elif self.return_sequence and not self.return_states:
            return hidden_outputs
        else:
            return hidden_outputs[:, -1, :]

class LSTM(tf.keras.Model):
    def __init__(self, num_units, num_classes):
        super(LSTM, self).__init__()
        self.units = num_units
        self.LSTM = BasicLSTM(num_units)
        self.classifier = tf.keras.layers.Dense(num_classes)
    
    def call(self, inputs, training = None, mask = None):
        h = self.LSTM(inputs)
        output = self.classifier(h)
        
        with tf.device('/cpu:0'):
            output =tf.nn.softmax(output)
        
        return output

In [None]:
######################################### GRU Model ###############################################
class BasicGRU(tf.keras.Model):
    def __init__(self, units, return_sequence=False, return_states=False, **kwargs):
        super(BasicGRU, self).__init__(**kwargs)
        self.units = units
        self.return_sequence = return_sequence
        self.return_states = return_states

        def bias_initializer(_, *args, **kwargs):
            # Unit forget bias from the paper
            # - [Learning to forget: Continual prediction with LSTM](http://www.mitpressjournals.org/doi/pdf/10.1162/089976600300015015)
            return tf.keras.backend.concatenate([
                tf.keras.initializers.Zeros()((self.units,), *args, **kwargs),  # input gate
                tf.keras.initializers.Ones()((self.units,), *args, **kwargs),  # forget gate
                tf.keras.initializers.Zeros()((self.units * 2,), *args, **kwargs),  # context and output gates
            ])

        self.kernel = tf.keras.layers.Dense(4 * units, use_bias=False)
        self.recurrent_kernel = tf.keras.layers.Dense(4 * units, kernel_initializer='glorot_uniform', bias_initializer=bias_initializer)

    def call(self, inputs, training=None, mask=None, initial_states=None):
        # LSTM Cell in pure TF Eager code
        # reset the states initially if not provided, else use those
        if initial_states is None:
            h_state = tf.zeros((inputs.shape[0], self.units))
            #c_state = tf.zeros((inputs.shape[0], self.units))
        else:
            assert len(initial_states) == 2, "Must pass a list of 2 states when passing 'initial_states'"
            h_state = initial_states

        h_list = []
        #c_list = []

        for t in range(inputs.shape[1]):
            
            ip = inputs[:, t, :]
            z = self.kernel(ip)
            #z += self.recurrent_kernel(h_state)

            z0 = z[:, :self.units]
            z1 = z[:, self.units: 2 * self.units]
            z2 = z[:, 2 * self.units: 3 * self.units]
            
            z_1 = self.recurrent_kernel(h_state)
            z0 += z_1[:, :self.units]
            z1 += z[:, self.units: 2 * self.units]

            zt = tf.keras.activations.sigmoid(z0)
            r = tf.keras.activations.sigmoid(z1)

            z_2 = self.recurrent_kernel(h_state * r)
            z2 += z_2[:, 2 * self.units: 3 * self.units]

            s_tilde = tf.nn.tanh(z2)
            h = (1 - zt) * h_state + (zt * s_tilde)

            h_state = h
            #c_state = c

            h_list.append(h_state)
            #c_list.append(c_state)

        hidden_outputs = tf.stack(h_list, axis=1)
        #hidden_states = tf.stack(c_list, axis=1)

        return hidden_outputs[:, -1, :]

class GRU(tf.keras.Model):
    def __init__(self, num_units, num_classes):
        super(GRU, self).__init__()
        self.units = num_units
        self.gru = BasicGRU(num_units)
        self.classifier = tf.keras.layers.Dense(num_classes)
    
    def call(self, inputs, training = None, mask = None):
        h = self.gru(inputs)
        output = self.classifier(h)
        
        with tf.device('/cpu:0'):
            output =tf.nn.softmax(output)
        
        return output

In [None]:
######################################### MGU Model ###############################################
class BasicMGU(tf.keras.Model):
    def __init__(self,num_hidden, return_sequence=False, return_states=False, **kwargs):
        super(BasicMGU, self).__init__(**kwargs)
        self.units            = num_hidden
        self.return_sequence  = return_sequence
        self.return_states    = return_states

        def bias_initializer(_, *args, **kwargs):
            return tf.keras.backend.concatenate([
                tf.keras.initializers.Zeros()((self.units,), *args, **kwargs), 
                tf.keras.initializers.Ones()((self.units,), *args, **kwargs),  
            ])

        self.kernel           = tf.keras.layers.Dense(2 * num_hidden, use_bias=False)
        self.recurrent_kernel = tf.keras.layers.Dense(2 * num_hidden, kernel_initializer='glorot_uniform', bias_initializer=bias_initializer)

    def call(self, inputs, training=None, mask=None, initial_states=None):
        if initial_states is None:
            h_state = tf.zeros((inputs.shape[0], self.units))
        else:
            assert len(initial_states) == 2, "Must pass a list of 2 states when passing 'initial_states'"
            h_state = initial_states

        h_list = []
        #print('hello')
        for t in range(inputs.shape[1]):
            
            ip = inputs[:, t, :]
            z = self.kernel(ip)
            z0 = z[:, :self.units]
            z1 = z[:, self.units: 2 * self.units]
      
            z_0 = self.recurrent_kernel(h_state)
            z0 += z_0[:, :self.units]
  
            f = tf.keras.activations.sigmoid(z0)

            z_1 = self.recurrent_kernel(h_state * f)
            z1 += z_1[:, self.units: 2 * self.units]

            stilde = tf.nn.tanh(z1)

            h = ((1 - f) * h_state) + (f * stilde)

            h_state = h

            h_list.append(h_state)

        hidden_outputs = tf.stack(h_list, axis=1)

        return hidden_outputs[:, -1, :]

class MGU(tf.keras.Model):
    def __init__(self,units,num_classes):
        super(MGU,self).__init__()
        self.units = units
        self.mgu = BasicMGU(units)
        self.classifier = tf.keras.layers.Dense(num_classes)

    def call(self,inputs,training=None,mask=None):
        h = self.mgu(inputs)
        output = self.classifier(h)

        with tf.device('/cpu:0'):
            output = tf.nn.softmax(output)
        return output

In [None]:
# Parameters
num_inputs = 28
num_timesteps = 28
num_hiddenunits = 128
num_classes = 10

num_epochs = 10
batch_size = 64
learning_rate = 0.01

In [None]:
def loss(model,X,Y):
    logits = model(tf.convert_to_tensor(X, tf.float32))
    return tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = logits, labels = Y))

In [None]:
def accuracy(y_pred, y_true):
    predicted = tf.nn.softmax(y_pred)
    correct_prediction = tf.equal(tf.argmax(predicted,1), tf.argmax(y_true,1))
    return tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

In [None]:
optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate)
gradients = tfe.implicit_value_and_gradients(loss)

#model = LSTM(num_hiddenunits, num_classes)
#model = GRU(num_hiddenunits, num_classes)
model = MGU(num_hiddenunits, num_classes)

In [None]:
#data = input_data.read_data_sets("/tmp/data/", one_hot=True)
data = notMNIST()

train_ds = tf.data.Dataset.from_tensor_slices((data.train.images, data.train.labels)).map(lambda x, y: (x, tf.cast(y, tf.float32)))\
          .shuffle(buffer_size=1000)\
          .batch(batch_size=64)\

X_test = np.array(data.test.images[:]).reshape([-1, 28, 28])
y_test = data.test.labels[:]

time_start = time.time()

In [None]:
######################################### Training Procedure ###############################################
for epoch in range(num_epochs):
    train_accuracy = []
    test_accuracy = []
    print("Epoch: {}".format(epoch+1))
    total_loss = tfe.Variable(0, dtype = tf.float32)
    for step, (image_batch, label_batch) in enumerate(tfe.Iterator(train_ds)):
        image_batch, label_batch = data.train.next_batch(batch_size)
        image_batch = np.array(image_batch).reshape([batch_size, 28, 28])
        loss, grads_and_vars = gradients(model, image_batch, label_batch)
        optimizer.apply_gradients(grads_and_vars)
        #total_loss += loss
        if (step%50 == 0):
            print("Step: {} Loss: {:.4f}".format(step, loss))
            train_images = model(tf.convert_to_tensor(image_batch, tf.float32))
            train_accuracy.append(accuracy(train_images,label_batch))
            test_images = model(tf.convert_to_tensor(X_test, tf.float32))
            test_accuracy.append(accuracy(test_images, y_test))
            
            
plt.plot(train_accuracy, label ='train')
plt.ylim(0,1)
plt.plot(test_accuracy, label = 'test')
plt.ylim(0,1)
plt.ylabel('Accuracy')
plt.title('Training vs Test Curve')
plt.legend()
plt.show()

print("Loss:{:.4f}".format(loss))

test_images = model(tf.convert_to_tensor(X_test, tf.float32))
print("Test Accuracy :{:.4f}".format(accuracy(test_images, y_test)))

time_taken = time.time() - time_start
print('\nTotal time taken (in seconds): {:.2f}'.format(time_taken))