In [None]:
# reference
# © MIT Introduction to Deep Learning
# http://introtodeeplearning.com

In [None]:
from google.colab import userdata
COMET_API_KEY = userdata.get('COMET_API_KEY')

In [None]:
!pip install comet_ml > /dev/null 2>&1
!pip install mitdeeplearning --quiet
import warnings
warnings.filterwarnings('ignore')

import comet_ml
import tensorflow as tf

import mitdeeplearning as mdl

import numpy as np
import os
import time
import functools
from IPython import display as ipythondisplay
from tqdm import tqdm
from scipy.io.wavfile import write
!apt-get install abcmidi timidity > /dev/null 2>&1

assert len(tf.config.list_physical_devices('GPU')) > 0, "GPU not found"
assert COMET_API_KEY != "", "Please insert your Comet API Key"

In [None]:
songs = mdl.lab1.load_training_data()

example_song = songs[0]
print("\nExample song 0: ")
print(example_song)
#  Music dataset of ABC melody

NameError: name 'mdl' is not defined

In [None]:
print(type(songs))
print(type(songs[0]))

NameError: name 'songs' is not defined

In [None]:
mdl.lab1.play_song(example_song) # takes some time

In [None]:
songs_joined = "\n\n".join(songs)

# Finding all unique characters in the joined string
vocab = sorted(set(songs_joined))
print("There are", len(vocab), "unique characters in the dataset")

*Vectorizing text data*

In [None]:
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

In [None]:
print('{')
for char,_ in zip(char2idx, range(5)):
    print('  {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print('  ...\n}')

In [None]:
def vectorize_string(songs):
  vector_data = np.array([char2idx[i] for i in songs])
  return vector_data
vectorized_songs = vectorize_string(songs_joined)

In [None]:
print(vectorized_songs.shape)

In [None]:
print ('{}   mapped to {}\n'.format(repr(songs_joined[:10]), vectorized_songs[:10]))
# check that vectorized_songs is a numpy array
assert isinstance(vectorized_songs, np.ndarray), "returned result should be a numpy array"

**Preparing Data for model**

*Now we divide the text into example sequences to  use during training. Each input sequence that we feed into our RNN will contain seq_length characters from the text. We need to define a target sequence for each input sequence, which will be used in training the RNN to predict the next character. For each input, the corresponding target will contain the same length of text, except shifted one character to the right.*

In [None]:
def get_batch(vectorized_songs,seq_length,batch_size):
  n = vectorized_songs.shape[0] - 1

  #start index for
  index = np.random.choice(n-seq_length,batch_size)

  input_batch = [vectorized_songs[i:i+seq_length] for i in index]
  output_batch = [vectorized_songs[i+1:i+seq_length+1] for i in index] #+1 as mentioned above, for word Jainil, ip = jaini, op = ainil

  x_batch = np.reshape(input_batch, [batch_size, seq_length])
  y_batch = np.reshape(output_batch, [batch_size, seq_length])
  return x_batch, y_batch


# Perform some simple tests to make sure your batch function is working properly!
test_args = (vectorized_songs, 10, 2)
if not mdl.lab1.test_batch_func_types(get_batch, test_args) or not mdl.lab1.test_batch_func_shapes(get_batch, test_args) or not mdl.lab1.test_batch_func_next_step(get_batch, test_args):
  print("======\n[FAIL] could not pass tests")
else:
  print("======\n[PASS] passed all tests!")


In [None]:
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)

for i, (input_idx, target_idx) in enumerate(zip(np.squeeze(x_batch), np.squeeze(y_batch))):
    print("Step {:3d}".format(i))
    print("  input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
    print("  expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))

**RNN Model**

In [None]:
def LSTM(rnn_units):
  return tf.keras.layers.LSTM(
    rnn_units,
    return_sequences=True,
    recurrent_initializer='glorot_uniform',    #The goal of Xavier Initialization is to initialize the weights such that the variance of the activations are the same across every layer. This constant variance helps prevent
                                               # the gradient from exploding or vanishing.
    recurrent_activation='sigmoid',
    stateful=True,
  )

In [None]:
'''TODO: Add LSTM and Dense layers to define the RNN model using the Sequential API.'''
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
    # Layer 1: Embedding layer to transform indices into dense vectors of a fixed embedding size
    tf.keras.layers.Embedding(vocab_size, embedding_dim),
    # Layer 2: LSTM with `rnn_units` number of units.
    LSTM(rnn_units),
    # Layer 3: Dense (fully-connected) layer that transforms the LSTM output into the vocabulary size.
    tf.keras.layers.Dense(vocab_size)
  ])

  return model

# Build a simple model with default hyperparameters.
model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)

In [None]:
model.summary()

In [None]:
x, y = get_batch(vectorized_songs, seq_length=100, batch_size=32)
pred = model(x)
print("Input shape:      ", x.shape, " # (batch_size, sequence_length)")
print("Prediction shape: ", pred.shape, "# (batch_size, sequence_length, vocab_size)")

In [None]:
def compute_loss(labels, logits):
  loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
  return loss

example_batch_loss = compute_loss(y, pred)

print("Prediction shape: ", pred.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss:      ", example_batch_loss.numpy().mean())

In [None]:
vocab_size = len(vocab)

# Model parameters:
params = dict(
  num_training_iterations = 3000,  # Increase this to train longer
  batch_size = 8,  # Experiment between 1 and 64
  seq_length = 100,  # Experiment between 50 and 500
  learning_rate = 5e-3,  # Experiment between 1e-5 and 1e-1
  embedding_dim = 256,
  rnn_units = 1024,  # Experiment between 1 and 2048
)

checkpoint_dir = '/content/drive/MyDrive/Colab Notebooks/'
checkpoint_prefix = os.path.join(checkpoint_dir, "music_generation_ckpt.weights.h5")


print(checkpoint_prefix)

In [None]:
# when hyperparameters change, you can run the create_experiment() function to initiate a new experiment.
# All experiments defined with the same project_name will live under that project in your Comet interface.
def create_experiment():
  # end any prior experiments
  if 'experiment' in locals():
    experiment.end()

  # initiate the comet experiment for tracking
  experiment = comet_ml.Experiment(
                  api_key=COMET_API_KEY,
                  project_name="MusicGeneration")

  # log our hyperparameters, defined above, to the experiment
  for param, value in params.items():
    experiment.log_parameter(param, value)
  experiment.flush()

  return experiment

In [None]:
# Adam, Adagard optimizers
### Define optimizer and training operation ###
model = build_model(vocab_size, params["embedding_dim"], params["rnn_units"], params["batch_size"])

optimizer = optimizer = tf.keras.optimizers.Adam(params["learning_rate"])
# optimizer = optimizer = tf.keras.optimizers.Adagard(params["learning_rate"])


@tf.function
def train_step(x, y):

  with tf.GradientTape() as tape:

    y_hat = model(x)
    loss = compute_loss(y,y_hat )

  # Now, compute the gradients
  grads = tape.gradient(loss,model.trainable_variables )

  # Apply the gradients to the optimizer so it can update the model accordingly
  optimizer.apply_gradients(zip(grads, model.trainable_variables))
  return loss

##################
# Begin training!#
##################

history = []
plotter = mdl.util.PeriodicPlotter(sec=2, xlabel='Iterations', ylabel='Loss')
experiment = create_experiment()

if hasattr(tqdm, '_instances'): tqdm._instances.clear() # clear if it exists
for iter in tqdm(range(params["num_training_iterations"])):

  # Grab a batch and propagate it through the network
  x_batch, y_batch = get_batch(vectorized_songs, params["seq_length"], params["batch_size"])
  loss = train_step(x_batch, y_batch)

  # log the loss to the Comet interface! we will be able to track it there.
  experiment.log_metric("loss", loss.numpy().mean(), step=iter)
  # Update the progress bar and also visualize within notebook
  history.append(loss.numpy().mean())
  plotter.plot(history)

  # Update the model with the changed weights!
  if iter % 100 == 0:
    model.save_weights(checkpoint_prefix)

# Save the trained model and the weights
model.save_weights(checkpoint_prefix)
experiment.flush()

In [None]:
model = build_model(vocab_size, params["embedding_dim"], params["rnn_units"], batch_size=1)

# Restore the model weights for the last checkpoint after training
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))

model.summary()

In [None]:
def generate_text(model, start_string, generation_length=1000):

  input_eval = [char2idx[s] for s in start_string] # TODO
  input_eval = tf.expand_dims(input_eval, 0)


  text_generated = []


  model.reset_states()
  tqdm._instances.clear()

  for i in tqdm(range(generation_length)):
      predictions = model(input_eval)

      predictions = tf.squeeze(predictions, 0)
      predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()

      input_eval = tf.expand_dims([predicted_id], 0)
      text_generated.append(idx2char[predicted_id])


  return (start_string + ''.join(text_generated))

In [None]:
generated_text = generate_text(model, start_string="X", generation_length=1000)

In [None]:
generated_songs = mdl.lab1.extract_song_snippet(generated_text)

for i, song in enumerate(generated_songs):
  # Synthesize the waveform from a song
  waveform = mdl.lab1.play_song(song)

  # If its a valid song (correct syntax), lets play it!
  if waveform:
    print("Generated song", i)
    ipythondisplay.display(waveform)

    numeric_data = np.frombuffer(waveform.data, dtype=np.int16)
    wav_file_path = f"output_{i}.wav"
    write(wav_file_path, 88200, numeric_data)

    # save your song to the Comet interface -- you can access it there
    experiment.log_asset(wav_file_path)

In [None]:
experiment.end()