In [1]:
import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.models import Sequential
import collections
import glob
import numpy as np
from numpy import expand_dims 
import pandas as pd
import pathlib
import pretty_midi
from sklearn.model_selection import train_test_split 
from collections import Counter

dataset

In [2]:
data_dir = pathlib.Path('data/maestro-v2.0.0')
if not data_dir.exists():
  tf.keras.utils.get_file(
      'maestro-v2.0.0-midi.zip',
      origin='https://storage.googleapis.com/magentadata/datasets/maestro/v2.0.0/maestro-v2.0.0-midi.zip',
      extract=True,
      cache_dir='.', cache_subdir='data',
  )

drum notation is differnet and needs it's own set of instructions

In [3]:
is_drum = False
data_dir = pathlib.Path('data/maestro-v2.0.0')
filenames = glob.glob(str(data_dir/'**/*.mid*'))
print('Number of files:', len(filenames))

Number of files: 1282


read in the notes

In [4]:
def notes_in(notes,instrument,track_end,is_drum):
# Sort the notes by start time
    sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
    prev_start = sorted_notes[0].start

    for note in sorted_notes:
        start = note.start
        end = note.end
        pitch = note.pitch
        notes['pitch'].append(pitch)
        notes['start'].append(float(start)+track_end)
        if is_drum == False:
            notes['duration'].append(end - start)
        else:
            notes['duration'].append(1/4)
        if note == sorted_notes[-1]:
            track_end = note.end + track_end
            
    return notes,track_end

read in the files

In [9]:
# Extracting the notes from the sample MIDI file

def midi_to_notes(filenames: str,is_drum) -> pd.DataFrame:
    instrument = None
    track_end = 0
    notes = collections.defaultdict(list)
    #for midi_file in glob.glob(f'{midi_file}/*.mid'):
    for midi_file in filenames:
        print(midi_file)
        pm = pretty_midi.PrettyMIDI(midi_file)
        for instrument in pm.instruments:
            if is_drum == False and instrument.is_drum == False:
                notes,track_end = notes_in(notes,instrument,track_end,is_drum)
            if is_drum == True and instrument.is_drum == True:
                notes,track_end = notes_in(notes,instrument,track_end,is_drum)

    return pd.DataFrame({name: np.array(value) for name, value in notes.items()})


raw_notes = midi_to_notes(filenames,is_drum)
raw_notes.head(10)

data\maestro-v2.0.0\2004\MIDI-Unprocessed_SMF_02_R1_2004_01-05_ORIG_MID--AUDIO_02_R1_2004_05_Track05_wav.midi
data\maestro-v2.0.0\2004\MIDI-Unprocessed_SMF_02_R1_2004_01-05_ORIG_MID--AUDIO_02_R1_2004_06_Track06_wav.midi
data\maestro-v2.0.0\2004\MIDI-Unprocessed_SMF_02_R1_2004_01-05_ORIG_MID--AUDIO_02_R1_2004_08_Track08_wav.midi
data\maestro-v2.0.0\2004\MIDI-Unprocessed_SMF_02_R1_2004_01-05_ORIG_MID--AUDIO_02_R1_2004_10_Track10_wav.midi
data\maestro-v2.0.0\2004\MIDI-Unprocessed_SMF_05_R1_2004_01_ORIG_MID--AUDIO_05_R1_2004_02_Track02_wav.midi


Unnamed: 0,pitch,start,duration
0,71,1.092708,0.096875
1,55,1.279167,0.217708
2,71,1.288542,0.505208
3,59,1.463542,0.167708
4,62,1.633333,0.119792
5,72,1.786458,0.041667
6,67,1.803125,0.196875
7,74,1.983333,0.114583
8,57,1.983333,0.539583
9,72,2.0375,0.06875


In [10]:
raw_notes.tail(10)

Unnamed: 0,pitch,start,duration
37231,54,3145.107292,0.5875
37232,49,3145.110417,0.73125
37233,42,3145.113542,0.935417
37234,83,3146.541667,1.726042
37235,78,3146.546875,1.709375
37236,47,3146.547917,1.664583
37237,74,3146.547917,1.702083
37238,35,3146.551042,1.680208
37239,42,3146.551042,1.683333
37240,71,3146.552083,1.695833


output notes to midi

In [11]:
def notes_to_midi(notes: pd.DataFrame, out_file: str, instrument_program,drums,
                  velocity: int = 100) -> pretty_midi.PrettyMIDI:

    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(
      program=instrument_program,is_drum=drums)

    for _, note in notes.iterrows():
        start = float(note['start'])
        if is_drum == False:
          end = float(start + note['duration'])
        else:
           end = float(start + 1/4)
        pitch = int(note['pitch'])
        note = pretty_midi.Note(velocity=velocity, pitch=pitch,
                                start=start, end=end)
        instrument.notes.append(note)

    pm.instruments.append(instrument)
    pm.write(out_file)
    return pm

# piano roll
# test data preprocessing by recreeating input data
piano = 0
example_file = f'test_drums_{is_drum}.midi'
example_pm = notes_to_midi(raw_notes,example_file,piano,is_drum)

normalize pitch + whatever else & save normalization constants into max scalars

In [12]:
max_scalars = []
note_parameters = ['pitch', 'start', 'duration']
if is_drum == True:
    note_parameters.remove('duration')
print(note_parameters)
note_scales = ['pitch','start']
for i in note_scales:
    param_max = max(raw_notes[i])
    max_scalars.append(param_max)
    raw_notes[i] = raw_notes[i]/param_max

['pitch', 'start', 'duration']


In [13]:
raw_notes.head(10)

Unnamed: 0,pitch,start,duration
0,0.67619,0.000347,0.009452
1,0.52381,0.000407,0.021242
2,0.67619,0.00041,0.049294
3,0.561905,0.000465,0.016363
4,0.590476,0.000519,0.011688
5,0.685714,0.000568,0.004065
6,0.638095,0.000573,0.019209
7,0.704762,0.00063,0.01118
8,0.542857,0.00063,0.052648
9,0.685714,0.000648,0.006708


In [14]:
raw_notes.tail(10)

Unnamed: 0,pitch,start,duration
37231,0.514286,0.999541,0.057323
37232,0.466667,0.999542,0.071349
37233,0.4,0.999543,0.091269
37234,0.790476,0.999997,0.168411
37235,0.742857,0.999998,0.166785
37236,0.447619,0.999999,0.162415
37237,0.704762,0.999999,0.166074
37238,0.333333,1.0,0.163939
37239,0.4,1.0,0.164244
37240,0.67619,1.0,0.165464


make lists of data

In [15]:
lists = {}
for parameter in note_parameters:
    lists[parameter] = raw_notes[parameter]

train test split

In [16]:
# train test split
test_size = 0.1
pitch_train,pitch_test,start_train,start_test = train_test_split(lists['pitch'],lists['start'],test_size=test_size,shuffle=False)
if is_drum == False:
    duration_train,duration_test = train_test_split(lists['duration'], test_size=test_size,shuffle=False)

make lists of lists to loop over later

In [22]:
list_train_data = [pitch_train, start_train]
list_test_data = [pitch_test, start_test]
list_train_data_str = ['pitch_train', 'start_train']
if is_drum == False:
    list_train_data.append(duration_train)
    list_test_data.append(duration_test)
    list_train_data_str.append('duration_train')

seqences function, note duration map for potential note duration rounding function later

In [17]:
def create_sequences(data,input,output):
    x = []
    y = []
    for i in range(len(data)-input-output):
        x.append(data[i:i+input])
        y.append(data[i:i+input:i+input+output])
    return np.array(x),np.array(y)

def mse_with_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
    mse = (float(y_true) -float(y_pred)) ** 2
    positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
    return tf.reduce_mean(mse + positive_pressure)

def create_rnn(x,y,loss,final_filter,layers,filters,epochs):
    model = Sequential()
    model.add(LSTM(filters[0], return_sequences=True, input_shape=(x.shape[1], x.shape[2])))
    for i in range(layers):
        if i != layers - 1:
            # Do not return sequences in the last LSTM layer
            model.add(LSTM(filters[i],activation='relu',return_sequences=True))
        else:
            model.add(LSTM(filters[i], return_sequences=False))
    # Add the final Dense layer
    model.add(Dense(final_filter,activation='relu',))
    model.compile(loss=loss, optimizer='adam',metrics='accuracy')

    model.summary()
    history = model.fit(x,y, 
                    epochs=epochs, 
                    validation_split=0.1)
    return history,model

# Define the mapping dictionary for durations
mapping = {0: 1/64,
           1: 1/32,
           2: 1/16,
           3: 1/8,
           4: 1/4,
           5: 1/2,
           6: 1,
           7: 2,
           8: 4,
           9: 8}

for each list inside list_train_data  
- make sequences of the train_data
- fix train data shape 
- train model 
- generate a new note using last sequence in test data of the same type as the train data 
- recreate last sequence, bin first note of last sequence, add generate note to end of last sequenece
- generate lots of notes
- recreate midi

In [70]:
# sequence dimensions
Dinput = 15
output = 1

# final amount of options for pitch,start,duration
final_filter_list = [128, 200]

# layers,layer filters and epochs
layers = 1
layer_filters = [25]
epochs = 250

# duration isn't important for drums
if is_drum == False:
    final_filter_list.append(10)

# for each list in lists of data
for count in range(len(list_train_data)):
    Dtype = list_train_data_str[count]
    
    # makes sequences from train lists to train model and generate notes
    xs,y = create_sequences(list_train_data[count],Dinput,output)
    print(xs,y)
    xs_test,_ = create_sequences(list_test_data[count],Dinput,output)
    print(xs_test)
    # reformat x for LSTM
    x = expand_dims(xs,axis=1)

    # Flatten xs_test into a list of tuples
    list_of_tuples = [tuple(lst) for lst in xs_test]

    # Count the frequency of each tuple
    tuple_counts = Counter(list_of_tuples)
    
    # Get the most common tuple
    most_common_tuple = tuple_counts.most_common(1)[0][0]

    # Convert the most common tuple back to a list
    most_common_list = [list(most_common_tuple)]

    # x_test is now most common sequence, model can continue from here
    x_test = expand_dims(most_common_list,axis=1)

    # train model 
    history, model = create_rnn(x,y,mse_with_positive_pressure,final_filter_list[count],layers,layer_filters,epochs)
    
    # genereate notes
    pred_note = model.predict(x_test)
    print(pred_note)
    # output midi file

[[0.85964912 0.77192982 0.63157895 ... 0.66666667 0.66666667 0.85964912]
 [0.77192982 0.63157895 0.77192982 ... 0.66666667 0.85964912 0.77192982]
 [0.63157895 0.77192982 0.77192982 ... 0.85964912 0.77192982 0.63157895]
 ...
 [0.70175439 0.73684211 0.73684211 ... 0.70175439 0.73684211 0.85964912]
 [0.73684211 0.73684211 0.61403509 ... 0.73684211 0.85964912 0.61403509]
 [0.73684211 0.61403509 0.73684211 ... 0.85964912 0.61403509 0.73684211]] [[0.85964912]
 [0.77192982]
 [0.63157895]
 ...
 [0.70175439]
 [0.73684211]
 [0.73684211]]
[[0.73684211 0.73684211 0.61403509 ... 1.         0.61403509 1.        ]
 [0.73684211 0.61403509 0.73684211 ... 0.61403509 1.         0.61403509]
 [0.61403509 0.73684211 0.73684211 ... 1.         0.61403509 0.73684211]
 ...
 [0.73684211 0.61403509 0.73684211 ... 0.61403509 0.85964912 0.61403509]
 [0.61403509 0.73684211 0.73684211 ... 0.85964912 0.61403509 0.73684211]
 [0.73684211 0.73684211 0.70175439 ... 0.61403509 0.73684211 0.85964912]]
Model: "sequential_10"

KeyboardInterrupt: 

In [None]:
for i in x:
    print(i)

[[49 44 36 44 44 44 44 44 44 44 38 38 38 38 49 44 36 44 44 44 44 44 44 44
  38 38 36 49 36 44 38 38 44 44 45 45 44 38 38 44 44 38 38 44 38 38 38 38
  49 44 36 44 38 38 44 44 38 38 44 38]]
[[44 36 44 44 44 44 44 44 44 38 38 38 38 49 44 36 44 44 44 44 44 44 44 38
  38 36 49 36 44 38 38 44 44 45 45 44 38 38 44 44 38 38 44 38 38 38 38 49
  44 36 44 38 38 44 44 38 38 44 38 38]]
[[36 44 44 44 44 44 44 44 38 38 38 38 49 44 36 44 44 44 44 44 44 44 38 38
  36 49 36 44 38 38 44 44 45 45 44 38 38 44 44 38 38 44 38 38 38 38 49 44
  36 44 38 38 44 44 38 38 44 38 38 45]]
[[44 44 44 44 44 44 44 38 38 38 38 49 44 36 44 44 44 44 44 44 44 38 38 36
  49 36 44 38 38 44 44 45 45 44 38 38 44 44 38 38 44 38 38 38 38 49 44 36
  44 38 38 44 44 38 38 44 38 38 45 45]]
[[44 44 44 44 44 44 38 38 38 38 49 44 36 44 44 44 44 44 44 44 38 38 36 49
  36 44 38 38 44 44 45 45 44 38 38 44 44 38 38 44 38 38 38 38 49 44 36 44
  38 38 44 44 38 38 44 38 38 45 45 44]]
[[44 44 44 44 44 38 38 38 38 49 44 36 44 44 44 44 44 44 44 3

In [None]:
predict_notes = None
predict_notes = collections.defaultdict(list)
for count in range(len(list_train_data)):
    Dtype = list_train_data_str[count]
    print(Dtype)

    Dinput = 150
    x,y = create_sequences(list_train_data[count],Dinput,1)
    xs = expand_dims(x,axis=1)
    xs.shape
    x,x_test = train_test_split(xs,test_size=0.01)
    y,_ = train_test_split(y,test_size=0.01)

    note_range,song_length,power = 128,200,10
    final_filter = [note_range,song_length,power]
    layers = 3
    filters = [32,64,128]
    epochs = 50
    print('final filter count',final_filter[count])
    history,model = create_rnn(x,y,mse_with_positive_pressure,final_filter[count],layers,filters,Dtype,epochs)
    to_generate = 400
    next_char = None
    last_array = []

    for loops in range(0,to_generate):
        if loops == 0:
            input_seq = x_test[-1].reshape(1, x_test.shape[1], x_test.shape[2])
            print('pred sequence: ',input_seq)
            predictions = model.predict(input_seq)
            
            if Dtype == 'pitch_train':
                existing_chars = (input_seq*max_scalars[0])
                predictions_list = []

                for i in predictions[0]:
                    predictions_list.append(i)
                for i in range(len(predictions_list)):
                    if i in existing_chars:
                        pass
                    else:
                        predictions_list[i]=0

                predictions_list = predictions_list / np.sum(predictions_list)
                next_char = np.random.choice(len(predictions_list), p=predictions_list)
                predict_notes[Dtype].append(next_char)
            else:
                for i in predictions:
                    predict_notes[Dtype].append(np.argmax(i))
                    print('generated character:', np.argmax(i))
        else:
            if loops == 1:
                last_array = x_test[-1][0]
                last_array = last_array[1:]
            else:
                last_array = last_array[1:]
        
            if Dtype == 'pitch_train':
                last_array = np.append(last_array,predict_notes[Dtype][-1]/max_scalars[0])
            if Dtype == 'start_train':
                last_array = np.append(last_array,predict_notes[Dtype][-1])
            if Dtype == 'duration_train':
                last_array = np.append(last_array,mapping[predict_notes[Dtype][-1]])
            
            input_seq = last_array.reshape(1, x_test.shape[1], x_test.shape[2])
            predictions = model.predict(input_seq)
            
            if Dtype == 'pitch_train':
                existing_chars = (input_seq*max_scalars[0])
                predictions_list = []

                for i in predictions[0]:
                    predictions_list.append(i)
                for i in range(len(predictions_list)):
                    if i in existing_chars:
                        pass
                    else:
                        predictions_list[i]=0

                predictions_list = predictions_list / np.sum(predictions_list)
                next_char = np.random.choice(len(predictions_list), p=predictions_list)
                predict_notes[Dtype].append(next_char)
            else:
                for i in predictions:
                    predict_notes[Dtype].append(np.argmax(i))


pitch_train
final filter count 128
Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm_13 (LSTM)              (None, 1, 32)             23424     
                                                                 
 lstm_14 (LSTM)              (None, 1, 32)             8320      
                                                                 
 lstm_15 (LSTM)              (None, 1, 64)             24832     
                                                                 
 lstm_16 (LSTM)              (None, 128)               98816     
                                                                 
 dense_3 (Dense)             (None, 128)               16512     
                                                                 
Total params: 171,904
Trainable params: 171,904
Non-trainable params: 0
_________________________________________________________________
Epoch 1/50
Ep

In [None]:
df = pd.DataFrame.from_dict(predict_notes)
tempo_scalar = 25
df['start_train']=df['start_train']/(max(df['start_train'])+max(df['duration_train']))
df['start_train'] = df['start_train']*tempo_scalar
df['duration_train'] = df['duration_train'].replace(mapping)
df = df.rename(columns={'pitch_train': 'pitch', 'start_train': 'start','duration_train':'duration'})
df.head(20)

Unnamed: 0,pitch,start,duration
0,40,4.086538,4.0
1,89,22.475962,0.5
2,84,23.918269,1.0
3,77,17.908654,4.0
4,71,22.475962,4.0
5,56,22.475962,0.5
6,31,19.711538,0.125
7,65,16.706731,4.0
8,65,3.966346,4.0
9,58,3.365385,1.0


In [None]:
import pandas as pd

def shiftcols(df):
    # create a shifted column
    df['shifted'] = df['start'].shift(3)

    # compare the shifted column with the original column
    df['identical'] = df['start'] == df['shifted']

    # create a boolean mask that selects the rows where 'identical' is True
    mask = df['identical'] == True

    # increase the values in column 'A' for the selected rows
    df.loc[mask, 'start'] += 0.5
    return df
df['start'] = df['start'].round(1)
df = df.sort_values('start')
for i in range(25):
    df = shiftcols(df)
df.head(20)

Unnamed: 0,pitch,start,duration,shifted,identical
279,87,0.1,1.0,,False
254,78,0.2,1.0,,False
193,89,0.2,0.5,,False
271,75,0.2,0.125,0.1,False
154,89,0.7,0.0625,0.2,False
70,73,0.6,1.0,0.2,False
71,73,0.6,1.0,0.2,False
238,70,0.6,0.5,0.7,False
247,68,1.1,0.5,0.6,False
182,39,1.1,0.03125,0.6,False


In [None]:
raw_notes.head(10)

Unnamed: 0,pitch,start,duration
0,49,7.714278,0.428571
1,44,7.714278,0.428571
2,36,7.714278,0.428571
3,44,8.142849,0.428571
4,44,8.57142,0.428571
5,44,8.999991,0.428571
6,44,9.428562,0.428571
7,44,9.857133,0.428571
8,44,10.285704,0.428571
9,44,10.714275,0.053571
