In [97]:
!pip install -q music21 tqdm

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from midi2audio import FluidSynth
from music21 import corpus, converter, instrument, note, stream, chord, tempo,meter 
import IPython
from IPython.display import Image, Audio
import glob
import os
import pickle
from tqdm import tqdm
import tensorflow as tf

In [2]:
def get_distinct(elements):
    # Get all pitch names
    elements = sorted(set(elements))
    return (elements, len(elements))

def create_element_map(elements):
    # create dictionary to map notes and durations to integers
    mapping = dict((element, number) for number, element in enumerate(elements))
    reverse = dict((number, element) for number, element in enumerate(elements))
    return (mapping, reverse) 

In [3]:
# data params
intervals = range(1)
seq_len = 64

# model params
embed_size = 100
rnn_units = 256
use_attention = True

In [101]:

def get_music_list(data_folder):    
    file_list = glob.glob(f"{data_folder}/**/*.mid",recursive=True)
    return file_list

In [102]:
import json
def bin_tempo(tempo, bin_size=5):
    # Bin size determines the range of each bin
    return f"tempo_{int(bin_size * round(tempo / bin_size))}"
def extract_note_durations(scores,notes=[],durations=[]):
  for element in scores.flatten(): 
    if isinstance(element, chord.Chord):
      notes.append('.'.join(n.nameWithOctave for n in element.pitches))
      durations.append(element.duration.quarterLength)
    if isinstance(element, tempo.MetronomeMark):
      if(element.text is None):
        notes.append(str(bin_tempo(element.number)))
        durations.append(element.duration.quarterLength)
      else:
        notes.append(bin_tempo(element.number)+"::"+ element.text)
    if isinstance(element, meter.TimeSignature):
      notes.append(f"timesig_{element.ratioString}")
      durations.append(element.duration.quarterLength)
    if isinstance(element, note.Note):
      if element.isRest:
        notes.append(str(element.name))
        durations.append(element.duration.quarterLength)
      else:
        notes.append(str(element.nameWithOctave))
        durations.append(element.duration.quarterLength)
 
  return durations,notes 

In [None]:
files=get_music_list("./music")
notes = []
durations = []
with open('notes.corpus', 'wb') as f:
    progress=tqdm(files,"Processing")
    for file in progress:  
        progress.set_description("Processing " + file)
        scores = converter.parse(file).chordify()
        for interval in intervals:
                transposed = scores.transpose(interval)
                notes.extend(['START'] * seq_len)
                durations.extend([0]* seq_len)
                extract_note_durations(transposed,notes,durations)
    pickle.dump(zip(durations,notes),f)
            

            
    


In [8]:

import random
with open("notes_tempo.corpus", 'rb') as f:
  data = pickle.load(f) 
durations,notes= zip(*data)

corpus=0.175

seed_set=set()

num_notes_to_select = int(len(notes) * corpus)
start_index = random.randint(0, len(notes) - num_notes_to_select)
notes_seed=notes[start_index:start_index+num_notes_to_select]
durations_seed=durations[start_index:start_index+num_notes_to_select]
for i in range(len(notes_seed)):
  seed_note= notes_seed[i]
  seed_set.add(seed_note)

train_notes=[]
train_durations=[]
for i in range(0,len(durations)-seq_len,seq_len):
  seq_duration=durations[i:i+seq_len]
  seq_note=notes[i:i+seq_len]
  append=True
  for j in range(seq_len):
    seed_note= seq_note[j]
    if seed_note not in seed_set:
      append=False
      break 
  if(append):
    train_durations.append(seq_duration)
    train_notes.append(seq_note)

notes=[]
durations=[]
for train_note in train_notes:
  for note_name in train_note:
    notes.append(note_name)
for train_duration in train_durations:
  for duration_value in train_duration:
    durations.append(duration_value)


print(len(notes),len(seed_set))

with open('notes_train.corpus', 'wb') as f:
  pickle.dump(zip(durations,notes), f)





113856 7771


In [9]:
note_names, n_notes = get_distinct(notes)
duration_names, n_durations = get_distinct(durations)
distincts = [note_names, n_notes, duration_names, n_durations]
note_map, note_reverse = create_element_map(note_names)
duration_map, duration_reverse = create_element_map(duration_names)
lookups = [note_map, note_reverse, duration_map, duration_reverse]
with open("weights/lookup.pkl", 'wb') as f:
     pickle.dump(lookups,f)
tempos=[note_name for note_name in note_names if note_name.startswith("tempo")]
len(tempos),n_notes




(83, 7759)

### Don't execute the above code if reading from corpus

In [5]:
with open("notes_train.corpus", 'rb') as f:
  data = pickle.load(f) 
durations,notes=zip(*data)
tf.keras.config.enable_unsafe_deserialization()
note_names, n_notes = get_distinct(notes)
duration_names, n_durations = get_distinct(durations)
distincts = [note_names, n_notes, duration_names, n_durations]
with open('weights/lookup.pkl', 'rb') as f:
    [note_map, note_reverse, duration_map, duration_reverse] = pickle.load(f)
    lookups = [note_map, note_reverse, duration_map, duration_reverse]


In [7]:
from keras.utils import to_categorical

def prepare_sequences(notes, durations, lookups, distincts, seq_len =64,ratio=1):
    note_to_int, int_to_note, duration_to_int, int_to_duration = lookups
    note_names, n_notes, duration_names, n_durations = distincts

    notes_network_input = []
    notes_network_output = []
    durations_network_input = []
    durations_network_output = []

    # create input sequences and the corresponding outputs
    for i in range(int(len(notes)*ratio) - seq_len):
        notes_sequence_in = notes[i:i + seq_len]
        notes_sequence_out = notes[i + seq_len]
        notes_network_input.append([note_to_int[char] for char in notes_sequence_in])
        notes_network_output.append(note_to_int[notes_sequence_out])

        durations_sequence_in = durations[i:i + seq_len]
        durations_sequence_out = durations[i + seq_len]
        durations_network_input.append([duration_to_int[char] for char in durations_sequence_in])
        durations_network_output.append(duration_to_int[durations_sequence_out])

    n_patterns = len(notes_network_input)

    # reshape the input into a format compatible with LSTM layers
    notes_network_input = np.reshape(notes_network_input, (n_patterns, seq_len))
    durations_network_input = np.reshape(durations_network_input, (n_patterns, seq_len))
    network_input = [notes_network_input, durations_network_input]

    notes_network_output = to_categorical(notes_network_output, num_classes=n_notes)
    durations_network_output = to_categorical(durations_network_output, num_classes=n_durations)
    network_output = [notes_network_output, durations_network_output]
    return (network_input, network_output)

In [8]:
network_input, network_output = prepare_sequences(notes, durations, lookups, distincts, seq_len)
network_input,network_output, len(network_input[0])


([array([[7656, 7656, 7656, ..., 7656, 7656, 7656],
         [7656, 7656, 7656, ..., 7656, 7656, 2357],
         [7656, 7656, 7656, ..., 7656, 2357, 7680],
         ...,
         [7207, 1072, 3976, ..., 7656, 7737, 7752],
         [1072, 3976, 7656, ..., 7737, 7752,  932],
         [3976, 7656, 7656, ..., 7752,  932, 3777]]),
  array([[0, 0, 0, ..., 0, 0, 0],
         [0, 0, 0, ..., 0, 0, 9],
         [0, 0, 0, ..., 0, 9, 0],
         ...,
         [5, 5, 5, ..., 9, 0, 9],
         [5, 5, 9, ..., 0, 9, 9],
         [5, 9, 9, ..., 9, 9, 9]])],
 [array([[0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.]]),
  array([[0., 0., 0., ..., 0., 0., 0.],
         [1., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0., ..., 0., 0., 0.],
         ...,
         [0., 0., 0., ..., 0., 0., 0.],
         [0., 0., 0.,

In [9]:
print('pitch input')
print(network_input[0][0])
print('duration input')
print(network_input[1][0])
print('pitch target')
print(network_output[0][0])
print('duration target')
print(network_output[1][0])

pitch input
[7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656
 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656
 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656
 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656 7656
 7656 7656 7656 7656 7656 7656 7656 7656]
duration input
[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
pitch target
[0. 0. 0. ... 0. 0. 0.]
duration target
[0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


In [10]:
import tensorflow as tf

from keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.utils import plot_model

import os


from keras.layers import LSTM, Input, Dropout, Dense, Activation, Embedding, Concatenate, Reshape
from keras.layers import Flatten, RepeatVector, Permute, TimeDistributed
from keras.layers import Multiply, Lambda, Softmax, Layer
import keras.backend as K 
from keras.models import Model
from tensorflow.keras.optimizers import RMSprop
from keras.utils import to_categorical

In [11]:
def create_network(n_notes, n_durations, embed_size=100, rnn_units=512, use_attention=False):
    notes_in = Input(shape=(None,))
    durations_in = Input(shape=(None,))

    x1 = Embedding(n_notes, embed_size)(notes_in)
    x2 = Embedding(n_durations, embed_size)(durations_in)
    x = Concatenate()([x1, x2])
    x = LSTM(rnn_units, return_sequences=True)(x)
    x = Dropout(0.2)(x)
    x = LSTM(int(rnn_units/2))(x)


    notes_out = Dense(n_notes, activation='softmax', name='pitch')(x)
    durations_out = Dense(n_durations, activation='softmax', name='duration')(x)

    model = Model([notes_in, durations_in], [notes_out, durations_out])

    
    att_model = None

    opti = RMSprop(learning_rate=0.001)
    model.compile(loss=['categorical_crossentropy', 'categorical_crossentropy'], optimizer=opti, metrics=['accuracy','accuracy']
 )

    return model, att_model

In [12]:
model, att_model = create_network(n_notes, n_durations, 100, 256, True)
if att_model:
    att_model.save('weights/att_model.keras')

model.summary()

2024-08-06 21:51:13.150209: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2 Pro
2024-08-06 21:51:13.150245: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2024-08-06 21:51:13.150257: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2024-08-06 21:51:13.150492: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-08-06 21:51:13.150512: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [13]:
import pickle

initial_epoch=None
if(initial_epoch):
  model=tf.keras.models.load_model("weights/weights-improvement-615-2.0041-bigger.keras")
  print("Loaded Model")
else:
  model.save("weights/weights.keras")


checkpoint1 = ModelCheckpoint(
    "weights/weights-improvement-{epoch:02d}-{loss:.4f}-bigger.keras",
    monitor='loss',
    verbose=0,
    save_best_only=True,
    mode='min'
)

checkpoint2 = ModelCheckpoint(
    "weights/weights.keras",
    monitor='loss',
    verbose=0,
    save_best_only=True,
    mode='min'
)

early_stopping = EarlyStopping(
    monitor='loss',
    restore_best_weights=True,
    patience=50
)

lr_scheduler = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='loss',
    factor=0.5,
    patience=10,
    min_lr=1e-6,
    verbose=1
)


callbacks_list = [
    checkpoint1,
    checkpoint2,
    lr_scheduler,
    early_stopping]

# Ensure the weights file has the correct extension for saving weights

history = model.fit(
    network_input, network_output,
    epochs=750, batch_size=32,
    validation_split=0.2,
    callbacks=callbacks_list,
    initial_epoch=0 if initial_epoch is None else initial_epoch,
    shuffle=True
)
with open('training_history.pkl', 'wb') as file:
    pickle.dump(history.history, file)

Epoch 1/750


2024-08-06 21:51:24.370742: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m2845/2845[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m116s[0m 40ms/step - duration_accuracy: 0.5624 - loss: 8.2438 - pitch_accuracy: 0.0454 - val_duration_accuracy: 0.6082 - val_loss: 6.6204 - val_pitch_accuracy: 0.1446 - learning_rate: 0.0010
Epoch 2/750
[1m2845/2845[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m117s[0m 41ms/step - duration_accuracy: 0.6561 - loss: 7.0935 - pitch_accuracy: 0.1391 - val_duration_accuracy: 0.6184 - val_loss: 6.2666 - val_pitch_accuracy: 0.1964 - learning_rate: 0.0010
Epoch 3/750
[1m2845/2845[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m131s[0m 46ms/step - duration_accuracy: 0.6793 - loss: 6.7111 - pitch_accuracy: 0.1700 - val_duration_accuracy: 0.6226 - val_loss: 6.1091 - val_pitch_accuracy: 0.2182 - learning_rate: 0.0010
Epoch 4/750
[1m2845/2845[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m132s[0m 46ms/step - duration_accuracy: 0.6991 - loss: 6.4758 - pitch_accuracy: 0.1837 - val_duration_accuracy: 0.6280 - val_loss: 6.0770 - val_pit

KeyboardInterrupt: 

In [None]:
import tensorflow as tf

from keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.utils import plot_model

import os


from keras.layers import LSTM, Input, Dropout, Dense, Activation, Embedding, Concatenate, Reshape
from keras.layers import Flatten, RepeatVector, Permute, TimeDistributed
from keras.layers import Multiply, Lambda, Softmax, Layer
import keras.backend as K 
from keras.models import Model
from tensorflow.keras.optimizers import RMSprop
from keras.utils import to_categorical

In [None]:
def sample_with_temp(preds, temperature):
    if temperature == 0:
        return np.argmax(preds)
    else:
        preds = np.log(preds) / temperature
        exp_preds = np.exp(preds)
        preds = exp_preds / np.sum(exp_preds)
        return np.random.choice(len(preds), p=preds)

In [None]:
# prediction params
notes_temp=0.8
duration_temp = 0.8
max_extra_notes = 100
max_seq_len = 64
seq_len = 64

notes = ['START']
durations = [0]

if seq_len is not None:
    notes = ['START'] * (seq_len - len(notes)) + notes
    durations = [0] * (seq_len - len(durations)) + durations 

sequence_length = len(notes)
print(notes)

In [None]:
prediction_output = []
notes_input_sequence = []
durations_input_sequence = []
for n, d in zip(notes,durations):
    note_int = note_map[n]
    duration_int = duration_map[d]
    notes_input_sequence.append(note_int)
    durations_input_sequence.append(duration_int)

for note_index in range(max_extra_notes):

    prediction_input = [
        np.array([notes_input_sequence])
        , np.array([durations_input_sequence])
       ]

    notes_prediction, durations_prediction = model.predict(prediction_input, verbose=0)
    
    
    i1 = sample_with_temp(notes_prediction[0], 0.8)
    i2 = sample_with_temp(durations_prediction[0], 1)    

    note_result = note_reverse[i1]
    duration_result = duration_reverse[i2]
    
    prediction_output.append([note_result, duration_result])

    notes_input_sequence.append(i1)
    durations_input_sequence.append(i2)
    
    if len(notes_input_sequence) > max_seq_len:
        notes_input_sequence = notes_input_sequence[1:]
        durations_input_sequence = durations_input_sequence[1:]
        
    if note_result == 'START':
        break

print('Generated sequence of {} notes :- {}'.format(len(prediction_output),prediction_output))

[<music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.chord.Chord C3 E-6>, <music21.

In [None]:
midi_stream = stream.Stream()
# create note and chord objects based on the values generated by the model
for pattern in prediction_output:
    note_pattern, duration_pattern = pattern
    # pattern is a chord
    if ('.' in note_pattern):
        notes_in_chord = note_pattern.split('.')
        chord_notes = []
        
        for current_note in notes_in_chord:
            chord_note = note.Note(current_note)
            chord_note.duration = duration.Duration(duration_pattern)
            chord_note.storedInstrument = instrument.Violoncello()
            chord_notes.append(chord_note)
        new_note = chord.Chord(chord_notes)
        midi_stream.append(new_note)
    elif note_pattern == 'rest':
    # pattern is a rest
        new_note = note.Rest()
        new_note.duration = duration.Duration(duration_pattern)
        new_note.storedInstrument = instrument.Violoncello()
        midi_stream.append(new_note)
    elif note_pattern.startswith("tempo_"):
        value=note_pattern.split("_")[1]
        tempo_notes=value.split("::")
        new_note = tempo.MetronomeMark(value)
        if(len(tempo_notes)>1):
            value=tempo_notes[0] + " "+ tempo_notes[1]
            new_note = tempo.MetronomeMark(value)
        new_note.duration = duration.Duration(duration_pattern)
        new_note.storedInstrument = instrument.Violoncello()
        midi_stream.append(new_note)
    elif note_pattern.startswith("timesig_"):
        value=note_pattern.split("_")[1]
        new_note = meter.TimeSignature(value)
        new_note.duration = duration.Duration(duration_pattern)
        new_note.storedInstrument = instrument.Violoncello()
        midi_stream.append(new_note)
    elif note_pattern != 'START':
    # pattern is a note
        new_note = note.Note(note_pattern)
        new_note.duration = duration.Duration(duration_pattern)
        new_note.storedInstrument = instrument.Violoncello()
        midi_stream.append(new_note)
    else:
        new_note=None
        pass
    if(new_note is not None):
        print(new_note,new_note.duration)




midi_stream = midi_stream.chordify()
midi_stream.write('midi', "generated_melody.mid")
fs = FluidSynth("FluidR3_GM.sf2")
wav_file = "generated_melody.wav"
fs.midi_to_audio("generated_melody.mid", wav_file)
IPython.display.Audio("generated_melody.wav") 
