In [None]:
import os
import numpy as np
import pandas as pd
from collections import deque

!pip install music21
!pip install pygame

from music21 import converter, instrument, note, chord, midi, stream, meter

from keras.models import Model, Sequential
from keras.layers import *
from keras.models import load_model
import keras.backend as K
import keras.callbacks
from keras.utils import Sequence
from keras.utils import to_categorical
from keras.preprocessing.sequence import TimeseriesGenerator

In [None]:
# grab / parse data

In [None]:
directory = "midi/Piano"
# specify the number of track in each song (should be the same, in same order, for each song)
tracks = (0,)
instru = instrument.Piano()

notes = [[] for track in tracks]
durations = [[] for track in tracks]
for i, file in enumerate(os.listdir(directory)):
    midi_part = converter.parse(os.path.join(directory, file))
    # Parse the midi file by the notes/chords it contains
    for track, _ in enumerate(tracks):
        notes_to_parse = midi_part[tracks[track]].flat.notesAndRests
        notes_to_parse = notes_to_parse.makeMeasures()
        for mes in notes_to_parse.measures(0, None):
            if isinstance(mes, stream.Measure):
                mes = mes.chordify()
                notes_mes_tab = []
                durations_mes_tab = []
                for elem in mes:    
                    if isinstance(elem, note.Note):
                        notes_mes_tab.append([str(elem.pitch)])
                    elif isinstance(elem, chord.Chord):
                        notes_mes_tab.append([str(n.nameWithOctave) for n in elem.pitches])
                    elif isinstance(elem, note.Rest):
                        notes_mes_tab.append([elem.name])
                    durations_mes_tab.append(elem.quarterLength)
                notes[track].append(notes_mes_tab)
                durations[track].append(durations_mes_tab)
    print("Song %s Loaded" % file)
print("DONE LOADING SONGS") 
# Get all notes
np_notes = [[] for track in tracks]
for track, _ in enumerate(tracks):
    for elem in notes[track]:
        np_notes[track] += elem
np_notes = [np.array(np_notes[track]) for track in tracks]
np_notes = [np_notes[track].flatten() for track in tracks]
notes_vocab = [[] for track in tracks]
for track, _ in enumerate(tracks):
    for item in np_notes[track]:
        if len(item) > 1 and item != "rest":
            notes_vocab[track].append(",".join([note for note in item]))
        else:
            notes_vocab[track].append(item[0])
    notes_vocab[track] = sorted(set(notes_vocab[track]))
# Get all durations
np_durations = [[] for track in tracks]
for track, _ in enumerate(tracks):
    for elem in durations[track]:
        np_durations[track] += elem
durations_vocab = [[] for track in tracks]
for track, _ in enumerate(tracks):
    durations_vocab[track] = sorted(set(np.array(np_durations[track]).flatten()))

for track, _ in enumerate(tracks):
    print("%s Measures, notes_Vocab : %s ; durations_Vocab : %s" % (len(notes[track]), len(notes_vocab[track]), len(durations_vocab[track])))
print(notes_vocab, notes)
print(durations_vocab, durations)

In [None]:
maxLen = 0
for track, _ in enumerate(tracks):
    currMax = max(len(elem) for elem in notes[track])
    if currMax > maxLen:
        maxLen = currMax
for track, _ in enumerate(tracks):
    currMax = max(len(elem) for elem in durations[track])
    if currMax > maxLen:
        maxLen = currMax
        
print(maxLen)

In [None]:
cat_notes = [[] for track in tracks]
cat_durations = [[] for track in tracks]

for track, _ in enumerate(tracks):
    for mes in notes[track]:
        cat_tab = []
        for elem in mes:
            int_note = notes_vocab[track].index(",".join(elem))
            cat = np.zeros((len(notes_vocab[track])))
            cat[int_note] = 1
            cat_tab.append(cat)
        while len(cat_tab) < maxLen:
            cat_tab.append(np.zeros((len(notes_vocab[track]))))
        cat_notes[track].append(cat_tab)
    for mes in durations[track]:
        cat_tab = []
        for elem in mes:
            int_duration = durations_vocab[track].index(elem)
            cat = np.zeros((len(durations_vocab[track])))
            cat[int_duration] = 1
            cat_tab.append(cat)
        while len(cat_tab) < maxLen:
            cat_tab.append(np.zeros((len(durations_vocab[track]))))
        cat_durations[track].append(cat_tab)
# merge
x = [cat_notes, cat_durations]

In [None]:
# Build generator

In [None]:
batch_size = 32
#split = int(0.8 * len(x))

class dataGenerator(Sequence):
    def __init__(self, x, batch_size):
        self.batch_size = batch_size
        self.x = x
    def __len__(self):
        return len(self.x[0][0]) // self.batch_size - 1
    def __getitem__(self, idx):
        X_note = [[] for track in tracks]
        X_duration = [[] for track in tracks]
        Y_note = [[] for track in tracks]
        Y_duration = [[] for track in tracks]
        res = [[], []]
        for track, _ in enumerate(tracks):
            for i in range(self.batch_size):
                X_note[track].append(self.x[0][track][idx + i])
                X_duration[track].append(self.x[1][track][idx + i])
                Y_note[track].append(self.x[0][track][idx + i + 1])
                Y_duration[track].append(self.x[1][track][idx + i + 1])
            X_note[track] = np.array(X_note[track])
            X_duration[track] = np.array(X_duration[track])
            Y_note[track] = np.array(Y_note[track])
            Y_duration[track] = np.array(Y_duration[track])
            res[0].append(X_note[track])
            res[0].append(X_duration[track])
            res[1].append(Y_note[track])
            res[1].append(Y_duration[track])
        return res
data_gen = dataGenerator(x, batch_size)

In [None]:
data_gen[0][0][0].shape

In [None]:
# Build Model

In [None]:
K.clear_session()

inputs_list = []
tracks_list = []
outputs_list = []

for track, _ in enumerate(tracks):
    in_note = Input(shape=(maxLen, len(notes_vocab[track])))
    inputs_list.append(in_note)
    x1 = LSTM(64, return_sequences=True)(in_note)
    tracks_list.append(x1)
    in_duration = Input(shape=(maxLen, len(durations_vocab[track])))
    inputs_list.append(in_duration)
    x2 = LSTM(64, return_sequences=True)(in_duration)
    tracks_list.append(x2)

concat = Concatenate()(tracks_list)
shared = TimeDistributed(Dense(64, activation='relu'))(concat)

for track, _ in enumerate(tracks):
    
    out_note = Dense(len(notes_vocab[track]), activation='softmax')(shared)
    outputs_list.append(out_note)
    
    out_duration = Dense(len(durations_vocab[track]), activation='softmax')(shared)
    outputs_list.append(out_duration)

model = Model(inputs=inputs_list, 
              outputs=outputs_list)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])

model.summary()

In [None]:
#save best model if model improved
model_name = "Piano_Mozart_Chopin_Batch_Beethoven_measured.h5"
best_checkpoint = keras.callbacks.ModelCheckpoint(model_name, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

In [None]:
h = model.fit_generator(data_gen, epochs = 100, callbacks=[best_checkpoint])

In [None]:
#print History graph
historydf = pd.DataFrame(h.history, index=h.epoch)
historydf.plot(ylim=(0,1))

In [None]:
# Load / save

In [None]:
model_name = "Piano_Mozart_Chopin_Batch_Beethoven_measured.h5"
model.save(model_name)
#model = load_model(model_name)

In [None]:
# Test

In [None]:
# Build seed
idx = 42
x_test, _ = data_gen[idx]
x_test = [x_test[i][0] for i in range(len(x_test))]
x_test_stream = [stream.Stream() for track in tracks]

for track, _ in enumerate(tracks):
    for i in range(seq_len):
        if x_test[2*track][i] != np.zeros((len(x_test[2*track][i]))):
            str_note = notes_vocab[track][np.argmax(x_test[2*track][i])]
            duration = durations_vocab[track][np.argmax(x_test[2*track+1][i])]
            if len(str_note.split(",")) > 1:
                _chord = chord.Chord(str_note.split(","))
                _chord.quarterLength = duration
                x_test_stream[track].append(_chord)
            else:
                if str_note != "rest":
                    _note = note.Note(str_note)
                    _note.quarterLength = duration
                    x_test_stream[track].append(_note)
                else:
                    _rest = note.Rest()
                    _rest.quarterLength = duration
                    x_test_stream[track].append(_rest)
    x_test_stream[track].insert(0, instru)

In [None]:
# make seq_len predictions from seed
#preds = [[] for track in tracks]
x = x_test

for _ in range(seq_len*2):
    pred = model.predict([np.array([x[i]]) for i in range(len(x))])
    """_note = [pred[i] for i in range(0, len(pred), 2)]
    _duration = [pred[i] for i in range(1, len(pred), 2)]
    for track, _ in enumerate(tracks):
        cat_note = np.zeros((len(notes_vocab[track])))
        _note[track] = np.argmax(_note[track])
        cat_note[_note[track]] = 1
        cat_duration = np.zeros((len(durations_vocab[track])))
        _duration[track] = np.argmax(_duration[track])
        cat_duration[_duration[track]] = 1
        x[2*track] = x[2*track][1:]
        x[2*track] = list(x[2*track]) + [cat_note]
        x[2*track+1] = x[2*track+1][1:]
        x[2*track+1] = list(x[2*track+1]) + [cat_duration]
        preds[track].append((cat_note, cat_duration))"""

In [None]:
# Build predicted stream
y_test_stream = [stream.Stream() for track in tracks]

for track, _ in enumerate(tracks):
    for i in range(len(pred[track])):
        if x_test[2*track][i] != np.zeros((len(x_test[2*track][i]))):
            str_note = notes_vocab[track][np.argmax(pred[track][i][0])]
            duration = durations_vocab[track][np.argmax(pred[track][i][1])]
            if len(str_note.split(",")) > 1:
                _chord = chord.Chord(str_note.split(","))
                _chord.quarterLength = duration
                y_test_stream[track].append(_chord)
            else:
                if str_note != "rest":
                    _note = note.Note(str_note)
                    _note.quarterLength = duration
                    y_test_stream[track].append(_note)
                else:
                    _rest = note.Rest()
                    _rest.quarterLength = duration
                    y_test_stream[track].append(_rest)
    y_test_stream[track].insert(0, instru)

In [None]:
# play seed
x_full_score = stream.Score()
for track, _ in enumerate(tracks):
    p = stream.Part()
    p.append(x_test_stream[track])
    x_full_score.insert(0, p)
x_full_score.makeMeasures()#.show('musicxml')
x_full_score.chordify()
x_full_score.write("midi", "x_test.mid")
sp = midi.realtime.StreamPlayer(x_full_score)
sp.play()

In [None]:
# play generated music
y_full_score = stream.Score()
for track, _ in enumerate(tracks):
    p = stream.Part()
    p.append(y_test_stream[track])
    y_full_score.insert(0, p)
y_full_score.makeMeasures()#.show('musicxml')
y_full_score.chordify()
y_full_score.write("midi", "x_test.mid")
sp = midi.realtime.StreamPlayer(y_full_score)
sp.play()