# Dataset preparation

package that are necessary to handle audio files.

In [1]:
# to be uncommented on colab
#sudo apt install -y fluidsynth

# if you are using macOS run
#brew install fluidsynth

In [2]:
!pip3 install --upgrade pyfluidsynth
!pip3 install fluidsynth
!pip3 install pretty_midi
!pip3 install mido
!pip3 install miditok
# !pip3 install -Iv https://pypi.python.org/packages/source/p/pyFluidSynth/pyFluidSynth-1.2.4.tar.gz#md5=60079310701d0b9298b65a8b6728ffcc



In [3]:
import sys
import pathlib
import glob
import os
import shutil
import math
sys.path.insert(0, os.path.abspath('utils/'))
import midi_util

dir_path = pathlib.Path('midis')

filenames = glob.glob(str(dir_path/'*.mid*'))
print('Number of files:', len(filenames))

Number of files: 10854


In [4]:
import pretty_midi

pm = pretty_midi.PrettyMIDI(filenames[1])

In [5]:
import fluidsynth
from IPython import display

_SAMPLING_RATE = 16000
def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
  waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
  waveform_short = waveform[:seconds*_SAMPLING_RATE]
  return display.Audio(waveform_short, rate=_SAMPLING_RATE)

#display_audio(pm)

# TODO: change 
midi = midi_util.MidiUtils(filenames[1])
midi.play_midi()

Divide the dataset in training, validation and test set

In [6]:
def make_subset(dir=None, start_index=0, end_index=0) -> None:
    dir = "dataset/" + dir
    if os.path.exists(dir):
        shutil.rmtree(dir)
    os.makedirs(dir)
    files = filenames[start_index:end_index]
    for filename in files:
        if os.path.isfile(filename):
            shutil.copyfile(src=filename, dst=filename.replace("midis/", dir + '/'))

    print(f"{dir}: {len(files)}")


train_size = 0.7
validation_size = 0.1
test_size = 0.2

train_path = pathlib.Path('dataset/training')
test_path = pathlib.Path('dataset/test')
validation_path = pathlib.Path('dataset/validation')

make_subset("training", 0, math.floor(len(filenames)*train_size))
make_subset("validation", math.floor(len(filenames)*train_size), math.floor(len(filenames)*(train_size+validation_size)))
make_subset("test", math.floor(len(filenames)*(train_size+validation_size)), math.floor(len(filenames)*(train_size+validation_size + test_size)))

dataset/training: 7597
dataset/validation: 1086
dataset/test: 2171


In [19]:
from miditok import MIDILike

pitch_range = range(21, 109)
nb_velocities = 32
additional_tokens = {'Chord': False, 'Rest': True, 'Tempo': False, 'Program': False, 'TimeSignature': False,
                     'rest_range': (2, 8),  # (half, 8 beats)
                     'nb_tempos': 64,  # nb of tempo bins
                     }  # (min, max)

tokenizer = MIDILike(pitch_range=pitch_range, nb_velocities=nb_velocities, additional_tokens=additional_tokens, pad=True, sos_eos=True)

In [20]:
tokens = 0
for token in tokenizer.vocab:
    print(token)
    if tokens > 100:
        break
    tokens += 1

PAD_None
SOS_None
EOS_None
NoteOn_21
NoteOn_22
NoteOn_23
NoteOn_24
NoteOn_25
NoteOn_26
NoteOn_27
NoteOn_28
NoteOn_29
NoteOn_30
NoteOn_31
NoteOn_32
NoteOn_33
NoteOn_34
NoteOn_35
NoteOn_36
NoteOn_37
NoteOn_38
NoteOn_39
NoteOn_40
NoteOn_41
NoteOn_42
NoteOn_43
NoteOn_44
NoteOn_45
NoteOn_46
NoteOn_47
NoteOn_48
NoteOn_49
NoteOn_50
NoteOn_51
NoteOn_52
NoteOn_53
NoteOn_54
NoteOn_55
NoteOn_56
NoteOn_57
NoteOn_58
NoteOn_59
NoteOn_60
NoteOn_61
NoteOn_62
NoteOn_63
NoteOn_64
NoteOn_65
NoteOn_66
NoteOn_67
NoteOn_68
NoteOn_69
NoteOn_70
NoteOn_71
NoteOn_72
NoteOn_73
NoteOn_74
NoteOn_75
NoteOn_76
NoteOn_77
NoteOn_78
NoteOn_79
NoteOn_80
NoteOn_81
NoteOn_82
NoteOn_83
NoteOn_84
NoteOn_85
NoteOn_86
NoteOn_87
NoteOn_88
NoteOn_89
NoteOn_90
NoteOn_91
NoteOn_92
NoteOn_93
NoteOn_94
NoteOn_95
NoteOn_96
NoteOn_97
NoteOn_98
NoteOn_99
NoteOn_100
NoteOn_101
NoteOn_102
NoteOn_103
NoteOn_104
NoteOn_105
NoteOn_106
NoteOn_107
NoteOn_108
NoteOff_21
NoteOff_22
NoteOff_23
NoteOff_24
NoteOff_25
NoteOff_26
NoteOff_27
NoteOff

In [23]:
len(tokenizer.vocab)

284

### tokenization
We transform our MIDI files in sequences of tokens

In [24]:
from miditoolkit import MidiFile
from tqdm import tqdm

def tokenize_directory(dir=None):
    all_tokens = []
    path = pathlib.Path('dataset/' + dir)
    files_to_tokenize = glob.glob(str(path/'*.mid*'))
    for file in tqdm(files_to_tokenize):
        all_tokens.extend(tokenizer.midi_to_tokens(MidiFile(file)))
    return all_tokens

training_tokens = tokenize_directory("training")
validation_tokens = tokenize_directory("validation")
test_tokens = tokenize_directory("test")
print(f"training tokens: {len(training_tokens)}")
print(f"validation tokens: {len(validation_tokens)}")
print(f"test tokens: {len(test_tokens)}")

100%|██████████| 7598/7598 [57:41<00:00,  2.20it/s]  
100%|██████████| 1086/1086 [07:50<00:00,  2.31it/s]
100%|██████████| 2171/2171 [15:43<00:00,  2.30it/s]

training tokens: 7596
validation tokens: 1086
test tokens: 2171





transform the sequence of tokens in tensorflow Dataset (tf.data.Dataset) in order to save it.

In [None]:
import numpy as np
import tensorflow as tf

training_ds = tf.data.Dataset.from_tensor_slices(training_tokens)
validation_ds = tf.data.Dataset.from_tensor_slices(validation_tokens)
test_ds = tf.data.Dataset.from_tensor_slices(test_tokens)

# save to file
training_ds.save("tf_dataset/training/")
validation_ds.save("tf_dataset/validation/")
test_ds.save("tf_dataset/test/")

2023-01-09 12:44:21.366940: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-01-09 12:47:48.902780: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [None]:
# TODO: create the sequence by shifting of 1 and by adding the target
def create_sequences(
        dataset: tf.data.Dataset,
        seq_len: int,
        vocab_size,
) -> tf.data.Dataset:
    seq_len = seq_len + 1

    windows = dataset.window(seq_len, shift=1, stride=1, drop_remainder=True)

    flatten = lambda x: x.batch(seq_len, drop_remainder=True)
    sequences = windows.flat_map(flatten)

    def split_label(sequences):
        inputs = sequences[:-1]
        label = sequences[-1]
        return inputs, label

    return sequences.map(split_label, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
seq_length = 25 # TODO tuning
vocab_size = tokenizer.vocab.count()
seq_ds = create_sequences(training_ds, seq_length, vocab_size)
seq_ds.element_spec

In [None]:
for seq, target in seq_ds.take(1):
  print('sequence shape:', seq.shape)
  print('sequence elements (first 10):', seq[0: 10])
  print()
  print('target:', target)

In [None]:
batch_size = 64
buffer_size = len(training_ds) - seq_length
train_ds = (seq_ds
            .shuffle(buffer_size)
            .batch(batch_size, drop_remainder=True)
            .cache()
            .prefetch(tf.data.experimental.AUTOTUNE))

In [None]:
train_ds.element_spec

In [None]:
def mse_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
    mse = (y_true - y_pred) ** 2
    positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
    return tf.reduce_mean(mse + positive_pressure)

In [None]:
input_shape = (seq_length, 3)
learning_rate = 0.005

inputs = tf.keras.Input(input_shape)
x = tf.keras.layers.LSTM(vocab_size)(inputs)

output = tf.keras.layers.Dense(vocab_size)(x)

model = tf.keras.Model(inputs, output)

loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

model.compile(loss=loss, optimizer=optimizer)

model.summary()

In [None]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath='./training_checkpoints/ckpt_{epoch}',
        save_weights_only=True),
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=5,
        verbose=1,
        restore_best_weights=True),
]

In [None]:
history = model.fit(
    train_ds,
    epochs=50,
    callbacks=callbacks,
)

In [None]:
from matplotlib import pyplot as plt

plt.plot(history.epoch, history.history['loss'], label='total loss')
plt.show()

In [None]:
def predict_next_note(
    notes: np.ndarray,
    keras_model: tf.keras.Model
) -> int:

  # Add batch dimension
  inputs = tf.expand_dims(notes, 0)

  prediction = model.predict(inputs)

  return int(prediction)