# IMPORT SECTION

In [None]:
!pip install --upgrade music21==6.7.1
from music21 import converter, instrument, note, chord, stream
import os
from tqdm import tqdm
import numpy as np
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Flatten, Dropout
%load_ext tensorboard 
from tensorboard import notebook
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ReduceLROnPlateau

Collecting music21==6.7.1
  Downloading music21-6.7.1.tar.gz (19.2 MB)
[K     |████████████████████████████████| 19.2 MB 208 kB/s 
Collecting webcolors
  Downloading webcolors-1.11.1-py3-none-any.whl (9.9 kB)
Building wheels for collected packages: music21
  Building wheel for music21 (setup.py) ... [?25l[?25hdone
  Created wheel for music21: filename=music21-6.7.1-py3-none-any.whl size=21941721 sha256=e03b39466a8dbc077c1590277a889f0fef483d6ad74dfca448f3babb60ad20e6
  Stored in directory: /root/.cache/pip/wheels/72/44/61/90e4e65262ca1b4d9f707527b540729ce3f64e00fc6b38d54c
Successfully built music21
Installing collected packages: webcolors, music21
  Attempting uninstall: music21
    Found existing installation: music21 5.5.0
    Uninstalling music21-5.5.0:
      Successfully uninstalled music21-5.5.0
Successfully installed music21-6.7.1 webcolors-1.11.1


In [None]:
from google.colab import drive

drive.mount('/content/drive', force_remount=False)

MIDI_PATH="/content/drive/My Drive/MLFolder/Onlab/MIDI_Iron_Maiden/"

Mounted at /content/drive


In [None]:
!nvidia-smi

Thu Nov 11 15:17:22 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.44       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   36C    P0    26W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

# UTILITY FUNCTIONS SECTION

In [None]:
def most_frequent(paramlist): 
    #https://www.geeksforgeeks.org/python-find-most-frequent-element-in-a-list/
    counter = 0
    num = paramlist[0] 
      
    for i in paramlist: 
        curr_frequency = paramlist.count(i) 
        if(curr_frequency> counter): 
            counter = curr_frequency 
            num = i 
  
    return num 

def get_notes_from_chord(chord):
    if chord.startswith("<music21.chord.Chord "):
        chord = chord[len("<music21.chord.Chord "):]
    if chord.endswith(">"):
        chord = chord[:-1]
    chord = chord.replace(" ", ",")
    return chord

def get_number_from_duration(duration):
    if duration.startswith("<music21.duration.Duration "):
        duration = duration[len("<music21.duration.Duration "):]
    if duration.endswith(">"):
        duration = duration[:-1]
    duration = duration.replace(" ", ",")
    return duration

def create_mapper(chords):
    pitchnames = sorted(set(str(item) for item in chords))
    mapper = dict((note, number) for number, note in enumerate(pitchnames))

    return mapper

def encode_using_mapper(chords, mapper):
    encodedsong=[]
    for c in chords:
        encodedsong.append(mapper[str(c)])

    return encodedsong


def decode_chords_using_mapper(numbers, mapper):
    outputnotes = []
    for number in numbers:
        outputnotes.append(chord_from_string(get_notes_from_chord(get_key_from_value(number, mapper))))

    return outputnotes

def combine_chords_with_durations(chords, durations):
    combined = []

    for i, j in zip(chords, durations):
        i = get_notes_from_chord(str(i))
        j = get_number_from_duration(str(j))
        combined.append(i + ';' + j)

    return combined

def make_slices(data, slice_length):
    for song in tqdm(data):
        if len(song) > slice_length:

            input = []
            output = []
            slice = []

            for idx, number in enumerate(song):
                if idx < slice_length:
                    slice.append(number)

            input.append(slice.copy())
            output.append(song[slice_length])

            # Sliding window
            for idx, number in enumerate(song):
                if idx >= slice_length and (idx + 1) < len(song):
                    slice.pop(0)
                    slice.append(number)
                    input.append(slice.copy())  # Copy is necessary, because of how pointers and lists work in Python
                    output.append(song[idx + 1])

    return input, output

def parse_everything_together(data, slice_length):

    notes=[]
    input=[]
    output=[]
    slice = []

    for song in tqdm(data):
        for number in song:
            notes.append(number)
    
    for idx, note in tqdm(enumerate(notes)):
        if idx < slice_length:
            slice.append(number)

    input.append(slice.copy())
    output.append(notes[slice_length])

    # Sliding window
    for idx, number in tqdm(enumerate(notes)):
        if idx >= slice_length and (idx + 1) < len(notes):
            slice.pop(0)
            slice.append(number)
            input.append(slice.copy())  # Copy is necessary, because of how pointers and lists work in Python
            output.append(notes[idx + 1])
        
    return input, output

def get_key_from_value(value, dict):
    return list(dict.keys())[list(dict.values()).index(value)]

def get_notes_from_chord(chord):
    if chord.startswith("<music21.chord.Chord "):
        chord = chord[len("<music21.chord.Chord "):]
    if chord.endswith(">"):
        chord = chord[:-1]
    chord = chord.replace(" ", ",")
    return chord

def get_number_from_duration(duration):
    if duration.startswith("<music21.duration.Duration "):
        duration = duration[len("<music21.duration.Duration "):]
    if duration.endswith(">"):
        duration = duration[:-1]
    duration = duration.replace(" ", ",")
    return duration

def chord_from_string(chordstring):
    notes = chordstring.split(";")
    return chord.Chord(notes)


def convert_to_float(frac_str):
    #From: https://stackoverflow.com/questions/1806278/convert-fraction-to-float
    try:
        return float(frac_str)
    except ValueError:
        num, denom = frac_str.split('/')
        try:
            leading, num = num.split(' ')
            whole = float(leading)
        except ValueError:
            whole = 0
        frac = float(num) / float(denom)
        return whole - frac if whole < 0 else whole + frac

#Source: https://github.com/alexissa32/DataScienceMusic
def create_midi_without_chords(prediction_output, target_instrument = instrument.Piano(), filename = 'test_output.mid'):
    '''
    First step:
    Only notes, no chords
    Static 4/4 beat
    Rests
    '''
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    for pattern in prediction_output:
        # pattern is a rest
        if('rest' in pattern):
            new_rest = note.Rest(pattern)
            new_rest.offset = offset
            new_rest.storedInstrument = target_instrument #???
            output_notes.append(new_rest)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = target_instrument
            output_notes.append(new_note)
        # increase offset each iteration so that notes do not stack
        offset += 0.5

    midi_stream = stream.Stream(output_notes)

    midi_stream.write('midi', fp=filename)

def create_midi_without_durations(prediction_output, target_instrument = instrument.Piano(), filename = 'test_output.mid'):
    '''
    Second step:
    Chords and notes
    Static 4/4 beat
    Rests
    '''
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    for pattern in prediction_output:
        # pattern is a chord
        if ('chord' in pattern):
            notes = []
            pattern = get_notes_from_chord(pattern)
            patternpitches = pattern.split(',')
            for current_note in patternpitches:
                new_note = note.Note(current_note)
                new_note.storedInstrument = target_instrument
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a rest
        elif('rest' in pattern):
            new_rest = note.Rest(pattern)
            new_rest.offset = offset
            new_rest.storedInstrument = target_instrument #???
            output_notes.append(new_rest)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = target_instrument
            output_notes.append(new_note)
        # increase offset each iteration so that notes do not stack
        offset += 0.5

    midi_stream = stream.Stream(output_notes)

    midi_stream.write('midi', fp=filename)

def create_midi_with_durations(prediction_output, output_durations, target_instrument = instrument.Piano(), filename = 'test_output.mid'):
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    for i in range(len(prediction_output)):
        pattern = prediction_output[i]
        duration = get_number_from_duration(output_durations[i])
        # pattern is a chord
        if ('chord' in pattern):
            notes = []
            pattern = get_notes_from_chord(pattern)
            patternpitches = pattern.split(',')
            for current_note in patternpitches:
                new_note = note.Note(current_note)
                new_note.storedInstrument = target_instrument
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a rest
        elif('rest' in pattern):
            new_rest = note.Rest(pattern)
            new_rest.offset = offset
            new_rest.storedInstrument = target_instrument #???
            output_notes.append(new_rest)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = target_instrument
            output_notes.append(new_note)
        # increase offset each iteration so that notes do not stack
        offset += convert_to_float(duration)

    midi_stream = stream.Stream(output_notes)

    midi_stream.write('midi', fp=filename)

def create_midi_with_embedded_durations(prediction_output, target_instrument = instrument.Piano(), filename = 'test_output.mid'):
    offset = 0
    output_notes = []

    # create note and chord objects based on the values generated by the model
    for i in range(len(prediction_output)):
        pattern = prediction_output[i]
        splitpattern = pattern.split(";")
        pattern = splitpattern[0]

        duration = get_number_from_duration(splitpattern[1])
        # pattern is a rest
        if('rest' in pattern):
            new_rest = note.Rest(pattern)
            new_rest.offset = offset
            new_rest.storedInstrument = target_instrument #???
            output_notes.append(new_rest)
        # pattern is a chord
        elif (',' in pattern):
            notes = []
            pattern = get_notes_from_chord(pattern)
            patternpitches = pattern.split(',')
            for current_note in patternpitches:
                new_note = note.Note(current_note)
                new_note.storedInstrument = target_instrument
                notes.append(new_note)
            new_chord = chord.Chord(notes)
            new_chord.offset = offset
            output_notes.append(new_chord)
        # pattern is a note
        else:
            new_note = note.Note(pattern)
            new_note.offset = offset
            new_note.storedInstrument = target_instrument
            output_notes.append(new_note)
        # increase offset each iteration so that notes do not stack
        offset += convert_to_float(duration)

    midi_stream = stream.Stream(output_notes)

    midi_stream.write('midi', fp=filename)

def parse_midi_notes_and_durations():
    midiparts = []

    for file in tqdm(os.listdir(path)):
        midi = converter.parse(os.path.join(path, file))

        for part in midi.parts:
            chords=[]
            durations=[]
            for element in part.notesAndRests:
                if isinstance(element, note.Note):
                    chords.append(chord.Chord([element]))
                    durations.append(element.duration)
                elif isinstance(element, chord.Chord):
                    chords.append(element)
                    durations.append(element.duration)
                elif isinstance(element, note.Rest):
                    chords.append(element)
                    durations.append(element.duration)

            if len(chords) > 0:
                midiparts.append(MidiPart(file, part.partName, chords, durations))
            else:
                for voice in part.voices:
                    chords=[]
                    durations=[]
                    for element in voice.notesAndRests:
                        if isinstance(element, note.Note):
                            chords.append(chord.Chord([element]))
                            durations.append(element.duration)
                        elif isinstance(element, chord.Chord):
                            chords.append(element)
                            durations.append(element.duration)
                        elif isinstance(element, note.Rest):
                            chords.append(element)
                            durations.append(element.duration)

                    midiparts.append(MidiPart(file, part.partName, chords, durations))

    return midiparts

# PREPROCESSING SECTION


In [None]:
class MidiPart:
    def __init__(self, song, instrument, chords, durations):
        self.song = song
        self.instrument = instrument
        self.chords = chords
        self.durations = durations


path = MIDI_PATH

IRON_MAIDEN_INSTRUMENTS = ['Acoustic Guitar', 'Viola', 'Electric Bass', 'Brass', 'Sampler', 'Electric Guitar', 'Piano', 'StringInstrument']

SLICE_LEN = 20

In [None]:
midiparts = parse_midi_notes_and_durations()

100%|██████████| 93/93 [14:13<00:00,  9.18s/it]


In [None]:
allchords = []
alldurations = []

TARGET_INSTRUMENT = 'Electric Bass'
for i in midiparts:
    if i.instrument == TARGET_INSTRUMENT:
        if len(i.chords)>0 :
            allchords.append(i.chords)
            alldurations.append(i.durations)

In [None]:
assert(len(allchords) == len(alldurations))

combined = []
for i in range(len(allchords)):
    combined.append(combine_chords_with_durations(allchords[i], alldurations[i]))

mapperdata = []

for i in combined:
    for j in i:
        mapperdata.append(j)

mapper = create_mapper(mapperdata)

encoded_data = []

for c in combined:
    encoded = encode_using_mapper(c, mapper)
    encoded_data.append(encoded)

restkeysvalues = []
for j in mapper.keys():
    if ( 'rest' in j):
        restkeysvalues.append(mapper[j])

cleared_encoded_data=[]

for i in range(len(encoded_data)):
    if most_frequent(encoded_data[i]) not in restkeysvalues:
        cleared_encoded_data.append(encoded_data[i])
    else:
        pass

In [None]:
#Creating the input data

input, output = parse_everything_together(cleared_encoded_data, SLICE_LEN)

100%|██████████| 102/102 [00:00<00:00, 8069.93it/s]
158198it [00:00, 2855970.59it/s]
158198it [00:00, 739969.34it/s]


In [None]:
import matplotlib.pyplot as plt
from collections import Counter

outputcnt = Counter(output)

function_like_array=[]
for val in outputcnt.values():
    function_like_array.append(val)

function_like_array.sort()

outliers = []
OUTLIER_CONSTANT = 40

for i in outputcnt.keys():
    if outputcnt[i] < OUTLIER_CONSTANT:
        outliers.append(i)

In [None]:
#Because I don't want to mess up my inputs and outputs, I test their lengths before and after the outlier filtering.
assert(len(input) == len(output))

newinput=[]
newoutput=[]

for i in range(len(output)):
    if(output[i] not in outliers):
        newinput.append(input[i])
        newoutput.append(output[i])

input = newinput
output = newoutput

assert(len(input) == len(output))

In [None]:
#However, this outlier filtering made things complicated. Now I have to make a new mapper, so that i won't end up with output classes containing 0 elements.

mapper_list = [] #Idx of the mapper list is the new value, the element is the old value.
new_output_elements = set(output)

for i in new_output_elements:
    mapper_list.append(i)

newoutput = []

for i in output:
    newoutput.append(mapper_list.index(i))

output = newoutput

In [None]:
input = np.reshape(np.asarray(input), (len(input), SLICE_LEN, 1))
output=np.asarray(output)

In [None]:
SEED = 54
#np.random.seed(SEED)

from sklearn.model_selection import train_test_split

X_train, X_test, Y_train, Y_test = train_test_split(input, output, test_size=0.01, random_state=SEED)
X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=0.2, random_state=SEED)

startidx = np.random.randint(0, len(X_test)-1)
starting_slice = X_test[startidx]

print(X_train.shape, X_val.shape, X_test.shape, Y_train.shape, Y_val.shape, Y_test.shape)
print(startidx)
print(starting_slice)

(121952, 20, 1) (30489, 20, 1) (1540, 20, 1) (121952,) (30489,) (1540,)
417
[[748]
 [748]
 [748]
 [748]
 [748]
 [748]
 [748]
 [748]
 [748]
 [748]
 [217]
 [217]
 [217]
 [217]
 [309]
 [398]
 [708]
 [708]
 [708]
 [  1]]


# MODEL TRAINING SECTION

In [None]:
from tensorflow.keras.utils import Sequence

# DATA GENERATOR
NUM_CLASSES = to_categorical(Y_train).shape[1]
SLICE_SIZE = SLICE_LEN

class MyDatagen(Sequence):
  def __init__(self, list_IDs, batch_size=16, dim=(SLICE_SIZE), shuffle=True, validation=False):
    'Initialization'
    self.dim = dim
    self.batch_size = batch_size
    self.list_IDs = list_IDs
    self.shuffle = shuffle
    self.validation=validation
    self.on_epoch_end()

  def __len__(self):
    return int(np.floor(len(self.list_IDs) / self.batch_size))

  def __getitem__(self, index):
    # Generate indexes of the batch
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

    # Find list of IDs
    list_IDs_temp = [self.list_IDs[k] for k in indexes]

    # Generate data
    X, y = self.__data_generation(list_IDs_temp)

    return X, y

  def on_epoch_end(self):
      #Updates indexes after each epoch
    self.indexes = np.arange(len(self.list_IDs))
    if self.shuffle == True:
      np.random.shuffle(self.indexes)

  def __data_generation(self, list_IDs_temp):
    #Generates data containing batch_size samples
    if self.validation:
      X = np.empty((self.batch_size, self.dim, 1))
      y = np.empty((self.batch_size, NUM_CLASSES))

      # Generate data
      for i, ID in enumerate(list_IDs_temp):
        X[i] = X_val[ID]

        y[i] = to_categorical(Y_val[ID], num_classes=NUM_CLASSES)

      return X, y
    else:
      X = np.empty((self.batch_size, self.dim, 1))
      y = np.empty((self.batch_size, NUM_CLASSES))

      # Generate data
      for i, ID in enumerate(list_IDs_temp):
        X[i] = X_train[ID]

        y[i] = to_categorical(Y_train[ID], num_classes=NUM_CLASSES)

      return X, y

In [None]:
MODEL_NAME = "Transformer_bass_1"

MODEL_SAVE_PATH = "/content/drive/MyDrive/MLFolder/Onlab/modelsaves/"

TBPATH = "/content/tblogs/"+MODEL_NAME

In [None]:
#Before creating the neural network, I define some important callbacks

tb = TensorBoard(log_dir = TBPATH, write_images=True, histogram_freq=1)

plateau = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, min_lr=0.00005, verbose=1)

es = EarlyStopping(monitor='val_loss', patience = 5, restore_best_weights = True, verbose=1)

callbacks = [plateau, es, tb]

In [None]:
#CONSTANTS:

EMBEDDING_DIM = 256
encoder_layers = 1
decoder_layers = 1
dropout_rate = 0.1
layernorm_epsilon = 1e-6
num_heads = 4
key_dim = 10


In [None]:
from tensorflow.keras.layers import Layer
import tensorflow as tf
import math as m

#source: https://github.com/jason9693/MusicTransformer-tensorflow2.0
class DynamicPositionEmbedding(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        embedding_dim = EMBEDDING_DIM
        max_seq=2048
        embed_sinusoid_list = np.array([[
            [
                m.sin(
                    pos * m.exp(-m.log(10000) * i/embedding_dim) *
                    m.exp(m.log(10000)/embedding_dim * (i % 2)) + 0.5 * m.pi * (i % 2)
                )
                for i in range(embedding_dim)
            ]
            for pos in range(max_seq)
        ]])
        self.positional_embedding = tf.constant(embed_sinusoid_list, dtype=tf.float32)

    def call(self, inputs, **kwargs):
        return tf.add(inputs, self.positional_embedding[:,:inputs.shape[1],:])

In [None]:
def make_encoder_block(x):
    #attn_out = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim, dropout=dropout_rate)(x,x)
    attn_out = Attention()([x,x])
    lnorm = LayerNormalization(epsilon=layernorm_epsilon)(attn_out+x)
    x = Dense(EMBEDDING_DIM // 2, activation='relu')(lnorm)
    x = Dense(EMBEDDING_DIM)(x)
    x = Dropout(dropout_rate)(x)
    #x = LayerNormalization(epsilon=layernorm_epsilon)(lnorm+x)
    return x

In [None]:
def make_decoder_block(x, encoder_out):
    #attn_out = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim, dropout=dropout_rate)(x,x)
    attn_out = Attention()([x,x])
    x = LayerNormalization(epsilon=layernorm_epsilon)(attn_out+x)
    #attn_out = MultiHeadAttention(num_heads=num_heads, key_dim=key_dim, dropout=dropout_rate)(x, encoder_out, encoder_out)
    attn_out = Attention()([x,x])
    lnorm = LayerNormalization(epsilon=layernorm_epsilon)(attn_out+x)
    x = Dense(EMBEDDING_DIM // 2, activation='relu')(lnorm)
    x = Dense(EMBEDDING_DIM)(x)
    x = Dropout(dropout_rate)(x)
    #x = LayerNormalization(epsilon=layernorm_epsilon)(lnorm+x)
    return x

In [None]:
#I create the neural network model here.
from tensorflow.keras.layers import MultiHeadAttention, Attention
from tensorflow.keras.layers import Embedding, Reshape, Dropout
from tensorflow.keras.layers import Input, LayerNormalization
from tensorflow.keras.models import Model



input_layer = Input(shape=(SLICE_LEN,1))
x = Embedding(len(mapper), EMBEDDING_DIM, input_length=SLICE_LEN)(input_layer)
x = Reshape((SLICE_LEN, EMBEDDING_DIM))(x)
x = DynamicPositionEmbedding()(x)
enc_out = make_encoder_block(x)

for i in range(encoder_layers-1):
    enc_out = make_encoder_block(enc_out)

for i in range(decoder_layers):
    x = make_decoder_block(x, enc_out)

x = Flatten()(x)
output = Dense(NUM_CLASSES, activation='softmax')(x)
model = Model(input_layer, output)

model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['accuracy'])

training_gen=MyDatagen(range(X_train.shape[0]), dim = SLICE_LEN, batch_size=16)
val_gen=MyDatagen(range(X_val.shape[0]), dim = SLICE_LEN, batch_size=16, validation=True)

model.fit(training_gen, epochs=10000, validation_data=val_gen, callbacks=callbacks)

model.evaluate(X_test, to_categorical(Y_test, num_classes=NUM_CLASSES))

model.save(MODEL_SAVE_PATH + MODEL_NAME + ".h5")

Epoch 1/10000
Epoch 2/10000
Epoch 3/10000
Epoch 4/10000
Epoch 5/10000
Epoch 6/10000
Epoch 7/10000
Epoch 8/10000
Epoch 9/10000
Epoch 10/10000
Epoch 11/10000
Epoch 12/10000
Epoch 13/10000
Epoch 14/10000
Epoch 15/10000
Epoch 16/10000
Epoch 17/10000
Epoch 18/10000
Epoch 19/10000
Epoch 20/10000
Epoch 21/10000
Epoch 22/10000
Epoch 00022: ReduceLROnPlateau reducing learning rate to 0.00020000000949949026.
Epoch 23/10000
Epoch 24/10000
Epoch 25/10000
Epoch 00025: ReduceLROnPlateau reducing learning rate to 5e-05.
Epoch 26/10000
Epoch 27/10000
Epoch 28/10000
Epoch 00028: early stopping


In [None]:
import gc
gc.collect()

1937509