# Combined Dataset - Feed Forward Neural Network
## openSMILE GeMAPS Featureset

## Import relevant libraries

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torcheval.metrics import R2Score, MeanSquaredError

from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

import math

import sys
sys.path.insert(1, '../../utils')
from paths import *

sys.path.insert(1, '../../models')
from feedforward_nn_combined import NeuralNetworkCombined

## Change featureset here!

In [None]:
featureset = NORMALISED_OPENSMILE_GEMAPS_FEATURES_CSV
featureset_path = f'{COMBINED_EXTRACTED_FEATURES_FOLDER}/scaled/{featureset}'

## Neural Network Training

### Import annotations dataset

In [None]:
df_annotations = pd.read_csv(f'{COMBINED_STATIC_ANNOTATIONS_CSV}')
df_annotations

### Import the featureset

This is where you should change between normalised and standardised, and untouched featuresets!

In [None]:
df_opensmile_gemaps = pd.read_csv(featureset_path)

df_opensmile_gemaps

#### Prepare dataframes for the neural network

Perform splitting of the dataframe into training and testing sets

In [None]:
features = df_opensmile_gemaps.drop('song_id', axis=1)
features

In [None]:
targets = df_annotations.drop('song_id', axis=1)
targets

Perform 80-20 train-test split

In [None]:
## FIXME: Rewrote the dataset as a PyTorch Dataset Class
class MusicEmoDataset(Dataset):

    def __init__(self, df_features, df_targets, train=True):
        self.features = df_features
        self.targets = df_targets

        # Train-Test Split (80/20) consisting of features and targets dataset
        X_train, X_test, y_train, y_test = train_test_split(self.features, self.targets, test_size=0.2, random_state=42)

        if train:
            self.X, self.y = torch.tensor(X_train.values, dtype=torch.float), torch.tensor(y_train.values, dtype=torch.float)
        else:
            self.X, self.y = torch.tensor(X_test.values, dtype=torch.float), torch.tensor(y_test.values, dtype=torch.float)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
    def __len__(self):
        if len(self.X) == len(self.y):
            return len(self.X)
        else:
            raise Exception("Size of Features and Targets do not match.")        

In [None]:
## FIXME: Instantiate DEAM datasets for Train and Test split
MusicEmo_train = MusicEmoDataset(features, targets, train=True)
MusicEmo_test = MusicEmoDataset(features, targets, train=False)
print(len(MusicEmo_train))
print(len(MusicEmo_test))

In [None]:
## FIXME: Setup Dataloader for both splits
train_dataloader = DataLoader(MusicEmo_train, batch_size=64, shuffle=True) # batch_size=len(MusicEmo_train)
test_dataloader = DataLoader(MusicEmo_test, batch_size=32) # batch_size=len(MusicEmo_test)

In [None]:
print(len(test_dataloader))

Define neural network parameters and instantitate neural network

In [None]:
## FIXME: What is the purpose of defining this parameters if most of them are not being used? Removed Hidden size as a variable as it is implicitly defined via input_size

input_size = features.shape[1]
output_size = targets.shape[1]  # Output size for valence and arousal
num_epochs = 2000

print(input_size, output_size)

Set a random seed to ensure consistent initial weights of the neural network

In [None]:
# # Set the seed
# seed = 42
# torch.manual_seed(seed)

#### Training

Training loop

In [None]:
def train_one_loop(model, train_dataloader, optimiser, criterion, device):
    # Training Phase
    model.train()
    train_loss = 0.0
    train_count = 0
    
    for batch in train_dataloader:
        # Zero gradients
        optimiser.zero_grad()

        # Unpack batch
        inputs, targets = batch
        inputs_re = inputs.to(device)
        outputs_re = targets.to(device)

        # Forward pass
        preds = model(inputs_re)

        # Calculate loss
        loss = criterion(preds, outputs_re)
        
        # Add to total losses
        train_loss += loss.item() * outputs_re.shape[0]
        train_count += outputs_re.shape[0]

        # Backward pass
        loss.backward()

        # Update weights
        optimiser.step()
    
    # Compute total training loss (RMSE)
    train_loss /= train_count
    train_rmse = math.sqrt(train_loss)

    return train_rmse # Need to check whether to return optimiser, model and criterion

In [None]:
def test_one_loop(model, test_dataloader, device="cpu"):
    # Evaluation Phase
    model.eval()

    full_preds = []
    full_outputs = []
    num_features = 0
    with torch.no_grad():
        for batch in test_dataloader:
            # Unpack batch
            inputs, targets = batch
            inputs_re = inputs.to(device)
            outputs_re = targets.to(device)

            # Capture number of features
            num_features = inputs.shape[1]

            # Forward pass
            preds = model(inputs_re)
            
            # Append outputs together
            full_preds.append(preds)
            full_outputs.append(outputs_re)

    # Combine into a single tensor for preds and outputs
    preds_tensor = torch.cat(full_preds, 0)
    outputs_tensor = torch.cat(full_outputs, 0)
    
    # Calculate Mean Squared Error
    mse_metric = MeanSquaredError(multioutput="raw_values")
    mse_metric.update(preds_tensor, outputs_tensor)
    mse = mse_metric.compute()
    print("Test MSE Metric:", mse)

    average_rmse = torch.sqrt(torch.mean(mse))
    print("Test RMSE:", average_rmse.item())

    valence_rmse = torch.sqrt(mse[0])
    print("Valence RMSE:", valence_rmse.item())

    arousal_rmse = torch.sqrt(mse[1])
    print("Arousal RMSE:", arousal_rmse.item())

    # Calculate R^2 Score
    r2_metric = R2Score(multioutput="raw_values") # can be adjusted using multioutput and num_regressors (adjusted r^2 score)
    r2_metric.update(preds_tensor, outputs_tensor)
    r2_score = r2_metric.compute()
    print(f"Test R^2 Score (Valence, Arousal): {r2_score}")

    combined_r2_score = torch.mean(r2_score)
    print(f"Test R^2 Score (combined): {combined_r2_score.item():.4f}")
    
    # Calculate Adjusted R^2 Score
    r2_metric = R2Score(multioutput="raw_values", num_regressors=num_features) # can be adjusted using multioutput and num_regressors (adjusted r^2 score)
    r2_metric.update(preds_tensor, outputs_tensor)
    adjusted_r2_score = r2_metric.compute()
    print(f"Adjusted Test R^2 Score (Valence, Arousal): {adjusted_r2_score}")

    valence_adj_r2_score = adjusted_r2_score[0]
    arousal_adj_r2_score = adjusted_r2_score[1]

    combined_adj_r2_score = torch.mean(adjusted_r2_score)
    print(f"Adjusted Test R^2 Score (combined): {combined_adj_r2_score.item():.4f}")

    return average_rmse.item(), combined_adj_r2_score.item(), valence_adj_r2_score.item(), arousal_adj_r2_score.item(), preds_tensor, outputs_tensor

In [None]:
def train_model(model_kwargs, train_dataloader, test_dataloader, num_epochs, criterion=None, optimiser=None, device="cpu"):
    # Set the seed
    torch.manual_seed(seed=42)

    model = NeuralNetworkCombined(**model_kwargs)

    if optimiser is None:
        optimiser = torch.optim.Adam(model.parameters(), lr=1e-3, betas=(0.9, 0.999), eps=1e-8)
    
    if criterion is None:
        criterion = nn.MSELoss()
    
    rmse_list = []
    r2_scores_list = []
    adjusted_r2_scores_valence_list = []
    adjusted_r2_scores_arousal_list = []

    for epoch in range(num_epochs):    
        train_rmse = train_one_loop(model, train_dataloader, optimiser, criterion, device)
        test_rmse, adj_r2_score, valence_adj_r2_score, arousal_adj_r2_score, predictions, targets = test_one_loop(model, test_dataloader, device)
        
        rmse_list.append(test_rmse)
        r2_scores_list.append(adj_r2_score)
        adjusted_r2_scores_valence_list.append(valence_adj_r2_score)
        adjusted_r2_scores_arousal_list.append(arousal_adj_r2_score)

        print(f'Epoch {epoch + 1}/{num_epochs}, Training Loss (RMSE): {train_rmse:.4f}, Test Loss (RMSE): {test_rmse:.4f}, Adjusted Test R^2 Score: {adj_r2_score:.4f}', end="\n\n")

    print("Training completed.")
    return model, rmse_list, r2_scores_list, adjusted_r2_scores_valence_list, adjusted_r2_scores_arousal_list, predictions, targets

In [None]:
model_dict = {"input_size": input_size, "output_size":output_size, "dropout_prob":0.01}

In [None]:
trained_model, rmse_list, r2_scores_list, adjusted_r2_scores_valence_list, adjusted_r2_scores_arousal_list, predictions, targets = train_model(model_dict, train_dataloader, test_dataloader, num_epochs)

## Neural Network Testing

Generating scores

In [None]:
test_rmse, adj_r2_score, valence_adj_r2_score, arousal_adj_r2_score, preds_tensor, outputs_tensor = test_one_loop(trained_model, test_dataloader)

## Analyse relationship between epochs and r^2 score

### Plot the graph to visualise the relationship the evaluation metrics

In [None]:
num_epochs_list = [i for i in range(num_epochs)]

#### RMSE vs. num_epochs

In [None]:
plt.plot(num_epochs_list, rmse_list, color='b', linestyle='-')
plt.title('num_epochs vs. Test RMSE')
plt.xlabel('num_epochs')
plt.ylabel('Tets RMSE') 
plt.grid(True)
plt.show() 

In [None]:
min_rmse = min(rmse_list)
corresponding_r2_score = r2_scores_list[rmse_list.index(min_rmse)]
corresponding_num_epochs = num_epochs_list[rmse_list.index(min_rmse)] + 1 

print(f'Min RMSE score: {min_rmse}')
print(f'Corresponding R^2 SCore: {corresponding_r2_score}')
print(f'Corresponding num_epochs: {corresponding_num_epochs}')

Test Adjusted R^2 Score vs. num_epochs

In [None]:
plt.plot(num_epochs_list, r2_scores_list, color='b', linestyle='-')
plt.title('num_epochs vs. Test Adjusted R^2 Score')
plt.xlabel('num_epochs')
plt.ylabel('Test R^2 SCore') 
plt.grid(True)
plt.show() 

In [None]:
max_r2_score = max(r2_scores_list)
corresponding_rmse = rmse_list[r2_scores_list.index(max_r2_score)]
corresponding_num_epochs = num_epochs_list[r2_scores_list.index(max_r2_score)] + 1
optimal_num_epocs = corresponding_num_epochs

print(f'Max R^2 score: {max_r2_score}')
print(f'Corresponding RMSE: {corresponding_rmse}')
print(f'Corresponding num_epochs: {corresponding_num_epochs}')

Test Adjusted R^2 Score (Valence) vs. num_epochs

In [None]:
plt.plot(num_epochs_list, adjusted_r2_scores_valence_list, color='b', linestyle='-')
plt.title('num_epochs vs. Test Adjusted R^2 Score (Valence)')
plt.xlabel('num_epochs')
plt.ylabel('Test Adjusted R^2 Score (Valence)') 
plt.grid(True)
plt.show() 

In [None]:
max_r2_score_valence = max(adjusted_r2_scores_valence_list)
corresponding_rmse = rmse_list[adjusted_r2_scores_valence_list.index(max_r2_score_valence)]
corresponding_num_epochs = num_epochs_list[adjusted_r2_scores_valence_list.index(max_r2_score_valence)] + 1

print(f'Max R^2 score: {max_r2_score_valence}')
print(f'Corresponding RMSE: {corresponding_rmse}')
print(f'Corresponding num_epochs: {corresponding_num_epochs}')

#### Test Adjusted R^2 Score (Arousal) vs. num_epochs

In [None]:
plt.plot(num_epochs_list, adjusted_r2_scores_arousal_list, color='b', linestyle='-')
plt.title('num_epochs vs. Test Adjusted R^2 Score (Arousal)')
plt.xlabel('num_epochs')
plt.ylabel('Test Adjusted R^2 SCore (Arousal)') 
plt.grid(True)
plt.show() 

In [None]:
max_r2_score_arousal = max(adjusted_r2_scores_arousal_list)
corresponding_rmse = rmse_list[adjusted_r2_scores_arousal_list.index(max_r2_score_arousal)]
corresponding_num_epochs = num_epochs_list[adjusted_r2_scores_arousal_list.index(max_r2_score_arousal)] + 1

print(f'Max R^2 score: {max_r2_score_arousal}')
print(f'Corresponding RMSE: {corresponding_rmse}')
print(f'Corresponding num_epochs: {corresponding_num_epochs}')

Getting the model with the highest test adjusted R^2 score

In [None]:
trained_model, rmse_list, r2_scores_list, adjusted_r2_scores_valence_list, adjusted_r2_scores_arousal_list, predictions, targets = train_model(model_dict, train_dataloader, test_dataloader, optimal_num_epocs)

### Plot the predicted values vs true values

In [None]:
print(predictions)

In [None]:
print(targets)

In [None]:
# Create a figure and axis
fig, ax = plt.subplots(figsize=(8, 6))

# Plot the tensors from the predictions in orange
for tensor in predictions:
    predictions_scatter = ax.scatter(tensor[0], tensor[1], color='orange')

# Plot the tensors from the targets in green
for tensor in targets:
    targets_scatter = ax.scatter(tensor[0], tensor[1], color='green')

# Set the axis labels and title
ax.set_xlabel('Valence')
ax.set_ylabel('Arousal')
ax.set_title('Scatter Plot of Predictions vs. Targets')

# set the legend
legend_elements = [
    predictions_scatter,
    targets_scatter
]
ax.legend(legend_elements, ['Predictions', 'True Values'], loc='upper left')

# Show the plot
plt.show()

### Save the model weights

In [None]:
# torch.save(trained_model.state_dict(), f'../../models/opensmile_gemaps_normalised/combined_feedforward_nn_opensmile_gemaps_normalised.pt')