In [42]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import LSTM
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint
from IPython.display import clear_output
import numpy as np
import json

%run midi_utils.ipynb

gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [43]:
# https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence
# Dynamically grabs data for the model, since the whole thing wouldn't fit into memory all at once.

class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, training_data, batch_size, num_classes, shuffle=True):
        self.training_data = training_data
        self.batch_size = batch_size
        self.num_classes = num_classes
        self.shuffle = shuffle

    def __len__(self):
        # returns the number of batches
        return int(np.floor(len(training_data) / self.batch_size))

    def __getitem__(self, idx):
        X = [i.split(", ")[0].split(" ") for i in self.training_data[idx * self.batch_size:(idx + 1) * self.batch_size]]
        X = [[int(integer) for integer in integers] for integers in X]
        y = [i.split(", ")[1] for i in self.training_data[idx * self.batch_size:(idx + 1) * self.batch_size]]
        y = [int(integer) for integer in y]

        return to_categorical(X, num_classes=self.num_classes), to_categorical(y, num_classes=self.num_classes)

    def on_epoch_end(self):
        if self.shuffle == True:
            np.random.shuffle(self.training_data)

In [44]:
# Load vocabulary
vocabulary = {token: int(token_int) for token, token_int in json.load(open("./dictionary.json")).items()}

# Count the lines in training_data
with open("./training_data_preprocessed.txt") as f:
    training_data = f.read().splitlines()

# FOR TESTING THE LEARNING CAPABILITY OF THE MODEL
training_data = training_data[:100000]
    
# Instantiate generator with batch size 512, shuffling the data each epoch
training_generator = DataGenerator(training_data, 512, len(vocabulary), True)

In [45]:
# Config to save model after every epoch if it is better than all previous ones in terms of minimal loss
filepath = "../models/simple/SimpleLSTM-{epoch:02d}-{loss:.4f}.hdf5"
checkpoint = ModelCheckpoint(
    filepath, monitor='loss',
    verbose=0,
    save_weights_only=False,
    save_best_only=True,
    mode='min'
)

In [46]:
# If you want to continue training an existing model, load it here
model = tf.keras.models.load_model("../models/simple/SimpleLSTM-08-1.7862.hdf5")

In [8]:
# If you want to start training from scratch, instantiate the model here
model = Sequential()
model.add(LSTM(256, input_shape=(100, len(vocabulary),)))
model.add(Dropout(0.2))
model.add(Dense(len(vocabulary), activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam')

In [47]:
model.fit(training_generator,
          epochs=100,
          #use_multiprocessing=True, can't use this in a jupyter notebook ¯\_(ツ)_/¯ 
          callbacks=[checkpoint],
          workers=6)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100

KeyboardInterrupt: 

In [96]:
def print_loading(i, n_tokens_to_generate, stop_at_EOS):
    clear_output(wait=True)
    if not stop_at_EOS:
        print(str(i), "/", str(n_tokens_to_generate), "generated")
        return
    print(str(i), "/ ?", "generated")

def generate_music(model, vocab_size, vocabulary, n_tokens_to_generate, stop_at_EOS = False):
    
    # Keeps track of the number of tokens generated so far
    i = 0
    
    # Used as input, where the first input is a bunch of random tokens from the vocabulary 
    # It's sliding because the predicted token will be constantly appended to the input
    # [0, 1, 2] -predict-> [3] 
    # [1, 2, 3] -predict-> [4]
    # [2, 3, 4] and so on
    sliding_window = [np.random.randint(0, vocab_size, size=100).tolist()]
    
    # Inverse of the vocabulary, because the tokens in integer form need to be converted back to tokens
    int_to_token_dict = dict(map(reversed, vocabulary.items()))
    
    # List that holds the final output. Grows by each prediction.
    prediction_output = []
    
    while True:
        # Convert to the same format as the one the model saw during training
        prediction_input = to_categorical(sliding_window, num_classes = vocab_size)

        # Predict next token depending on the current sequence 
        prediction = model(prediction_input)
        i += 1
        
        # Get the integer variant of the token
        index = np.argmax(prediction)

        # Check if previous tokens were "varied" enough: if they had at least 15 unique tokens.
        # If not, choose a random prediction from the top 2 predictions. This avoids getting stuck in a short melody.
        if (len(np.unique(sliding_window)) < 40):
            indexes_of_top2_predictions = np.argpartition(prediction[0], -2)[-2:]
            index = np.random.choice(indexes_of_top2_predictions)
        
        # Grab the token variant of the integer and append the resulting token to prediction output
        result = int_to_token_dict[index]
        prediction_output.append(result)
        
        # Slide the input 1 int to the right, appending the current prediction and removing one token from the start,
        # so the sequence length will stay the same
        sliding_window = np.append(sliding_window, index)
        sliding_window = [sliding_window[1:len(sliding_window)]]
        
        # A loading bar for the impatient
        print_loading(i, n_tokens_to_generate, stop_at_EOS)
        
        if (stop_at_EOS and result == "<EOS>") or (i == n_tokens_to_generate):
            break
            
    return prediction_output

In [54]:
generated_tokens = generate_music(model, len(vocabulary), vocabulary, 3000)
convert_tokens_to_midi(generated_tokens).show("midi")

3000 / 3000 generated


In [97]:
for i in range(0, 3):
    midi_filepath = "../generated_samples/sample" + str(i) + ".mid"
    
    generated_tokens = generate_music(model, len(vocabulary), vocabulary, 5000)
    generated_midi_stream = convert_tokens_to_midi(generated_tokens)
    generated_midi_stream.write('midi', fp=midi_filepath)

5000 / 5000 generated
