In [1885]:
import numpy as np
import os
import glob
import pickle
from music21 import converter, instrument, stream, note, chord
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM, Activation, Embedding, 
    Lambda, Input, concatenate, Reshape, Permute, RepeatVector, Multiply
from keras.utils import np_utils
from keras.callbacks import ModelCheckpoint
from keras.layers import BatchNormalization as BatchNorm
from keras.models import Model
from keras import backend as K

In [1886]:
def convert_dataset_to_notes(flag):
    notes = []
    for file in glob.glob("classical_piano_dataset/*.mid"):
        print("Parsing %s" % file)
        midi = converter.parse(file)
        try: 
            s2 = instrument.partitionByInstrument(midi)
            notes_to_parse = s2.parts[0].recurse() 
        except:
            notes_to_parse = midi.flat.notes

        for element in notes_to_parse:
            length = str(element.quarterLength)
            if "/" in str(element.quarterLength):
                length = str('%.1f' % float(element.quarterLength))
            if flag == 0 or flag == 2:
                length = ""
                
            if isinstance(element, note.Note):
                notes.append(str(element.pitch) + " " +  length)
            elif isinstance(element, chord.Chord):
                notes.append('.'.join(str(n) for n in element.normalOrder) + " " + length)
            elif isinstance(element, note.Rest) and (flag == 2 or flag == 3):
                notes.append(str(element.name)  + " " + length)
  
    return notes

In [1887]:
def create_att_network(network_input, n_vocab): #attention network
    notes_in = Input(shape = (network_input.shape[1]))
    x1 = Embedding(n_vocab, 5, input_length = network_input.shape[1])(notes_in)

    x = LSTM(512, return_sequences = True)(x1)
    x = LSTM(512, return_sequences = True)(x)
    
    e = Dense(1, activation='tanh')(x)
    e = Reshape([-1])(e)
   
    alpha = Activation('softmax')(e)

    c = Permute([2,1])(RepeatVector(512)(alpha))
    c = Multiply()([x,c])
    c = Lambda(lambda xin: K.sum(xin, axis=1), output_shape=(512,))(c)
    
    notes_out = Dense(n_vocab, activation = 'softmax')(c)
    model = Model(notes_in, notes_out)

    att_model = Model(notes_in, alpha)
    #opti = RMSprop(lr = 0.001)
    model.compile(loss='categorical_crossentropy', optimizer='adam')
    return model

In [1888]:
def create_seq_network(network_input, n_vocab):
    model = Sequential()
    model.add(LSTM(
        512,
        input_shape=(network_input.shape[1], network_input.shape[2]),
        recurrent_dropout=0.3,
        return_sequences=True
    ))
    model.add(LSTM(512,
        input_shape=(network_input.shape[1], network_input.shape[2]), 
        recurrent_dropout=0.3,
        return_sequences=False
    ))
    model.add(BatchNorm())
    model.add(Dropout(0.3))
    model.add(Dense(256))
    model.add(Activation('relu'))
    model.add(BatchNorm())
    model.add(Dropout(0.3))
    model.add(Dense(n_vocab))
    model.add(Lambda(lambda x: x / 0.6))
    model.add(Activation('softmax'))
    model.compile(optimizer = 'adam', loss = 'categorical_crossentropy')
    return model

In [1889]:
def train_model(epochs_num, notes, flag):
    n_vocab = len(set(notes))
    network_input, network_output = prepare_sequences(notes, n_vocab, flag)
    
    if flag == 0:
        model = create_seq_network(network_input, n_vocab)
    elif flag == 1:
        model = create_att_network(network_input, n_vocab)
 
    checkpoint = ModelCheckpoint(
        "weights.hdf5",
        monitor='loss',
        verbose=0,
        save_best_only=True,
        mode='min'
    )  
    model.fit(network_input, network_output, epochs = epochs_num, callbacks=[checkpoint])

In [1890]:
def prepare_sequences(notes, n_vocab, flag):
    sequence_length = 4
    pitchnames = sorted(set(item for item in notes))
    note_to_int = dict((note, number) for number, note in enumerate(pitchnames))
    network_input = []
    network_output = []

    for i in range(0, len(notes) - sequence_length, 1):
        sequence_in = notes[i:i + sequence_length]
        sequence_out = notes[i + sequence_length]
        network_input.append([note_to_int[char] for char in sequence_in])
        network_output.append(note_to_int[sequence_out])
    n_patterns = len(network_input)

    if flag == 0:
        network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))
    elif flag == 1:
        network_input = np.reshape(network_input, (n_patterns, sequence_length))

    network_input = network_input / float(n_vocab)
    network_output = np_utils.to_categorical(network_output)
    return (network_input, network_output)

In [1891]:
def prepare_sequences_prediction(notes, pitchnames, n_vocab, flag):
    note_to_int = dict((note, number) for number, note in enumerate(pitchnames))
    sequence_length = 4
    network_input = []
    output = []
    for i in range(0, len(notes) - sequence_length, 1):
        sequence_in = notes[i:i + sequence_length]
        sequence_out = notes[i + sequence_length]
        network_input.append([note_to_int[char] for char in sequence_in])
        output.append(note_to_int[sequence_out])
    n_patterns = len(network_input)

    if flag == 0:
        normalized_input = np.reshape(network_input, (n_patterns, sequence_length, 1))
    elif flag == 1:
        normalized_input = np.reshape(network_input, (n_patterns, sequence_length))

    normalized_input = normalized_input / float(n_vocab)
    return (network_input, normalized_input)

In [1892]:
def predict_notes(model, network_input, pitchnames, n_vocab, flag):
    start = np.random.randint(0, len(network_input)-1)
    int_to_note = dict((number, note) for number, note in enumerate(pitchnames))
    pattern = network_input[start]
    prediction_output = []

    for note_index in range(200):
        if flag == 0:
            prediction_input = np.reshape(pattern, (1, len(pattern), 1))
        elif flag == 1:
            prediction_input = np.reshape(pattern, (1, len(pattern)))
        prediction_input = prediction_input / float(n_vocab)
        prediction = model.predict(prediction_input)       
        index = np.random.choice(range(len(prediction[0])), p=prediction[0])
        result = int_to_note[index]
        prediction_output.append(result)
        pattern.append(index)
        pattern = pattern[1:len(pattern)]
    return prediction_output

In [1893]:
def perform_rnn(notes, flag):
    pitchnames = sorted(set(item for item in notes))
    n_vocab = len(set(notes))

    network_input, normalized_input = prepare_sequences_prediction(notes, pitchnames, n_vocab, model_flag)
    if model_flag == 0:
        model = create_seq_network(normalized_input, n_vocab)
    elif model_flag == 1:
        model = create_att_network(normalized_input, n_vocab)
    model.load_weights('weights.hdf5')

    prediction_output = predict_notes(model, network_input, pitchnames, n_vocab, model_flag)
    return prediction_output

In [1894]:
def generate_music_file(prediction_output, flag):
    offset = 0
    output_notes = []
    for pattern_and_duration in prediction_output:
        duration = pattern_and_duration.split(" ")[1]
        pattern = pattern_and_duration.split(" ")[0]

        if ('.' in pattern) or pattern.isdigit():
            notes_in_chord = pattern.split('.')
            notes = []
            for current_note in notes_in_chord:
                new_note = note.Note(int(current_note))
                new_note.storedInstrument = instrument.Piano()
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            if flag == 1 or flag == 3: 
                new_chord.quarterLength = float(duration)
            output_notes.append(new_chord)
        elif('rest' in pattern and (flag == 3 or flag == 2)):
            new_rest = note.Rest(pattern)
            new_rest.offset = offset
            new_rest.storedInstrument = instrument.Piano()
            if flag == 1 or flag == 3: 
                new_rest.quarterLength = float(duration)
            output_notes.append(new_rest)
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = instrument.Piano()
            if flag == 1 or flag == 3: 
                new_note.quarterLength = float(duration)
            output_notes.append(new_note)
        offset += float(duration)

    midi_stream = stream.Stream(output_notes)
    midi_stream.write('midi', fp='output/test_output.mid')

In [1895]:
########### MAIN ###########

#IMPORTANT
# element_flag = 0: no duration or rest
# element_flag = 1: only duration added
# element_flag = 2: only rest added
# element_flag = 3: duration and rest added
# model_flag = 0: sequential model
# model_flag = 1: model with attention
# any other value will cause an error

epochs = 1
model_flag = 0
element_flag = 3

notes = convert_dataset_to_notes(element_flag)

Parsing classical/appass_2_format0.mid
Parsing classical/beethoven_hammerklavier_1_format0.mid
Parsing classical/beethoven_hammerklavier_2_format0.mid
Parsing classical/appass_1_format0.mid
Parsing classical/appass_3_format0.mid
Parsing classical/beethoven_hammerklavier_3_format0.mid


In [1896]:
train_model(epochs, notes, model_flag)
prediction_output = perform_rnn(notes, model_flag)

Train on 10208 samples


In [1897]:
generate_music_file(prediction_output, element_flag)

In [1898]:
#Experimental code for multiple instruments

#     midi = converter.parse('test_output.mid')
#     for el in midi.recurse():
#         if 'Instrument' in el.classes: # or 'Piano'
#             el.activeSite.replace(el, instrument.SteelDrum())
#     midi.parts[0].insert(0, instrument.SteelDrum())
#     midi.parts[1].insert(0, instrument.Guitar())
#     for p in midi.parts:
#         p.insert(0, instrument.SteelDrum())
    
#     midi.write('midi', fp='test_output2.mid')
    
# def generate_second_instrument():
#     notes = pickle.load(open('notes.p', 'rb'))
#     pitchnames = sorted(set(item for item in notes))
#     n_vocab = len(set(notes))
#     network_input, normalized_input = prepare_sequences_prediction(notes, pitchnames, n_vocab)
#     model = create_network(normalized_input, n_vocab)
#     model.load_weights('double_weights.hdf5')
#     prediction_output = generate_notes(model, network_input, pitchnames, n_vocab)
#     create_midi(prediction_output)
# def get_notes_second_instrument():
#     notes = []

#     for file in glob.glob("rock_test/*.mid"):
#         midi = converter.parse(file)

#         print("Parsing %s" % file)

#         elements_to_parse = None

#         try: # file has instrument parts
#             s2 = instrument.partitionByInstrument(midi) 
#             guitar = s2.parts[0].recurse() 
#             drums = = s2.parts[1].recurse()
#         except: # file has notes in a flat structure
#             elements_to_parse = midi.flat.notes

#         for i in (0, len(drums): drums[i]
#             length = str(drums[i].quarterLength)
#             if "/" in str(drums[i].quarterLength):
#                 length = str('%.1f' % float(drums[i].quarterLength))
                
#             if isinstance(drums[i], note.Note):
#                 notes.append(str(drums[i].pitch) + " " +  length + str(guitar[i].pitch))
#             elif isinstance(drums[i], chord.Chord):
#                 notes.append('.'.join(str(n) for n in drums[i].normalOrder) + " " + length 
#                     + '.'.join(str(n) for n in guitar[i].normalOrder))
#             elif isinstance(drums[i], note.Rest):
#                 notes.append(str(drums[i].name)  + " " + length + str(guitar[i].name))
#     pickle.dump(notes, open('notes.p', 'wb'))
#     return notes
# def create_midi(prediction_output):
#     properly unpack
