In [101]:
import numpy as np
import random as  rnd

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import Input

gpus = tf.config.list_physical_devices('GPU')
gpu = gpus[0]
tf.config.experimental.set_memory_growth(gpu, True)

# Data preprocessing

## Load data

In [102]:
lines = []
with open('./data/shakespeare_data.txt') as file:
  for line in file:
    line = line.strip()
    if len(line) > 0:
      lines.append(line)

print(lines[:5])

["A LOVER'S COMPLAINT", 'FROM off a hill whose concave womb reworded', 'A plaintful story from a sistering vale,', 'My spirits to attend this double voice accorded,', 'And down I laid to list the sad-tuned tale;']


## Create vocabulary

In [103]:
text = '\n'.join(lines)
vocab = sorted(set(text))
vocab.insert(0, "[UNK]") # Unknown token for out-of-vocab words
vocab.insert(1, "") # empty char for padding

print(f'{len(vocab)} words')
print(vocab)

82 words
['[UNK]', '', '\t', '\n', ' ', '!', '$', '&', "'", '(', ')', ',', '-', '.', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', ':', ';', '?', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '[', ']', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '|']


## Encode sentence

In [104]:
# Very simple implementation
encode_ = lambda string: [ vocab.index(char) for char in string ]
decode_ = lambda nums: ' '.join([ vocab[index] for index in nums ])

Two issues with this implementation:
- It does not take into account UTF-8 characters
- It throws an error when encoutering unknown word instead of returning index for UNK token

To properly handle these, use:
- `tf.strings.unicode_split`: this will encode UTF-8 before splitting
- `tf.keras.layers.StringLookup`: this takes care of UNK token.

In [105]:
def line_to_tensor(line, vocab):
  tokens = tf.strings.unicode_split(line, input_encoding='UTF-8')
  ids = tf.keras.layers.StringLookup(vocabulary=vocab)(tokens)

  return ids

def text_from_ids(ids, vocab):
  tokens = tf.keras.layers.StringLookup(vocabulary=vocab, invert=True)(ids)

  return tf.strings.reduce_join(tokens)

In [106]:
ids = line_to_tensor('hello world', vocab)
print(f'IDs: {ids}')

text = text_from_ids(ids, vocab)
print(f'Text: {text}')

IDs: [62 59 66 66 69  4 77 69 72 66 58]
Text: b'hello world'


## Create dataset

In [107]:
train_lines = lines[: -1000]
eval_lines = lines[-1000:]

### Dataset creation procedure
1. Convert text to IDs
2. Group IDs into batches of SEQUENCE_LENGTH
3. Map each sequence of IDs to text input and target
4. Batch again into batches of BATCH_SIZE

In [108]:
# Convert text to IDs
all_ids = line_to_tensor('\n'.join(['hello world', 'generative AI']), vocab)
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
print(ids_dataset)

<TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int64, name=None)>


In [109]:
# Group IDs into batches of sequence_length
seq_length = 5
data_generator = ids_dataset.batch(seq_length + 1, drop_remainder=True)

In [110]:
for seq in data_generator.take(5):
  print(seq)

tf.Tensor([62 59 66 66 69  4], shape=(6,), dtype=int64)
tf.Tensor([77 69 72 66 58  3], shape=(6,), dtype=int64)
tf.Tensor([61 59 68 59 72 55], shape=(6,), dtype=int64)
tf.Tensor([74 63 76 59  4 27], shape=(6,), dtype=int64)


In [111]:
def split_input_target(sequence):
  input_text = sequence[:-1]
  target_text = sequence[1:]

  return input_text, target_text

split_input_target(list('tensorflow'))

(['t', 'e', 'n', 's', 'o', 'r', 'f', 'l', 'o'],
 ['e', 'n', 's', 'o', 'r', 'f', 'l', 'o', 'w'])

In [112]:
# Map each sequence to input and target
data_xy = data_generator.map(lambda z: split_input_target(z))

# Batch again
batches = data_xy.batch(2)
for batch in batches.take(2):
  print(batch)

(<tf.Tensor: shape=(2, 5), dtype=int64, numpy=
array([[62, 59, 66, 66, 69],
       [77, 69, 72, 66, 58]], dtype=int64)>, <tf.Tensor: shape=(2, 5), dtype=int64, numpy=
array([[59, 66, 66, 69,  4],
       [69, 72, 66, 58,  3]], dtype=int64)>)
(<tf.Tensor: shape=(2, 5), dtype=int64, numpy=
array([[61, 59, 68, 59, 72],
       [74, 63, 76, 59,  4]], dtype=int64)>, <tf.Tensor: shape=(2, 5), dtype=int64, numpy=
array([[59, 68, 59, 72, 55],
       [63, 76, 59,  4, 27]], dtype=int64)>)


In [113]:
def create_batch_dataset(lines, vocab, seq_length=20, batch_size=64):
  single_line_data = '\n'.join(lines)

  all_ids = line_to_tensor(single_line_data, vocab)
  ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

  # Split IDs into lines of seq_length + 1
  data_generator = ids_dataset.batch(seq_length+1, drop_remainder=True)

  # Map lines of IDs into batches of (x, y) tuples
  dataset_xy = data_generator.map(lambda z: split_input_target(z))

  # Split lines of (x, y) tuples into batches of batch_size
  dataset = (
      dataset_xy
        .shuffle(10000)
        .batch(batch_size, drop_remainder=True)
        # .prefetch(tf.data.experimental.AUTOTUNE)
  )

  return dataset

In [114]:
dataset = create_batch_dataset(train_lines, vocab, seq_length=100, batch_size=64)

In [115]:
# there are a total of batch_size pairs in each dataset batch
# THIS IS NOT AN ACTUAL LOOP
for input, target in dataset.take(1):
  print(f'Batch size: {len(input)}')

  print(text_from_ids(input[0], vocab))
  print(text_from_ids(target[0], vocab))

Batch size: 64
tf.Tensor(b"rime\nRot and consume themselves in little time.\n'Were I hard-favour'd, foul, or wrinkled-old,\nIll-nu", shape=(), dtype=string)
tf.Tensor(b"ime\nRot and consume themselves in little time.\n'Were I hard-favour'd, foul, or wrinkled-old,\nIll-nur", shape=(), dtype=string)


# Define model

In [116]:
class GRULM(tf.keras.Model):
    def __init__(self, vocab_size=256, embedding_dim=256, rnn_units=128):
        super().__init__(self)

        self.embeddding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(units=vocab_size, activation=tf.nn.log_softmax)

    def call(self, inputs, states=None, return_state=False, training=True):
        # Use training flag to forward prop when predicting characters
        # since we will use trained weights of the model
        
        x = inputs
        x = self.embeddding(x, training=training)

        if states is None:
            states = self.gru.get_initial_state(x)
        x, states = self.gru(x, initial_state=states, training=training)

        x = self.dense(x, training=training)
        if return_state:
            return x, states
        else:
            return x

In [117]:
# Length of the vocabulary in StringLookup Layer
vocab_size = 82

# The embedding dimension
embedding_dim = 256

# RNN layers
rnn_units = 512

model = GRULM(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units = rnn_units)

## Test prediction on untrained model

In [118]:
for input, target in dataset.take(1):
    x = input[0].numpy()

    yhat = model(tf.constant([x]), training=False)

    print(f'x: {x.shape}')
    print(f'y: {yhat.shape}')

x: (100,)
y: (1, 100, 82)


20 is the sequence length, 82 is the vocab size

In [119]:
sampled_indices = tf.math.argmax(yhat[0], axis=1)
print(sampled_indices)
print(text_from_ids(sampled_indices, vocab))

tf.Tensor(
[69 69 57  9  7 68 47 68 33  7 47 34 69 54 72 54 69 37 56 67 69 54 72 54
 48 57 57 28 25 48 34 69 68 34 69 57 54 47 70 68 45 47 47 26 47 13 13 29
 48  7 37 54 42 48 34  7 54 69 34  9  5 27 49 34  7 69 34 69 69 69 77 76
 76 68 23  5 47 23  2  1 37 74 76  7 65 29 68  2 68 55 23 47 34 69 57 57
 27 27 48 69], shape=(100,), dtype=int64)
tf.Tensor(b'ooc(&nUnG&UHo]r]oKbmo]r]VccB;VHonHoc]UpnSUU?U..CV&K]PVH&]oH(!AWH&oHooowvvn9!U9\tKtv&kCn\tna9UHoccAAVo', shape=(), dtype=string)


# Training

In [120]:
def compile_model(model):
    loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
    opt = tf.keras.optimizers.Adam(learning_rate=0.00125)
    model.compile(optimizer=opt, loss=loss)

    return model

In [121]:
model = compile_model(model)
history = model.fit(dataset, epochs=20)

Epoch 1/20


Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [122]:
for input, target in dataset.take(1):
    x = input[0].numpy()

    yhat = model(tf.constant([x]))

    print(f'x: {x.shape}')
    print(f'y: {yhat.shape}')

x: (100,)
y: (1, 100, 82)


## Log perplexity

In [123]:
# GRADED FUNCTION: log_perplexity
def log_perplexity(preds, target):
    """
        preds (tf.Tensor): (1, number of predicted chars, vocab size)
        target (tf.Tensor): (1, number of predicted chars)
    """
    PADDING_ID = 0
    
    vocab_size = preds.shape[-1]

    # reshape target to match preds shape
    target_1h = tf.one_hot(target, vocab_size)

    # this produces the log probabilities of P(w_i|w1, ... w_i-1)
    log_p = np.sum(preds * target_1h, axis= -1)

    # if target has the form [ 1, 2, 0 ]
    # non_pad will be [ 1, 1, 0 ]
    non_pad = 1.0 - np.equal(target, PADDING_ID)

    # remove log probabilities of padded tokens
    # this will turn all log probs of padded tokens to 0
    # for example if log_p = [ -1, -2, -3 ] and non_pad = [ 1, 1, 0 ]
    # then log_p = [ -1, -2, 0 ]
    log_p = log_p * non_pad 

    # finally take the mean 
    log_ppx = np.mean(log_p)
        
    return -log_ppx

eval_text = "\n".join(eval_lines)
eval_ids = line_to_tensor([eval_text], vocab)
input_ids, target_ids = split_input_target(tf.squeeze(eval_ids, axis=0))

preds, status = model(tf.expand_dims(input_ids, 0), training=False, states=None, return_state=True)

log_ppx = log_perplexity(preds, tf.expand_dims(target_ids, 0))
print(log_ppx)

1.209201705553149


# Generate texts

GRU will generate the same sentence so we need to use random sampling to make it less repetitive. The technique is called temperature random sampling.

In [124]:
def temperature_random_sampling(log_probs, temperature=1.0):
    u = tf.random.uniform(minval=1e-6, maxval=1.0 - 1e-6, shape=log_probs.shape)
    g = -tf.math.log(-tf.math.log(u))
    return tf.math.argmax(log_probs + g * temperature, axis=-1)

In [125]:
class GenerativeModel(tf.keras.Model):
    def __init__(self, model, vocab, temperature=1.0):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.vocab = vocab

    def generate_one_step(self, inputs, states= None):
        input_ids = line_to_tensor(inputs, self.vocab)

        yhat, states = self.model(input_ids, states, return_state=True)
        yhat = yhat[0, -1, :] 

        predicted_ids = temperature_random_sampling(yhat, self.temperature)
        next_char = text_from_ids(predicted_ids, vocab)
        # predicted_ids = tf.math.argmax(yhat[0], axis=1)
        
        return tf.expand_dims(next_char, 0), states

    def generate_n_chars(self, num_chars, prefix):
        states = None
        next_char = tf.constant([prefix])
        result = [next_char]
        for _ in range(num_chars):
            next_char, states = self.generate_one_step(next_char, states=states)
            result.append(next_char)

        return tf.strings.join(result)[0].numpy().decode('utf-8')

In [131]:
gen = GenerativeModel(model, vocab)

for i in range(10):
    print(gen.generate_n_chars(50, "T"))

Though they are lawful than I hear
Are many other m
TIN]
PRINCE HENRY	Well, here comes the bird.
PISTOL
The standing time of seven devil did.
HORTENSIO	[As
The walls!
An if thine ragges grow asher to try so

TIV	Thou art Alacum, father. Farewell; come to the 
The place,
And awe him about contriding water. You 
The gib my faults.
VALENTINE	I see unto thy greater
TNEmb, and Art thou gone to pass her roasted answer
To sayion that I say about thy working;
And if you'
There are all beard; and here she may,
Yet I will s
