In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Aux.ai Project
**Milestone 3**

## Imports

In [None]:
from music21 import converter, instrument, note, chord, stream
from keras.utils.np_utils import to_categorical
from tensorflow import keras
from tensorflow.keras.callbacks import ModelCheckpoint
import numpy as np
import glob 
import os
import json

PROJECT_PATH = "/content/drive/MyDrive/Term 8/DS2.4/final-project"
COMPOSER = "rachmaninov"
MIDI_PATH = os.path.join(PROJECT_PATH, "midi", COMPOSER)
NOTES_PATH = os.path.join(MIDI_PATH, "notes.json")
WEIGHTS_PATH = os.path.join(PROJECT_PATH, "weights")

## Encode notes
Because we are use MIDI files as input data, these have to be encoded into notes that we can feed the neural network. For this project I am only using the piano part and am encoding the notes as follows:  
- Single Note - pitch:duration
- Chord - pitch,pitch,pitch:duration

In [None]:
notes = []
song_limit = 100

if os.path.exists(NOTES_PATH):
  # Notes already exist
  print("Notes already exist")
  with open(NOTES_PATH, 'r') as f:
    data = json.load(f)
    notes = data['notes']
else:
  # Notes do not exist yet
  print("Notes do not exist yet")
  for i, file in enumerate(glob.glob(os.path.join(MIDI_PATH, "*.mid"))):
    if i >= song_limit:
      break

    print(f"Looking at file: {file}")
    try:
      midi = converter.parse(file)
    except:
      print(f"Could not parse file: {file}")
    
    notes_to_parse = None
    try:
      parts = instrument.partitionByInstrument(midi).parts
    except:
      print(f"Skipping {file} because of parts issue")
      continue
    
    # Find piano part index
    piano_index = 0
    for i, part in enumerate(parts):
      if part.partName == "Piano":
        piano_index = i
        break
    
    # Get notes from piano part
    if parts:
      notes_to_parse = parts[piano_index].recurse()
    else:
      notes_to_parse = midi.flat.notes

    # Encode each note
    for element in notes_to_parse:
      if isinstance(element, note.Note):
        notes.append(f"{element.pitch}:{element.duration.quarterLength}")
      elif isinstance(element, chord.Chord):
        notes.append(f"{','.join(str(n) for n in element.normalOrder)}:{element.duration.quarterLength}")

    # Write notes.json file
    with open(NOTES_PATH, 'w') as f:
      data = {
          "notes": notes
      }
      f.write(json.dumps(data))

print(f"Notes: {notes}")

Notes already exist
Notes: ['C#5:0.5', 'E-5:0.75', 'E5:0.25', 'E5:1/3', 'F#5:1/3', 'G5:0.25', 'G5:1/3', 'A5:1/3', 'B5:0.25', 'B5:0.5', '3,6:0.5', '7,11:0.5', '7,10,2:0.5', '11,2,6:0.5', '6,10,1:0.5', '9,1,4:1/3', '6,10,0:1/3', '7,11:0.25', '2,4:0.75', 'C#5:0.0', 'B4:0.0', 'C#5:0.0', '6,9,11,2:0.25', '4,7,11:1/3', '7,9,0,3:0.25', '6,9,11,2:0.5', 'C#4:0.5', '0,4,7:0.5', 'F#4:0.5', '9,0:0.5', '11,3,6:0.5', 'G#4:0.75', 'G4:0.25', '3,7,10:0.5', 'G2:2.75', 'B-4:0.5', 'F#4:0.5', 'G4:0.5', 'F4:1/3', 'D4:1/3', 'E-4:0.25', '2,4,7:0.75', 'B-2:0.75', '1,4,7:2.75', '9:2.75', 'D2:1/3', 'A2:0.25', '2,6:1/3', 'A3:1/3', 'E4:0.25', '2,6:0.75', 'A4:0.75', 'C#5:1/3', 'F#3:1/3', 'B2:2/3', 'A3:0.25', 'C#4:1/3', 'A3:1/3', 'F4:0.25', '3,6:0.75', 'A4:0.75', 'B4:0.25', '9,0,4:0.75', 'F#2:2/3', 'E3:0.25', 'G#3:1/3', 'A3:1/3', 'C4:0.25', '11,0,4:1/3', '6,9:1/3', '7:0.25', '6,11:0.75', 'A4:0.25', 'F#4:0.75', 'B1:2/3', 'F#3:0.25', 'C4:1/3', 'A3:1/3', 'F#3:0.25', 'G#4:0.5', '9,11:0.75', '9,10:0.25', 'G#3:0.0', '11,4

## Create Network Input

The model expects input in the form of sequences of encoded notes and the correct next note after that sequence. Therefore I define a sequence length and create a network input and output variables which essentially act like  X_train, y_train. I do not split the data into test and validation sets because I do not have any evaluation established for the model besides loss.  

An important point here is that I am not using the string representation(encoded version) of the notes as input/output of the model, instead I convert them to numbers using a dictionary mapping. This is what note_to_int is.



In [None]:
sequence_length = 40

# Create note to int dictionary
pitch_names = sorted(set(item for item in notes))
note_to_int = dict((note, number) for number, note in enumerate(pitch_names))

n_vocab = len(note_to_int.keys())
network_input = []
network_output = []

# Create network input sequences and corresponding outputs
for i in range(0, len(notes) - sequence_length, 1):
  sequence_in = notes[i:i + sequence_length]
  sequence_out = notes[i + sequence_length]
  network_input.append([note_to_int[item] for item in sequence_in])
  network_output.append(note_to_int[sequence_out])

n_patterns = len(network_input)

# Reshape and normalize input
network_input = np.reshape(network_input, (n_patterns, sequence_length, 1))
network_input = network_input / float(n_vocab)

print(f"Vocab Length: {n_vocab}")

network_output = to_categorical(network_output)

Vocab Length: 2344


## Create network_model_object which is used in the API.  
This is specific to each model/composer

In [None]:
# Create network_object json for the API

input_json_model_object = {
    "network_input": network_input.tolist()[:int(len(network_input)/4)],
    "network_input_shape": network_input.shape,
    "pitch_names": pitch_names,
    "sequence_length": int(sequence_length),
    "n_vocab": int(n_vocab)
}

with open(os.path.join(PROJECT_PATH, "models",f"{COMPOSER}_network_object_seqlen_{sequence_length}.json"), 'w') as f:
  json.dump(input_json_model_object, f)

## Define Model  
This is my LSTM model which has been proven to work pretty well for the input data that I have. 

In [None]:
from tensorflow.keras import backend as K

class Attention(keras.layers.Layer):
    
  def __init__(self, return_sequences=True):
      self.return_sequences = return_sequences
      super(Attention,self).__init__()
      
  def build(self, input_shape):
      
      self.W=self.add_weight(name="att_weight", shape=(input_shape[-1],1),
                              initializer="normal")
      self.b=self.add_weight(name="att_bias", shape=(input_shape[1],1),
                              initializer="zeros")
      
      super(Attention,self).build(input_shape)
      
  def call(self, x):
      
      e = K.tanh(K.dot(x,self.W)+self.b)
      a = K.softmax(e, axis=1)
      output = x*a
      
      if self.return_sequences:
          return output
      
      return K.sum(output, axis=1)

  def get_config(self):
      config = super().get_config().copy()
      config.update({
          'return_sequences': self.return_sequences,
      })
      return config

In [None]:
model = keras.models.Sequential([
  keras.layers.LSTM(512, input_shape=(network_input.shape[1], network_input.shape[2]), return_sequences=True),
  keras.layers.Dropout(0.2),
  Attention(return_sequences=True),
  keras.layers.LSTM(512, return_sequences=True),
  keras.layers.Dropout(0.2),
  keras.layers.Flatten(input_shape=(sequence_length, 512)),
  keras.layers.Dense(256),
  keras.layers.Dropout(0.2),
  keras.layers.Dense(n_vocab, activation='softmax'),
]) 

model.compile(loss='categorical_crossentropy', optimizer='rmsprop')

## Fit the model

In [None]:
model_fn = f"{COMPOSER}_seqlen_{sequence_length}_2LSTM_1Attention_2Dense.hdf5"
model_fp = os.path.join(PROJECT_PATH, "models", COMPOSER, model_fn)
model.load_weights(model_fp)

In [None]:
# Add checkpoints

model.fit(network_input, network_output, epochs=200, batch_size=512)

model_fn = f"{COMPOSER}_seqlen_{sequence_length}_2LSTM_1Attention_2Dense.hdf5"
model_fp = os.path.join(PROJECT_PATH, "models", COMPOSER, model_fn)
model.save(model_fp)

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

## Generate Notes

This part of the code generates notes using the model we just trained. All it needs as input is a random sequence of notes of the same sequence length as was used in training, and then I essentially use that as a starting point and generate n notes, giving the model sequences and getting a note one by one as the prediction. 

In [None]:
start = np.random.randint(0, len(network_input)-1)


print(f"start: {start}")

int_to_note = dict((number, note) for number, note in enumerate(pitch_names))

pattern = network_input[start]
prediction_output = []

# print(f"start pattern: {pattern}")

# Generate n notes
notes_to_generate = 400
for note_index in range(notes_to_generate):
  
  prediction_input = np.reshape(pattern, (1, len(pattern), 1))
  prediction = model.predict(prediction_input, verbose=0)[0]

  prediction_index = np.argmax(prediction)

  prediction_note = int_to_note[prediction_index]
  prediction_output.append(prediction_note)
  prediction_result = prediction_index / float(n_vocab)

  # print(f"Pattern: {pattern[-5:]}")

  pattern = np.append(pattern, prediction_result)
  pattern = pattern[1:len(pattern)]

print(f"Output notes: {prediction_output}")

start: 21014
Output notes: ['3,6:0.25', 'C#4:0.25', 'G#3:0.25', '8,1:0.75', '8,1:0.25', 'F4:0.25', 'C#4:0.25', 'G#3:0.25', 'E-4:0.25', 'B-3:0.25', 'F4:0.25', 'G#3:0.25', 'G#4:0.25', 'F3:0.25', 'E-5:0.25', 'G#2:0.25', 'F5:1/3', 'F5:0.25', 'C#2:0.25', 'G#4:0.25', 'G#2:0.25', 'F#5:1/3', 'F#5:0.25', 'E-3:0.25', 'C#5:0.25', 'E3:0.25', 'G#5:1/3', 'G#5:0.25', 'F3:0.25', 'F5:0.25', 'G#3:0.25', 'G#5:1/3', 'A5:0.25', '6,11:0.25', 'F#5:0.25', 'D4:0.25', 'A5:1/3', 'B5:0.25', 'F#4:0.25', 'D5:0.25', 'A4:0.25', 'C6:1/3', 'B5:0.25', 'B3:0.5', 'E-5:0.25', 'E-5:0.25', 'G#4:0.25', '1,5:1/3', 'F5:0.25', '8,1:2/3', 'G#2:0.25', 'B4:0.0', 'C#5:0.0', 'C#6:0.0', 'C5:1.0', '1,3:1.0', 'F3:1/3', 'F4:0.25', 'C#5:0.0', 'C5:0.25', 'F4:0.25', 'G4:0.25', '10,3:0.25', 'F#4:1/3', 'G#2:0.25', 'C2:0.25', 'G#4:1/3', 'F4:1/3', 'F4:1/3', 'F4:1/3', 'G#4:1/3', 'F4:0.25', 'G#4:1/3', 'G#4:1/3', 'G#4:1/3', 'B-3:1/3', 'C#4:1/3', 'G#3:0.25', 'G5:0.25', 'B-3:1/3', 'F4:0.25', 'D5:1/3', 'D5:1/3', 'G#3:0.25', 'F4:0.25', 'G3:0.25', 'G3:

## Convert back to MIDI file

Here I decode the generates notes using the inverse of the note_to_int dictionary I had previously, and write a Midi file

In [None]:
import random

offset = 0
output_notes = []
# create note and chord objects based on the values generated by the model
for pattern in prediction_output:
    # pattern is a chord
    if (',' in pattern) or pattern.isdigit():
        pitch = pattern.split(":")[0]
        duration = pattern.split(":")[1]
        notes_in_chord = pitch.split(',')
        chord_notes = []
        for current_note in notes_in_chord:
            new_note = note.Note(int(current_note))
            new_note.storedInstrument = instrument.AltoSaxophone()
            chord_notes.append(new_note)
        new_chord = chord.Chord(chord_notes)
        new_chord.offset = offset
        output_notes.append(new_chord)
    else:
        pitch = pattern.split(":")[0]
        duration = pattern.split(":")[1]
        
        try:
          new_note = note.Note(pitch)
        except:
          continue
        new_note.offset = offset
        new_note.storedInstrument = instrument.AltoSaxophone()
        output_notes.append(new_note)
    # increase offset each iteration so that notes do not stack

    if '/' in duration:
      duration = float(int(duration.split('/')[0])/int(duration.split('/')[1]))

    offset += float(duration)

midi_stream = stream.Stream(output_notes)
midi_stream.write('midi', fp='test_output.mid')

'test_output.mid'

## A lot of work for this milestone was done on the Frontend of the application, and the link for the github repo is below:

- Github repo: https://github.com/APNovichkov/aux-ai-frontend