***
### __Darius Petermann__ <br>
Email: dariusarthur.petermann01@estudiant.upf.edu <br>
Project Git Page: https://github.com/darius522/lstm_rl_music_generator.git

How this notebook works:

This notebook is divided into two main sections:
* 1. [LSTM Training Stage](#part1)
* 2. [Deep Q-Learning Stage](#part2)

In [4]:
import random
import numpy as np

from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, LSTM, Dropout, Activation
from keras.utils.np_utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint

In [10]:
from mingus.midi import fluidsynth
import mingus.core.notes as notes
from mingus.containers import NoteContainer
from mingus.containers import Note
import time

def play_sequence(seq):

    SF2 = './1.sf2'
    fluidsynth.init(SF2,'alsa')
    time.sleep(1)
    for pitch in seq:
        note = Note(notes.int_to_note(pitch), 4)
        fluidsynth.play_Note(note)
        time.sleep(0.25)
        fluidsynth.stop_Note(note)

<h1><center>LSTM Training Stage</center></h1>

***

### Data Handling

In [5]:
from music21 import *
import glob
from tqdm import tqdm

# Since music21 only gives note name, we need a dict that converts name to categorical data
note2cat = {'C':0,'C#':1,'D-':1,'D':2,'D#':3,'E-':3,'E':4,'F':5,'F#':6,
             'G-':6,'G':7,'G#':8,'A-':8,'A':9,'A#':10,'B-':10,'B':11}

all_notes = []
for file in tqdm(glob.glob("./midifiles/*.mid")):
    midi = converter.parse(file)
    parts = instrument.partitionByInstrument(midi)
    for part in parts:
        nn = part.flat.notes.stream()
        for n in nn:
            if not isinstance(n, chord.Chord):
                all_notes.append(note2cat[n.name])
        
print(len(all_notes))

100%|██████████| 104/104 [01:18<00:00,  1.32it/s]

108655





In [6]:
# Sequence length ()
SEQ_LEN = 50

X = []
y = []

for i in range(0, len(all_notes) - SEQ_LEN, 1):
    seq = all_notes[i:i + SEQ_LEN] # Take the input sequence
    out = all_notes[i + SEQ_LEN]   # Take the output note
    X.append(seq)
    y.append(out)

X = np.asarray(X)
X = np.reshape(X, (np.shape(X)[0],np.shape(X)[1],1))
y_onehot = np.asarray(to_categorical(y))
# Printing Shapes
print('Initial Data Shape: '+str(np.shape(all_notes)))
print('Training Data Shape: '+str(np.shape(X)))
print('Ground Truth Data Shape: '+str(np.shape(y_onehot)))

Initial Data Shape: (108655,)
Training Data Shape: (108605, 50, 1)
Ground Truth Data Shape: (108605, 12)


### Define an LSTM Network 

In [7]:
def createModel(network_input):

    model = Sequential()
    model.add(LSTM(
        256,
        input_shape=(network_input.shape[1], network_input.shape[2]),
        return_sequences=True
    ))
    model.add(Dropout(0.3))
    model.add(LSTM(512, return_sequences=True))
    model.add(Dropout(0.3))
    model.add(LSTM(256))
    model.add(Dense(256))
    model.add(Dropout(0.3))
    model.add(Dense(12))
    model.add(Activation('softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='rmsprop')

    return model

In [8]:
path_to_hdf5 = './lstm_weight-119-0.0526.hdf5'

if path_to_hdf5 == '':
    filepath = "lstm_weight-{epoch:02d}-{loss:.4f}.hdf5"    
    checkpoint = ModelCheckpoint(
        filepath,
        monitor='loss', 
        verbose=0,        
        save_best_only=True,        
        mode='min'
    )    
    callbacks_list = [checkpoint]
    model = createModel(X)
    model.fit(X, y_onehot, epochs=60, batch_size=512, callbacks=callbacks_list)
else:
    model = createModel(X)
    model.load_weights(path_to_hdf5)

In [15]:
# 1. The very first sequence is cherry-picked from our training set
test_sequence   = X[np.random.randint(0, np.shape(X)[0]-1)]
pred_sequence   = []
TEST_SEQ_LENGTH = 16

for i in range(TEST_SEQ_LENGTH):
    test_sequence = np.reshape(test_sequence, (1, len(test_sequence), 1))
    # We predict the next note : Output will be a one-hot will prob as value so we can argmax
    prob = model.predict(test_sequence, verbose=0)
    pred_sequence.append(np.argmax(prob))
    # 4. The next sequence will add this prediction to the previous one and forget its oldest value
    test_sequence = np.append(test_sequence[:,1:SEQ_LEN,:],pred_sequence[-1])

play_sequence(pred_sequence)

<h1><center>Deep Q-Learning Stage</center></h1>
We feed our environement with sequences generated by our pretrained LSTM. The reward system will be based off the below music theory rules (very basic for now)

***

First let our total future discounted reward be:

\begin{align}
R_t = \sum^T_{t’=t}\gamma^{t’-t}r_{t’}
\end{align}

As explained in the companion slides of this notebook, this reward will be calculated off the sequences generated by our LSTM model.

We will be trying to maximize this reward by learning a function \\(Q\\), which will be giving us the best action \\(a\\) for a given state \\(s\\). This mechanism can be defined as follow:

\begin{align}
Q(s, a) = max_\pi \mathbb{E}[R_t|s_t = s, a_t = a, \pi]
\end{align}

These different well-known aspects of Q learning can be musically (for our case) interpreted as follow:

* \\(s\\): current state of the composition
* \\(a\\): latest note event generated by the LSTM
* \\(r\\): reward given by q learning based off music theory rules
* \\(a\\): rectified action

In [1]:
from copy import deepcopy
import gym
import matplotlib
import matplotlib.pyplot as plt

from insoco.Environment import Environment
from insoco.Plotting import plotQ

from insoco.FunctionApprox import Q_function, Q_function_count

# major
KEY = [0,2,4,5,7,9,11]

To use gym-minigrid install from the local master code with : pip3 install -e . 


Using TensorFlow backend.


In [16]:
#@Todo: Define environement configuration for our music reward policy
conf = {"name":"Composition", "stats":{}}
env = Environment(conf)

Setting default max_steps per episode: 100000
Default number of agents: 1
No environment found


In [None]:
#@Todo: Implement the reward policy system
def evaluate_model(num_trials=100):
    
     """Used to evaluate the rewards the model receives.
    Generates num_trials compositions and computes the LSTM sequence and music
    theory rewards.
    
    Args:
      num_trials: The number of compositions to use for evaluation.
    """
        
    # 1. The very first sequence is cherry-picked from our training set
    test_sequence   = X[np.random.randint(0, np.shape(X)[0]-1)]
    pred_sequence   = []
    TEST_SEQ_LENGTH = 16

    for i in range(TEST_SEQ_LENGTH):
        test_sequence = np.reshape(test_sequence, (1, len(test_sequence), 1))
        # We predict the next note : Output will be a one-hot will prob as value so we can argmax
        prob = model.predict(test_sequence, verbose=0)
        pred_sequence.append(np.argmax(prob))
        # 4. The next sequence will add this prediction to the previous one and forget its oldest value
        test_sequence = np.append(test_sequence[:,1:SEQ_LEN,:],pred_sequence[-1])
        

In [None]:
#@ToDo: Update environement variables based off the reward/penalty
def collect_reward(obs, action):
    """ The reward policy are all called from this function. 
    Their reward/penalty amounts are gathered here based on the current state of the LSTM sequence
    
    Args:
      obs: the observed note.
      action: the chosen action.
    Returns:
      Float reward value.
    """
    reward = 0
    
    reward_music_theory()
    
    return reward

Our reward policy is solely based off music theory (at least for now, as an initial step). Weĺl start simple by relying on what the pre-trained LSTM generates, and only rectify if the new action note \\(a\\) is out of a pre-defined scale. The reward can be defined as follow:

\begin{align}
r_t(a,s) = \frac{1}{c}r_{MT}(a,s)
\end{align}

Where \\(c\\) is a pre-defined constant controlling how much emphasis should be put on the music theory policy. 

In [None]:
#@ToDo: Observe composition state and reward/penalize accordingly
def reward_music_theory(action, penalty_amount=-1.0):
    reward = 0

    action_note = self.comp[-1] # @ToDo: Define Global Var. for composition
    
    if action_note not in key:
        reward = penalty_amount

    return reward