In [1]:
import numpy as np
import pretty_midi
import os
import csv
import math
import re
np.random.seed(42)
import tensorflow as tf
import keras
tf.set_random_seed(42)
from keras.models import Sequential, load_model
from keras.layers import Dense, Activation, Dropout
from keras.layers import LSTM, Dropout
from keras.layers import TimeDistributed
from keras.layers.core import Dense, Activation, Dropout, RepeatVector
from keras.optimizers import RMSprop
from keras.callbacks import ModelCheckpoint
import matplotlib.pyplot as plt
import pickle
import sys
import heapq
#import seaborn as sns
#from pylab import rcParams

%matplotlib inline

def noteVal(y):
    val = y
    val = val.split('-', 1)[0]
    val = re.sub('Tied', '', val)
    return noteList.get(val.lower(), 0)

#noteList = {'whole':48,'half':24,'dotted half':16,'triplet':16,'quarter':12,'dotted quarter':8,'eighth':6,'dotted eighth':4, 
#            'sixteenth':3, 'dotted sixteenth':3, 'thirty second':2, 'triplet thirty second':1, 'sixty fourth':1}

noteList = {'whole':24,'half':12,'dotted half':8,'triplet':8,'quarter':6,'dotted quarter':4,'eighth':3,'dotted eighth':3,
            'sixteenth':2, 'dotted sixteenth':2, 'thirty second':1, 'triplet thirty second':1, 'sixty fourth':1}
    

# What should the format hold?
# Simplest format is note pitch held for a note value amount of time, not allowing multiple notes at once


Using TensorFlow backend.


# Pre-processing the dataset
The training set is picked up from the open source musicnet dataset that has been published in the numpy _npz_ format  
The dataset contains annotations of music compositions from over 330 classical music pieces in 3 different instruments.  
The over 1 million annotated labels indicate the precise time of each note in every recording, the instrument that plays  
each note, and the note's position in the metrical structure of the composition. The labels are acquired from musical  
scores aligned to recordings by the state of the art *dynamic time warping* algorithm. The labels are verified by trained  
musicians, with an estimated labeling error rate of 4%

The dataset is stored hierarchically as an interval tree, which when queried with a time point provides the annotations  
for each note being played by each intrument at that moment. The annotations do not account for the decay in note volume  
for the duration it is sustained for (for some instruments), so the note is taken to be at constant volume through its life

The dataset is pre-processed and stored back as an *npz* array. The dataset ndarray is stored as binary encoded vectors at  
quantized time instant (bars quantized into 24 notes). The vector corresponding to a given 24<sup>th</sup> note has 1's at  
all locations where a particular note is being. Thus the ndarray has 2 dimensions:
    1. 128 - the total number of midi-note pitches possible for an instrument
    2. 24 * #BARS - as each bar is quantized into 24 locations for notes


In [None]:
#(start,end,(instrument,note,measure,beat,note_value))

train_data = np.load(open('../mnt/Downloads/musicnet.npz','rb'))

fs = 44100

print 'Number of recordings: ' + str(len(train_data.files))

totalNotesCount = 0
BarRoll = {}

for idx in train_data:
    #X is the audio, Y is the labels
    X,Y = train_data[idx]

    DURATION = (Y.end())/fs
    print "DURATION = " + str(DURATION) + "s"

    Yseq = sorted(Y)

    NOTES = []
    for y in Yseq:
        if (y.data)[0] == 41:
            NOTES.append(y)
    
    print 'Number of notes is: ' + str(len(NOTES))
    totalNotesCount += len(NOTES)
    if not NOTES:
        print 'it is empty, hence going to the next song'
        continue
    
    Measures = (NOTES[-1].data)[2]
    print "Total number of measures is = " + str(Measures)
    PianoRoll = np.zeros( (128, int(24+math.ceil(24*Measures))), dtype='float32' )

    for note in NOTES:
        #RHS is length of the note value
        start = int(round(24*( note.data[2]+note.data[3] )))
        PianoRoll[ note.data[1], start:start+noteVal(note.data[4]) ] = 1
    
    BarRoll[idx] = PianoRoll
    print 'done: ' + idx

np.savez_compressed('../mnt/Downloads/DATASET.npz', BarRoll=BarRoll)
print totalNotesCount

# Compressing the data-set to fit in memory
Since the dataset is stored sparsely (roughly 1-2%), we further preprocess to fit it in memory. Since not every pitch is  
realized in a musical composition (some notes may even lie outside the auditory spectrum), the array is truncated to the  
upper and lower pitch range corresponding to where notes are played. This roughly cuts the size of the ndarray in half.

We then convert the dataset into a format that makes the training examples and labels more apparent to the neural network.  
The dataset is parsed into segments of length **MEMORY** (in this case, we take a full bar (24) as MEMORY) that are offset  
by the parameter **OFFSET**, the immediate next **FUTURE** notes are the _labels_ for the data and are what the neural network  
would try to predict (for simplicity, we take FUTURE as being 1 24<sup>th</sup> note)


In [4]:
MEMORY = 24
OFFSET = 1
FUTURE = 1

def itercount(val):
    return 1+(range(0, val - MEMORY, OFFSET))[-1]/OFFSET

D = np.load('../mnt/Downloads/DATASET.npz')
PianoRoll = (D['BarRoll']).item()

MINnote = 128
MAXnote = -1
TRsize = 0
NUMBEROFNOTES = set()

for label in PianoRoll:
    song = PianoRoll[label]
    locations = set((np.where(song))[0])
    NUMBEROFNOTES = NUMBEROFNOTES | locations
    MAXnote = np.maximum( MAXnote, max(locations))
    MINnote = np.minimum( MINnote, min(locations) )
    TRsize = TRsize + itercount(song.shape[1])

NOTEspace = int(MAXnote-MINnote+2)

print 'Training Data Size = '  + str(TRsize)
print 'MAXnote = ' + str(MAXnote)
print 'MINnote = ' + str(MINnote)
print 'Number of notes = ' + str(len(NUMBEROFNOTES))


smp = 0
files = 0
Input = np.zeros( (TRsize, MEMORY, NOTEspace), dtype='float32' )
Output = np.zeros( (TRsize, FUTURE*NOTEspace), dtype='float32')

for label in PianoRoll:
    song = PianoRoll[label]
    print 'Unzipping training label: ' + label + '(with augmentation)'
    files = files+1
    for idx in range(0, song.shape[1] - MEMORY, OFFSET):
        cnt = min(idx,song.shape[1]-MEMORY-FUTURE)
        for i in range(0,FUTURE):
            Output[smp, i*NOTEspace:(i+1)*NOTEspace] = song[MINnote-1:MAXnote+1,cnt+MEMORY+i]
        Input[smp, :, :] = (song[MINnote-1:MAXnote+1,cnt:cnt+MEMORY]).transpose()
        smp+=1

print 'Sucessfully unzipped ' +str(files)+ ' files'

Training Data Size = 736416
MAXnote = 101
MINnote = 51
Number of notes = 50
Unzipping training label: 1788(with augmentation)
Unzipping training label: 1789(with augmentation)
Unzipping training label: 2659(with augmentation)
Unzipping training label: 1835(with augmentation)
Unzipping training label: 2482(with augmentation)
Unzipping training label: 2483(with augmentation)
Unzipping training label: 2480(with augmentation)
Unzipping training label: 2481(with augmentation)
Unzipping training label: 2154(with augmentation)
Unzipping training label: 2155(with augmentation)
Unzipping training label: 2156(with augmentation)
Unzipping training label: 2157(with augmentation)
Unzipping training label: 2150(with augmentation)
Unzipping training label: 2151(with augmentation)
Unzipping training label: 2244(with augmentation)
Unzipping training label: 2158(with augmentation)
Unzipping training label: 2159(with augmentation)
Unzipping training label: 2242(with augmentation)
Unzipping training label

In [3]:
print Input.shape
print Output.shape

(736416, 24, 52)
(736416, 52)


## Callback for real-time accuracy plot for Keras model
Keras Callback to plot validation set and training set accuracy for the model

In [4]:
from IPython.display import clear_output

class PlotLosses(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.i = 0
        self.x = []
        self.losses = []
        self.val_losses = []
        self.invHdist = []
        
        self.logs = []

    def on_epoch_end(self, epoch, logs={}):
        
        self.logs.append(logs)
        self.x.append(self.i)
        self.losses.append(logs.get('loss'))
        self.val_losses.append(logs.get('val_loss'))
        self.invHdist.append(logs.get('inv_MSE_hamming_distance'))
        self.i += 1
        
        clear_output(wait=True)
        self.fig, self.ax = plt.subplots(nrows=1, ncols=2,figsize=(20,10))
        
        self.ax[0].plot(self.x, self.losses, label="loss")
        self.ax[0].plot(self.x, self.val_losses, label="val_loss")
        self.ax[0].legend(loc="right")

        self.ax[1].plot(self.x, self.invHdist, label="inv_MSE_hamming_distance")
        self.ax[1].legend(loc="upper right")
        
        plt.show()

plot_losses = PlotLosses()


# Model Architecture
The model we use for predicting future notes is a single layer LSTM with 256 input neurons. To prevent overfitting, a  
dropout of 20% is added at the output. The output later is fully connected of size **FUTURE***NOTEspace (number of  
notes between the highest and lowest observed pitch in the dataset). The output layer is a simple sigmoid classifier  
which would generate probabilities independently for each note which we can round to generate the prediction for  
the next note.

The loss function used is categorical cross-entropy. This is allowable for multi-labelled data (as in this case) as the  
trainer is heavily penalized for incorrectly predicting all the activated locations in a vector. The optimizer used is the  
Adam Optimizer with Nesterov Momentum that various sources [1,2,3] have established performs well on LSTM  
network training. The accuracy criterion being measured is a custom variant of top-K categorical accuracy, which is  
a bit more forgiving because it only checks that the note with highest predicted probability matches with at least  
one of the top 4 notes in the true note spectrum

In [48]:
for f in os.listdir('checkpoints/'):
    os.remove(os.path.join('checkpoints/', f))

#from keras.layers.normalization import BatchNormalization
from keras.callbacks import EarlyStopping, ModelCheckpoint
import keras.backend as K
from keras.utils import plot_model

def inv_MSE_hamming_distance(y_true, y_pred):
    return K.mean(K.abs(y_true) - K.abs(y_true - K.round(y_pred)), axis=-1)

# instantiate model
model = Sequential()

# input layer
model.add(LSTM(256, input_shape=(MEMORY, NOTEspace), return_sequences=True)) #, kernel_initializer='Zeros'))
model.add(Dropout(0.5))

# hidden layer 1
model.add(LSTM(256))
model.add(Dropout(0.5))

# output layer
model.add(Dense(FUTURE*NOTEspace))
model.add(Activation('sigmoid'))

model.compile(loss='binary_crossentropy', optimizer='nadam', metrics=['categorical_accuracy', inv_MSE_hamming_distance])

# checkpoint
filepath="checkpoints/weights-improvement-{epoch:02d}-{loss:.2f}.hdf5"
checkpointer = ModelCheckpoint(filepath, monitor='loss', verbose=1, save_best_only=False, mode='auto')

plot_model(model, to_file='model.png', show_shapes=True)


# Fitting the model to the training data
The sample data is split into training and validation sets in an 80:20 ratio  
Batch size is chosen as 128 and the model is trained for 20 epochs  
Shuffling is enabled on the input dataset due to the way it has been generated  
Checkpointing is enabled while training due to the long training time per epoch (~30 minutes)

In [None]:
# fit the model
history = model.fit(Input, Output, validation_split=0.2, batch_size=128, epochs=20, shuffle=True, callbacks=[checkpointer, plot_losses]).history

Train on 589132 samples, validate on 147284 samples
Epoch 1/20
 33792/589132 [>.............................] - ETA: 23:44 - loss: 0.0881 - categorical_accuracy: 0.0229 - inv_MSE_hamming_distance: -0.0030

# Saving the model to disk
The model is saved in the *HDF5* format which is good for storing (and compressing) sparse data/ model weights  
The model architecture is stored as a *yaml* file which allows easy analysis of the design post training

In [49]:
import h5py

model_yaml = model.to_yaml()
with open("tmp/model.yaml", "w") as yaml_file:
    yaml_file.write(model_yaml)
# serialize weights to HDF5
model.save_weights("model.hdf5")
print("Saved model to disk")

Saved model to disk


# Loading model
If the same model trained from before is used, then this cell can be skipped  
If a different model is used, then it can be loaded, providing the correct yaml  
architecture and HDF5 model weights

In [50]:
model = None

from keras.models import model_from_yaml

yaml_file = open('tmp/model.yaml', 'r')
loaded_model_yaml = yaml_file.read()
yaml_file.close()
model = model_from_yaml(loaded_model_yaml)
# load weights into new model
model.load_weights("checkpoints-new/weights-improvement-08-0.02.hdf5") #model.hdf5")
print("Loaded model from disk")

Loaded model from disk


# Post training analysis of the model
A new segment of music can be input to the model (1 bar) in the template format in *bar.csv*  
The cell below picks up a single bar of music, and uses the trained model to generate an  
estimate for the next bar of music. The generation process is sequential, in that each 24<sup>th</sup>  
note is generated sequentially and appended to the input bar to generate the next 24<sup>th</sup> note

In [116]:
# Model has only been trained for true notes in the range MINnote to MAXnote (in a width = NOTEspace)
from fractions import Fraction

pianoBar=None
pianoBar = np.zeros((1,MEMORY,NOTEspace), dtype='float32')

def prepareBar(pathToMusicTxt):
    print 'BAR    DUR    PITCH'
    with open(pathToMusicTxt, 'r') as csvfile:
        fileIn = csv.reader(csvfile, delimiter=',')
        for line in fileIn:
            notePosition = int(24*float(Fraction(line[0])))
            noteLength   = noteVal(line[1])
            noteFreq     = int(line[2])
            print str(notePosition/24.0)+'\t'+str(noteLength/24.0)+'\t'+str(noteFreq)
            pianoBar[0, notePosition:notePosition+noteLength, noteFreq-MINnote] = 1


#prepareBar('bar.csv')
pianoBar[0,:,:]=Input[133333,:,:]
originalPianoBar=pianoBar

print np.where(pianoBar)[1:3]

for iteration in range(0,MEMORY,FUTURE):
    indices=[]
    future=np.reshape( model.predict(pianoBar, batch_size=1, verbose=0), (NOTEspace,FUTURE) )
    indices = np.where(np.round(future))
    np.sort(future, axis=0)
    #if np.ndarray.max(future) > 0.5:
        #print 'ooooooooooooooo'
    #else:
        #print '---------------'
    pianoBar[0,0:MEMORY-FUTURE,:] = pianoBar[0,FUTURE:MEMORY,:]
    pianoBar[0,MEMORY-FUTURE:MEMORY,:] = np.zeros((FUTURE,pianoBar.shape[2]))
    for i,j in zip(indices[1],indices[0]):
        # print i,j
        pianoBar[0,MEMORY-FUTURE+i,j] = 1
        
print np.where(pianoBar)


(array([11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]), array([18, 18, 18, 18, 18, 18, 21, 21, 21, 21, 21, 21, 24]))
(array([0, 0, 0, 0, 0]), array([0, 1, 2, 3, 4]), array([24, 24, 24, 24, 24]))


In [136]:
from midiutil import MIDIFile
import pygame
import time

track    = 0
channel  = 0
time     = 0    # In beats
duration = 1    # In beats
tempo    = 90   # In BPM (quarter notes per minute)
volume   = 100  # 0-127, as per the MIDI standard

MyMIDI = MIDIFile(1)  # One track, defaults to format 1 (tempo track is created
                      # automatically)
MyMIDI.addTempo(track, time, 24*tempo/4)

L = np.where(originalPianoBar)[1]
V = np.where(originalPianoBar)[2]
for i in range(0,len(L)):
    MyMIDI.addNote(track, channel, MINnote+L[i], L[i], duration, volume)
    
L = np.where(pianoBar)[1]
V = np.where(pianoBar)[2]    
for i in range(0,len(L)):
    MyMIDI.addNote(track, channel, MINnote+L[i], 24 + L[i], duration, volume)
    
with open("Mgen.mid", "w") as output_file:
    MyMIDI.writeFile(output_file)

pygame.mixer.init(frequency=44100, size=-16, channels=2, buffer=4096)
pygame.mixer.music.load("Mgen.mid")
pygame.mixer.music.play(loops=0, start=0.0)

error: No available audio device