<a href="https://colab.research.google.com/github/linkjavier/disco/blob/felipe/felipe/lstm_attention_295_rmsprop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

From book: Generative AI with Python and TensorFlow 2 By Joseph Babcock and Raghav Bali

LSTM-Attention Model for Music Generation

In [1]:
import os
import pickle
import numpy as np
from music21 import note, chord, corpus, converter, stream, instrument
import glob
from tensorflow.keras.utils import to_categorical
from tqdm.notebook import tqdm

In [2]:
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import LSTM, Input, Dropout, Dense, Activation, Embedding, Concatenate, Reshape
from tensorflow.keras.layers import Flatten, RepeatVector, Permute, TimeDistributed
from tensorflow.keras.layers import Multiply, Lambda, Softmax
import tensorflow.keras.backend as K 
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import RMSprop

In [3]:
!unzip '/content/drive/MyDrive/classical_music_midi_295.zip'

Archive:  /content/drive/MyDrive/classical_music_midi_295.zip
   creating: classical_music_midi_295/
  inflating: classical_music_midi_295/alb_esp1.mid  
  inflating: classical_music_midi_295/alb_esp2.mid  
  inflating: classical_music_midi_295/alb_esp3.mid  
  inflating: classical_music_midi_295/alb_esp4.mid  
  inflating: classical_music_midi_295/alb_esp5.mid  
  inflating: classical_music_midi_295/alb_esp6.mid  
  inflating: classical_music_midi_295/alb_se1.mid  
  inflating: classical_music_midi_295/alb_se2.mid  
  inflating: classical_music_midi_295/alb_se3.mid  
  inflating: classical_music_midi_295/alb_se4.mid  
  inflating: classical_music_midi_295/alb_se5.mid  
  inflating: classical_music_midi_295/alb_se6.mid  
  inflating: classical_music_midi_295/alb_se7.mid  
  inflating: classical_music_midi_295/alb_se8.mid  
  inflating: classical_music_midi_295/bach_846.mid  
  inflating: classical_music_midi_295/bach_847.mid  
  inflating: classical_music_midi_295/bach_850.mid  
  infl

In [4]:
data_dir = 'classical_music_midi_295'

# list of files
midi_list = os.listdir(data_dir)

# Load and make list of stream objects
original_scores = []
for midi in tqdm(midi_list):
    score = converter.parse(os.path.join(data_dir,midi))
    original_scores.append(score)

  0%|          | 0/295 [00:00<?, ?it/s]

In [5]:
# Merge notes into chords
original_scores = [midi.chordify() for midi in tqdm(original_scores)]

  0%|          | 0/295 [00:00<?, ?it/s]

In [6]:
# Define empty lists of lists
original_chords = [[] for _ in original_scores]
original_durations = [[] for _ in original_scores]
original_keys = []

# Extract notes, chords, durations, and keys
for i, midi in tqdm(enumerate(original_scores)):
    original_keys.append(str(midi.analyze('key')))
    for element in midi:
        if isinstance(element, note.Note):
            original_chords[i].append(element.pitch)
            original_durations[i].append(element.duration.quarterLength)
        elif isinstance(element, chord.Chord):
            original_chords[i].append('.'.join(str(n) for n in element.pitches))
            original_durations[i].append(element.duration.quarterLength)

0it [00:00, ?it/s]

In [7]:
# Create list of chords and durations from songs in C major
major_chords = [c for (c, k) in tqdm(zip(original_chords, original_keys)) if (k == 'C major')]
major_durations = [c for (c, k) in tqdm(zip(original_durations, original_keys)) if (k == 'C major')]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

In [8]:
def get_distinct(elements):
    # Get all pitch names
    element_names = sorted(set(elements))
    n_elements = len(element_names)
    return (element_names, n_elements)

def create_lookups(element_names):
    # create dictionary to map notes and durations to integers
    element_to_int = dict((element, number) for number, element in enumerate(element_names))
    int_to_element = dict((number, element) for number, element in enumerate(element_names))

    return (element_to_int, int_to_element)

In [9]:
store_folder = 'parsed_data'
os.mkdir(store_folder)

In [10]:
# get the distinct sets of notes and durations
note_names, n_notes = get_distinct([n for chord in major_chords for n in chord])
duration_names, n_durations = get_distinct([d for dur in major_durations for d in dur])
distincts = [note_names, n_notes, duration_names, n_durations]

with open(os.path.join(store_folder, 'distincts'), 'wb') as f:
    pickle.dump(distincts, f)

In [11]:
# make the lookup dictionaries for notes and dictionaries and save
note_to_int, int_to_note = create_lookups(note_names)
duration_to_int, int_to_duration = create_lookups(duration_names)
lookups = [note_to_int, int_to_note, duration_to_int, int_to_duration]

with open(os.path.join(store_folder, 'lookups'), 'wb') as f:
    pickle.dump(lookups, f)

In [12]:
# Set sequence length
sequence_length = 64

# Define empty array for train data
train_chords = []
train_durations = []
target_chords = []
target_durations = []

# Construct train and target sequences for chords and durations
for s in range(len(major_chords)):
    chord_list = [note_to_int[c] for c in major_chords[s]]
    duration_list = [duration_to_int[d] for d in major_durations[s]]
    for i in range(len(chord_list) - sequence_length):
        train_chords.append(chord_list[i:i+sequence_length])
        train_durations.append(duration_list[i:i+sequence_length])
        target_chords.append(chord_list[i+1])
        target_durations.append(duration_list[i+1])

In [13]:
train_chords = np.array(train_chords)
train_durations = np.array(train_durations)
target_chords = np.array(target_chords)
target_durations = np.array(target_durations)

In [14]:
def create_network(n_notes, n_durations, embed_size = 100, rnn_units = 256):
    """ create the structure of the neural network """

    notes_in = Input(shape = (None,))
    durations_in = Input(shape = (None,))

    x1 = Embedding(n_notes, embed_size)(notes_in)
    x2 = Embedding(n_durations, embed_size)(durations_in) 

    x = Concatenate()([x1,x2])

    x = LSTM(rnn_units, return_sequences=True)(x)

    x = LSTM(rnn_units, return_sequences=True)(x)

    # attention
    e = Dense(1, activation='tanh')(x)
    e = Reshape([-1])(e)
    alpha = Activation('softmax')(e)

    alpha_repeated = Permute([2, 1])(RepeatVector(rnn_units)(alpha))

    c = Multiply()([x, alpha_repeated])
    c = Lambda(lambda xin: K.sum(xin, axis=1), output_shape=(rnn_units,))(c)
    
                                    
    notes_out = Dense(n_notes, activation = 'softmax', name = 'pitch')(c)
    durations_out = Dense(n_durations, activation = 'softmax', name = 'duration')(c)
   
    model = Model([notes_in, durations_in], [notes_out, durations_out])
    model.compile(loss=['sparse_categorical_crossentropy', 
                        'sparse_categorical_crossentropy'], optimizer=RMSprop(learning_rate = 0.001))

    return model

In [15]:
embed_size = 128
rnn_units = 128

In [16]:
model = create_network(n_notes, n_durations, embed_size, rnn_units)

In [17]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, None)]       0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, None)]       0                                            
__________________________________________________________________________________________________
embedding (Embedding)           (None, None, 128)    860800      input_1[0][0]                    
__________________________________________________________________________________________________
embedding_1 (Embedding)         (None, None, 128)    2688        input_2[0][0]                    
______________________________________________________________________________________________

In [18]:
os.mkdir('outputs')
os.mkdir('outputs/output')
os.mkdir('outputs/weights')

In [19]:
output_folder = 'outputs'
weights_folder = os.path.join(output_folder, 'weights')


checkpoint = ModelCheckpoint(
    os.path.join(weights_folder, "weights.h5"),
    monitor='loss',
    verbose=0,
    save_best_only=True,
    mode='min'
)

epoch_checkpoint = ModelCheckpoint(
    os.path.join(weights_folder, "weights-{epoch:02d}-{loss:.4f}.h5"),
    monitor='loss',
    verbose=0,
    save_best_only=True,
    mode='min'
)

early_stopping = EarlyStopping(
    monitor='loss'
    , restore_best_weights=True
    , patience = 5
)


callbacks_list = [
    checkpoint
    , epoch_checkpoint
    , early_stopping
 ]

model.save_weights(os.path.join(weights_folder, "weights.h5"))

In [20]:
history = model.fit([train_chords, train_durations], 
                    [target_chords, target_durations]
                    , epochs=100, batch_size=128
                    , callbacks=callbacks_list
                    , shuffle=True
                  )

Epoch 1/100




Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100


In [21]:
def sample_with_temp(preds, temperature):

    if temperature == 0:
        return np.argmax(preds)
    else:
        preds = np.log(preds) / temperature
        exp_preds = np.exp(preds)
        preds = exp_preds / np.sum(exp_preds)
        return np.random.choice(len(preds), p=preds)

In [22]:
# chord and duration sequences
initial_chords = np.expand_dims(train_chords[0,:].copy(), 0)
initial_durations = np.expand_dims(train_durations[0,:].copy(), 0)

In [23]:
initial_chords

array([[2510, 4125, 2424, 6702, 6702, 1360, 2552, 2424, 6702, 6702, 4125,
        2510, 6392, 6392, 6297, 6297, 6162, 6593, 3220, 1308, 5099, 6702,
        2484, 4125, 5099, 5089, 3220, 4125, 4092, 2510, 4914, 4914, 6373,
        6373, 2323, 4025, 6642, 2273, 3993, 6593, 2273, 3993, 6593, 2323,
        4025, 6642, 6671, 2493, 2273, 3993, 6593, 2493, 2273, 3993, 6593,
        2493, 2273, 3993, 6593, 6639, 2335, 4036, 6654, 2493]])

In [24]:
initial_durations

array([[7, 2, 2, 2, 2, 2, 2, 2, 2, 2, 7, 2, 2, 2, 2, 2, 8, 7, 2, 2, 7, 2,
        2, 7, 2, 2, 7, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
        3, 0, 2, 2, 0, 3, 3, 2, 0, 3, 3, 2, 0, 3, 0, 2, 3, 3, 3, 2]])

In [25]:
# Function to predict chords and durations
def predict_chords(chord_sequence, duration_sequence,model,temperature=1.0):
    predicted_chords, predicted_durations = model.predict([chord_sequence, duration_sequence])
    return sample_with_temp(predicted_chords[0],temperature), sample_with_temp(predicted_durations[0],temperature)

In [26]:
# Define empty lists for generated chords and durations
new_chords, new_durations = [], []

# Generate chords and durations using 120 rounds of prediction
for j in range(120):
    new_chord, new_duration = predict_chords(initial_chords, 
                                             initial_durations,
                                             model,
                                             temperature=0.8)
    new_chords.append(new_chord)
    new_durations.append(new_duration)
    initial_chords[0][:-1] = initial_chords[0][1:]
    initial_chords[0][-1] = new_chord
    initial_durations[0][:-1] = initial_durations[0][1:]
    initial_durations[0][-1] = new_duration

In [27]:
# Create stream object
generated_stream = stream.Stream()
generated_stream.append(instrument.Piano())

# Add notes and durations to stream
for j in range(len(new_chords)):
    try:
        generated_stream.append(note.Note(int_to_note[new_chords[j]].replace('.', ' '), 
                                          quarterType = int_to_duration[new_durations[j]]))
    except:
        generated_stream.append(chord.Chord(int_to_note[new_chords[j]].replace('.', ' '), 
                                            quarterType = int_to_duration[new_durations[j]]))

# Export as MIDI file
generated_stream.write('midi', fp='lstm_att_295_100_ep.mid')

'lstm_att_295_100_ep.mid'