**POETRY PREDICTOR**


We are using encoder-decoder architecture. Before writing the functions for either we need to preprocess the dataset.

**1. IMPORT DATASET**

Import the dataset from a URL using keras api from tensorflow library.

In [None]:
import os
import warnings
warnings.filterwarnings("ignore")
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

**HOW TO CHOOSE LOG LEVEL:**

**0:** Show all logs, including DEBUG messages.

**1:** Filter out DEBUG messages, showing INFO, WARNING, and ERROR messages.

**2:** Filter out DEBUG and INFO messages, showing only WARNING and ERROR messages.

**3:** Filter out DEBUG, INFO, and WARNING messages, showing only ERROR messages.

In [None]:
import tensorflow as tf
import numpy as np

dataset_file= tf.keras.utils.get_file("shakespeare.txt",
    "https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt")

**2. READ THE FILE**

Read the file in binary mode and then decode it to UTF encoded string (Now we have 1 giant string). Now, calculate length of string in terms of characters.


In [None]:
extracted_string= open(dataset_file, 'rb').read().decode('UTF-8')
print(extracted_string)

**3. UNIQUE CHARACTERS**

Find out how many unique characters are in the document and store in a variable ‘unique’. This is our **vocabulary**.

In [None]:
unique= sorted(set(extracted_string))
print(unique)

**4. GIVE IDS TO CHARACTERS**

We have no use for simple characters, so we need to assign some identity to these characters.

In [None]:
ids_to_chars= tf.keras.layers.StringLookup(vocabulary= list(unique), mask_token=None)

Since this is a text-generator, we need to be able to convert these Ids back to human-readable words. So we have to write a code to convert from IDs to characters.

In [None]:
chars_from_ids= tf.keras.layers.StringLookup(vocabulary= ids_to_chars.get_vocabulary(), invert= True, mask_token= None)

Now perform these steps on the actual extracted data and not just unique characters i.e: **Give ids to all characters in dataset**.

In [None]:
all_ids_to_data= ids_to_chars(tf.strings.unicode_split(extracted_string, "UTF-8"))

**Create a stream of these character indices** obtained from actual extracted data.

In [None]:
char_indices_stream=tf.data.Dataset.from_tensor_slices(all_ids_to_data)

Now perform the inverse.

In [None]:
def readable_text_from_indices(ids):
   return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

Using this inversion method we can obtain characters from the ids. However, we need to regenerate our String (sequence) from these characters since our encoder will take an input sequence. To do that we use batch method.

In [None]:
#suppose we have sequence length as 100
seq_length= 100

#from char indice stream, generate a sequence
sequences= char_indices_stream.batch(seq_length+1, drop_remainder= True) #take last as empty
for seq in sequences.take(1):
   print(chars_from_ids(seq))

The above sequence is just a sequence of characters obtained from indices. Now, we convert these characters into readable text.

In [None]:
for seq in sequences.take(1):
   print(readable_text_from_indices(seq).numpy())

**5. LABEL AND TARGET**

In a predictor, we need a label which is current character and target which is next characters. To do this input is all elements in sequence except last. And label/target is all elements except first.

In [None]:
def split_input_sequence(sequence):
  input_text = sequence[:-1]
  target_text =sequence[1:]
  return input_text, target_text

In [None]:
dataset= sequences.map(split_input_sequence)
for input, target in dataset.take(1):
   print("Input :", readable_text_from_indices(input).numpy())
   print("Target:", readable_text_from_indices(target).numpy())

**6. CREATE A BATCH**

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 10000

dataset = (
    dataset.shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE)
)

**7. BUILDING THE ENCODER**

•	Start token should be vector (a sequence), created using embedding layer.

•	Recurrent layer, updates state produced by encoder to new state, using GRU.

•	Pass new state to dense layer (softmax layer) to produce probability.


In [None]:
vocab_size= len(unique)
# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

Encoder inherits from **tf.keras.Model**, which is a base class for building models in TensorFlow. This means it will have all the methods and properties of tf.keras.Model.

**self** is a conventional name used to represent the instance of the class within its own methods.

In [None]:
class Encoder(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self) #inheritance from parent
    self.embedding= tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru =tf.keras.layers.GRU(rnn_units, return_sequences=True, return_state=True) #gives sequence and hidden states
    self.dense= tf.keras.layers.Dense(vocab_size)

    #The call method handles the forward pass of the model, applying each layer to the inputs.
    #If no previous state is provided, it initializes the state; otherwise, it uses the given state.
    #The method can also return the updated state if requested, which is useful for generating text iteratively.

  def call(self, inputs, states=None, return_state=False, training=False):
       x= self.embedding(inputs, training=training)
       if states is None:
         states= self.gru.get_initial_state(x)     #if no prev. state exists, use current state.
  # if a prev. state exists
         x, states = self.gru(x, initial_state= states, training=training)
         x= self.dense(x, training=training)

         if return_state:
             return x, states
         else:
          return x


In [None]:
Encoder_model= Encoder(vocab_size= len(ids_to_chars.get_vocabulary()), embedding_dim=embedding_dim, rnn_units=rnn_units)

**8. TEST THE MODEL ON 1 LINE**



In [None]:
for input_batch, target_batch in dataset.take(1):
    target_predictions= Encoder_model(input_batch)
    print(target_predictions.shape,"# (batch_size, sequence_length, vocab_size)")

Now, we try the prediction batch.

In [None]:
sampled_indices= tf.random.categorical(target_predictions[0], num_samples=1) #target_predictions[0] is predicted probabilities (logits) for each possible next character in the text sequence. Typically, this is the output from a neural network's softmax layer.
sampled_indices= tf.squeeze(sampled_indices, axis= -1).numpy() #gives 1D tensor converted to array

In [None]:
print("Input:\n", readable_text_from_indices(input_batch[0]).numpy())
print()
print("Next Char Predictions:\n", readable_text_from_indices(sampled_indices).numpy())

**9. LOSS FUNCTION**

Determine the loss from input and the predicted values.

In [None]:
loss =tf.losses.SparseCategoricalCrossentropy(from_logits= True)
mean_loss_for_batch= loss(input_batch, target_predictions)
print("Mean Loss: ", mean_loss_for_batch)

**10. TRAINING THE ENCODER**

Configure epochs and checkpoints.

In [None]:
tf.exp(mean_loss_for_batch).numpy()
Encoder_model.compile(optimizer="adam", loss=loss)
checkpoint_dir = "./training_checkpoints"
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix, save_weights_only=True
)


In [None]:
EPOCHS= 10
train_history= Encoder_model.fit(dataset, epochs= EPOCHS, callbacks=[checkpoint_callback])

**11. BUILDING THE DECODER**


*   Create a function to convert between characters and token ids.
*   Create a mask to prevent [UNK] token generation.
*   Create a function to take input tokens and previous hidden states.
*   Convert input tokens to token IDs.
*   Run the model with the input token IDs to get the predicted logits (scores) for the next token.
*   Divide the logits by the temperature parameter to control randomness in token sampling.
*   Apply the prediction mask to the logits to exclude certain tokens.
*   Sample token IDs from the logits and convert them back to characters.
*   Return the predicted characters and updated model states.




In [None]:
class Decoder(tf.keras.Model):
  def __init__(self, Encoder_model, chars_from_ids, ids_to_chars, temperature= 1.0):
    super().__init__()
    self.temperature= temperature
    self.model= Encoder_model
    self.chars_from_ids = chars_from_ids
    self.ids_to_chars = ids_to_chars
    #create mask-----
    mask_token= self.ids_to_chars(["[UNK]"])[:, None]
    #at each bad index, place -inf. Also, match shape to vocabulary.
    sparse_mask= tf.SparseTensor(values=[-float("inf")] * len(mask_token), indices=mask_token, dense_shape=[len(ids_from_chars.get_vocabulary())] )
    self.prediction_mask= tf.sparse.to_dense(sparse_mask)

  @tf.function
    def generate_one_step(self, inputs, states= None):
    #give input to decoder some token ids
    input_characters= tf.strings.unicode_split(inputs, "UTF-8")
    input_ids= self.ids_to_chars(input_characters).to_tensor()

    predicted_logits, states= self.Encoder_model(inputs=input_ids, states=states, return_state=True)

    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits / self.temperature
    predicted_logits = predicted_logits + self.prediction_mask #prevent [UNK] from being generated

    # get the token ids
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    #get characters from ids
    predicted_characters= self.chars_from_ids(predicted_ids)

    return predicted_characters, states


In [None]:
Decoder_model= Decoder(Encoder_model, chars_from_ids, ids_to_chars)
# give some prompt
start = time.time()
states = None
next_char = tf.constant(["ROMEO:"])
result = [next_char]

for n in range(2000):
  next_char, states= Decoder_model.generate_one_step( next_char, states=states)
  result.append(next_char)

result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode("utf-8"), "\n\n" + "_" * 80)

