# Music Generation with RNN 

In [None]:
print("Start")

## Setup

In [None]:
!pip install -r requirements.txt && echo "Dependencies installed."

In [None]:
import collections
import datetime
import fluidsynth
import glob
import numpy as np
import pathlib
import pandas as pd
import pretty_midi
import seaborn as sns
import tensorflow as tf

from IPython import display
from matplotlib import pyplot as plt
from typing import Optional

## Download the Maestro dataset

In [None]:
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# Sampling rate for audio playback
_SAMPLING_RATE = 16000


FILE_COUNT = 0.75 #range 0-1, total file to use from dataset for training

SEQ_LENGTH = 25

EPOCHS = 27

BATHC_SIZE = 512

BUFFER_SIZE = 200000

In [None]:
#Constants
DATASET_NAME = "maestro-v2.0.0"
DATASET_FILE = f"{DATASET_NAME}-midi.zip"
DATASET_URL = (
    f"https://storage.googleapis.com/magentadata/datasets/maestro/v2.0.0/{DATASET_FILE}"
)
CACHE_DIR = "."
CACHE_SUBDIR = "data"

In [None]:
#Download
data_dir = pathlib.Path(f"{CACHE_SUBDIR}/{DATASET_NAME}")
if not data_dir.exists():
    tf.keras.utils.get_file(
        DATASET_FILE,
        origin=DATASET_URL,
        extract=True,
        cache_dir=CACHE_DIR,
        cache_subdir=CACHE_SUBDIR,
    )
    print(f"Downloaded and extracted {DATASET_NAME}")
else:
    print(f"{DATASET_NAME} already exists and skipping download.")

In [None]:
import pathlib, glob, os

#Auto-detect extracted dataset folder
CACHE_SUBDIR = "data"
data_root = pathlib.Path(CACHE_SUBDIR)
possible_dirs = [d for d in data_root.iterdir() if d.is_dir() and "maestro" in d.name.lower()]

if possible_dirs:
    data_dir = possible_dirs[0]
    print(f"Using dataset folder: {data_dir}")
else:
    raise FileNotFoundError("MAESTRO dataset folder not found!")

#Collect all MIDI file paths
filenames = glob.glob(os.path.join(str(data_dir), "**", "*.mid*"), recursive=True)
print(f"Found {len(filenames)} MIDI files.")

# Alias
midi_files = filenames  #same reference & this is not a copy;
sample_file = midi_files[0]
print(f"Sample file: {sample_file}")


## Functions

### Audio Display Function

In [None]:
def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
  waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
  waveform_short = waveform[:seconds*_SAMPLING_RATE]
  return display.Audio(waveform_short, rate=_SAMPLING_RATE)

### Midid -> Notes Function

In [None]:
def midi_to_notes(midi_file: str) -> pd.DataFrame:
  pm = pretty_midi.PrettyMIDI(midi_file)
  instrument = pm.instruments[0]
  notes = collections.defaultdict(list)

  # Sort the notes by start time
  sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
  prev_start = sorted_notes[0].start

  for note in sorted_notes:
    start = note.start
    end = note.end
    notes['pitch'].append(note.pitch)
    notes['start'].append(start)
    notes['end'].append(end)
    notes['step'].append(start - prev_start)
    notes['duration'].append(end - start)
    prev_start = start

  return pd.DataFrame({name: np.array(value) for name, value in notes.items()})

### Number->Human Readable Notes Function

In [None]:
get_note_names = np.vectorize(pretty_midi.note_number_to_name)


### Plot Piano Roll Function

In [None]:
def plot_piano_roll(notes: pd.DataFrame, count: Optional[int] = None):
  if count:
    title = f'First {count} notes'
  else:
    title = f'Whole track'
    count = len(notes['pitch'])
  plt.figure(figsize=(20, 4))
  plot_pitch = np.stack([notes['pitch'], notes['pitch']], axis=0)
  plot_start_stop = np.stack([notes['start'], notes['end']], axis=0)
  plt.plot(
      plot_start_stop[:, :count], plot_pitch[:, :count], color="b", marker=".")
  plt.xlabel('Time [s]')
  plt.ylabel('Pitch')
  _ = plt.title(title)

### Plot Distribution Function

In [None]:
def plot_distributions(notes: pd.DataFrame, drop_percentile=2.5):
  plt.figure(figsize=[15, 5])
  plt.subplot(1, 3, 1)
  sns.histplot(notes, x="pitch", bins=20)

  plt.subplot(1, 3, 2)
  max_step = np.percentile(notes['step'], 100 - drop_percentile)
  sns.histplot(notes, x="step", bins=np.linspace(0, max_step, 21))
  
  plt.subplot(1, 3, 3)
  max_duration = np.percentile(notes['duration'], 100 - drop_percentile)
  sns.histplot(notes, x="duration", bins=np.linspace(0, max_duration, 21))

### Notes-> Midi Function

In [None]:
def notes_to_midi(
  notes: pd.DataFrame,
  out_file: str, 
  instrument_name: str,
  velocity: int = 100,  # note loudness
) -> pretty_midi.PrettyMIDI:

  pm = pretty_midi.PrettyMIDI()
  instrument = pretty_midi.Instrument(
      program=pretty_midi.instrument_name_to_program(
          instrument_name))

  prev_start = 0
  for i, note in notes.iterrows():
    start = float(prev_start + note['step'])
    end = float(start + note['duration'])
    note = pretty_midi.Note(
        velocity=velocity,
        pitch=int(note['pitch']),
        start=start,
        end=end,
    )
    instrument.notes.append(note)
    prev_start = start

  pm.instruments.append(instrument)
  pm.write(out_file)
  return pm

### Sequence creating function

In [None]:
def create_sequences(
    dataset: tf.data.Dataset, 
    seq_length: int,
    vocab_size = 128,
) -> tf.data.Dataset:
  """Returns TF Dataset of sequence and label examples."""
  seq_length = seq_length+1

  # Take 1 extra for the labels
  windows = dataset.window(seq_length, shift=1, stride=1,
                              drop_remainder=True)

  # `flat_map` flattens the" dataset of datasets" into a dataset of tensors
  flatten = lambda x: x.batch(seq_length, drop_remainder=True)
  sequences = windows.flat_map(flatten)
  
  # Normalize note pitch
  def scale_pitch(x):
    x = x/[vocab_size,1.0,1.0]
    return x

  # Split the labels
  def split_labels(sequences):
    inputs = sequences[:-1]
    labels_dense = sequences[-1]
    labels = {key:labels_dense[i] for i,key in enumerate(key_order)}

    return scale_pitch(inputs), labels

  return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)

## Creating the training dataset


In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm


num_files = int(len(filenames) * FILE_COUNT) #total files to use from dataset for training;
subset = np.random.choice(filenames, num_files, replace=False)

print(f"Processing {num_files} random MIDI files...")

results = []
for f in tqdm(subset, desc="Processing MIDI files"):
    try:
        df = midi_to_notes(f)  
        results.append(df)
    except Exception as e:
        print(f"Error processing {f}: {e}")

#Combine all
all_notes = pd.concat(results, ignore_index=True)
print(f"Done. Combined shape: {all_notes.shape}")


In [None]:
n_notes = len(all_notes)
print('Number of notes parsed:', n_notes)

In [None]:
key_order = ['pitch', 'step', 'duration']
train_notes = np.stack([all_notes[key] for key in key_order], axis=1).astype(np.float32)

In [None]:
notes_ds = tf.data.Dataset.from_tensor_slices(train_notes)
print(notes_ds.element_spec)

In [None]:
seq_length = SEQ_LENGTH
vocab_size = 128
seq_ds = create_sequences(notes_ds, seq_length, vocab_size)
seq_ds.element_spec

In [None]:
for seq, target in seq_ds.take(1):
  print('sequence shape:', seq.shape)
  print('sequence elements (first 10):', seq[0: 10])
  print()
  print('target:', target)

In [None]:
batch_size = BATHC_SIZE
buffer_size = BUFFER_SIZE

train_ds = (seq_ds
            .cache()
            .shuffle(buffer_size)
            .batch(batch_size, drop_remainder=True)
            .prefetch(tf.data.experimental.AUTOTUNE))

## Create and train the model

In [None]:
def mse_with_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
  mse = (y_true - y_pred) ** 2
  positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
  return tf.reduce_mean(mse + positive_pressure)

In [None]:
import tensorflow as tf
tf.keras.mixed_precision.set_global_policy('mixed_float16')

In [None]:
input_shape = (seq_length, 3)
learning_rate = 0.005

inputs = tf.keras.Input(input_shape)
x = tf.keras.layers.LSTM(128)(inputs)

outputs = {
  'pitch': tf.keras.layers.Dense(128, name='pitch')(x),
  'step': tf.keras.layers.Dense(1, name='step')(x),
  'duration': tf.keras.layers.Dense(1, name='duration')(x),
}

model = tf.keras.Model(inputs, outputs)

loss = {
      'pitch': tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True),
      'step': mse_with_positive_pressure,
      'duration': mse_with_positive_pressure,
}

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

model.compile(loss=loss, optimizer=optimizer)

model.summary()

In [None]:
losses = model.evaluate(train_ds, return_dict=True)
losses

In [None]:
model.compile(
    loss=loss,
    loss_weights={
        'pitch': 0.05,
        'step': 1.0,
        'duration':1.0,
    },
    optimizer=optimizer,
)

In [None]:
model.evaluate(train_ds, return_dict=True)

In [None]:
import tensorflow as tf
print(tf.config.list_physical_devices('GPU'))
print("Built with CUDA:", tf.test.is_built_with_cuda())
print("GPU available:", tf.test.is_gpu_available())


In [None]:
print(model.output_names)


In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=5,
        verbose=1,
        restore_best_weights=True
    ),
]

In [None]:
model.compile(
    loss=loss,
    loss_weights={
        'pitch': 0.05,
        'step': 1.0,
        'duration':1.0,
    },
    optimizer=optimizer,
)

In [None]:
%%time
epochs = EPOCHS

history = model.fit(
    train_ds,
    epochs=epochs,
    callbacks=callbacks,
)

In [None]:
plt.plot(history.epoch, history.history['loss'], label='total loss')
plt.show()

In [None]:
FILE_NAME = "music_rnn_model.keras"
model.save(FILE_NAME)
print(f"Model saved successfully as {FILE_NAME}")
