# Text generator based on RNN

### Import libraries

In [1]:
import tensorflow as tf
import numpy as np
import random

### Configurations & data encoding

In [2]:
vocab = (" $%'()+,-./0123456789:;=?ABCDEFGHIJKLMNOPQRSTUVWXYZ"
            "\\^_abcdefghijklmnopqrstuvwxyz{|}\n")
graphPath=r".\graphs"
testTextPath=r"..\DataSet\arvix_abstracts.txt"
batchSize=1
modelParamPath=r".\model_checkpoints"
def Encode(string):
    tensor=[vocab.find(ch)+1 for ch in string]
    tensor=tf.one_hot(tensor,depth=len(vocab)+1,on_value=1.0,off_value=0.0,axis=-1, dtype=tf.float32)
    with tf.Session() as sess:
        nparray=tensor.eval()
    return nparray
def Decode(nparray):
    return "".join([vocab[index-1] if index>0 else "[INVALID]" for index in np.argmax(nparray,axis=1)])

### Define model class

In [3]:
class MyRNN():
    def __init__(self,inputVectorDim,modelCheckpointPath,stateSize=100,scope="RNN",outputVectorDim=None):
        if outputVectorDim==None:
            outputVectorDim=inputVectorDim
        self.__inputVectorDim=inputVectorDim
        self.__outputVetorDim=outputVectorDim
        self.__stateSize=stateSize
        self.__mdlCkptPath=modelCheckpointPath
        with tf.variable_scope(scope) as modelScope:
            with tf.variable_scope("structure") as structureScope:
                #One batch at a time
                self.inputs=tf.placeholder(dtype=tf.float32,shape=[None,inputVectorDim])
                self.outputs=tf.placeholder(dtype=tf.float32,shape=[None,outputVectorDim])
                self.__cell=tf.contrib.rnn.GRUCell(num_units=stateSize)
                self.initState = tf.placeholder_with_default(self.__cell.zero_state(batch_size=1,dtype=tf.float32),
                                                        shape=[None,self.__stateSize])
                #S20:State to output
                self.__wS2O=tf.get_variable(name="Weight_s2o",shape=[stateSize,outputVectorDim],
                                            initializer=tf.truncated_normal_initializer())
                self.__bS2O=tf.get_variable(name="Bias_s2o",shape=[1,outputVectorDim],
                                            initializer=tf.truncated_normal_initializer())
                #add a "batch dimension" to the front
                cellInputs=tf.reshape(tensor=self.inputs,shape=[1,-1,inputVectorDim])
                self.__stateOutput,self.finalState=tf.nn.dynamic_rnn(cell=self.__cell,inputs=cellInputs,
                                                     dtype=tf.float32,initial_state=self.initState)
                outputs=tf.reshape(tensor=self.__stateOutput, shape=[-1, stateSize])#Remove the batch dimension ((
                self.netOutputs=tf.add(tf.matmul(outputs,self.__wS2O),self.__bS2O)
            with tf.name_scope("training"):
                loss=tf.losses.softmax_cross_entropy(logits=self.netOutputs,onehot_labels=self.outputs)
                loss=tf.reduce_mean(loss,name="loss")
                self.globalStep=tf.Variable(0,dtype=tf.int32,trainable=False,name='globalStep')
                self.optimizer=tf.train.AdamOptimizer(learning_rate=0.001,
                                                      name="optimizer").minimize(loss,global_step=self.globalStep)
            with tf.name_scope("summary") as sumScope:
                lossSum=tf.summary.scalar(tensor=loss,name="loss")
                self.summary_op=tf.summary.merge(tf.get_collection(tf.GraphKeys.SUMMARIES,sumScope))

    def onlineInference(self,timeSteps=100,state=None,seed=None,modelParamPath=None):
        feedDict={}
        saver=tf.train.Saver()
        if modelParamPath==None:
            modelParamPath=self.__mdlCkptPath
        if state!=None:
            feedDict[self.initState]=state
        if seed==None:
            #Create a random seed
            seed=tf.one_hot([random.randint(0,self.__inputVectorDim)],
                            depth=self.__inputVectorDim,on_value=1.0,off_value=0.0,axis=-1,dtype=tf.float32)
            with tf.Session() as sess:
                seed=sess.run(seed)
        feedDict[self.inputs]=seed
        seq=[]
        with tf.Session() as sess:
            ckpt = tf.train.get_checkpoint_state(self.__mdlCkptPath)
            print("Loading model")
            if ckpt and ckpt.model_checkpoint_path:  
                saver.restore(sess,ckpt.model_checkpoint_path)
                print("Successfully loaded")
            for length in range(timeSteps):
                input_,state=sess.run(fetches=[self.netOutputs,self.finalState],
                                      feed_dict=feedDict)
                #Since the input vectors are discrete, so we need to convert it into a
                #new one hot vector
                input_=tf.one_hot(indices=[int(np.argmax(input_[0]))],depth=self.__outputVetorDim,
                                  on_value=1.0,off_value=0.0,axis=-1,dtype=tf.float32).eval()
                seq.append(input_)
                feedDict[self.inputs],feedDict[self.initState]=input_,state
            seq=tf.concat(values=seq,axis=0).eval()
        return seq,state

### Make an instance

In [4]:
Test=MyRNN(scope="RNN",inputVectorDim=len(vocab)+1,modelCheckpointPath=modelParamPath,stateSize=200)

### Training

In [None]:
n_epochs=6
with tf.Session() as sess:
    writer=tf.summary.FileWriter(logdir=graphPath,graph=sess.graph)
    saver=tf.train.Saver()
    sess.run(tf.global_variables_initializer())
    ckpt=tf.train.get_checkpoint_state(modelParamPath)
    if ckpt and ckpt.model_checkpoint_path:
        saver.restore(sess,ckpt.model_checkpoint_path)
        iteration=Test.globalStep.eval()
        print("Successfully loaded session at iteration {}. Continue training now. ".format(iteration))
    else:
        iteration=0
    for i in range(n_epochs):
        with open(testTextPath,'r') as fileObj:
            feedDict={}
            for line in fileObj:
                sample=line.strip()
                if len(sample)<2:
                    continue
                X=Encode(sample) 
                Y=X[1:]
                X=X[:-1]
                iteration+=1
                feedDict[Test.inputs]=X
                feedDict[Test.outputs]=Y
                _,summary,state=sess.run([Test.optimizer,Test.summary_op,Test.finalState],feed_dict=feedDict)
                if (iteration+1)%50==0:
                    writer.add_summary(summary,global_step=iteration)
                    saver.save(sess=sess,save_path=modelParamPath+r"\rnnMDL",global_step=iteration)
        saver.save(sess=sess,save_path=modelParamPath+r"\rnnMDL",global_step=iteration)
        result=sess.run(Test.netOutputs,{Test.inputs:X})#See what happens if fed a zero state
    print("Session terminated. ")
    writer.close()
    saver.save(sess=sess,save_path=modelParamPath+r"\rnnMDL",global_step=Test.globalStep)

### Test online inference

In [5]:
result,_=Test.onlineInference(timeSteps=500)
print(Decode(result))

Loading model
Successfully loaded
hing the model structured starding to convergence that our atching typerformmentic gradient descent on MNIST and CIFAR-10 classification, in addition to the standard neural networks, which the descricies, (GAvalttem to training deep neural networks (DNNs). In practice, beep neural networks (DNNs) as a performarict of computation problem formal DNN undersing is input-traima. The architecture based on the training simple neural networks (DNNs) as a performance of the model analysis formertion, and


### Evaluate transition performance of the model

In [None]:
with tf.Session() as sess:
    print("Loading model")
    ckpt=tf.train.get_checkpoint_state(modelParamPath)
    if ckpt and ckpt.model_checkpoint_path:  
        saver.restore(sess,ckpt.model_checkpoint_path)
        print("Successfully loaded")
    result=sess.run(Test.netOutputs,{Test.inputs:X})
print(Decode(result))