In [1]:
from abc import ABC

import tensorflow as tf

import os
import re
import pandas as pd

In [2]:
DELIMITER = '|'
seq_length = 100

BATCH_SIZE = 64
BUFFER_SIZE = 10000
EPOCHS = 1

In [3]:
dataset_df = pd.read_csv('dataset.csv', dtype=str, delimiter=DELIMITER).sample(frac=1)
lyrics = dataset_df['lyrics'].str.cat()
lyrics = re.sub(r'\n{2,}', '\n', lyrics)

In [4]:
vocab = sorted(set(lyrics))
print(f'{len(vocab)} unique characters')

ids_from_chars = tf.keras.layers.StringLookup(
    vocabulary=list(vocab), mask_token=None)
chars_from_ids = tf.keras.layers.StringLookup(
    vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)

def text_from_ids(_ids):
    return tf.strings.reduce_join(chars_from_ids(_ids), axis=-1)

91 unique characters


In [5]:
all_ids = ids_from_chars(tf.strings.unicode_split(lyrics, 'UTF-8'))
all_ids = tf.cast(all_ids, dtype=tf.int8)
print(all_ids)

ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)

sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)
for seq in sequences.take(1):
  print(chars_from_ids(seq))

tf.Tensor([47 69 14 ... 80 66 79], shape=(13364544,), dtype=int8)
tf.Tensor(
[b'O' b'h' b',' b' ' b'o' b'h' b',' b' ' b'o' b'h' b'\n' b'O' b'h' b','
 b' ' b'o' b'h' b'\n' b'O' b'h' b',' b' ' b'o' b'h' b',' b' ' b'o' b'h'
 b'\n' b'B' b'a' b'b' b'y' b',' b' ' b't' b'h' b'i' b's' b' ' b'l' b'o'
 b'v' b'e' b',' b' ' b'I' b"'" b'l' b'l' b' ' b'n' b'e' b'v' b'e' b'r'
 b' ' b'l' b'e' b't' b' ' b'i' b't' b' ' b'd' b'i' b'e' b'\n' b'C' b'a'
 b'n' b"'" b't' b' ' b'b' b'e' b' ' b't' b'o' b'u' b'c' b'h' b'e' b'd'
 b' ' b'b' b'y' b' ' b'n' b'o' b' ' b'o' b'n' b'e' b'\n' b'I' b"'" b'd'
 b' ' b'l' b'i'], shape=(101,), dtype=string)


In [6]:
def split_input_target(sequence: list[str]) -> tuple[list[str], list[str]]:
    input_text = sequence[:-1]
    target_text = sequence[1:]
    return input_text, target_text

In [7]:
dataset = (
    sequences
    .map(split_input_target)
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE, drop_remainder=True)
    .prefetch(tf.data.experimental.AUTOTUNE))

dataset

<PrefetchDataset element_spec=(TensorSpec(shape=(64, 100), dtype=tf.int8, name=None), TensorSpec(shape=(64, 100), dtype=tf.int8, name=None))>

In [8]:
class MyModel(tf.keras.Model):
  def __init__(self, vocab_size, embedding_dim, rnn_units):
    super().__init__(self)
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.lstm = tf.keras.layers.LSTM(rnn_units,
                                   return_sequences=True,
                                   return_state=True)
    self.dense = tf.keras.layers.Dense(vocab_size)

  def call(self, inputs, states=None, return_state=False, training=False):
    x = inputs
    x = self.embedding(x, training=training)
    if states is None:
      states = self.lstm.get_initial_state(x)
    x, state1, state2  = self.lstm(x, initial_state=states, training=training)
    states = (state1, state2)
    x = self.dense(x, training=training)

    if return_state:
      return x, states
    else:
      return x

In [9]:
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())

# The embedding dimension
embedding_dim = 256

# Number of RNN units
rnn_units = 1024

model = MyModel(
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units)

In [10]:
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss)

In [11]:
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_prefix,
    save_weights_only=True)

In [12]:
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])



In [13]:
class OneStep(tf.keras.Model, ABC):
  def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
    super().__init__()
    self.temperature = temperature
    self.model = model
    self.chars_from_ids = chars_from_ids
    self.ids_from_chars = ids_from_chars

    # Create a mask to prevent "[UNK]" from being generated.
    skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
    sparse_mask = tf.SparseTensor(
        # Put a -inf at each bad index.
        values=[-float('inf')]*len(skip_ids),
        indices=skip_ids,
        # Match the shape to the vocabulary
        dense_shape=[len(ids_from_chars.get_vocabulary())])
    self.prediction_mask = tf.sparse.to_dense(sparse_mask)

  @tf.function
  def generate_one_step(self, inputs, states=None):
    # Convert strings to token IDs.
    input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
    input_ids = self.ids_from_chars(input_chars).to_tensor()

    # Run the model.
    # predicted_logits.shape is [batch, char, next_char_logits]
    predicted_logits, states = self.model(inputs=input_ids, states=states,
                                          return_state=True)
    # Only use the last prediction.
    predicted_logits = predicted_logits[:, -1, :]
    predicted_logits = predicted_logits/self.temperature
    # Apply the prediction mask: prevent "[UNK]" from being generated.
    predicted_logits = predicted_logits + self.prediction_mask

    # Sample the output logits to generate token IDs.
    predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
    predicted_ids = tf.squeeze(predicted_ids, axis=-1)

    # Convert from token ids to characters
    predicted_chars = self.chars_from_ids(predicted_ids)

    # Return the characters and model state.
    return predicted_chars, states

In [14]:
one_step_model = OneStep(model, chars_from_ids, ids_from_chars, temperature=0.40)

<tf.Tensor: shape=(92,), dtype=float32, numpy=
array([-inf,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,   0.,
         0.,   0.,   0.,   0.], dtype=float32)>

In [15]:
states = None

while True:
    string_input = input("Enter seed text: ")
    #string_input = "i am capable of "
    if string_input == "":
        break
    string_input = tf.constant([string_input])
    result = [string_input]
    for n in range(1000):
      next_char, states = one_step_model.generate_one_step(string_input, states=states)
      string_input = string_input + next_char

    print(string_input.numpy()[0].decode())
    break
