In [1]:
# Download files with terminal link
# curl https://www.gutenberg.org/cache/epub/2265/pg2265.txt > pg2265.txt

In [2]:
# Read into python session as plain text 
# Chars represent set of unique characters
# Reads text, removes beginning portion of legal description, constructs dictionaries based on text
import numpy as np
# Read and process text
with open('pg2265.txt', 'r', encoding='utf-8') as f:
    text=f.read()
text = text[15858:]
chars = set(text)
char2int = {ch:i for i, ch in enumerate(chars)}
int2char = dict(enumerate(chars))
text_ints = np.array([char2int[ch] for ch in text], 
                        dtype=np.int32)

In [3]:
# Reshape into batches of sequences
# Shift input (x) and output (y) of neural network by one character
def reshape_data(sequence, batch_size, num_steps):
    mini_batch_length = batch_size * num_steps
    num_batches = int(len(sequence) / mini_batch_length)
    if num_batches*mini_batch_length + 1 > len(sequence):
        num_batches = num_batches - 1
    
    # Truncate sequence at end to get rid of remaining characters that don't make full batch
    x = sequence[0: num_batches*mini_batch_length]
    y = sequence[1: num_batches*mini_batch_length + 1]

    # Split x and y into list batches of sequences
    x_batch_splits = np.split(x, batch_size)
    y_batch_splits = np.split(y, batch_size)

    # Stack batches together; batch_size x mini_batch_length
    x = np.stack(x_batch_splits)
    y = np.stack(y_batch_splits)

    return x, y

In [4]:
# Create batch generator
# Split arrays x and y into mini-batches where row is seq w/ len = steps
def create_batch_generator(data_x, data_y, num_steps):
    batch_size, tot_batch_length = data_x.shape
    num_batches = int(tot_batch_length/num_steps)
    for b in range(num_batches):
        yield (data_x[:, b*num_steps: (b+1)*num_steps],
               data_y[:, b*num_steps: (b+1)*num_steps])

In [5]:
# Implement class 
# Constructs graph of RNN to predict next character after observing sequence of characters
import tensorflow._api.v2.compat.v1 as tf
import os

# Helper fxn: get_top_char method
def get_top_char(probas, char_size, top_n=5):
    p = np.squeeze(probas)
    p[np.argsort(p)[:-top_n]] = 0.0
    p = p / np.sum(p)
    ch_id = np.random.choice(char_size, 1, p=p)[0]
    return ch_id

class CharRNN(object):
    def __init__(self, num_classes, batch_size=64,
                 num_steps=100, lstm_size=128,
                 num_layers=1, learning_rate=0.001,
                 keep_prob=0.5, grad_clip=5,
                 sampling=False):
        self.num_classes = num_classes
        self.batch_size = batch_size
        self.num_steps = num_steps
        self.lstm_size = lstm_size
        self.num_layers = num_layers
        self.learning_rate = learning_rate
        self.keep_prob = keep_prob
        self.grad_clip = grad_clip
        # Keep sampling boolean to determine whether instance is 
        # training (false) or sampling (true)
        
        self.g = tf.Graph()
        with self.g.as_default():
            tf.set_random_seed(123)

            self.build(sampling=sampling)

            self.saver = tf.train.Saver()

            self.init_op = tf.global_variables_initializer()
        
    # Build method doesn't use embedding layer to create salient representation for unique words
    # If sampling (testing), batch size = 1; training batch size = batch size
    # Uses one-hot encoding
    def build(self, sampling):
        if sampling == True:
            batch_size, num_steps = 1, 1
        else:
            batch_size = self.batch_size
            num_steps = self.num_steps

        tf_x = tf.placeholder(tf.int32,
                                shape=[batch_size, num_steps],
                                name='tf_x')
        tf_y = tf.placeholder(tf.int32,
                                shape=[batch_size, num_steps],
                                name='tf_y')
        tf_keepprob = tf.placeholder(tf.float32,
                                name='tf_keepprob')

        # One-hot encoding:
        x_onehot = tf.one_hot(tf_x, depth=self.num_classes)
        y_onehot = tf.one_hot(tf_y, depth=self.num_classes)

        # Build the multi-layer RNN cells
        cells = tf.compat.v1.nn.rnn_cell.MultiRNNCell(
                [tf.compat.v1.nn.rnn_cell.DropoutWrapper(
                    tf.compat.v1.nn.rnn_cell.BasicLSTMCell(self.lstm_size),
                output_keep_prob=tf_keepprob)
            for _ in range(self.num_layers)])

        # Define the initial state
        self.initial_state = cells.zero_state(
                    batch_size, tf.float32)

        # Run each sequence step through the RNN
        lstm_outputs, self.final_state = tf.nn.dynamic_rnn(
                    cells, x_onehot,
                    initial_state=self.initial_state)

        print('  << lstm_outputs  >>', lstm_outputs)

        seq_output_reshaped = tf.reshape(
                    lstm_outputs,
                    shape=[-1, self.lstm_size],
                       name='seq_output_reshaped')

        logits = tf.layers.dense(
                    inputs=seq_output_reshaped,
                    units=self.num_classes,
                    activation=None,
                    name='logits')

        proba = tf.nn.softmax(
                    logits,
                    name='probabilities')

        y_reshaped = tf.reshape(
                    y_onehot,
                    shape=[-1, self.num_classes],
                    name='y_reshaped')
        cost = tf.reduce_mean(
                    tf.nn.softmax_cross_entropy_with_logits(
                        logits=logits,
                        labels=y_reshaped),
                    name='cost')

        # Gradient clipping to avoid "exploding gradients"
        tvars = tf.trainable_variables()
        grads, _ = tf.clip_by_global_norm(
                    tf.gradients(cost, tvars),
                    self.grad_clip)
        optimizer = tf.train.AdamOptimizer(self.learning_rate)
        train_op = optimizer.apply_gradients(
                    zip(grads, tvars),
                    name='train_op')
        
    # Train Method
    def train(self, train_x, train_y,
                 num_epochs, ckpt_dir='./model/'):
        # Create the checkpoint directory
        # if it does not exists
        if not os.path.exists(ckpt_dir):
            os.mkdir(ckpt_dir)

        with tf.Session(graph=self.g) as sess:
            sess.run(self.init_op)

            n_batches = int(train_x.shape[1]/self.num_steps)
            iterations = n_batches * num_epochs
            for epoch in range(num_epochs):
                # Train network
                new_state = sess.run(self.initial_state)
                loss = 0

                # Mini-batch generator:
                bgen = create_batch_generator(
                        train_x, train_y, self.num_steps)
                for b, (batch_x, batch_y) in enumerate(bgen, 1):
                    iteration = epoch*n_batches + b

                    feed = {'tf_x:0': batch_x,
                            'tf_y:0': batch_y,
                            'tf_keepprob:0' : self.keep_prob,
                            self.initial_state : new_state}
                    batch_cost, _, new_state = sess.run(
                            ['cost:0', 'train_op',
                                self.final_state],
                            feed_dict=feed)
                    if iteration % 10 == 0:
                        print('Epoch %d/%d Iteration %d'
                            '| Training loss: %.4f' % (
                            epoch + 1, num_epochs,
                            iteration, batch_cost))

                # Save the trained model
                self.saver.save(
                        sess, os.path.join(
                            ckpt_dir, 'language_modeling.ckpt'))
    
    # Sample Method
    # Similar to predict method
    def sample(self, output_length,
                  ckpt_dir, starter_seq="The "):
        observed_seq = [ch for ch in starter_seq]
        with tf.Session(graph=self.g) as sess:
            self.saver.restore(
                sess,
                tf.train.latest_checkpoint(ckpt_dir))

            # 1: run the model using the starter sequence
            new_state = sess.run(self.initial_state)
            for ch in starter_seq:
                x = np.zeros((1, 1))
                x[0, 0] = char2int[ch]
                feed = {'tf_x:0': x,
                        'tf_keepprob:0': 1.0,
                        self.initial_state: new_state}
                proba, new_state = sess.run(
                        ['probabilities:0', self.final_state],
                        feed_dict=feed)

            ch_id = get_top_char(proba, len(chars))
            observed_seq.append(int2char[ch_id])

            # 2: run the model using the updated observed_seq
            for i in range(output_length):
                x[0,0] = ch_id
                feed = {'tf_x:0': x,
                        'tf_keepprob:0': 1.0,
                        self.initial_state: new_state}
                proba, new_state = sess.run(
                        ['probabilities:0', self.final_state],
                        feed_dict=feed)
                        
                ch_id = get_top_char(proba, len(chars))
                observed_seq.append(int2char[ch_id])

        return ''.join(observed_seq)

In [6]:
# Create and train CharRNN Model
batch_size = 64
num_steps = 100
train_x, train_y = reshape_data(text_ints,
                                batch_size,
                                num_steps)

rnn = CharRNN(num_classes=len(chars), batch_size=batch_size)
rnn.train(train_x, train_y,
            num_epochs=100,
            ckpt_dir='./model-100/')

Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
  << lstm_outputs  >> Tensor("rnn/transpose_1:0", shape=(64, 100, 128), dtype=float32)
Instructions for updating:

Future major versions of TensorFlow will allow gradients to flow
into the labels input on backprop by default.

See `tf.nn.softmax_cross_entropy_with_logits_v2`.



  tf.compat.v1.nn.rnn_cell.BasicLSTMCell(self.lstm_size),
  logits = tf.layers.dense(
2022-08-23 17:23:29.097353: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-08-23 17:23:29.105621: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:354] MLIR V1 optimization pass is not enabled


Epoch 1/100 Iteration 10| Training loss: 3.7651
Epoch 1/100 Iteration 20| Training loss: 3.3743
INFO:tensorflow:./model-100/language_modeling.ckpt.index
INFO:tensorflow:0
INFO:tensorflow:./model-100/language_modeling.ckpt.meta
INFO:tensorflow:200
INFO:tensorflow:./model-100/language_modeling.ckpt.data-00000-of-00001
INFO:tensorflow:1500
Epoch 2/100 Iteration 30| Training loss: 3.3098
Epoch 2/100 Iteration 40| Training loss: 3.2421
Epoch 2/100 Iteration 50| Training loss: 3.2602
INFO:tensorflow:./model-100/language_modeling.ckpt.index
INFO:tensorflow:0
INFO:tensorflow:./model-100/language_modeling.ckpt.meta
INFO:tensorflow:200
INFO:tensorflow:./model-100/language_modeling.ckpt.data-00000-of-00001
INFO:tensorflow:1500
Epoch 3/100 Iteration 60| Training loss: 3.2013
Epoch 3/100 Iteration 70| Training loss: 3.2051
INFO:tensorflow:./model-100/language_modeling.ckpt.index
INFO:tensorflow:0
INFO:tensorflow:./model-100/language_modeling.ckpt.meta
INFO:tensorflow:200
INFO:tensorflow:./model-100

In [7]:
# CharRNN Model in Sampling Mode
# Specify that sampling=True
del rnn

np.random.seed(123)
rnn = CharRNN(len(chars), sampling=True)
print(rnn.sample(ckpt_dir='./model-100/',
                        output_length=500))



  tf.compat.v1.nn.rnn_cell.BasicLSTMCell(self.lstm_size),
  logits = tf.layers.dense(


  << lstm_outputs  >> Tensor("rnn/transpose_1:0", shape=(1, 1, 128), dtype=float32)
INFO:tensorflow:Restoring parameters from ./model-100/language_modeling.ckpt
The wirle of the mise, whot' wise and sertor and my that wist of the murse, bus best the thas to the sind and the mint sheath, wotle some haues ant to me a makned

   Ham. No mo harke this tast is tiliue, aroue,
Tees thes theere and and tor his, on to heare

   Ham. I sand yor sies alaie, oue to hath as, and we mes ant her,
Ber it
the Sinder our sendes of my mere

   Ham. Whe is the why do sint in the worla beane:
I sare you hous singent, in him of in tho gaue

   Ham. I swore, and tere thee if hor m
