In [14]:
import re
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.preprocessing.text import Tokenizer

In [15]:
from tensorflow.keras.layers import Input, Embedding, Dense, LSTM, Bidirectional
from tensorflow.keras.layers import SpatialDropout1D, concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.preprocessing import sequence
from tensorflow.keras.preprocessing.text import Tokenizer, text_to_word_sequence

Main Reference:
https://github.com/minimaxir/textgenrnn/tree/master/textgenrnn

Attention Weighted Average Layer
https://gist.github.com/thomwolf/e309e779a08c1ba899514d44355cd6df  
https://github.com/minimaxir/textgenrnn/blob/1a271addb5894aaa485d3f23663a9afb813f7692/textgenrnn/AttentionWeightedAverage.py#L6


In [25]:
from tensorflow.keras.layers import Layer, InputSpec
from tensorflow.keras import backend as K
from tensorflow.keras import initializers


class AttentionWeightedAverage(Layer):
    """
    Computes a weighted average of the different channels across timesteps.
    Uses 1 parameter pr. channel to compute the attention value for
    a single timestep.
    """

    def __init__(self, return_attention=False, **kwargs):
        self.init = initializers.get('uniform')
        self.supports_masking = True
        self.return_attention = return_attention
        super(AttentionWeightedAverage, self).__init__(** kwargs)

    def build(self, input_shape):
        self.input_spec = [InputSpec(ndim=3)]
        assert len(input_shape) == 3

        self.W = self.add_weight(shape=(input_shape[2], 1),
                                 name='{}_W'.format(self.name),
                                 trainable=True,
                                 initializer=self.init)
        super(AttentionWeightedAverage, self).build(input_shape)

    def call(self, x, mask=None):
        # computes a probability distribution over the timesteps
        # uses 'max trick' for numerical stability
        # reshape is done to avoid issue with Tensorflow
        # and 1-dimensional weights
        logits = K.dot(x, self.W)
        x_shape = K.shape(x)
        logits = K.reshape(logits, (x_shape[0], x_shape[1]))
        ai = K.exp(logits - K.max(logits, axis=-1, keepdims=True))

        # masked timesteps have zero weight
        if mask is not None:
            mask = K.cast(mask, K.floatx())
            ai = ai * mask
        att_weights = ai / (K.sum(ai, axis=1, keepdims=True) + K.epsilon())
        weighted_input = x * K.expand_dims(att_weights)
        result = K.sum(weighted_input, axis=1)
        if self.return_attention:
            return [result, att_weights]
        return result

    def get_output_shape_for(self, input_shape):
        return self.compute_output_shape(input_shape)

    def compute_output_shape(self, input_shape):
        output_len = input_shape[2]
        if self.return_attention:
            return [(input_shape[0], output_len), (input_shape[0],
                                                   input_shape[1])]
        return (input_shape[0], output_len)

    def compute_mask(self, input, input_mask=None):
        if isinstance(input_mask, list):
            return [None] * len(input_mask)
        else:
            return None

In [12]:
def create_dataset(data, n_step, batch_size):
    
    window_length = n_step + 1
    dataset = tf.data.Dataset.from_tensor_slices(data)
    dataset = dataset.window(window_length, shift=1, drop_remainder=True)
    dataset = dataset.flat_map(lambda window: window.batch(window_length))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(lambda windows: (windows[:, :-1], windows[:, -1]))
    dataset = dataset.prefetch(1)

    return dataset

# Character-level Models

## Textgenrnn's Weights and Vocabulary

Loading Textgenrnn Vocabulary:

In [4]:
vocab_path = './textgenrnn_vocab.json'
with open(vocab_path, 'r', encoding='utf8', errors='ignore') as json_file:
    vocab = json.load(json_file)

Reverse vocabulary and number of classes:

In [5]:
indices_char = dict((vocab[c], c) for c in vocab)
num_classes = len(vocab) + 1 # To allow for padding

Model characteristics:

In [6]:
MAX_LENGTH = 40
DIM_EMBEDDINGS = 100
DROPOUT = 0.0
RNN_LAYERS = 2
RNN_SIZE = 128

Building the model:

In [7]:
input_layer = Input(shape=(MAX_LENGTH, ), name='input')

# Try a different embedding?
embedding_layer = Embedding(num_classes, DIM_EMBEDDINGS, 
                            input_length = MAX_LENGTH,
                            name='embedding')(input_layer)

# SpatialDropout after the Embedding layer drops one entire row/feature!
if DROPOUT > 0.0:
    embedding_layer = SpatialDropout1D(DROPOUT, 
                                       name='regularized_embedding')(embedding_layer)
    
# Try changing to bidirectional
rnn_layers_list = []
for i in range(RNN_LAYERS):
    prev_layer = embedding_layer if i==0 else rnn_layers_list[-1]
    next_layer = LSTM(RNN_SIZE, return_sequences=True, name = f"lstm{i}")
    rnn_layers_list.append(next_layer(prev_layer))
    
# Concatenating the layers created so far
seq = concatenate([embedding_layer] + rnn_layers_list)

# Attention layer (see references above)
attention = AttentionWeightedAverage(name='attention_layer')(seq)

# Output
output = Dense(num_classes, name='output', activation='softmax')(attention)

Compiling:

In [8]:
model = Model(inputs=[input_layer], outputs=[output])
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')

Loading Pre-Trained Weights:

In [9]:
model.load_weights('./textgenrnn_weights.hdf5')

Now let's generate some predictions. Ww need a function to generate the id for the next word based on the current text being used. 

In [11]:
def next_letter_id(text, vocab, maxlen, model, temperature=0.5):
    
    # Encode the text using the vocabulary and pad sequences to maxlen
    encoded = np.array([vocab.get(x, 0) for x in text])
    encoded_padded = sequence.pad_sequences([encoded], maxlen=maxlen)
    
    # Compute model predictions
    preds = model.predict(encoded_padded)[0]
    preds = np.asarray(preds).astype('float64')
    
    # Now let's calculate the probabilities. The temperature variable
    # controls how much we let the model decide
    preds = np.log(preds + K.epsilon()) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    
    # Getting the index for the most likely character
    index = np.argmax(probas)
    
    # We get the second most likely character when we select padding character
    if index == 0:
        index = np.argsort(preds)[-2]

    return index

Now let's start with a text and see what it generates:

In [12]:
def generate_text(text, size=150, temperature=0.75):
    for _ in range(size):
        text += indices_char[next_letter_id(text, vocab, MAX_LENGTH, 
                                            model, 
                                            temperature=temperature)]
    print(''.join(text))
    

In [13]:
text = ['T', 'o', 'd', 'a', 'y']
generate_text(text, temperature=0.6)

Today 4.0<s>shift in the third windown things you enjoy when I see them in the desperate elephants?<s>da and PS4 will have an online minute depressed on the dr


According to the documentation, the original model was trained on hundreds of thousands of text documents from Reddit submissions, using a diverse variety of subreddits. The results are pretty interesting, using right grammar and sentence structure, even if the meaning is not necessarily accurate. You can play with size and temperature to get different results.

## Fine-Tuning: War and Peace

The textgenrnn package is intended to be used with texts given by the user. According to the documentation, the approach is to train the model on the new text starting with the pre-trained weights used above. Although this is not the usual approach when using transfer learning, let's start with it. First, we need to load and parse out text accordingly.

Loading the text:

In [10]:
filepath = './war_peace.txt'
with open(filepath, encoding='UTF-8') as f:
    war = f.read()

Tokenizing using the vocabulary from Textgenrnn:

In [11]:
tokenizer = Tokenizer(filters = '', lower = False, char_level=True)
tokenizer.word_index = vocab
[encoded] = np.array(tokenizer.texts_to_sequences([war]))

In [12]:
num_classes = len(vocab) + 1
dataset_size = len(encoded)

Train and Validation Data:

In [13]:
train_size = dataset_size * 90 // 100
train_data = encoded[:train_size]
val_data = encoded[train_size:]

Creating the datasets:

In [15]:
BATCH_SIZE = 128

In [17]:
train_dataset = create_dataset(train_data, MAX_LENGTH, BATCH_SIZE)
val_dataset = create_dataset(val_data, MAX_LENGTH, BATCH_SIZE)

Training the model using a variable learning rate this time:

In [56]:
def lr_linear_decay(epoch):
            return (base_lr * (1 - (epoch / num_epochs)))

In [57]:
num_epochs = 20

In [None]:
model.fit(train_dataset, 
          validation_data=val_dataset, 
          epochs = num_epochs,
          callbacks=[LearningRateScheduler(lr_linear_decay)])

# Word-Level Model

In [1]:
import re
from tensorflow.keras.preprocessing.text import Tokenizer, text_to_word_sequence

In [3]:
filepath = './war_peace.txt'
with open(filepath, encoding='UTF-8') as f:
    war = f.read()

Getting the list of words:

In [4]:
# https://stackoverflow.com/a/3645946/9314418
punct = '!"#$%&()*+,-./:;<=>?@[\]^_`{|}~\\n\\t\'‘’“”’–—…'
text = re.sub('([{}])'.format(punct), r' \1 ', war)
text = re.sub(' {2,}', ' ', text)
text = text_to_word_sequence(text, filters='')

Fitting a tokenizer:

In [5]:
tokenizer = Tokenizer(filters = '', lower = True, 
                      char_level = False)
tokenizer.fit_on_texts(text)

Let's limit the vocabulary to the most common 10000 words:

In [6]:
max_words = 10000
vocab = {k:v for (k,v) in tokenizer.word_index.items() if v<=10000}
tokenizer.word_index = vocab

Now tokenizing our words:

In [7]:
[encoded] = np.array(tokenizer.texts_to_sequences([text]))

Now that the words are encoded, preparing the data is similar to what we did for the character-level models. In this case, we can use a smaller MAX_LENGTH though. Defining constants:

In [9]:
BATCH_SIZE = 128
num_classes = len(vocab) + 1
dataset_size = len(encoded)

Train-validation split:

In [10]:
train_size = dataset_size * 90 // 100
train_data = encoded[:train_size]
val_data = encoded[train_size:]

Datasets:

In [16]:
train_dataset = create_dataset(train_data, MAX_LENGTH, BATCH_SIZE)
val_dataset = create_dataset(val_data, MAX_LENGTH, BATCH_SIZE)

Network:

In [23]:
MAX_LENGTH = 10
DIM_EMBEDDINGS = 100
DROPOUT = 0.0
RNN_LAYERS = 2
RNN_SIZE = 128

In [26]:
input_layer = Input(shape=(MAX_LENGTH, ), name='input')

# Try a different embedding?
embedding_layer = Embedding(num_classes, DIM_EMBEDDINGS, 
                            input_length = MAX_LENGTH,
                            name='embedding')(input_layer)

# SpatialDropout after the Embedding layer drops one entire row/feature!
if DROPOUT > 0.0:
    embedding_layer = SpatialDropout1D(DROPOUT, 
                                       name='regularized_embedding')(embedding_layer)
    
# Try changing to bidirectional
rnn_layers_list = []
for i in range(RNN_LAYERS):
    prev_layer = embedding_layer if i==0 else rnn_layers_list[-1]
    next_layer = LSTM(RNN_SIZE, return_sequences=True, name = f"lstm{i}")
    rnn_layers_list.append(next_layer(prev_layer))
    
# Concatenating the layers created so far
seq = concatenate([embedding_layer] + rnn_layers_list)

# Attention layer (see references above)
attention = AttentionWeightedAverage(name='attention_layer')(seq)

# Output
output = Dense(num_classes, name='output', activation='softmax')(attention)

In [27]:
model = Model(inputs=[input_layer], outputs=[output])
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam')

In [None]:
model.fit(train_dataset, 
          validation_data=val_dataset, 
          epochs = 5)

Epoch 1/5
    140/Unknown - 24s 169ms/step - loss: 6.5599