# Practical 2: Text Generator with RNN

### Setup
Import tensorflow

In [1]:
import tensorflow as tf
import numpy as np
import os
import time

Download Dataset Shakespeare

In [2]:
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt


Load Data

In [3]:
# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')

Length of text: 1115394 characters


In [4]:
# Take a look at the first 250 characters in text
print(text[:250])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.



In [5]:
# The unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')

65 unique characters


### Text Processing

Vectorize Text <br>
Before training, you need to convert the string to a numeric representation. tf.keras.layers.StringLookup can convert each character to a numeric ID. The method is that the text will be broken down into tokens first.

In [6]:
example_texts = ['abcdefg', 'xyz']
chars = tf.strings.unicode_split(example_texts, input_encoding='UTF-8')
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

now create a tf.keras.layers.StringLookup layer

In [7]:
ids_from_chars = tf.keras.layers.StringLookup(
vocabulary=list(vocab), mask_token=None)

The command above converts the token to an ID

In [8]:
ids = ids_from_chars(chars)
ids

<tf.RaggedTensor [[40, 41, 42, 43, 44, 45, 46], [63, 64, 65]]>

Since the goal of this tutorial is to generate text, it is also important to invert this representation. For this you can use the code

In [9]:
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

This layer converts back the characters from the ID vector, and returns them as tf.RaggedTensor characters

In [10]:
chars = chars_from_ids(ids)
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

You can use tf.strings.reduce_join to rejoin characters into a string.

In [11]:
tf.strings.reduce_join(chars, axis=-1).numpy()

array([b'abcdefg', b'xyz'], dtype=object)

In [12]:
def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

### Predictions
Create Training Sets and Targets

In [13]:
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids

<tf.Tensor: shape=(1115394,), dtype=int64, numpy=array([19, 48, 57, ..., 46,  9,  1])>

In [14]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [15]:
for ids in ids_dataset.take(10):
    print(chars_from_ids(ids).numpy().decode('utf-8'))

F
i
r
s
t
 
C
i
t
i


In [16]:
seq_length = 100

The batch method allows you to easily convert these individual characters into a sequence of desired sizes.

In [17]:
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(1):
  print(chars_from_ids(seq))

tf.Tensor(
[b'F' b'i' b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':'
 b'\n' b'B' b'e' b'f' b'o' b'r' b'e' b' ' b'w' b'e' b' ' b'p' b'r' b'o'
 b'c' b'e' b'e' b'd' b' ' b'a' b'n' b'y' b' ' b'f' b'u' b'r' b't' b'h'
 b'e' b'r' b',' b' ' b'h' b'e' b'a' b'r' b' ' b'm' b'e' b' ' b's' b'p'
 b'e' b'a' b'k' b'.' b'\n' b'\n' b'A' b'l' b'l' b':' b'\n' b'S' b'p' b'e'
 b'a' b'k' b',' b' ' b's' b'p' b'e' b'a' b'k' b'.' b'\n' b'\n' b'F' b'i'
 b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':' b'\n' b'Y'
 b'o' b'u' b' '], shape=(101,), dtype=string)


it will be easier to see what is done if you concatenate the tokens back into a string

In [18]:
for seq in sequences.take(5):
    print(text_from_ids(seq).numpy())

b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '
b'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'
b"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"
b"ll him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be d"
b'one: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citi'


For training, you need a dataset of pairs (input, labels). Where input and label are sequences. At each time step, the input is the current character and the label is the next character. Here's a function that takes a sequence as input, duplicates it, and shifts it to align the input and labels for each time step

In [19]:
def split_input_target(sequence):
  input_text = sequence[:-1]
  target_text = sequence[1:]
  return input_text, target_text

In [20]:
split_input_target(list("Tensorflow"))

(['T', 'e', 'n', 's', 'o', 'r', 'f', 'l', 'o'],
 ['e', 'n', 's', 'o', 'r', 'f', 'l', 'o', 'w'])

In [21]:
dataset = sequences.map(split_input_target)

In [22]:
for input_example, target_example in dataset.take(1):
  print("Input :", text_from_ids(input_example).numpy())
  print("Target:", text_from_ids(target_example).numpy())

Input : b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'
Target: b'irst Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '


Create Training Batches

You use tf.data to divide text into manageable sequences. But before feeding this data into the model, you need to shuffle the data and pack it into batches.

In [23]:
# Batch size
BATCH_SIZE = 64

# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000

dataset = (
    dataset
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>

### Create Model

In [24]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

In [25]:
class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(rnn_units,
                                   return_sequences=True,
                                   return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.gru.get_initial_state(x)
    x, states = self.gru(x, initial_state=states, training=training)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

In [26]:
model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

### Model Test

In [27]:
for input_example_batch, target_example_batch in dataset.take(1):
    example_batch_predictions = model(input_example_batch)
    print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

(64, 100, 66) # (batch_size, sequence_length, vocab_size)


In [28]:
model.summary()

Model: "my_model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  16896     
                                                                 
 gru (GRU)                   multiple                  3938304   
                                                                 
 dense (Dense)               multiple                  67650     
                                                                 
Total params: 4022850 (15.35 MB)
Trainable params: 4022850 (15.35 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


To get the actual predictions from the model, you need to sample from the output distribution, to get the actual character index. This distribution is determined by the logit on the character vocabulary. Note: It is important to sample from this distribution because taking argmax from it can easily get the model stuck in an infinote loop. Try it for the first example in the batch

In [29]:
sampled_indices = tf.random.categorical(example_batch_predictions[0],num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

This gives us, at each time step, a prediction of the next character index:

In [30]:
sampled_indices

array([53, 58, 17, 50,  0, 36,  0,  2, 57, 32,  3,  5,  0, 24, 49, 21, 11,
       24, 17, 27, 12, 25, 37,  8, 61, 34, 27, 20, 29, 32, 46, 29, 39,  9,
        3, 33, 10, 48, 56, 40, 53, 10, 50, 34, 11, 12, 23, 41, 43,  1, 49,
       48,  7, 36, 48, 15, 63, 52, 26, 51, 55, 39, 56, 39, 44, 11, 40,  0,
       12, 60, 30, 28, 24, 22, 49, 17, 17, 65, 38,  9, 63, 22, 54, 62, 51,
       36, 31,  1, 21, 26, 56,  1,  6, 14, 65, 22, 30, 42, 27, 47])

Decode the following code to see the text predicted by this untrained model:

In [31]:
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())

Input:
 b"The Earl of Wiltshire hath the realm in farm.\n\nLORD WILLOUGHBY:\nThe king's grown bankrupt, like a br"

Next Char Predictions:
 b"nsDk[UNK]W[UNK] rS!&[UNK]KjH:KDN;LX-vUNGPSgPZ.!T3iqan3kU:;Jbd\nji,WiBxmMlpZqZe:a[UNK];uQOKIjDDzY.xIowlWR\nHMq\n'AzIQcNh"


### Train Models
Additional optimizer and loss function

The standard tf.keras.losses.sparse_categorical_crossentropy loss function works in this case because it is applied across the final dimension of the prediction. Since your model returns logits, you need to set the from_logits flag.

In [32]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [33]:
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss:        ", example_batch_mean_loss)

Prediction shape:  (64, 100, 66)  # (batch_size, sequence_length, vocab_size)
Mean loss:         tf.Tensor(4.1891813, shape=(), dtype=float32)


The newly initialized model should not be too confident in itself, all output logs should have the same magnitude. To confirm this, you can check that the exponential of the average loss should be approximately equal to the vocabulary size. A much higher loss means the model is confident in the wrong answer, and has bad initialization:

In [34]:
tf.exp(example_batch_mean_loss).numpy()

65.968765

Konfigurasikan prosedur pelatihan menggunakan metode tf.keras.Model.compile. Gunakan tf.keras.optimizers.Adam dengan argumen default dan fungsi loss.

In [35]:
model.compile(optimizer='adam', loss=loss)

Configure Checkpoints

Use tf.keras.callbacks.ModelCheckpoint to ensure that checkpoints are saved during training:

In [36]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

Carry out the Training Process

In [37]:
EPOCHS = 10

In [38]:
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


### Generate Text

In [39]:
class OneStep(tf.keras.Model):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [40]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)

In [41]:
start = time.time()
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)

ROMEO:
Not rooting to the bloody, he just more fire.

DUKE OF YORK:
She, who holds me sour me from that word with her?
My son Pargaret, which divinest a widow,
Exceeding that hath nothing of good stopp'd with the
discourteous, live you, worsensio, for he should have more beart:
My daughter, and Menenius, whose hands and me
Have twenting to be consul: but one this bringdal trud
Made jest his wife's night.

DUKE VINCENTIO:
Your manny, my brother?

COMINIUS:
But, let him slain,
Where are your ladys and blood with a gracital buttobles
As it as my sister unake!
Go take them again: I shall well exame to such is fool:
Unse of goodsming smoths so tidow 'gainst thy profess
On your hearts do and with his son-socreach. Fie, I would no.

GLOUCESTER:
I cannot jeaud waste of one.

Provost:
Swift, let him?

PETRUCHIO:
What, nor none, that, I know not. My sovereign;
Metting the Catis and honour Sich
Shasing our actions and wims within the liss,
Like templessed with a dispatch, or life
Some league and 

In [42]:
start = time.time()
states = None
next_char = tf.constant(['ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:'])
result = [next_char]

for n in range(1000):
  next_char, states = one_step_model.generate_one_step(next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result, '\n\n' + '_'*80)
print('\nRun time:', end - start)

tf.Tensor(
[b"ROMEO:\nHow now!\nThe Duke of Bolingbroke, for if you set my son,\nI will true that shall be arms'd with a togh\nOf gracious for a line, lady,\nRight two God will obe thee, Ere your last\nUnder your coward, were indubed, trust.\n\nPROSPERO:\nBroke, thou art dety?\nAnd he is noble grumious mother?'\n\nFirst Citizen:\nHow now, my lord; fear that, in goie, horse; confride thee.\n\nPETRUCHIO:\nWhy, then, drange me, my lord: scape you, pray!\n\nFirst Lady:\nIf you, Joss' is for a fetter\nHad indeed, but indeed, but for what's the master;\nFor Allibe in our mother, she is in the least,\nThat I am a gine as well as you!\n\nCAMILLO:\nGo to the warleys King Histings, one father,\nA kinder matter, he is before thyself;\nFor chreits I heard thou warr'd the first.\n\nThird Servingman:\nDo it well.\n\nANGELO:\nThere? if you may well! if you nothing.\n\nCAMILLO:\nBy sweeting, this a gloot,\nYet I comal of his. Do him my sorrow: if all she doth mother approach:\nI willing at the place, 

### Export Generator Model

In [43]:
tf.saved_model.save(one_step_model, 'one_step')
one_step_reloaded = tf.saved_model.load('one_step')



In [44]:
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]

for n in range(100):
  next_char, states = one_step_reloaded.generate_one_step(next_char, states=states)
  result.append(next_char)

print(tf.strings.join(result)[0].numpy().decode("utf-8"))

ROMEO:
Ay, what I cannot proceed the
offences? why, most warrio!
Is it softended-mouth, Treachedine I'ld
w


# TUGAS

Gunakan tf.GradientTape untuk men track nilai gradient. Anda dapat mempelajari lebih lanjut tentang pendekatan ini dengan membaca eager execution guide.

Prosedurnya adalah :
1. Jalankan Model dan hitung loss dengan tf.GradientTape.
2. Hitung update dan terapkan pada model dengan optimizer

In [45]:
class CustomTraining(MyModel):
  @tf.function
  def train_step(self, inputs):
    inputs, labels = inputs
    with tf.GradientTape() as tape:
      predictions = self(inputs, training=True)
      loss = self.loss(labels, predictions)
      grads = tape.gradient(loss, model.trainable_variables)
      self.optimizer.apply_gradients(zip(grads, model.trainable_variables))

      return {'loss': loss}

In [46]:
model = CustomTraining(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [47]:
model.compile(optimizer = tf.keras.optimizers.Adam(), loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))

In [48]:
model.fit(dataset, epochs=1)



<keras.src.callbacks.History at 0x7ee8900dfac0>

In [49]:
EPOCHS = 10

mean = tf.metrics.Mean()

for epoch in range(EPOCHS):
  start = time.time()

  mean.reset_states()
  for (batch_n, (inp, target)) in enumerate(dataset):
    logs = model.train_step([inp, target])
    mean.update_state(logs['loss'])

    if batch_n % 50 == 0:
      template = f"Epoch {epoch+1} Batch {batch_n} Loss {logs['loss']:.4f}"
      print(template)

  # saving (checkpoint) the model every 5 epochs
  if (epoch + 1) % 5 == 0:
    model.save_weights(checkpoint_prefix.format(epoch=epoch))

  print()
  print(f'Epoch {epoch+1} Loss: {mean.result().numpy():.4f}')
  print(f'Time taken for 1 epoch {time.time() - start:.2f} sec')
  print("_"*80)

model.save_weights(checkpoint_prefix.format(epoch=epoch))

Epoch 1 Batch 0 Loss 2.1800
Epoch 1 Batch 50 Loss 2.0702
Epoch 1 Batch 100 Loss 1.9699
Epoch 1 Batch 150 Loss 1.8791

Epoch 1 Loss: 1.9944
Time taken for 1 epoch 16.10 sec
________________________________________________________________________________
Epoch 2 Batch 0 Loss 1.8015
Epoch 2 Batch 50 Loss 1.7387
Epoch 2 Batch 100 Loss 1.6774
Epoch 2 Batch 150 Loss 1.6475

Epoch 2 Loss: 1.7140
Time taken for 1 epoch 13.46 sec
________________________________________________________________________________
Epoch 3 Batch 0 Loss 1.5875
Epoch 3 Batch 50 Loss 1.5583
Epoch 3 Batch 100 Loss 1.5670
Epoch 3 Batch 150 Loss 1.5454

Epoch 3 Loss: 1.5538
Time taken for 1 epoch 20.47 sec
________________________________________________________________________________
Epoch 4 Batch 0 Loss 1.4734
Epoch 4 Batch 50 Loss 1.4529
Epoch 4 Batch 100 Loss 1.4420
Epoch 4 Batch 150 Loss 1.4389

Epoch 4 Loss: 1.4535
Time taken for 1 epoch 11.93 sec
_____________________________________________________________________

# Lab Assignment

Explain the code above and state the difference with practicum 2?

- The difference between assignment code and practicum 2 lies in the training approach used. In practicum 2, a simple training approach using **'model.fit'** was used. However, in task codes, a more specialized and complex training approach is implemented. In this particular approach, a **'train_step'** method is defined in the model instance, which controls training at the batch level. This method explicitly calculates loss values, gradients, and updates model weights. Additionally, the **'tf.metrics.Mean'** object is used to calculate the average loss during training. So it can be concluded that this approach provides a lot of control and high flexibility in model training