In [1]:
import ast
import pandas as pd
import numpy as np
import sklearn
import joblib
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import cross_validate
from sklearn.model_selection import RepeatedKFold, GridSearchCV
from sklearn import preprocessing, decomposition, manifold
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

#plotting
import matplotlib
import matplotlib.pyplot as plt

#3d plotting
from mpl_toolkits.mplot3d import Axes3D
%matplotlib qt
#plt.ion()

# Gather data (.csv) produced 

In [2]:
#read the data as pandas dataframe
path_to_features = "./datasets/bach_features.csv"
path_to_ratings = "./datasets/bach_ratings.csv"
data = pd.read_csv(path_to_features)
rating = pd.read_csv(path_to_ratings)

#Convert dataset to np array
features = data[['track_name','duration_ms','energy','danceability',
                 'loudness','valence','tempo','time_signature','pitch_avg', 
                 'timbre_avg','key_change_percentage','mode_avg']].to_numpy()

target = rating['rating']

# Some dataset reformatting

In [3]:
cleaned_features = features

# Substitute titles with integers
for i in range(cleaned_features.shape[0]):
    cleaned_features[i][0] = i

#convert the timbre and pitch vectors, which are actually strings in the dataset imported, to lists.
for row in range(cleaned_features.shape[0]):
    for col in range(cleaned_features.shape[1]):
        if type(cleaned_features[row][col]) == str:
            cleaned_features[row][col] = ast.literal_eval(cleaned_features[row][col])

In [4]:
#The do a little "hack" to unpack the lists within the feature array,
#and subesequently extent the coloumn size of the feature array.
def flatten(x):
    for item in x:
        try:
            yield from flatten(item) #if x has a member (item) it means its a a list or array, therefore we feed the item back into the function.
        except TypeError: #so if x has no members to iterate on (i.e its a float or integer), we return it (yield)
            yield item

#the list features are actually imported as string, so we need to convert them back
array = np.empty([])
for i in range(cleaned_features.shape[0]):
    row = list(flatten(cleaned_features[i]))
    row = [round(elem, 2) for elem in row]
    row = np.array(row)
    if i == 0:
        array = row
    else:
        array = np.vstack((array, row))

cleaned_features = array

In [5]:
print(cleaned_features[0])
print('**************')
print(cleaned_features.shape)
print(target.shape)
target

[ 0.00000e+00  2.19947e+05  2.00000e-02  1.70000e-01 -2.78800e+01
  7.00000e-02  1.35470e+02  4.00000e+00  3.40000e-01  2.40000e-01
  3.20000e-01  1.40000e-01  3.10000e-01  2.40000e-01  1.40000e-01
  2.90000e-01  1.30000e-01  3.90000e-01  2.60000e-01  1.40000e-01
  2.70900e+01 -2.55000e+00  4.36200e+01 -1.15400e+01  4.74600e+01
 -3.23900e+01  5.96000e+00  1.30000e+00  9.31000e+00  5.99000e+00
 -9.92000e+00 -8.05000e+00  9.00000e+01  6.00000e-01]
**************
(100, 34)
(100,)


0     7
1     8
2     7
3     8
4     8
     ..
95    3
96    3
97    4
98    4
99    3
Name: rating, Length: 100, dtype: int64

# Dimensionality reduction

## Find how many components you need

In [6]:
#Keep 99% of the variance from the original features

feat_variance = np.var(cleaned_features, axis=0).sum()
for i in range(cleaned_features.shape[1]):
        temp = np.var(cleaned_features[:,0:i+1], axis=0).sum()
        percentage = temp/feat_variance
        if percentage > 0.99:
            print("componenets needed: ", i+1)
            print("reached: ", percentage, "%")
            break

componenets needed:  2
reached:  0.9999996347104886 %


### LDA

In [7]:
lda = LinearDiscriminantAnalysis(n_components=2)
lda.fit(cleaned_features, target)
projected_features = lda.transform(cleaned_features)

#joblib_file = "dimred.pkl"
#joblib.dump(lda, joblib_file)

print(projected_features.shape)

(100, 2)


### Scaling the feature data

In [8]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaler.fit(projected_features)

#joblib_file = "scaler.pkl"
#joblib.dump(scaler, joblib_file)

projected_features = sklearn.preprocessing.scale(projected_features)

# Grid search

In [9]:
mlp = MLPRegressor()
crossval = RepeatedKFold(n_splits=5, n_repeats=20)
params = {
    'activation': ['identity', 'logistic', 'tanh', 'relu'],
    'hidden_layer_sizes': [[1], [2], [2, 3, 2], [2,3,4,3,2], [2, 2]],
    'max_iter': [2000, 5000, 10000, 20000]
}

gd_sr = GridSearchCV(estimator=mlp,
                     param_grid=params,
                     scoring='r2',
                     cv=crossval,  
                     n_jobs=-1) 

gd_sr.fit(projected_features, target)

print('best set of parameters', gd_sr.best_params_)
print('associated best score',gd_sr.best_score_)


best set of parameters {'activation': 'logistic', 'hidden_layer_sizes': [2, 2], 'max_iter': 10000}
associated best score 0.7492433337451194


# MLP instance

In [12]:
feat_train, feat_test, tar_train, tar_test = train_test_split(projected_features, target, test_size=0.2, random_state=5)
mlp = MLPRegressor(hidden_layer_sizes=(2, 2), max_iter=20000, activation='logistic', verbose=False)
mlp.fit(feat_train, tar_train)
tar_pred = mlp.predict(feat_test)

#joblib_file = "model.pkl"
#joblib.dump(mlp, joblib_file)


print('Mean squared error: %.4f'% sklearn.metrics.mean_squared_error(tar_test, tar_pred))
print('Mean absolute error: %.4f'% sklearn.metrics.mean_absolute_error(tar_test, tar_pred))
print('Median absolute error: %.4f'% sklearn.metrics.median_absolute_error(tar_test, tar_pred))
print('Coefficient of determination (r2 score): %.4f'% sklearn.metrics.r2_score(tar_test, tar_pred))
print('Explained variance score: %.4f'% sklearn.metrics.explained_variance_score(tar_test, tar_pred))
print('r2 score on individual targets',sklearn.metrics.r2_score(tar_test, tar_pred, multioutput='variance_weighted') )

Mean squared error: 1.3170
Mean absolute error: 0.9543
Median absolute error: 0.9196
Coefficient of determination (r2 score): 0.7631
Explained variance score: 0.7632
r2 score on individual targets 0.7631341161606593


# Evaluating the model

In [13]:
mlp = MLPRegressor(hidden_layer_sizes=(2, 2), max_iter=20000, activation='logistic', verbose=False)
cv = RepeatedKFold(n_splits=5, n_repeats=20)
scores = cross_validate(mlp, projected_features, target, cv=cv ,scoring=('neg_mean_squared_error', 'r2'),return_train_score=True)

print('MSE mean and variance', abs(np.mean(scores['test_neg_mean_squared_error'])),abs(np.var(scores['test_neg_mean_squared_error'])))
print('R2 mean and variance', np.mean(scores['train_r2']),np.var(scores['train_r2']),'\n')

#Compute the fitting degree
print('R2 mean difference from test and training set: ', np.mean(scores['train_r2']-scores['test_r2']))
print('MSE mean difference from test and training set: ', abs(np.mean(scores['train_neg_mean_squared_error']-scores['test_neg_mean_squared_error'])))

MSE mean and variance 1.2198372801317818 0.2512870008910244
R2 mean and variance 0.7951102268777557 0.0004786477643060768 

R2 mean difference from test and training set:  0.05399342385619638
MSE mean difference from test and training set:  0.20736568624666055


## Add various amounts of Noise to do further testing

In [15]:
#An ext_factor (extension factor) of 1 will add 100% more noisy training data, if 2 then 50% is added, etc..
ext_factor = 1
split = 5
size_of_feat_train = int((projected_features.shape[0]/split)*(split-1))
size_of_feat_train = round(size_of_feat_train/ext_factor)

r2_score_feat_ext = np.array([])
r2_score_train = np.array([])
mse_score_feat_ext = np.array([])
mse_score_train = np.array([])

#shuffle the rows of two np.arrays in unison
def unison_shuffle(x, y):
    if len(x) == len(y):
        p = np.random.permutation(len(x))
        return x[p], y[p]
    else:
        print('inputs to unison_shuffle have different lengths..')

mlp = MLPRegressor(hidden_layer_sizes=(2, 2), max_iter=20000, activation='logistic', verbose=False)
rkf = RepeatedKFold(n_splits=split, n_repeats=20)

for train_index, test_index in rkf.split(projected_features, target):
    #creating splits manually
    feat_train, feat_test = projected_features[train_index], projected_features[test_index]
    tar_train, tar_test = target[train_index], target[test_index]
    
    #extending by adding noise
    feat_train_ext = np.append(feat_train, feat_train[:size_of_feat_train,:] + np.random.normal(0, 0.1, (feat_train[:size_of_feat_train,:].shape)), axis=0)
    tar_train_ext = np.append(tar_train, tar_train[:size_of_feat_train], axis=0)
    
    #Shuffling the extended features and labels in unison
    feat_train_ext, tar_train_ext = unison_shuffle(feat_train_ext, tar_train_ext) 

    #training and testing
    mlp.fit(feat_train_ext, tar_train_ext)
    tar_predict_feat_ext = mlp.predict(feat_test)
    tar_predict_train = mlp.predict(feat_train)
    
    #Add the scores to arrays MAKE R2 or neg mean squared error
    r2_score_feat_ext = np.append(r2_score_feat_ext, sklearn.metrics.r2_score(tar_test, tar_predict_feat_ext))
    r2_score_train = np.append(r2_score_train, sklearn.metrics.r2_score(tar_train, tar_predict_train))
    mse_score_feat_ext = np.append(mse_score_feat_ext, sklearn.metrics.mean_squared_error(tar_test, tar_predict_feat_ext))
    mse_score_train = np.append(mse_score_train, sklearn.metrics.mean_squared_error(tar_train, tar_predict_train))


#plotting the loss curve over training iteration 
#plt.plot(mlp.loss_curve_)
#plt.xlabel('iteration')
#plt.ylabel('loss')
#plt.show()    

r2_collected_score_diff = np.array([])
for i in range(r2_score_feat_ext.size-1):
    r2_collected_score_diff = np.append(r2_collected_score_diff, abs(r2_score_feat_ext[i]-r2_score_train[i]))

print('R2 mean and variance of feat_ext (normal method): ', np.mean(r2_score_feat_ext), np.var(r2_score_feat_ext))
print('R2 mean and variance of train: ', np.mean(r2_score_train),np.var(r2_score_train))
print('R2 mean difference: ', abs(np.mean(r2_score_feat_ext)-np.mean(r2_score_train)))
print('Mean of all the differences between the R2 scores: ', np.mean(r2_collected_score_diff),'\n')

mse_collected_score_diff = np.array([])
for i in range(mse_score_feat_ext.size-1):
    mse_collected_score_diff = np.append(mse_collected_score_diff, abs(mse_score_feat_ext[i]-mse_score_train[i]))

print('MSE mean and variance of feat_ext (normal method): ', np.mean(mse_score_feat_ext), np.var(mse_score_feat_ext))
print('MSE mean and variance of train: ', np.mean(mse_score_train),np.var(mse_score_train))
print('MSE mean difference: ', abs(np.mean(mse_score_feat_ext)-np.mean(mse_score_train)))
print('Mean of all the differences between the MSE scores: ', np.mean(mse_collected_score_diff),'\n')


R2 mean and variance of feat_ext (normal method):  0.7444465404840893 0.01158962702661096
R2 mean and variance of train:  0.7941197249031795 0.0005552083690935509
R2 mean difference:  0.04967318441909019
Mean of all the differences between the R2 scores:  0.10016554108837107 

MSE mean and variance of feat_ext (normal method):  1.2224452029915458 0.3075329449669512
MSE mean and variance of train:  1.0181210607000801 0.014921189318534596
MSE mean difference:  0.20432414229146567
Mean of all the differences between the MSE scores:  0.5425875030647367 

