### Import Libraries

In [2]:
import collections
import datetime
import fluidsynth
import glob
import numpy as np
import pathlib
import pandas as pd
import pretty_midi
import seaborn as sns
import os
import random as rn
import numpy as np
import tensorflow as tf

from IPython import display
from matplotlib import pyplot as plt
from typing import Optional

### Ensure Reproducibility

*To get reproducible results, a global seed is set.*

In [3]:
SEED_VALUE = 888888
os.environ['TF_DETERMINISTIC_OPS'] = str(SEED_VALUE)
os.environ['PYTHONHASHSEED'] = str(SEED_VALUE)
os.environ["CUDA_VISIBLE_DEVICES"] = str(SEED_VALUE)
rn.seed(SEED_VALUE)
np.random.seed(SEED_VALUE)
tf.random.set_seed(SEED_VALUE)
# Sampling rate for audio playback
SAMPLING_RATE = 16000

### Create the Training Set

*The model should be able to predict the next note/s given an input midi file, and we need the midi files we have from the dataset to first be converted to notes. The function below should help:*


In [4]:
def convert_to_notes(midi_file):
    """
    Accepts a midi_file as an input in str and returns a pandas dataframe 
    containing the each note's start, end, pitch, step, and duration
    """

    pm = pretty_midi.PrettyMIDI(midi_file)
    instrument = pm.instruments[0]
    notes = collections.defaultdict(list)

    # Sort by start time
    sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
    prev_start = sorted_notes[0].start

    for note in sorted_notes:
        start, end = note.start, note.end
        notes['start'].append(start)
        notes['end'].append(end) 
        notes['pitch'].append(note.pitch)
        notes['duration'].append(end - start)
        notes['step'].append(start - prev_start)
        prev_start = start

    return pd.DataFrame({name: np.array(value) for name, value in notes.items()})


*Load the downloaded maestro dataset:*

In [5]:
data_dir = pathlib.Path('../data/maestro-v2.0.0')
files = glob.glob(str(data_dir/'**/*.mid*'))


*Convert to notes:*

In [6]:
limit = 5
all_notes = []
for f in files[:limit]:
  notes = convert_to_notes(f)
  all_notes.append(notes)

all_notes = pd.concat(all_notes)
print(len(all_notes))

37241


*Create a tensorflow.data.Dataset using the parsed notes:*

In [7]:
key_order = ['pitch', 'step', 'duration']
training_notes = np.stack([all_notes[key] for key in key_order], axis=1)
notes_ds = tf.data.Dataset.from_tensor_slices(training_notes)
notes_ds.element_spec

TensorSpec(shape=(3,), dtype=tf.float64, name=None)

*Define a function for creating sequences. The model will be trained on sequences of notes with one extra note after as the label/target.*

In [8]:
def create_sequences(dataset, seq_length, vocab_size = 128):
  """Returns TF Dataset of sequence and label examples."""
  seq_length = seq_length+1

  # Extra one note as the label
  windows = dataset.window(seq_length, shift=1, stride=1,
                           drop_remainder=True)

  flatten = lambda x: x.batch(seq_length, drop_remainder=True)
  sequences = windows.flat_map(flatten)
  
  # Normalization
  def scale_pitch(x):
    x = x/[vocab_size,1.0,1.0]
    return x

  # Split the labels
  def split_labels(sequences):
    inputs = sequences[:-1]
    labels_dense = sequences[-1]
    labels = {key:labels_dense[i] for i,key in enumerate(key_order)}

    return scale_pitch(inputs), labels

  return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)

*Initialize the Hyperparameters.*

In [10]:
SEQ_LENGTH = 10
VOCAB_SIZE = 128
seq_ds = create_sequences(notes_ds, SEQ_LENGTH, VOCAB_SIZE)
seq_ds.element_spec

(TensorSpec(shape=(10, 3), dtype=tf.float64, name=None),
 {'pitch': TensorSpec(shape=(), dtype=tf.float64, name=None),
  'step': TensorSpec(shape=(), dtype=tf.float64, name=None),
  'duration': TensorSpec(shape=(), dtype=tf.float64, name=None)})

In [11]:
batch_size = 2
buffer_size = len(all_notes) - SEQ_LENGTH  # the number of items in the dataset
train_ds = (seq_ds
            .shuffle(buffer_size)
            .batch(batch_size, drop_remainder=True)
            .cache()
            .prefetch(tf.data.experimental.AUTOTUNE))

train_ds.element_spec

(TensorSpec(shape=(2, 10, 3), dtype=tf.float64, name=None),
 {'pitch': TensorSpec(shape=(2,), dtype=tf.float64, name=None),
  'step': TensorSpec(shape=(2,), dtype=tf.float64, name=None),
  'duration': TensorSpec(shape=(2,), dtype=tf.float64, name=None)})

### Train the Model

In [12]:
def mse_with_positive_pressure(y_true, y_pred):
  mse = (y_true - y_pred) ** 2
  positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
  return tf.reduce_mean(mse + positive_pressure)

input_shape = (SEQ_LENGTH, 3)
learning_rate = 0.005

inputs = tf.keras.Input(input_shape)
x = tf.keras.layers.LSTM(128)(inputs)

outputs = {
  'pitch': tf.keras.layers.Dense(128, name='pitch')(x),
  'step': tf.keras.layers.Dense(1, name='step')(x),
  'duration': tf.keras.layers.Dense(1, name='duration')(x),
}

model = tf.keras.Model(inputs, outputs)

loss = {'pitch': tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        'step': mse_with_positive_pressure,
        'duration': mse_with_positive_pressure}

opt = tf.keras.optimizers.Adam(learning_rate=learning_rate)
model.compile(loss=loss, loss_weights={'pitch': 0.05,
                                       'step': 1.0,
                                       'duration':1.0},
                                       optimizer=opt)

model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 10, 3)]      0           []                               
                                                                                                  
 lstm (LSTM)                    (None, 128)          67584       ['input_1[0][0]']                
                                                                                                  
 duration (Dense)               (None, 1)            129         ['lstm[0][0]']                   
                                                                                                  
 pitch (Dense)                  (None, 128)          16512       ['lstm[0][0]']                   
                                                                                              

In [13]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath='../models/training_checkpoints/ckpt_{epoch}',
        save_weights_only=True),
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=5,
        verbose=1,
        restore_best_weights=True),
]

In [14]:
%%time
epochs = 1

history = model.fit(
    train_ds,
    epochs=epochs,
    callbacks=callbacks,
)

: 

: 

In [None]:
plt.plot(history.epoch, history.history['loss'], label='total loss')
plt.show()

In [4]:
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

# Sampling rate for audio playback
_SAMPLING_RATE = 16000

In [5]:
data_dir = pathlib.Path('../data/maestro-v2.0.0')
filenames = glob.glob(str(data_dir/'**/*.mid*'))
print('Number of files:', len(filenames))

Number of files: 1282


In [6]:
sample_file = filenames[1]
print(sample_file)

../data/maestro-v2.0.0/2004/MIDI-Unprocessed_SMF_02_R1_2004_01-05_ORIG_MID--AUDIO_02_R1_2004_06_Track06_wav.midi


In [7]:
pm = pretty_midi.PrettyMIDI(sample_file)

In [8]:
def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
  waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
  # Take a sample of the generated waveform to mitigate kernel resets
  waveform_short = waveform[:seconds*_SAMPLING_RATE]
  return display.Audio(waveform_short, rate=_SAMPLING_RATE)

In [9]:
display_audio(pm)

fluidsynth: error: Unknown integer parameter 'synth.sample-rate'


In [10]:
print('Number of instruments:', len(pm.instruments))
instrument = pm.instruments[0]
instrument_name = pretty_midi.program_to_instrument_name(instrument.program)
print('Instrument name:', instrument_name)

Number of instruments: 1
Instrument name: Acoustic Grand Piano


In [11]:
for i, note in enumerate(instrument.notes[:10]):
  note_name = pretty_midi.note_number_to_name(note.pitch)
  duration = note.end - note.start
  print(f'{i}: pitch={note.pitch}, note_name={note_name},'
        f' duration={duration:.4f}')

0: pitch=31, note_name=G1, duration=0.0656
1: pitch=43, note_name=G2, duration=0.0792
2: pitch=44, note_name=G#2, duration=0.0740
3: pitch=32, note_name=G#1, duration=0.0729
4: pitch=34, note_name=A#1, duration=0.0708
5: pitch=46, note_name=A#2, duration=0.0948
6: pitch=48, note_name=C3, duration=0.6260
7: pitch=36, note_name=C2, duration=0.6542
8: pitch=53, note_name=F3, duration=1.7667
9: pitch=56, note_name=G#3, duration=1.7688
