# Chroma Histogram Generation Model - training notebook

This notebook implements training of an LSTM model for generating chroma histograms.

It uses two objects from the custom module `model_training_utils`:
- `ChordsDatasetManager` - a class which handles all the necessary pre-processing steps to transform the dataset from the CSV file of melody chroma and chord chroma histograms, into tf.data.Dataset objects.
- `ChordGeneratorModel` - a custom model consisting of three LSTM layers and a dense, fully connected output layer.

### Handle imports and mounting Google Drive

In [None]:
import tensorflow as tf
from tensorflow import keras
import keras
import os
import numpy as np
import pandas as pd
import sklearn
import matplotlib.pyplot as plt
import datetime
import json

# # For running on Colab - import Google Drive
# from google.colab import drive
# drive.mount('/content/drive')

# # Update CWD to project folder
# print(f"Current working directory: {os.getcwd()}")
# if os.getcwd() != "/content/drive/My Drive/Norway/Thesis":
#     os.chdir("./drive/My Drive/Norway/Thesis/")
#     print(f"Working directory updated to: {os.getcwd()}")

# Load the dataset handler
from model_training_utils.dataset_manager import ChordsDatasetManager

# Load the untrained model
from model_training_utils.chord_generator_model import ChordGeneratorModel

### Print useful info about Tensorflow including connected GPU/TPU to runtime

In [None]:
print(f"TensorFlow version: {tf.__version__}")
print("Built with CUDA:", tf.test.is_built_with_cuda())
print("# of CPUs Available: ", len(tf.config.list_physical_devices('CPU')))
print("# of GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# Check TPU availability
tpu_available = False
devices = tf.config.list_logical_devices()
for device in devices:
    if device.device_type == 'TPU':
        tpu_available = True
        break

print(f"TPU available: {tpu_available}")

### Define handy functions for getting system time (in GMT if using Google Colab) and creating necessary save paths

In [None]:
def now():
    # Get current date and time and generate string
    now = datetime.datetime.now()
    now_string = now.strftime("%Y-%m-%d_%H-%M-%S")

    return now_string


def model_save_paths(sequence_length, batch_size, learning_rate, LSTM_dropout, epochs):
    model_save_location = f"./trained_models/{now()}_chord_generator_sq-{sequence_length}_btch-{batch_size}_lr-{learning_rate}_dropout-{LSTM_dropout}_epoch-{epochs}"
    history_save_location = f"./trained_models/history/{now()}_chord_generator_sq-{sequence_length}_btch-{batch_size}_lr-{learning_rate}_dropout-{LSTM_dropout}_epoch-{epochs}.json"

    return (model_save_location, history_save_location)

### Define model parameters

In [None]:
test_size = 0.3

sequence_length = 8
batch_size = 64
learning_rate = 0.0001
dropout = 0.375

num_inputs = 13
num_outputs = 12

lstm1_units = 512
lstm2_units = 1024
lstm3_units = 512

input_shape = (None, sequence_length, num_inputs)

### Load and process dataset - requires a chroma histograms dataset (not included in this repo)

In [None]:
dataset_path = "./datasets/Lakh/Lakh_chords_dataset_2024_ALL.csv"

# Load dataset
dataset_manager = ChordsDatasetManager(dataset_path)

# Format dataset
dataset_manager.format_dataset()

# Split dataset into train and test sections
dataset_manager.test_train_split(test_size=test_size, sequence_length=sequence_length, batch_size=batch_size)

# Get dataset components as tf.data.Dataset objects
dataset_train = dataset_manager.get_training_data()
dataset_test = dataset_manager.get_test_data()

# Print raw dataset DataFrame
dataset_manager.get_raw_dataset()

### Create, build, compile, and analyse model

In [None]:
# Initialise model
model = ChordGeneratorModel(
    lstm1_units,
    lstm2_units,
    lstm3_units,
    num_inputs=num_inputs,
    num_outputs=num_outputs,
    dropout=dropout
)

model.build(input_shape)

# Define evaluation metrics
metric_accuracy = tf.keras.metrics.Accuracy()
metric_mse = tf.keras.metrics.MeanAbsoluteError()
metric_mspe = tf.keras.metrics.MeanAbsolutePercentageError()
metric_r2 = tf.keras.metrics.R2Score()
metric_rmse = tf.keras.metrics.RootMeanSquaredError()
metrics = [metric_accuracy, metric_mse, metric_mspe, metric_r2, metric_rmse]

# Compile model
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
    loss=tf.keras.losses.MeanSquaredLogarithmicError(),
    metrics=metrics
)

# Print model summary
model.summary()

### Train model

In [None]:
# Set number of training opochs
epochs = 25

# Define callbacks
callback_stop = tf.keras.callbacks.EarlyStopping(monitor='loss', patience=10)
callback_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=f"./trained_models/model_checkpoints/{now()}_chord_generator_colab_sql-{sequence_length}_btch-{batch_size}_lr-{learning_rate}_dr-{dropout}",
    verbose=1
)

# Train model
history = model.fit(
    dataset_train,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=[callback_stop, callback_checkpoint]
)

### Save trained model and model history to Google Drive

In [None]:
# Generate save paths for model and history json
model_save_path, history_save_path = model_save_paths(sequence_length, batch_size, learning_rate, dropout, epochs)

# Save model
model.save(model_save_path)
print(f"Saved model to {model_save_path}")

# Save model history dict to json
model_history_data = history.history
json.dump(model_history_data, open(history_save_path, "w+"))
print(f"Saved model history to {history_save_path}")

### Run trained model on test dataset, then evaluate model

In [None]:
output_data_pred = model.predict(dataset_test)
output_data_test = dataset_manager.get_output_data_test()

#R2 score
print(f"Coefficient of determination (r2 score): {sklearn.metrics.r2_score(output_data_test[:output_data_pred.shape[0]], output_data_pred.reshape(-1,12))}")

# Print sum of each chroma column from test set
for i, column in enumerate(output_data_pred.T):
    print(i, ":\t", column.sum() / output_data_pred.sum())

# # Plot total loss for model
loss_plot_path = f"./trained_models/loss_plots/{now()}_chord_generator_loss_plot_sq-{sequence_length}_btch-{batch_size}_lr={learning_rate}_dropout-{dropout}_epoch-{epochs}.png"
plt.figure(figsize=(6,6))
plt.plot(history.epoch, history.history['loss'], label='total loss')
plt.xlabel("Epoch")
plt.ylabel("Total Loss")
plt.title("Total Loss for LSTM Chord Generation Model")
plt.savefig(loss_plot_path, bbox_inches="tight", dpi=100, transparent=True)
plt.show()

### Print evaluation results

In [None]:
score = model.evaluate(dataset_manager.get_test_data())

for i, score_result in enumerate(score):
    if i == 0:
        print(f"loss: {score_result}")
    else:
        print(f"{metrics[i-1].name}: {score_result}")