In [None]:
import numpy as np
import _pickle as pickle
import glob
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from music21 import note, chord, instrument, converter
from keras.layers import LSTM, Dense, Dropout, Activation, Input
from keras.models import Model, Sequential
from keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt
%matplotlib inline 

In [None]:
# Set to true for debugging
debug = True

In [None]:
# Main function to call all subfunctions in the notebook.
def train_network():
    # get data and convert it to notes
    notes, n_vocab = get_notes(quick=False)
    # prepare data
    mapped_notes, NetworkInput, NetworkOutut = prepare_data(notes, n_vocab)
    # get model
    model = get_model(NetworkInput.shape[1:], n_vocab)
    # train
    train(model, NetworkInput, NetworkOutut)
#--------------------------------------------------------------------
# uncomment after runing all cells.
#train_network()

In [None]:
def get_notes(quick=False):
    ''' Read input midi files and convert them to notes
        Also quick refers to using the saved notes.pkl to retrieve notes instead of reading midi files.
    '''
    input_folder = './music/Mozart/'
    output_notes_file = './data/third_piece_MozartNotes.pkl'
    notes = []
    
    if quick :
        try :
            with open(output_notes_file,'rb') as f :
                notes = pickle.load(f)
            print('Notes are loaded properly using the saved pickle')
            return notes, len(set(notes))
        except :
            print('It\'s not possible to do it quick,\n reading midi files....')
            return get_notes()
    
    for index,file in enumerate(glob.glob(input_folder+'*.mid')):
        if index % 10 == 0 : print(index,end='')
        print('.',end='')
        midi = converter.parse(file)
        notes_to_parse  = []
        parts = instrument.partitionByInstrument(midi)
        
        if parts :
            notes_to_parse = parts.parts[0].recurse()
        else :
            notes_to_parse = parts.flat_notes
        
        for element in notes_to_parse:
            if isinstance(element, note.Note):
                notes.append(str(element.pitch))
            elif isinstance(element, chord.Chord):
                notes.append('.'.join(str(e) for e in element.pitches))
            else : # it shouldn't reach here.
                pass
        
    with open(output_notes_file, 'wb') as f :
        pickle.dump(notes, f)
    print('\nThere are {} notes and {} vocabs'.format(len(notes), len(set(notes))))
    print('data loaded properly and saved to disk as notes.pkl.')
    return notes, len(set(notes))
#--------------------------------------------------------
if debug : new_notes, new_n_vocab = get_notes(quick=False)

In [None]:
def prepare_data(notes, n_vocab):
    ''' create input sequences and output notes '''
    sequence_length = 100
    NetworkInput = []
    NetworkOutput = []
    # create a mapping to the notes
    mapper = LabelEncoder()
    mapped_notes = mapper.fit_transform(notes)

    for i in range(len(notes)-sequence_length):
        in_seq = mapped_notes[i : i+sequence_length]
        out_note = mapped_notes[i+sequence_length]
        NetworkInput.append(in_seq)
        NetworkOutput.append(out_note)
    
    n_patterns = len(NetworkOutput)
    
    NetworkInput = np.reshape(NetworkInput, (n_patterns, sequence_length, 1))
    NetworkInput = NetworkInput / float(n_vocab)
    
    NetworkOutput = np.reshape(NetworkOutput, (-1,1))
    hotencoder = OneHotEncoder(sparse=False)
    _ = hotencoder.fit(mapped_notes.reshape(-1,1))
    NetworkOutput = hotencoder.transform(NetworkOutput)
    
    # save the mapper and hotencoder to disk for prediction.
    #with open('./data/mapper.pkl','wb') as f:
    #    pickle.dump(mapper, f)
    #with open('./data/hotencoder.pkl','wb') as f:
    #    pickle.dump(hotencoder, f)
    
    print('Input shape = ',NetworkInput.shape, '\nOutput shape = ', NetworkOutput.shape)
    return mapped_notes, NetworkInput, NetworkOutput
#---------------------------------------
if debug : mapped_notes, NetworkInput, NetworkOutput = prepare_data(notes, n_vocab)

In [None]:
def get_model(input_shape, n_vocab):
    
    X_input = Input(input_shape)
    X = LSTM(128, activation='tanh', return_sequences=True)(X_input)
    X = Dropout(.2)(X)
    #X = LSTM(512, activation='tanh', return_sequences=True)(X)
    #X = Dropout(.3)(X)
    X = LSTM(128, activation='tanh', return_sequences=False)(X)
    X = Dropout(.2)(X)
    #X = Dense(256, activation='tanh')(X)
    #X = Dropout(.3)(X)
    X = Dense(n_vocab, activation='softmax')(X)
    
    model = Model(X_input, X)
    model.compile(loss='categorical_crossentropy', optimizer='Adam')
    
    model.summary()
    return model
#----------------------------------------------
if debug : model = get_model(NetworkInput.shape[1:], n_vocab)

In [None]:
def train(model, NetworkInput, NetworkOutput):
    filepath = './checkpoints/2/ckpt-{loss:.4f}-{epoch:02d}.hdf5'
    checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=0, save_best_only=True, mode='min')
    callbacks=[checkpoint]
    
    model.fit(NetworkInput, NetworkOutput, epochs=300, batch_size=24, callbacks=callbacks)
    return model
#-----------------------------------
if debug : trained_model = train(model, NetworkInput, NetworkOutput)

In [None]:
loss_stack = []
def train(model, NetworkInput, NetworkOutput):
    min_loss = 40
    avg_loss_per_epoch = 0
    global loss_stack
    epochs = 100
    batch_size = 500
    steps_per_epoch = int(np.ceil(NetworkInput.shape[0] / float(batch_size)))
    #steps_per_epoch = 5
    file_to_save = './checkpoints/2/ckpt-{loss:.4f}-{epoch:02d}'
    for epoch in range(epochs):
        avg_loss_per_epoch = 0
        print('starting epoch {:02d} with {} steps'.format(epoch, steps_per_epoch),end='')
        
        for step in range(steps_per_epoch):
            print('.',end='')
            
            batch_start = step * batch_size
            batch_end = batch_start + batch_size
            if batch_end >= NetworkInput.shape[0] :
                batch_end = NetworkInput.shape[0] - 1
            
            history = model.fit(NetworkInput[batch_start:batch_end], NetworkOutput[batch_start:batch_end],
                         epochs=1, batch_size=32, verbose=0)
            
            current_loss = history.history['loss'][0]
            loss_stack.append(current_loss)
            avg_loss_per_epoch += current_loss
            if current_loss < min_loss :
                min_loss = current_loss
                print('new best loss {:.4f}'.format(min_loss),end=' ')
                model.save(file_to_save.format(loss=min_loss, epoch=epoch))
        
        avg_loss_per_epoch /= steps_per_epoch
        print("\nAverage loss is {:.4f}".format(avg_loss_per_epoch))
#----------------------------------------------------------------------
if debug : train(model, NetworkInput, NetworkOutput)

In [None]:
#with open('./checkpoints/1/loss_stack.pkl','wb') as f:
#    pickle.dump(loss_stack, f)

In [None]:
#plt.plot(range(len(loss_stack)),loss_stack)
#plt.show()