In [None]:
import os
import time
import warnings
import numpy as np
import tensorflow as tf

warnings.filterwarnings("ignore")
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

In [None]:
path_to_file = tf.keras.utils.get_file(
    "shakespeare.txt",
    "https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt",
)

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt
[1m1115394/1115394[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [None]:
text = open(path_to_file, "rb").read().decode(encoding="utf-8")
print(f"Length of text: {len(text)} characters")
print(text[:250])
vocab = sorted(set(text))
print(f"{len(vocab)} unique characters")

Length of text: 1115394 characters
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

65 unique characters


In [None]:
example_texts = ["abcdefg", "xyz"]

# TODO 1
chars = tf.strings.unicode_split(example_texts, input_encoding="UTF-8")
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

In [None]:
ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None
)

In [None]:
ids = ids_from_chars(chars)
ids

<tf.RaggedTensor [[40, 41, 42, 43, 44, 45, 46], [63, 64, 65]]>

In [None]:
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None
)

In [None]:
chars = chars_from_ids(ids)
chars

<tf.RaggedTensor [[b'a', b'b', b'c', b'd', b'e', b'f', b'g'], [b'x', b'y', b'z']]>

In [None]:
tf.strings.reduce_join(chars, axis=-1).numpy()

array([b'abcdefg', b'xyz'], dtype=object)

In [None]:
def text_from_ids(ids):
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

In [None]:
# TODO 2
all_ids = ids_from_chars(tf.strings.unicode_split(text, "UTF-8"))
all_ids

<tf.Tensor: shape=(1115394,), dtype=int64, numpy=array([19, 48, 57, ..., 46,  9,  1])>

In [None]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

In [None]:
for ids in ids_dataset.take(10):
  print(chars_from_ids(ids).numpy().decode("utf-8"))

F
i
r
s
t
 
C
i
t
i


In [None]:
seq_length=100
examples_per_epoch=len(text)//(seq_length+1)

In [None]:
#batch method helps convert individual characetrs into sequences of desired size
sequences=ids_dataset.batch(seq_length+1, drop_remainder=True)

for seq in sequences.take(5):
  print(chars_from_ids(seq))

tf.Tensor(
[b'F' b'i' b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':'
 b'\n' b'B' b'e' b'f' b'o' b'r' b'e' b' ' b'w' b'e' b' ' b'p' b'r' b'o'
 b'c' b'e' b'e' b'd' b' ' b'a' b'n' b'y' b' ' b'f' b'u' b'r' b't' b'h'
 b'e' b'r' b',' b' ' b'h' b'e' b'a' b'r' b' ' b'm' b'e' b' ' b's' b'p'
 b'e' b'a' b'k' b'.' b'\n' b'\n' b'A' b'l' b'l' b':' b'\n' b'S' b'p' b'e'
 b'a' b'k' b',' b' ' b's' b'p' b'e' b'a' b'k' b'.' b'\n' b'\n' b'F' b'i'
 b'r' b's' b't' b' ' b'C' b'i' b't' b'i' b'z' b'e' b'n' b':' b'\n' b'Y'
 b'o' b'u' b' '], shape=(101,), dtype=string)
tf.Tensor(
[b'a' b'r' b'e' b' ' b'a' b'l' b'l' b' ' b'r' b'e' b's' b'o' b'l' b'v'
 b'e' b'd' b' ' b'r' b'a' b't' b'h' b'e' b'r' b' ' b't' b'o' b' ' b'd'
 b'i' b'e' b' ' b't' b'h' b'a' b'n' b' ' b't' b'o' b' ' b'f' b'a' b'm'
 b'i' b's' b'h' b'?' b'\n' b'\n' b'A' b'l' b'l' b':' b'\n' b'R' b'e' b's'
 b'o' b'l' b'v' b'e' b'd' b'.' b' ' b'r' b'e' b's' b'o' b'l' b'v' b'e'
 b'd' b'.' b'\n' b'\n' b'F' b'i' b'r' b's' b't' b' ' b'C' b'i' b't' b'

In [None]:
#join tokens back into strings
for seq in sequences.take(5):
  print(text_from_ids(seq).numpy())

b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '
b'are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you k'
b"now Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us ki"
b"ll him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be d"
b'one: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor citi'


In [None]:
def split_input_target(sequence):
  input_text=sequence[:-1]
  target_text=sequence[1:]
  return input_text,target_text

In [None]:
split_input_target(list("Hello World"))

(['H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l'],
 ['e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'])

In [None]:
dataset=sequences.map(split_input_target)

In [None]:
for input_example, target_example in dataset.take(1):
  print("Input :", text_from_ids(input_example).numpy())
  print("Target :",text_from_ids(target_example).numpy())

Input : b'First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou'
Target : b'irst Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou '


In [None]:
#shuffle the data and pack it into batches
BATCH_SIZE=64

BUFFER_SIZE=10000

dataset=(
    dataset.shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE,drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE)
)
dataset

<_PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int64, name=None), TensorSpec(shape=(64, 100), dtype=tf.int64, name=None))>

In [None]:
#length of vocab in chars
vocab_size=len(vocab)

#the embedding dimension
embedding_dim=256

#the number of rnn units
rnn_units=1024

In [None]:
class MyModel(tf.keras.Model):
  def __init__(self, vocab_size,embedding_dim,rnn_units):
    # The issue was passing 'self' as an argument to super().__init__()
    # super().__init__() should be called without any positional arguments
    super().__init__()

    self.embedding =tf.keras.layers.Embedding(vocab_size,embedding_dim)

    self.gru=tf.keras.layers.GRU(
        rnn_units,return_sequences=True, return_state=True
    )
    self.dense=tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x=self.embedding(inputs,training=training)

    # Get the batch size from the input shape
    batch_size = tf.shape(inputs)[0]

    # Provide batch size when initializing the state
    if states is None:
      #Fix: Remove the 'inputs' argument and specify batch size and dimensions for initial state
      # The shape should be (batch_size, rnn_units)
      states = tf.zeros([batch_size, self.gru.units])
    x,states=self.gru(x,initial_state=states,training=training)
    x=self.dense(x,training=training)
    if return_state:
      return x,states
    else:
      return x

In [None]:
model=MyModel(
    vocab_size=len(ids_from_chars.get_vocabulary()),
    embedding_dim=embedding_dim,
    rnn_units=rnn_units
)

In [None]:
#check shape of output
for input_example_batch, target_example_batch in dataset.take(1):
  example_batch_predictions=model(input_example_batch)
  print(
      example_batch_predictions.shape,
      "(batch_size, sequence_length, vocab_size)",

  )

(64, 100, 66) (batch_size, sequence_length, vocab_size)


In [None]:
model.summary()

In [None]:
sampled_indices=tf.random.categorical(example_batch_predictions[0],num_samples=1
)
sampled_indices=tf.squeeze(sampled_indices,axis=-1).numpy()

In [None]:
sampled_indices

array([14, 42, 32, 21, 64, 11, 60,  5, 45, 40, 55, 55, 21, 13, 31, 50, 10,
        1,  6,  2, 20,  3, 39, 44, 31, 16, 44, 16, 36, 54, 45,  4, 14,  1,
        1, 53, 45, 21, 53, 28,  1, 31, 33,  4,  1, 12, 65,  9, 25, 16, 41,
       56, 12, 33, 15, 33, 56, 59, 46, 61,  1, 64, 25, 36, 20, 62,  1, 57,
       46, 39, 22, 39, 48,  3,  5, 36, 19, 62, 35, 23, 30, 31, 62,  6, 19,
        7, 13, 56, 10, 61, 15, 27,  8, 22, 30, 44, 21, 15, 55, 16])

In [None]:
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n",text_from_ids(sampled_indices).numpy())

Input:
 b' give your spirits comfort!\nBy and by.\nI hope it is some pardon or reprieve\nFor the most gentle Clau'

Next Char Predictions:
 b"AcSHy:u&fappH?Rk3\n' G!ZeRCeCWof$A\n\nnfHnO\nRT$\n;z.LCbq;TBTqtgv\nyLWGw\nrgZIZi!&WFwVJQRw'F,?q3vBN-IQeHBpC"


In [None]:
#set the from_logits flag
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True)

In [None]:
example_batch_mean_loss=loss(target_example_batch,example_batch_predictions)
print("Prediction shape:",
      example_batch_predictions.shape,
      "# (batch_size,sequence_length,vocab_size)",
)
print("Mean Loss:",example_batch_mean_loss)

Prediction shape: (64, 100, 66) # (batch_size,sequence_length,vocab_size)
Mean Loss: tf.Tensor(4.190802, shape=(), dtype=float32)


In [None]:
tf.exp(example_batch_mean_loss).numpy()

np.float32(66.07577)

In [None]:
model.compile(optimizer="adam",loss=loss)

In [None]:
#directory where the checkpoints will be saved
checkpoint_dir="./training_checkpoints"
#name of the checkpoint files
checkpoint_prefix=os.path.join(checkpoint_dir,"ckpt_epoch_{epoch}")
checkpoint_prefix+=".weights.h5"
checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True
)

In [None]:
EPOCHS=5

In [None]:
history=model.fit(dataset,epochs=EPOCHS,callbacks=[checkpoint_callback])

Epoch 1/5
[1m172/172[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m989s[0m 6s/step - loss: 1.3913
Epoch 2/5
[1m172/172[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m979s[0m 6s/step - loss: 1.3244
Epoch 3/5
[1m172/172[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1013s[0m 6s/step - loss: 1.2781
Epoch 4/5
[1m103/172[0m [32m━━━━━━━━━━━[0m[37m━━━━━━━━━[0m [1m6:53[0m 6s/step - loss: 1.2280

In [None]:
class OneStep(tf.keras.Model):
    def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
        super().__init__()
        self.temperature = temperature
        self.model = model
        self.chars_from_ids = chars_from_ids
        self.ids_from_chars = ids_from_chars

        # Create a mask to prevent "[UNK]" from being generated.
        skip_ids = self.ids_from_chars(["[UNK]"])[:, None]
        sparse_mask = tf.SparseTensor(
            # Put a -inf at each bad index.
            values=[-float("inf")] * len(skip_ids),
            indices=skip_ids,
            # Match the shape to the vocabulary
            dense_shape=[len(ids_from_chars.get_vocabulary())],
        )
        self.prediction_mask = tf.sparse.to_dense(sparse_mask)

    @tf.function
    def generate_one_step(self, inputs, states=None):
        # Convert strings to token IDs.
        input_chars = tf.strings.unicode_split(inputs, "UTF-8")
        input_ids = self.ids_from_chars(input_chars).to_tensor()

        # Run the model.
        # predicted_logits.shape is [batch, char, next_char_logits]
        predicted_logits, states = self.model(
            inputs=input_ids, states=states, return_state=True
        )
        # Only use the last prediction.
        predicted_logits = predicted_logits[:, -1, :]
        predicted_logits = predicted_logits / self.temperature
        # Apply the prediction mask: prevent "[UNK]" from being generated.
        predicted_logits = predicted_logits + self.prediction_mask

        # Sample the output logits to generate token IDs.
        predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
        predicted_ids = tf.squeeze(predicted_ids, axis=-1)

        # Convert from token ids to characters
        predicted_chars = self.chars_from_ids(predicted_ids)

        # Return the characters and model state.
        return predicted_chars, states

In [None]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)