In [1]:
import numpy as np
import tensorflow as tf

## Introduction



## Reading the data
To prepare our data for use by our neural net, we first needed to split it into groups of data that follow specific rules. To streamline the process, we used the `Dataset` class to store and manage our input data. This class was responsible for splitting the data into strings of the correct length and for turning them into one hot encoded arrays that the neural net could better understand. We stored this pre-prepared data in a `Batch` object, which has `inputs` and `targets` attributes for our model to use in training.

In [2]:
class Batch:
    def __init__(self, seqs):
        """Create a batch using the sequence
        Arguments
        ======================================================================
            seqs: int[][]
                The one-hot encoded sequences.
        """
        self.inputs = [seq[:-1] for seq in seqs]
        self.targets = [seq[1:] for seq in seqs]


In [3]:
class Dataset:
    def __init__(
            self,
            filenames,
            seq_length,
            shuffle=True,
            buffer_size=10000,
    ):
        """Creates a dataset
        Arguments
        ======================================================================
            filenames: string
                Path to one or more plain text files.
                The file contents are concatenated in the given order.

            seq_length: int
                The length of the text sequence.

            shuffle: boolean
                Whether to shuffle the sequences for the batches.

            buffer_size: int
                 The number of elements from this dataset from which the new
                 dataset will sample.
        """
        text = ''
        vocab = set()
        for filename in filenames:
            content = open(filename).read()
            text += content
            vocab = vocab.union(set(content))
        self.seq_length = seq_length
        self.vocab_size = len(vocab)
        self.char_to_ix = {c: i for i, c in enumerate(vocab)}
        self.ix_to_char = list(vocab)
        self.text = text
        self.data = np.array([self.char_to_ix[c] for c in text])
        self.shuffle = shuffle

    def batch(
            self,
            batch_size,
            drop_remainder=True
    ):
        """Batch the instances
        Arguments
        ======================================================================
            batch_size: int
                The number of instances in a single batch.

            drop_remainder: boolean
                Whether the last batch should be dropped in the case its has
                fewer than batch_size elements.
        """
        n_seq = len(self.data) // self.seq_length
        n_batch = n_seq // batch_size
        seq_ids = np.arange(n_seq)
        if self.shuffle:
            np.random.shuffle(seq_ids)
        i = 0
        for _ in range(n_batch):
            seqs = [None] * batch_size
            for j in range(batch_size):
                k = seq_ids[i] * self.seq_length
                seqs[j] = self._create_seq(k, k + self.seq_length + 1)
                i += 1
            yield Batch(seqs)
        if not drop_remainder:
            seqs = []
            for j in range(n_seq % batch_size):
                k = seq_ids[i] * self.seq_length
                seqs[j] = self._create_seq(k, k + self.seq_length + 1)
                i += 1
            yield Batch(seqs)

    def _create_seq(self, i, j):
        return list(map(self._to_label, self.data[i:j]))

    def _to_label(self, index):
        label = np.zeros(self.vocab_size)
        label[index] = 1.0
        return label

    def encode(self, text):
        """One-hot encode the text
        Arguments
        ======================================================================
            text: string
                The text to encode.

        Returns
        ======================================================================
            seq: int[]
                The one-hot encoded sequence.
        """
        return [self._to_label(self.char_to_ix[c]) for c in text]

    def decode(self, seq):
        """Decode the one-hot encoded sequence to text format
        Arguments
        ======================================================================
            text: string
                The text to encode.

        Returns
        ======================================================================
            seq: int[]
                The one-hot encoded sequence.
        """
        text = ''
        for label in seq:
            text += self.ix_to_char[np.argmax(label)]
        return text

The text generator itself is stored in the `RNNTextGenerator` class. Among other things, storing the generator in the class allows the session to be stored and used again and prevents accidental loss of information. It also allows multiple generators to exist simultaniously for testing or training with different data. 

The class also internalizes the methods needed to save and restore a model, allowing the generator to pick up where it previously left off.

The text generator does not take batches when training, however, and needs to be fed the inputs and targets seperately. 

In [4]:
class RNNTextGenerator:
    """A text generator using basic cell and dynamic rnn
    """
    def __init__(
            self,
            seq_length,
            vocab_size,
            rnn_cell=tf.nn.rnn_cell.BasicRNNCell,
            n_neurons=100,
            optimizer=tf.train.AdamOptimizer,
            learning_rate=0.001,
            name='RNNTextGenerator',
            logdir=None
    ):
        """Initialize the text generator and contruct the tf graph
        Arguments
        ======================================================================
        seq_length: int
            The number of characters in a sequence.

        vocab_size: int
            The number of unique characters in the text.

        neurons_per_cell: int
            The number of neurons in each RNN cell.

        name: string
            The name of the net (for graph visualization in tensorboard).
        """
        self.name = name
        self.tf_graph = tf.Graph()
        with self.tf_graph.as_default():
            self.tf_sess = tf.Session()
            # One-hot encoded input and targets
            """placeholder
            Example
            [
                batch_0: [
                    seq_0: [
                        # encoded labels with 5 categories
                        [0, 0, 0, 1, 0],  # i = 0
                        [0, 0, 1, 0, 0],  # i = 1
                    ],
                    ...
                ],
                ...
            ]
            """
            self.tf_input = tf.placeholder(
                tf.float32, shape=(None, seq_length, vocab_size)
            )
            self.tf_target = tf.placeholder(
                tf.float32, shape=(None, seq_length, vocab_size)
            )
            with tf.variable_scope(name):
                self.tf_rnn_cell = rnn_cell(n_neurons)
                outputs, _ = tf.nn.dynamic_rnn(
                    self.tf_rnn_cell,
                    tf.cast(self.tf_input, tf.float32),
                    dtype=tf.float32,
                )
                logits = tf.layers.dense(outputs, vocab_size)
                self.tf_loss = tf.reduce_mean(
                    tf.nn.softmax_cross_entropy_with_logits(
                        logits=logits,
                        labels=self.tf_target,
                    )
                )
                self.tf_train = optimizer(
                    learning_rate=learning_rate
                ).minimize(self.tf_loss)
                # Normilize the probablities
                y = tf.math.exp(logits)
                self.tf_prob = y / tf.reduce_sum(y, 2, keep_dims=True)
                self.tf_acc = tf.reduce_mean(tf.cast(
                    tf.equal(
                        tf.argmax(logits, 2),
                        tf.argmax(self.tf_target, 2),
                    ),
                    tf.float32
                ))
                self.tf_saver = tf.train.Saver()
                if logdir is not None:
                    self.logger = tf.summary.FileWriter(logdir, self.tf_graph)
            # Initialize the tf session
            self.tf_sess.run(tf.global_variables_initializer())
            self.tf_sess.run(tf.local_variables_initializer())

    def fit(self, inputs, targets):
        """Fit and train the classifier with a batch of inputs and targets
        Arguments
        ======================================================================
        inputs: np.ndarray
            A batch of input sequences.

        targets: np.ndarray
            A batch of target sequences.
        """
        self.tf_sess.run(
            self.tf_train,
            feed_dict={
                self.tf_input: inputs,
                self.tf_target: targets,
            },
        )
        return self

    def score(self, inputs, targets):
        """Get the score for the batch
        Arguments
        ======================================================================
        inputs: np.ndarray
            A batch of input sequences.

        targets: np.ndarray
            A batch of target sequences.

        Returns
        ======================================================================
        accuracy: tf.float32
            The accuracy on this batch.

        loss: tf.float32
            The loss on this batch.
        """
        return self.tf_sess.run(
            [self.tf_acc, self.tf_loss],
            feed_dict={
                self.tf_input: inputs,
                self.tf_target: targets,
            },
        )

    def predict(self, inputs):
        """Predict the probablities for the labels, for a batch of inputs
        Arguments
        ======================================================================
        inputs: np.ndarray
            A batch of input sequences.


        Returns
        ======================================================================
        predictions: np.ndarray
            A batch of sequences of probablities.
        """
        return self.tf_sess.run(
            self.tf_prob,
            feed_dict={
                self.tf_input: inputs,
            },
        )

    def save(self, path='./model'):
        """Save the model
        Arguments
        ======================================================================
        path: string
            The path to store the model.
        """
        self.tf_saver.save(
            self.tf_sess,
            path + '/' + self.name
        )

    def restore(self, path='./model'):
        """Restore the model
        Arguments
        ======================================================================
        path: string
            The path to store the weights.
        """
        self.tf_saver.restore(
            self.tf_sess,
            path + '/' + self.name
        )

    @staticmethod
    def sample(model, dataset, start_seq, length):
        """Generate the text using a saved model
        Arguments
        ======================================================================
        model: RNNTextGenerator
            The model to sample from.

        dataset: Dataset
            The dataset to encode and decode the labels.

        start_seq: int[]
            The sequence to begin with.

        length: int
            The length of the generated text.

        Returns
        ======================================================================
        text: int[]
            The one-hot encoded character labels.
        """
        text = [None] * length
        seq = dataset.encode(start_seq)
        for i in range(length):
            ix = np.random.choice(
                range(dataset.vocab_size),
                # pred[batch 0][last item in the sequence]
                p=model.predict([seq])[0][-1]
            )
            x = np.zeros(dataset.vocab_size)
            x[ix] = 1
            del seq[0]
            seq.append(x)
            text[i] = x
        return dataset.decode(text)

## Testing the code

### Testing the dataset generator

In [19]:
def test_batch_a_seq():
        print("-----------Testing Dataset generator-----------")
        batch_size = 5
        seq_length = 100
        filename = '../data/alice.txt'
        dataset = Dataset([filename], seq_length)
        for batch in dataset.batch(batch_size):
            assert(len(batch.inputs) == batch_size)
            assert(len(batch.targets) == batch_size)
            assert(len(batch.inputs[-1]) == seq_length)
            assert(len(batch.targets[-1]) == seq_length)
test_batch_a_seq()

-----------Testing Dataset generator-----------


### Testing the text generator
Before using any neural net, it's importamt to make sure that it is correctly processing the data it is given. To make sure this is the case, we fed our text generator some randomly generated data. While a fresh model will be needed to train on the text, this model is used to verify that the code is working correctly.
In this case, we are checking to ensure the model is providing sufficient variance in its outputs. This shows that the model is opperating on its inputs and successfully completing its operations.

In [21]:
def random_label(vocab_size):
    """randomly assign a label
    """
    label = np.random.randint(vocab_size)
    seq = np.zeros(vocab_size)
    seq[label] = 1.0
    return seq


def random_data(batch_size, seq_length, vocab_size):
    """generate random data
    """
    inputs = []
    targets = []
    for _ in range(batch_size):
        labels = [random_label(vocab_size) for _ in range(seq_length + 1)]
        inputs.append(labels[:-1])
        targets.append(labels[1:])
    return np.array(inputs), np.array(targets)

def test_on_random_data():
    print("---------------Testing text generator with randomly generated data-------------")
    seq_length = 10
    vocab_size = 4
    batch_size = 2
    text_gen = RNNTextGenerator(
        seq_length,
        vocab_size,
    )
    print('first fit')
    inputs, targets = random_data(batch_size, seq_length, vocab_size)
    print('fit:', text_gen.fit(inputs, targets))
    print('score:', text_gen.score(inputs, targets))
    print('predictions:', text_gen.predict(inputs))
    print('true targets:', targets)
    text_gen.save()

    seq_length = 5
    text_gen = RNNTextGenerator(
        seq_length,
        vocab_size
    )
    text_gen.restore()

def test_log():
    print("-------------Testing logs---------------")
    seq_length = 10
    vocab_size = 4
    batch_size = 2
    text_gen = RNNTextGenerator(
        seq_length,
        vocab_size,
        logdir='./tf_logs'
    )

In [22]:
test_on_random_data()
test_log()

---------------Testing text generator with randomly generated data-------------
Instructions for updating:
This class is equivalent as tf.keras.layers.SimpleRNNCell, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See `tf.nn.softmax_cross_entropy_with_logits_v2`.

Instructions for updating:
keep_dims is deprecated, use keepdims instead
first fit
fit: <__main__.RNNTextGenerator object at 0x7fe5622454a8>
score: [0.3, 1.3757889]
predictions: [[[0.25362095 0.20889768 0.25565118 0.28183022]
  [0.21944472 0.19302015 0.3110655  0.27646965]
  [0.22584444 0.2517532  0.26989004 0.2525123 ]
  [0.28246793 0.20647362 0.3006791  0.21037933]
  [0.2666454  0.27356318 0.21455306 0.2452383 ]
  [0.24525431 0.15664294 0.31523573 0.28286698]
  [0.23202464 0.18302342 0.27027434 0.3146777 ]
  [0.32244042 0.20671801 0.24668536 0.22415623]
  [0.22302635 0.27802995 0.293710

As seen above, the model'spredictions as to what the target are have sufficient variance so as to provide seemingly random probabilities of each output. This suggests that the model, while in need of training, is making its predictions appropriatly.
We also verified that tensorflow's logs were being stored in the correct location, as that is also managed by our model.

## Training the model

In [7]:
"""An end to end test using 
 ALICE'S ADVENTURES IN WONDERLAND
"""

class TestAlice(unittest.TestCase):
    def test_alice(self):
        print("----------------Testing dataset Alice---------------")
        seq_length = 25
        batch_size = 25
        learning_rate = 0.01
        epoch = 10
        dataset = Dataset(['./data/alice.txt'], seq_length)
        model = RNNTextGenerator(
            seq_length,
            dataset.vocab_size,
            learning_rate=learning_rate
        )
        for _ in range(epoch):
            for batch in dataset.batch(batch_size):
                model.fit(batch.inputs, batch.targets)
        model.save()
        start_seq = 'hello'
        model = RNNTextGenerator(
            len(start_seq),
            dataset.vocab_size,
        )
        model.restore()
        print('>>>>> {}'.format(start_seq), RNNTextGenerator.sample(
            model,
            dataset,
            start_seq,
            50
        ))
        print('<<<<<<')