<a href="https://colab.research.google.com/github/MarlonGrandy/LSTMMusicGeneration/blob/main/LSTMMusicGenerationPipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# from google.colab import drive
# drive.mount('/content/drive')

In [8]:
# import statements
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from music21 import converter, instrument, note, chord, stream, duration
import glob
import os
from itertools import chain
import copy
import numpy as np
from tensorflow.keras.callbacks import LambdaCallback
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout
from keras.layers import CuDNNLSTM, Bidirectional
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from numpy import argmax
from fractions import Fraction
import gc
import random
import keras.utils

In [4]:
def data_extractor(directory = "./midi_files/default_dataset/*.mid"):
    """
    Summary:
    Function converts midi files to metadata and appends nested metadata lists into one large list
    composed of all the songs in the dataset.

    Parameters:
    directory: String representation of directory where midi files are located

    Returns:
    list: sequential midi file metadata 
    """
    
    notes = []
    offsets = []
    durations = []
    for file in glob.glob(directory):
        mid = converter.parse(file)
        notes_to_parse = None
        prev_offset = 0 #taking difference between offsets, not absolute

        try:
            s2 = instrument.partitionByInstrument(mid) #can be multiple instruments in midi and splits it up, 0 index is piano
            notes_to_parse = s2.recurse()


        except:
            notes_to_parse = mid.flat.notes #gets all the notes you're gonna parse through

        for i, element in enumerate(notes_to_parse):
            if isinstance(element, note.Note):
                notes.append(str(element.pitch))

                durations.append(str(element.quarterLength))

                offset_dif = float(element.offset - prev_offset)

                offsets.append(round(offset_dif, 3)) # rounded off to 3 decimal places
                prev_offset = element.offset

            elif isinstance(element, chord.Chord):
                notes.append(".".join(str(n) for n in element.normalOrder)) # chords: all notes joined together by a string ie c.e.g
                offset_dif = float(element.offset - prev_offset)

                durations.append(str(element.quarterLength))

                offsets.append(round(offset_dif, 3))
                prev_offset = element.offset

    return [notes, offsets, durations]

./midi_files/*.mid


In [7]:
user_dir = input("Please input directory of files without including file names ") + "/*.mid"
print(user_dir)
data = []

if user_dir == "":
    data = data_extractor()
else:
    data = data_extractor(user_dir)

note_data = data[0]
offset_data = data[1]
duration_data = data[2]

unique_note_number = len(list(set(note_data))) # number of unique notes, length of list
unique_notes = sorted(list(set(note_data))) # list of unique notes, set
unique_offset_number = len(list(set(offset_data)))
unique_offsets = sorted(list(set(offset_data)))
unique_duration_number = len(list(set(duration_data)))
unique_durations = sorted(list(set(duration_data)))

./midi_files/default_dataset/*.mid




KeyError: 6365042528

In [None]:
#6417368928

In [44]:
def one_hot_encode(vector, all_values): #one hot encoded vector
    encoded_vectors = []
    int_to_index = dict((c, i) for i, c in enumerate(all_values)) # create dict, map a note to an index so you can easily find it
    for i in vector:
        zero = [0] * (len(all_values) - 1) # create list of 0s for each note
        zero.insert(int_to_index[i], 1) # where this note is present in the list of 0s, replace with a 1
        encoded_vectors.append(zero)

    return encoded_vectors

In [45]:
from numpy import argmax

# turns one hot back to midi notes
def one_hot_decode(vector, all_values):
    decoded_vector = []
    index_to_int = dict((i, c) for i, c in enumerate(all_values))
    decoded_vector.append(index_to_int[argmax(vector)]) # takes the highest value in the vector ie 1 and returns index
    return decoded_vector

In [46]:
segment_length = 64 # found ideal segment length of 64
# take segement lneght of 64 notes which predicts next note theoretically so need to loop to predict more

# can take in note offset or duration data and outputs result for each of them
def make_segments(data_array, unique_values, seq_length=segment_length):
    input_seq = []
    output_seq = []

    processed_data = one_hot_encode(data_array, unique_values) #one hot encode the data

    for i in range(0, len(processed_data) - seq_length, 1):
        input_seq.append([processed_data[i : i + seq_length]]) # add segments of 64
        output_seq.append(processed_data[seq_length + i]) # add the next note after 64
        # supervised learning: we use notes 0-64 to predict 65 but we know 65

    del processed_data
    gc.collect()

    input_seq = np.reshape( # reshaping into what is required for lstm
        input_seq, (len(input_seq), segment_length, len(unique_values)) # input_seq is list of lists, len(input_seq) = total number of segments
    )
    output_seq = np.array(output_seq)

    return input_seq, output_seq

In [47]:
note_model_data = make_segments(data_array=note_data, unique_values=unique_notes)
offset_model_data = make_segments(data_array=offset_data, unique_values=unique_offsets)
duration_model_data = make_segments(
    data_array=duration_data, unique_values=unique_durations
)

In [48]:
X_train_note, X_test_note, y_train_note, y_test_note = train_test_split(
    note_model_data[0], note_model_data[1], test_size=0.2 #note_model_data[0] is input and [1] is output sequence, test size = 20% of total data
)
X_train_off, X_test_off, y_train_off, y_test_off = train_test_split(
    offset_model_data[0], offset_model_data[1], test_size=0.2
)
X_train_dur, X_test_dur, y_train_dur, y_test_dur = train_test_split(
    duration_model_data[0], duration_model_data[1], test_size=0.2
)

In [49]:
del note_model_data
del offset_model_data
del duration_model_data
gc.collect()

0

In [50]:
# inputs model parameter; 1 lstm and 2 dense layers
def make_model(input_shape, output_shape):
    model = Sequential()
    model.add(CuDNNLSTM(512, input_shape=input_shape, return_sequences=False)) # LSTM layer that can run on GPU
    model.add(Dropout(0.25))
    model.add(Dense(128))
    model.add(Dense(output_shape, activation="softmax")) #use softmax to return a probability vector of the probability of the next note being a certain note

    model.compile( # standard
        loss="categorical_crossentropy",
        optimizer=Adam(learning_rate=0.001), # slowed down learning rate because model was overfitting in later stages of learning
        metrics=["accuracy"],
    )
    return model

In [52]:
model_notes = make_model(
    input_shape=(segment_length, unique_note_number), output_shape=unique_note_number
)
model_offsets = make_model(
    input_shape=(segment_length, unique_offset_number),
    output_shape=unique_offset_number,
)
model_durations = make_model(
    input_shape=(segment_length, unique_duration_number),
    output_shape=unique_duration_number,
)

In [53]:
my_callbacks = [tf.keras.callbacks.EarlyStopping(patience=2)] # early stopping looks at validation. stops when loss stops decreasing and learning
# stops at 2 epochs in a row where loss stops decreasing

In [54]:
history_note = model_notes.fit(
    X_train_note, y_train_note, epochs=150, validation_split=0.2, callbacks=my_callbacks
)

Epoch 1/150


2022-12-05 20:50:10.887781: W tensorflow/core/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
2022-12-05 20:50:11.373753: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.




2022-12-05 20:50:52.102532: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


Epoch 2/150
Epoch 3/150
Epoch 4/150
Epoch 5/150
Epoch 6/150
Epoch 7/150


In [55]:
history_offset = model_offsets.fit(
    X_train_off, y_train_off, epochs=150, validation_split=0.2, callbacks=my_callbacks
)

Epoch 1/150


2022-12-05 20:55:15.556140: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.




2022-12-05 20:55:52.469607: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


Epoch 2/150
Epoch 3/150
Epoch 4/150
Epoch 5/150
Epoch 6/150
Epoch 7/150
Epoch 8/150
Epoch 9/150
Epoch 10/150


In [56]:
history_duration = model_durations.fit(
    X_train_dur, y_train_dur, epochs=150, validation_split=0.2, callbacks=my_callbacks
)

Epoch 1/150


2022-12-05 21:02:07.931059: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.




2022-12-05 21:02:45.270210: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


Epoch 2/150
Epoch 3/150
Epoch 4/150
Epoch 5/150
Epoch 6/150
Epoch 7/150
Epoch 8/150
Epoch 9/150
Epoch 10/150
Epoch 11/150
Epoch 12/150
Epoch 13/150
Epoch 14/150
Epoch 15/150


In [57]:
# output for model is probability vector which is mapped to note
# noramlly take the highest probabilty and that is the note that is next
# however, we skew the probabilities by a certain amount so different probabilities will be chosen to prevent the repeated notes
# alters probability vector and takes largest probability
def sample(preds, temperature=1.0):
    """Helper function to sample an index from a probability array."""
    preds = np.asarray(preds).astype("float64")
    preds = np.exp(np.log(preds) / temperature)
    preds = preds / np.sum(preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

In [58]:
# use predicted note and append to end of input sequence, then cut off first note of the input sequence and keep shifting
def music_maker(seed_vec, model_type, number_unique, unique, diversity, num_notes=64):
    music = []
    arr = np.zeros((len(seed_vec) + num_notes, number_unique))
    for c, i in enumerate(seed_vec):
        arr[c] = i

    for i in range(0, num_notes, 1):
        d_arr = np.zeros(number_unique)
        pred = model_type.predict(
            np.reshape(
                arr[i : len(arr) - num_notes + i],
                (1, len(arr[i : len(arr) - num_notes + i]), number_unique),
            ),
            verbose=0,
        )[0]
        diverse = sample(pred, diversity) # calling sample function when you have a prediction
        d_arr[diverse] = 1
        music.append(one_hot_decode(d_arr, all_values=unique)[0])

        arr[len(seed_vec) + i] = d_arr

    return music

In [1]:
randnum = random.randrange(0, len(X_test_note)) # chose the same random segment from the notes, offset, and duration, 
test_note = X_test_note[randnum]
test_offset = X_test_off[randnum]
test_dur = X_test_dur[randnum]
generated_music_note = music_maker(
    test_note, model_notes, unique_note_number, unique_notes, 0.7 # higher diversity for notes and offsets beecause we wanted more variability?
)
generated_music_offset = music_maker(
    test_offset, model_offsets, unique_offset_number, unique_offsets, 0.7
)
generated_music_duration = music_maker(
    test_dur, model_durations, unique_duration_number, unique_durations, 0.2 # less diversity because model was more confident based on probability vector
)

NameError: name 'random' is not defined

In [71]:
# turn one of encoded back to original note form
seed_notes = []
seed_offsets = []
seed_durations = []
for n in test_note:
    seed_notes.append(one_hot_decode(n, unique_notes)[0])
for o in test_offset:
    seed_offsets.append(one_hot_decode(o, unique_offsets)[0])
for d in test_dur:
    seed_durations.append(one_hot_decode(d, unique_durations)[0])

In [72]:
def to_midi(notes, offsets, durations):
    """
    Summary:
    Takes midi metadata and converts it to a Music21 stream. The stream can then easily be converted into a midi file.

    Parameters:
    metadata: midi metadata (notes,chords,offsets)

    Returns:
    list: Music21 stream object
    """
    offset = offsets[0]
    s = stream.Stream()
    for i, ele in enumerate(notes):
        if ele[0].isalpha():
            n = note.Note(ele)
            try:
                n.quarterLength = float(durations[i])
            except:
                n.quarterLength = Fraction(durations[i])

            s.insert(offset, n)

            offset += offsets[i]
        else:
            chords = list(map(int, ele.split(".")))
            c = chord.Chord(chords)

            try:
                c.quarterLength = float(durations[i])
            except:
                c.quarterLength = Fraction(durations[i])

            s.insert(offset, c)

            offset += offsets[i]
    return s

In [73]:
generated_stream = to_midi(generated_music_note, generated_music_offset, generated_music_duration)
generated_stream.write("midi", "generated_classical.mid")
to_midi(seed_notes, seed_offsets, seed_durations).write("midi", "seed_classical.mid")

# biggest improvement is being able to split up process into different files so we can manage ram better
# right now we make massive array of notes so at some point the notes between song segments will overlap ie 16 notes from 1 song and 48 from another, we could
# have found different random sequences from different songs.

'seed_classical.mid'

In [None]:
def to_sheetmusic(stream):
    """
    Summary:
    Takes Stream object and converts it to sheet music.

    Parameters:
    midi file

    Returns:
    list: Music21 stream object
    """
    stream.show()

In [9]:
s1 = stream.Stream()
d5 = note.Note("D5")
s1.append(d5)
s1.show()

SubConverterException: Cannot find a path to the 'mscore' file at /Applications/MuseScore 3.app/Contents/MacOS/mscore -- download MuseScore