In [None]:
import os
import traceback
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import numpy as np
import random as  rnd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import Input
rnd.seed(32)

In [None]:
dirname = 'data/'
filename = 'shakespeare_data.txt'
lines = []
counter = 0

with open(os.path.join(dirname, filename)) as files:
    for line in files:        
        pure_line = line.strip()
        if pure_line:
            lines.append(pure_line)
            
n_lines = len(lines)
print(f"Number of lines: {n_lines}")

In [None]:
print("\n".join(lines[506:514]))

In [None]:
text = "\n".join(lines)
vocab = sorted(set(text))
vocab.insert(0,"[UNK]")
vocab.insert(1,"")

print(f'{len(vocab)} unique characters')
print(" ".join(vocab))

In [None]:
line = "Hello world!"
chars = tf.strings.unicode_split(line, input_encoding='UTF-8')
print(chars)

In [None]:
print(vocab.index('a'))
print(vocab.index('u'))
print(vocab.index(' '))
print(vocab.index('2'))
print(vocab.index('3'))
ids = tf.keras.layers.StringLookup(vocabulary=list(vocab), mask_token=None)(chars)
print(ids)

In [None]:
def line_to_tensor(line, vocab):
    """
    Converts a line of text into a tensor of integer values representing characters.

    Args:
        line (str): A single line of text.
        vocab (list): A list containing the vocabulary of unique characters.

    Returns:
        tf.Tensor(dtype=int64): A tensor containing integers (unicode values) corresponding to the characters in the `line`.
    """
    chars = tf.strings.unicode_split(line, input_encoding='UTF-8')
    ids = tf.keras.layers.StringLookup(vocabulary=list(vocab), mask_token=None)(chars)
    return ids

In [None]:
def text_from_ids(ids, vocab):
    """
    Converts a tensor of integer values into human-readable text.

    Args:
        ids (tf.Tensor): A tensor containing integer values (unicode IDs).
        vocab (list): A list containing the vocabulary of unique characters.

    Returns:
        str: A string containing the characters in human-readable format.
    """
    chars_from_ids = tf.keras.layers.StringLookup(vocabulary=vocab, invert=True, mask_token=None)
    return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)

In [None]:
text_from_ids(ids, vocab).numpy()

In [None]:
train_lines = lines[:-1000]
eval_lines = lines[-1000:]

print(f"Number of training lines: {len(train_lines)}")
print(f"Number of validation lines: {len(eval_lines)}")

In [None]:
all_ids = line_to_tensor("\n".join(["Hello world!", "Generative AI"]), vocab)
all_ids

In [None]:
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
print([text_from_ids([ids], vocab).numpy() for ids in ids_dataset.take(10)])

In [None]:
seq_length = 10
data_generator = ids_dataset.batch(seq_length + 1, drop_remainder=True)

In [None]:
for seq in data_generator.take(2):
    print(seq)

In [None]:
i = 1
for seq in data_generator.take(2):
    print(f"{i}. {text_from_ids(seq, vocab).numpy()}")
    i = i + 1

In [None]:
def split_input_target(sequence):
    """
    Splits the input sequence into two sequences, where one is shifted by one position.

    Args:
        sequence (tf.Tensor or list): A list of characters or a tensor.

    Returns:
        tf.Tensor, tf.Tensor: Two tensors representing the input and output sequences for the model.
    """
    input_text = sequence[:-1]
    target_text = sequence[1:]

    return input_text, target_text

In [None]:
split_input_target(list("Tensorflow"))

In [None]:
def create_batch_dataset(lines, vocab, seq_length=100, batch_size=64):
    """
    Creates a batch dataset from a list of text lines.

    Args:
        lines (list): A list of strings with the input data, one line per row.
        vocab (list): A list containing the vocabulary.
        seq_length (int): The desired length of each sample.
        batch_size (int): The batch size.

    Returns:
        tf.data.Dataset: A batch dataset generator.
    """
    BUFFER_SIZE = 10000
    single_line_data  = "\n".join(lines)
    all_ids = line_to_tensor(single_line_data, vocab)
    ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
    data_generator = ids_dataset.batch(seq_length + 1, drop_remainder=True)
    dataset_xy = data_generator.map(lambda x: split_input_target(x))
    dataset = (                                   
        dataset_xy                                
        .shuffle(BUFFER_SIZE)
        .batch(batch_size, drop_remainder=True)
        .prefetch(tf.data.experimental.AUTOTUNE)  
        )
    
    return dataset

In [None]:
tf.random.set_seed(1)
dataset = create_batch_dataset(train_lines[1:100], vocab, seq_length=16, batch_size=2)

print("Prints the elements into a single batch. The batch contains 2 elements: ")

for input_example, target_example in dataset.take(1):
    print("\n\033[94mInput0\t:", text_from_ids(input_example[0], vocab).numpy())
    print("\n\033[93mTarget0\t:", text_from_ids(target_example[0], vocab).numpy())
    
    print("\n\n\033[94mInput1\t:", text_from_ids(input_example[1], vocab).numpy())
    print("\n\033[93mTarget1\t:", text_from_ids(target_example[1], vocab).numpy())

In [None]:
BATCH_SIZE = 64
dataset = create_batch_dataset(train_lines, vocab, seq_length=100, batch_size=BATCH_SIZE)

In [None]:
class GRULM(tf.keras.Model):
    """
    A GRU-based language model that maps from a tensor of tokens to activations over a vocabulary.

    Args:
        vocab_size (int, optional): Size of the vocabulary. Defaults to 256.
        embedding_dim (int, optional): Depth of embedding. Defaults to 256.
        rnn_units (int, optional): Number of units in the GRU cell. Defaults to 128.

    Returns:
        tf.keras.Model: A GRULM language model.
    """
    def __init__(self, vocab_size=256, embedding_dim=256, rnn_units=128):
        super().__init__()

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(rnn_units, return_sequences=True, return_state=True)
        self.dense = tf.keras.layers.Dense(vocab_size, activation='log_softmax')
        
    def call(self, inputs, training=False):
        x = inputs
        x = self.embedding(x, training=training)
        x, states = self.gru(x, training=training)
        x = self.dense(x, training=training)
        return x, states

In [None]:
vocab_size = 82
embedding_dim = 256
rnn_units = 512

In [None]:
try:
    input_layer = tf.keras.Input(shape=(None,), batch_size=BATCH_SIZE)
    model = GRULM(vocab_size=vocab_size, embedding_dim=embedding_dim, rnn_units=rnn_units)
    model.call(input_layer)
    model.summary()
except Exception as e:
    print(e)
    # print("\033[91mError! \033[0mA problem occurred while building your model. This error can occur due to wrong initialization of the return_sequences parameter\n\n")
    traceback.print_exc()

In [None]:
for input_example_batch, target_example_batch in dataset.take(1):
    print("Input: ", input_example_batch[0].numpy()) # Lets use only the first sequence on the batch
    example_batch_predictions, _ = model(tf.constant([input_example_batch[0].numpy()]))
    print("\n",example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
sampled_indices = tf.math.argmax(example_batch_predictions[0], axis=1)
print(sampled_indices.numpy())

In [None]:
print("Input:\n", text_from_ids(input_example_batch[0], vocab))
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices, vocab))

In [None]:
def compile_model(model):
    """
    Sets the loss and optimizer for the given model

    Args:
        model (tf.keras.Model): The model to compile.

    Returns:
        tf.keras.Model: The compiled model.
    """
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    opt = tf.keras.optimizers.Adam(learning_rate=0.00125)
    model.compile(optimizer=opt, loss=loss)

    return model

In [None]:
EPOCHS = 10
model = compile_model(model)
history = model.fit(dataset, epochs=EPOCHS)

In [None]:
# import shutil
# # Define the output directory and file path
# output_dir = './model/'
# output_file = os.path.join(output_dir, 'model.weights.h5')

# # Remove the directory if it exists
# try:
#     shutil.rmtree(output_dir)
# except OSError as e:
#     pass

# # Create the directory
# os.makedirs(output_dir, exist_ok=True)

# # Save model weights to the specified file
# model.save_weights(output_file)


In [None]:
def log_perplexity(preds, target):
    """
    Function to calculate the log perplexity of a model.

    Args:
        preds (tf.Tensor): Predictions from the model.
        target (tf.Tensor): True target values.

    Returns:
        float: The log perplexity of the model.
    """
    PADDING_ID = 1
    if isinstance(preds, tuple):
        preds = preds[0]

    log_p = tf.reduce_sum(preds * tf.one_hot(target, depth=preds.shape[-1]), axis=-1)
    non_pad = 1.0 - tf.cast(tf.equal(target, PADDING_ID), dtype=log_p.dtype)
    log_p = log_p * non_pad

    log_p_sum = tf.reduce_sum(log_p, axis=-1)
    non_pad_sum = tf.reduce_sum(non_pad, axis=-1)

    return -log_p_sum / non_pad_sum

In [None]:
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 512

model = GRULM(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units = rnn_units)
model.build(input_shape=(100, vocab_size))
model.load_weights('./model/model.weights.h5')

In [None]:
eval_text = "\n".join(eval_lines)
eval_ids = line_to_tensor([eval_text], vocab)
input_ids, target_ids = split_input_target(tf.squeeze(eval_ids, axis=0))

preds = model(tf.expand_dims(input_ids, 0), training=False)
print(type(preds), len(preds), preds[0].shape if isinstance(preds, tuple) else preds.shape)


In [None]:
log_ppx = log_perplexity(preds, tf.expand_dims(target_ids, 0))
print(f'The log perplexity and perplexity of your model are {log_ppx} and {np.exp(log_ppx)} respectively')

In [None]:
def temperature_random_sampling(log_probs, temperature=1.0):
    """Temperature Random sampling from a categorical distribution. The higher the temperature, the more 
       random the output. If temperature is close to 0, it means that the model will just return the index
       of the character with the highest input log_score
    
    Args:
        log_probs (tf.Tensor): The log scores for each characeter in the dictionary
        temperature (number): A value to weight the random noise. 
    Returns:
        int: The index of the selected character
    """
    u = tf.random.uniform(minval=1e-6, maxval=1.0 - 1e-6, shape=log_probs.shape)
    g = -tf.math.log(-tf.math.log(u))
    return tf.math.argmax(log_probs + g * temperature, axis=-1)

In [None]:
class GenerativeModel:
    def __init__(self, model, vocab, temperature=1.0):
        self.model = model
        self.vocab = vocab
        self.temperature = temperature
        self.idx_to_char = tf.keras.layers.StringLookup(
            vocabulary=vocab, invert=True)
        self.char_to_idx = tf.keras.layers.StringLookup(
            vocabulary=vocab)

    def generate_one_step(self, inputs, states=None):
        inputs = tf.strings.unicode_split(inputs, 'UTF-8')
        inputs = self.char_to_idx(inputs)
        inputs = tf.expand_dims(inputs, 0)
        
        print(f"Inputs shape: {inputs.shape}")  # Debugging print
        
        preds = self.model(inputs, training=False)

        if isinstance(preds, tuple):
            preds = preds[0]

        preds = preds / self.temperature

        predicted_id = tf.random.categorical(preds[0], num_samples=1)[-1, 0].numpy()
        next_char = self.idx_to_char(predicted_id)
        return next_char

    def generate_n_chars(self, num_chars, start_string):
        result = [start_string]
        next_char = start_string
        for _ in range(num_chars):
            next_char = self.generate_one_step(next_char)
            result.append(next_char)

        return tf.strings.join(result)[0].numpy().decode('utf-8')

In [None]:
tf.random.set_seed(272)
gen = GenerativeModel(model, vocab, temperature=0.5)

print(gen.generate_n_chars(32, " "), '\n\n' + '_'*80)
print(gen.generate_n_chars(32, "Dear"), '\n\n' + '_'*80)
print(gen.generate_n_chars(32, "KING"), '\n\n' + '_'*80)

In [None]:
tf.random.set_seed(np.random.randint(1, 1000))
gen = GenerativeModel(model, vocab, temperature=0.8)
import time
start = time.time()

print(gen.generate_n_chars(1000, "ROMEO "), '\n\n' + '_'*80)
print('\nRun time:', time.time() - start)