<a href="https://colab.research.google.com/github/asuka4649/cv/blob/main/text_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Text generation with an RNN

In [None]:
import tensorflow as tf

import numpy as np
import os
import time

In [None]:
path_to_file = '/SteveJobs_StanfordSpeech.txt'

In [None]:
# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='windows-1252')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')

Length of text: 11934 characters


In [None]:
# Take a look at the first 250 characters in text
print(text[:250])

Thank you. I’m honored to be with you today for your commencement from one of the finest universities in the world. Truth be told, I never graduated from college, and this is the closest I’ve ever gotten to a college graduation today. I want to tell 


In [None]:
# The unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

68 unique characters


In [None]:
example_texts = ['abcdefg', 'xyz']

chars = tf.strings.unicode_split(example_texts, input_encoding='UTF-8')
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

In [None]:
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None)

In [None]:
ids = ids_from_chars(chars)
ids

<tf.RaggedTensor [[40, 41, 42, 43, 44, 45, 46], [63, 64, 65]]>

In [None]:
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

In [None]:
chars = chars_from_ids(ids)
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

In [None]:
tf.strings.reduce_join(chars, axis=-1).numpy()

array([b'abcdefg', b'xyz'], dtype=object)

In [None]:
def text_from_ids(ids):
  return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

In [None]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

<tf.Tensor: shape=(11934,), dtype=int64, numpy=array([36, 47, 40, ...,  1,  2,  1])>

In [None]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [None]:
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

T
h
a
n
k
 
y
o
u
.


In [None]:
seq_length = 100


In [None]:
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(1):
  print(chars_from_ids(seq))

tf.Tensor(
[b'T' b'h' b'a' b'n' b'k' b' ' b'y' b'o' b'u' b'.' b' ' b'I'
 b'\xe2\x80\x99' b'm' b' ' b'h' b'o' b'n' b'o' b'r' b'e' b'd' b' ' b't'
 b'o' b' ' b'b' b'e' b' ' b'w' b'i' b't' b'h' b' ' b'y' b'o' b'u' b' '
 b't' b'o' b'd' b'a' b'y' b' ' b'f' b'o' b'r' b' ' b'y' b'o' b'u' b'r'
 b' ' b'c' b'o' b'm' b'm' b'e' b'n' b'c' b'e' b'm' b'e' b'n' b't' b' '
 b'f' b'r' b'o' b'm' b' ' b'o' b'n' b'e' b' ' b'o' b'f' b' ' b't' b'h'
 b'e' b' ' b'f' b'i' b'n' b'e' b's' b't' b' ' b'u' b'n' b'i' b'v' b'e'
 b'r' b's' b'i' b't' b'i' b'e' b's'], shape=(101,), dtype=string)


In [None]:
for seq in sequences.take(5):
  print(text_from_ids(seq).numpy())

b'Thank you. I\xe2\x80\x99m honored to be with you today for your commencement from one of the finest universities'
b' in the world. Truth be told, I never graduated from college, and this is the closest I\xe2\x80\x99ve ever gotte'
b'n to a college graduation today. I want to tell you three stories from my life. That\xe2\x80\x99s it. No big dea'
b'l. Just three stories. The first story is about connecting the dots. I dropped out of Reed College af'
b'ter the first six months, but then stayed around as a drop-in for another 18 months or so before I re'


In [None]:
def split_input_target(sequence):
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [None]:
split_input_target(list("Tensorflow"))

(['T', 'e', 'n', 's', 'o', 'r', 'f', 'l', 'o'],
 ['e', 'n', 's', 'o', 'r', 'f', 'l', 'o', 'w'])

In [None]:
dataset = sequences.map(split_input_target)

In [None]:
for input_example, target_example in dataset.take(1):
    print("Input :", text_from_ids(input_example).numpy())
    print("Target:", text_from_ids(target_example).numpy())

Input : b'Thank you. I\xe2\x80\x99m honored to be with you today for your commencement from one of the finest universitie'
Target: b'hank you. I\xe2\x80\x99m honored to be with you today for your commencement from one of the finest universities'


In [None]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>

In [None]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

In [None]:
class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.gru.get_initial_state(x)
    x, states = self.gru(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

In [None]:
model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [None]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 69) # (batch_size, sequence_length, vocab_size)


In [None]:
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  17664     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  70725     
                                                                 
Total params: 4026693 (15.36 MB)
Trainable params: 4026693 (15.36 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

In [None]:
sampled_indices

array([39, 16, 68, 15, 11, 54, 30, 15, 58,  3, 53, 32, 38, 26, 42, 36, 37,
       49, 52,  5, 67,  6, 37, 48, 51, 41,  7, 17, 42, 56, 18, 33, 42, 50,
       46, 49, 68, 53, 63, 66, 58, 50, 56, 41, 26, 53, 24,  5,  9, 58, 46,
       45, 64, 40, 59, 42,  8, 63, 39, 36, 23, 11, 16, 30, 50, 13,  6, 47,
       21, 51, 34,  8, 16, 20, 44, 16, 52, 38, 43, 16, 60, 68, 22, 47, 28,
       56, 65, 44,  1, 26,  5,  3, 18, 11, 12, 31, 40, 15, 22, 55])

In [None]:
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())

Input:
 b' was beautifully hand calligraphed. Because I had dropped out and didn\xe2\x80\x99t have to take the normal cla'

Next Char Predictions:
 b'Y9\xe2\x80\x9d84oM8s nOWIcTVjm-\xe2\x80\x9c.Vilb0:cq?Pckgj\xe2\x80\x9dnx\xe2\x80\x99skqbInG-2sgfyatc1xYTE49Mk6.hClR19Be9mWd9u\xe2\x80\x9dDhKqze\nI- ?45Na8Dp'


In [None]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [None]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

Prediction shape:  (64, 100, 69)  # (batch_size, sequence_length, vocab_size)
Mean loss:         tf.Tensor(4.2340627, shape=(), dtype=float32)


In [None]:
tf.exp(example_batch_mean_loss).numpy()

68.99697

In [None]:
model.compile(optimizer='adam', loss=loss)

In [None]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [None]:
EPOCHS = 100

In [None]:
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

In [None]:
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [None]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [None]:
start = time.time()
states = None
next_char = tf.constant(['Steve:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('windows-1252'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

Steve:stir theveanmustogy I wofiso befe. Ato fom orourt one. I han jured ce whe ted tiney I ate titheap0rhat. 3utin. Ied onors thated tofele ut theo tet torer iyithar at as tha ceryrave koale pap bat ineogingnter eat byat roo becooul inserenpli ancyou. Whar fhe on hoveâ€™ts rom. Sreong eapl sid leas at if afail eveo Therich ang nife foullif an four tutimu de lin thy n ousy, ly lou fato ded the tha the have war socer ho and wasawaw wuth I fopo as wnes bevy hocrathe outt onecinatn yuaf ore, wa fe. Whet ent fessn y ot andrpagn prepla tiwkee in what om0 doonge, I I tour anes luod et thar terne thok the kl carot thed sf ill. Iteren wad cacu toligat dos tâ€™t aved yomwat whecto, fow it huy te wins le thaâ€™nd the Daat ontite d istut an, Yost alligeâ€™net to blot ast Neve thaathel iticge touly at pollinner sre Mof st curene ave hant autid0e hy amy ig sorype beise fured stop athe, I depasbe ye. 15 Nhak ceeletayto ngat wasry mur felok thelly anog be wiln lit coutid me befely bet ow erine bithy 

In [None]:
start = time.time()
states = None
next_char = tf.constant(['Steve:', 'Steve:', 'Steve:', 'Steve:', 'Steve:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result, '\n\n' + '_'*80)
print('\nRun time:', end - start)

tf.Tensor(
[b'Steve:Mlacicveat. It as. Whe fom, ththe..,. DSThe cout ifer moclaru an, creat. I inaf whour ant tithe, Burp dout I tore. dtici0 cat tour thor onishint mo pe dot open co go thind cad, I wime I wase cong, inne com of on anve nte se dan sot snd ougn ayoud thou low inppomy I lourse loullito fice tuthe ek fok cind it, lirery fouss ned tof irste bey hrte in he cole lhathing pisted atingr alvathyolove pply at futint en theasmes edolry, Shakand., I bhas fopom tos I jut r, at turdy soungo he men haen hhens oid lyan cand an thid for Siovyo beled ciat dol counsivisthat ines turezi\xe2\x80\x99st ry comewase fof inpicg yowher 1vdac oupy sor to gnid Whel bo wan ce dom on an the yre lost of monlend y ever ithu wher icle andacl wicr als aw ou cithel fiwant aneal wast I weent op het pup gam if to f.\xe2\x80\x99steat I thart at eat ifn torlreang so on wary af my anoud Ien soweid ya. I cou g that eapkef bed I domex toses wat in as at tour hoe Aofant thing daad I mout. Ned con thad bpat oude

In [None]:
tf.saved_model.save(one_step_model, 'one_step')
one_step_reloaded = tf.saved_model.load('one_step')



In [None]:
states = None
next_char = tf.constant(['Steve:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_reloaded.generate_one_step(next_char, states=states)
  result.append(next_char)

print(tf.strings.join(result)[0].numpy().decode("utf-8"))

Steve:of barea moan doo warl’tur, aringe go lergalplivist wawat yt hat readdermat da put hogerhD. Deant avothe e woot gyet oo mo, pe. Sout Ma ferethe Senrllllderg thy af tha thel yuls, wad on th ou’st wow st ed acont thed round vor and be thalk Wery tapsy l’l Ba avet whe sowr aned ath dele datha’t casr, she the un wouthas tou tha gho madpte to d aveathe tuly I de tove me werit wryer ons call sowhe nereiEg st me he tors foutht ouse, Slom eacloned the bemy I doxtinery frtew inis wren ting arihakd oppere biint hae. zulnd, I canorirert ufd at cate th fount to pned po wo mise wary, dacige eveengastrey 19MvWce gells. It sis math myean dn nor ind nove. ging overt, Catidut yopfer I th cof lo stang Mo. I epnothe medeeve Benthut nathid coll ou cisty hew wht ionime ined piwad. Irrsit of the, I wha be as alirne theand th in list oug m ou warcy mopu belkou uty ourenr ghy at erond could have eatyer fighete d oroun, thas mawiee blunther foes yomed the srrathets dheint of me lavet yowe harim be, morst

In [None]:
class CustomTraining(MyModel):
  @tf.function
  def train_step(self, inputs):
      inputs, labels = inputs
      with tf.GradientTape() as tape:
          predictions = self(inputs, training=True)
          loss = self.loss(labels, predictions)
      grads = tape.gradient(loss, model.trainable_variables)
      self.optimizer.apply_gradients(zip(grads, model.trainable_variables))

      return {'loss': loss}

In [None]:
model = CustomTraining(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [None]:
model.compile(optimizer = tf.keras.optimizers.Adam(),
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))

In [None]:
model.fit(dataset, epochs=1)

In [None]:
EPOCHS = 10

mean = tf.metrics.Mean()

for epoch in range(EPOCHS):
    start = time.time()

    mean.reset_states()
    for (batch_n, (inp, target)) in enumerate(dataset):
        logs = model.train_step([inp, target])
        mean.update_state(logs['loss'])

        if batch_n % 50 == 0:
            template = f"Epoch {epoch+1} Batch {batch_n} Loss {logs['loss']:.4f}"
            print(template)

    # saving (checkpoint) the model every 5 epochs
    if (epoch + 1) % 5 == 0:
        model.save_weights(checkpoint_prefix.format(epoch=epoch))

    print()
    print(f'Epoch {epoch+1} Loss: {mean.result().numpy():.4f}')
    print(f'Time taken for 1 epoch {time.time() - start:.2f} sec')
    print("_"*80)

model.save_weights(checkpoint_prefix.format(epoch=epoch))