<a href="https://colab.research.google.com/github/Chubbyman2/MIT_Labs/blob/master/RNN_Music_Generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf

!pip install mitdeeplearning
import mitdeeplearning as mdl

import numpy as np
import os 
import time
import functools
from IPython import display as ipythondisplay
from tqdm import tqdm
!apt-get install abcmidi timidity > /dev/null 2>&1

assert len(tf.config.list_physical_devices("GPU")) > 0

In [None]:
songs = mdl.lab1.load_training_data()

# Print one song, inspect in greater detail
example_song = songs[0]
print("\nExample song: ")
print(example_song)

In [None]:
# Convert the ABC notation to audio file
mdl.lab1.play_song(example_song)

In [None]:
# Join list of song strings into a single string containing all the songs
songs_joined = "\n\n".join(songs)

# Find all unique characters in the joined string
vocab = sorted(set(songs_joined))
print("There are", len(vocab), "unique characters in the dataset.")

**Vectorize the Text**

Two tables:

1. Maps characters to numbers

2. Maps numbers back to characters

In [None]:
# e.g. Index of character "d" is char2idx["d"]
char2idx = {u:i for i, u in enumerate(vocab)}

idx2char = np.array(vocab)

In [None]:
print("{")
for char,_ in zip(char2idx, range(20)):
  print(" {:4s}: {:3d},".format(repr(char), char2idx[char]))
print(" ...\n}")

In [None]:
def vectorize_string(string):
  vectorized_output = np.array([char2idx[char] for char in string])
  return vectorized_output

vectorized_songs = vectorize_string(songs_joined)

In [None]:
print ('{} ---- characters mapped to int ----> {}'.format(repr(songs_joined[:10]), vectorized_songs[:10]))
# check that vectorized_songs is a numpy array
assert isinstance(vectorized_songs, np.ndarray), "returned result should be a numpy array"

**Create training examples and targets**

Text will break into chunks of seq_length+1.

i.e. seq_length = 4 and text is "Hello", input sequence is "Hell" and target sequence is "ello"

In [None]:
def get_batch(vectorized_songs, seq_length, batch_size):
  # the length of the vectorized songs string
  n = vectorized_songs.shape[0] - 1
  # randomly choose the starting indices for the examples in the training batch
  idx = np.random.choice(n-seq_length, batch_size)

  '''TODO: construct a list of input sequences for the training batch'''
  input_batch = [vectorized_songs[i : i+seq_length] for i in idx]
  # input_batch = # TODO
  '''TODO: construct a list of output sequences for the training batch'''
  output_batch = [vectorized_songs[i+1 : i+seq_length+1] for i in idx]
  # output_batch = # TODO

  # x_batch, y_batch provide the true inputs and targets for network training
  x_batch = np.reshape(input_batch, [batch_size, seq_length])
  y_batch = np.reshape(output_batch, [batch_size, seq_length])
  return x_batch, y_batch


# Perform some simple tests to make sure your batch function is working properly! 
test_args = (vectorized_songs, 10, 2)
if not mdl.lab1.test_batch_func_types(get_batch, test_args) or \
   not mdl.lab1.test_batch_func_shapes(get_batch, test_args) or \
   not mdl.lab1.test_batch_func_next_step(get_batch, test_args): 
   print("======\n[FAIL] could not pass tests")
else: 
   print("======\n[PASS] passed all tests!")

In [None]:
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)

for i, (input_idx, target_idx) in enumerate(zip(np.squeeze(x_batch), np.squeeze(y_batch))):
    print("Step {:3d}".format(i))
    print("  input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
    print("  expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))

**The RNN Model**

In [None]:
def LSTM(rnn_units):
  return tf.keras.layers.LSTM(
      rnn_units,
      return_sequences=True,
      recurrent_initializer="glorot_uniform",
      recurrent_activation="sigmoid",
      stateful=True
  )

In [None]:
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
    # Transforms indices into dense vectors of a fixed embedding size                           
    tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
    
    # Call the LSTM function
    LSTM(rnn_units),
    tf.keras.layers.Dense(vocab_size)
  ])
  return model

# Give default hyperparameters
model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)

In [None]:
model.summary()

In [None]:
x, y = get_batch(vectorized_songs, seq_length=100, batch_size=32)
pred = model(x)
print("Input shape: ", x.shape, "# (batch_size, sequence_length)")
print("Prediction shape: ", pred.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
sampled_indices = tf.random.categorical(pred[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
sampled_indices

In [None]:
print("Input: \n", repr("".join(idx2char[x[0]])))
print()
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices])))

**Training the Model**

Now we need to train the model so the prediction isn't a bunch of mumbo jumbo.

In [None]:
def compute_loss(labels, logits):
  loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
  return loss

example_batch_loss = compute_loss(y, pred)

print("Prediction shape: ", pred.shape, " # (batch_size, sequence_length, vocab_size)") 
print("scalar_loss: ", example_batch_loss.numpy().mean())

In [None]:
# Optimization Parameters
num_training_iterations = 2000
batch_size = 4
seq_length = 100
learning_rate = 5e-3

# Model parameters
vocab_size = len(vocab)
embedding_dim = 256
rnn_units = 1024

# Checkpoint Location
checkpoint_dir = "./training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "my_ckpt")

In [None]:
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size)

optimizer = tf.keras.optimizers.Adam(learning_rate)

@tf.function
def train_step(x, y):
  with tf.GradientTape() as tape:

    # Feed input into model and generate predictions
    y_hat = model(x)

    # Compute loss
    loss = compute_loss(y, y_hat)

  # Compute gradients
    # Hint: Use `model.trainable_variables` to get a list of all model parameters
  grads = tape.gradient(loss, model.trainable_variables)

  # Apply gradients to the optimizer so it can update the model accordingly
  optimizer.apply_gradients(zip(grads, model.trainable_variables))
  return loss

# Begin training

history = []
plotter = mdl.util.PeriodicPlotter(sec=2, xlabel="Iterations", ylabel="Loss")
if hasattr(tqdm, "_instances"): tqdm._instances.clear() # Clear if it exists

for iter in tqdm(range(num_training_iterations)):

  # Grab a batch and propagate it through the network
  x_batch, y_batch = get_batch(vectorized_songs, seq_length, batch_size)
  loss = train_step(x_batch, y_batch)

  # Update progress bar
  history.append(loss.numpy().mean())
  plotter.plot(history)
  
  # Update model with changed weights
  if iter % 100 == 0:
    model.save_weights(checkpoint_prefix)

# Save trained model and weights
model.save_weights(checkpoint_prefix)

In [None]:
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)

# Restore model weights for the last checkpoint after training
  # This is required when you change the batch_size

model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))

model.summary()

**The Prediction Procedure**

Initialize a "seed" start string and the RNN state, and set the number of characters we want to generate.

Use the start string and the RNN state to obtain the probability distribution over the next predicted character.

Sample from multinomial distribution to calculate the index of the predicted character. This predicted character is then used as the next input to the model.

At each time step, the updated RNN state is fed back into the model, so that it now has more context in making the next prediction. After predicting the next character, the updated RNN states are again fed back into the model, which is how it learns sequence dependencies in the data, as it gets more information from the previous predictions.



In [None]:
def generate_text(model, start_string, generation_length=1000):
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)

  text_generated = []

  model.reset_states()
  tqdm._instances.clear()

  for i in tqdm(range(generation_length)):
    predictions = model(input_eval)
    
    # Remove batch dimension
    predictions = tf.squeeze(predictions, 0)

    predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()

    # Pass prediction and previous hidden state as inputs to the model
    input_eval = tf.expand_dims([predicted_id], 0)

    text_generated.append(idx2char[predicted_id])

  return (start_string + "".join(text_generated))

In [None]:
generated_text = generate_text(model, start_string="X", generation_length=1000)

In [None]:
generated_songs = mdl.lab1.extract_song_snippet(generated_text)

for i, song in enumerate(generated_songs):
  waveform = mdl.lab1.play_song(song)

  # If the song is valid, play it
  if waveform:
    print("Generated song", i)
    ipythondisplay.display(waveform)