In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
import re
import os
from tqdm import tqdm
import requests

# Check that we are using a GPU, if not switch runtimes
#   using Runtime > Change Runtime Type > GPU
# assert len(tf.config.list_physical_devices('GPU')) > 0

# Taylor Swift lyrics dataset courtesy of https://www.kaggle.com/PromptCloudHQ/taylor-swift-song-lyrics-from-all-the-albums

In [None]:
url = 'https://raw.githubusercontent.com/jwlibre/lyrics/master/lyrics.txt'
page = requests.get(url)
all_lyrics = page.text
print(all_lyrics)

In [None]:
all_lyrics = all_lyrics.replace("\n"," ").lower()
print(all_lyrics)

In [None]:
all_lyrics = all_lyrics.split(' ')
print(all_lyrics)

In [None]:
useless_characters = ["(", ")", "\"", "", ":", ",", ".", "!", "?", "\“", "\…", "<u+203d>"]
clean_lyrics = all_lyrics
for character in useless_characters:
    clean_lyrics = [word.replace(character,"") for word in clean_lyrics]

clean_lyrics = [word.replace("&amp;","and") for word in clean_lyrics]

regex = r"in'$"
clean_lyrics = [re.sub(regex, "ing", word) for word in clean_lyrics]

clean_lyrics = [re.sub("\'", "", word) for word in clean_lyrics]

print(clean_lyrics)

In [None]:
# Find all unique characters in the joined string
vocab = sorted(set(clean_lyrics))
print(vocab)
print("There are", len(vocab), "unique words in the lyrics")

In [None]:
# Enumerate the vocab (assign each word a number) to create a mapping of words - numbers
word2idx = {u:i for i, u in enumerate(vocab)}

In [None]:
# Reverse the mapping
idx2word = np.array(vocab)

In [None]:
# Vectorize the lyrics
vectorized_lyrics = []
for word in clean_lyrics:
    vectorized_lyrics.append(word2idx[word])
vectorized_lyrics = np.array(vectorized_lyrics)

print(vectorized_lyrics)

In [None]:
def get_batch(vectorized_lyrics, seq_length, batch_size):
  # the length of the vectorized songs string
  n = vectorized_lyrics.shape[0] - 1
  # randomly choose the starting indices for the examples in the training batch
  idx = np.random.choice(n-seq_length, batch_size)

  # construct a list of input sequences for the training batch
  input_batch = [vectorized_lyrics[i:i+seq_length] for i in idx]
  # construct a list of output sequences for the training batch
  output_batch = [vectorized_lyrics[i+1:i+1+seq_length] for i in idx]

  # x_batch, y_batch provide the true inputs and targets for network training
  x_batch = np.reshape(input_batch, [batch_size, seq_length])
  y_batch = np.reshape(output_batch, [batch_size, seq_length])

  return x_batch, y_batch

In [None]:
# Demonstrate the batching over the timesteps
x_batch, y_batch = get_batch(vectorized_lyrics, seq_length=5, batch_size=1)
print(x_batch)
print(y_batch)
for i, (input_idx, target_idx) in enumerate(zip(np.squeeze(x_batch), np.squeeze(y_batch))):
    print("Step {:3d}".format(i))
    print("  input: {} ({:s})".format(input_idx, repr(idx2word[input_idx])))
    print("  expected output: {} ({:s})".format(target_idx, repr(idx2word[target_idx])))

In [None]:
def LSTM(rnn_units): 
  return tf.keras.layers.LSTM(
    rnn_units, 
    return_sequences=True, 
    recurrent_initializer='glorot_uniform',
    recurrent_activation='sigmoid',
    stateful=True,
  )

In [None]:
### Defining the RNN Model ###

def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
    # Layer 1: Embedding layer to transform indices into dense vectors 
    #   of a fixed embedding size
    tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),

    # Layer 2: LSTM with `rnn_units` number of units. 
    LSTM(rnn_units),

    # Layer 3: Dense (fully-connected) layer that transforms the LSTM output
    #   into the vocabulary size. 
    tf.keras.layers.Dense(vocab_size)
  ])

  return model

# Build a simple model with default hyperparameters. You will get the 
#   chance to change these later.
model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)

In [None]:
model.summary()

In [None]:
x, y = get_batch(vectorized_lyrics, seq_length=100, batch_size=32)
pred = model(x)
print("Input shape:      ", x.shape, " # (batch_size, sequence_length)")
print("Prediction shape: ", pred.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
# obtain predictions from untrained model
sampled_indices = tf.random.categorical(pred[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices,axis=-1).numpy()
sampled_indices

In [None]:
# Decode predictions from untrained model, find they're a bit rubbish
print(x)
print("Input: \n", repr(" ".join(idx2word[x[0]])))
print()
print("Next Word Predictions: \n", repr(" ".join(idx2word[sampled_indices])))

In [None]:
### TRAINING THE MODEL: Part 1: Defining the loss function ###

# define the loss function to compute and return the loss between the true labels and predictions (logits). 
# Set the argument from_logits=True.
def compute_loss(labels, logits):
  loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
  return loss

# compute the loss using the true next characters from the example batch 
# and the predictions from the untrained model several cells above
example_batch_loss = compute_loss(y, pred)

print("Prediction shape: ", pred.shape, " # (batch_size, sequence_length, vocab_size)") 
print("scalar_loss:      ", example_batch_loss.numpy().mean())

In [None]:
### Hyperparameter setting and optimization ###

# Optimization parameters:
num_training_iterations = 2000  # Increase this to train longer
batch_size = 4  # Experiment between 1 and 64
seq_length = 100  # Experiment between 50 and 500
learning_rate = 5e-3  # Experiment between 1e-5 and 1e-1

# Model parameters: 
vocab_size = len(vocab)
embedding_dim = 256 
rnn_units = 1024  # Experiment between 1 and 2048

# Checkpoint location: 
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "my_ckpt")

In [None]:
### Define optimizer and training operation ###

# instantiate a new model for training using the `build_model`
# function and the hyperparameters created above.'''
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size)

# TODO: instantiate an optimizer with its learning rate.
#   Checkout the tensorflow website for a list of supported optimizers.
#   https://www.tensorflow.org/api_docs/python/tf/keras/optimizers/
#   Try using the Adam optimizer to start
optimizer = tf.keras.optimizers.Adam(learning_rate)


@tf.function
def train_step(x, y): 
  # Use tf.GradientTape()
  with tf.GradientTape() as tape:
  
    # Feed the current input into the model and generate predictions
    y_hat = model(x)
  
    # compute the loss
    loss = compute_loss(y, y_hat)

  # Now, compute the gradients 
#    complete the function call for gradient computation. 
#       Remember that we want the gradient of the loss with respect all 
#       of the model parameters. 
#       HINT: use `model.trainable_variables` to get a list of all model
#       parameters.
  grads = tape.gradient(loss, model.trainable_variables)
  
  # Apply the gradients to the optimizer so it can update the model accordingly
  optimizer.apply_gradients(zip(grads, model.trainable_variables))
  return loss

##################
# Begin training!#
##################

history = []
if hasattr(tqdm, '_instances'): tqdm._instances.clear() # clear if it exists

pbar = tqdm(range(num_training_iterations))
for iter in pbar:

  # Grab a batch and propagate it through the network
  x_batch, y_batch = get_batch(vectorized_lyrics, seq_length, batch_size)
  loss = train_step(x_batch, y_batch)

  # Update the progress bar
  history.append(loss.numpy().mean())
  pbar.set_description("loss: {}".format(loss.numpy().mean()))

  # Update the model with the changed weights!
  if iter % 100 == 0:     
    model.save_weights(checkpoint_prefix)
    
    
# Save the trained model and the weights
model.save_weights(checkpoint_prefix)