<a href="https://colab.research.google.com/github/Sanj-R/Music-Genre-Prediction-with-kNNs-and-Random-Forests/blob/main/all_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**CODE FOR DATA EXPLORATION SECTION**

In [None]:
import pandas as pd
import numpy as np

In [None]:
all_train_names_labels = pd.read_csv('train.csv')

In [None]:
genres = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']

In [None]:
duplicates = np.array([[6, 179],
[20, 785],
[49, 431],
[50, 288],
[60, 437],
[82, 390],
[113, 261],
[116, 165],
[123, 395],
[270, 312],
[293, 737],
[343, 584]])

In [None]:
# Create the ground truth y vector containing the genres of all training data as numbers (e.g. 'blues' is 0, 'classical' is 1, etc.)

all_train_numer_labels = []
for i in range(800):
  if i in duplicates[:,1]:
    continue
  genre = all_train_names_labels['Genre'].iloc[i]
  numer_genre = genres.index(genre)
  all_train_numer_labels.append(np.array([numer_genre]))
all_train_numer_labels = np.concatenate(all_train_numer_labels, axis=0)
print(all_train_numer_labels.shape)
print(all_train_numer_labels.dtype)
print(all_train_numer_labels)

In [None]:
# Compute the number of training data songs of each genre

nums_labels = {}
for i in range(all_train_numer_labels.size):
  label = all_train_numer_labels[i]
  nums_labels[label] = nums_labels.get(label, 0) + 1
print(nums_labels)

In [None]:
import scipy

In [None]:
# Identify the sample rates, sample lengths, and the given data types of the training data points

sample_rates = {}
lengths = {}
types = {}

for i in range(800):
  if i in duplicates[:,1]:
    continue
  data = scipy.io.wavfile.read(f'train{i:03}.wav')
  sample_rates[data[0]] = sample_rates.get(data[0], 0) + 1
  lengths[data[1].shape] = lengths.get(data[1].shape, 0) + 1
  types[data[1].dtype] = types.get(data[1].dtype, 0) + 1

print(sample_rates)
print(lengths)
print(types)
print()
print(sorted(list(lengths)))

In [None]:
#min and max durations, and we now know 9 of the samples have < 30 secs duration

print(660000/22050)
print(675808/22050)
print(22050*30)

In [None]:
# trim or extend all the training data to be 30 seconds long

import scipy

all_train_raw_data_matrix = [] #it will also be trimmed/extended to 30 secs for all clips
lengths = set()

for i in range(800):
  if i in duplicates[:,1]:
    continue
  data = scipy.io.wavfile.read(f'train{i:03}.wav')[1]
  if data.size < 661500:
    data = np.pad(data, (0,661500-data.size))
  elif data.size > 661500:
    data = data[:661500]
  data = data.reshape((1,-1))
  lengths.add(data.shape)
  all_train_raw_data_matrix.append(data)

all_train_raw_data_matrix = np.concatenate(all_train_raw_data_matrix, axis=0)
print(lengths)
print(all_train_raw_data_matrix.shape)

**CODE FOR FEATURE EXTRACTION SECTION**

CODE FOR RAW DATA SUBSECTION

In [None]:
#we stratified split 75/25, so overall, 60% train, 20% val, 10% public test, 10% private test

import sklearn.model_selection

train_raw_data_matrix, val_raw_data_matrix, train_labels, val_labels = sklearn.model_selection.train_test_split(all_train_raw_data_matrix, all_train_numer_labels, train_size=0.75, random_state=123456, stratify=all_train_numer_labels)

print(train_raw_data_matrix.shape)
print(val_raw_data_matrix.shape)
print(train_labels.shape)
print(val_labels.shape)
print()

nums_labels = {}
for i in range(train_labels.size):
  label = train_labels[i]
  nums_labels[label] = nums_labels.get(label, 0) + 1
print(nums_labels)

nums_labels = {}
for i in range(val_labels.size):
  label = val_labels[i]
  nums_labels[label] = nums_labels.get(label, 0) + 1
print(nums_labels)

In [None]:
# Store color array for plotting later

import sklearn.decomposition
import matplotlib.pyplot as plt

colors = np.array(['red', 'blue', 'green', 'yellow', 'black', 'gray', 'magenta', 'pink', 'orange', 'brown'])

In [None]:
# Convert raw data into 2-component PCA projection

pca = sklearn.decomposition.PCA(n_components=2)
pca_train_raw_data_matrix = pca.fit_transform(train_raw_data_matrix)

In [None]:
# Plot PCA (2 components) Projection of Centered Raw Data

plt.scatter(pca_train_raw_data_matrix[:,0], pca_train_raw_data_matrix[:,1], c=colors[train_labels])
plt.show()

In [None]:
# Try to identify the indices of the outlying points

for i in range(train_labels.size):
  if pca_train_raw_data_matrix[i,1] > 8*1e6 or pca_train_raw_data_matrix[i,0] > 1*1e7:
    print(i, np.multiply(pca_train_raw_data_matrix[i], np.array([1/1e7, 1/1e6])))

In [None]:
# Plot the scatterplot after Removing Outliers from PCA (2 components) Projection of Centered Raw Data

trim_boolean = np.array(train_labels.size*[True])
trim_boolean[205] = False
trim_boolean[242] = False

plt.scatter(pca_train_raw_data_matrix[trim_boolean][:,0], pca_train_raw_data_matrix[trim_boolean][:,1], c=colors[train_labels[trim_boolean]])
plt.show()

In [None]:
# Plot the PCA (2 components) Projection of Centered Non-Outlier Raw Data

pca = sklearn.decomposition.PCA(n_components=2)
pca_trim_train_raw_data_matrix = pca.fit_transform(train_raw_data_matrix[trim_boolean])

plt.scatter(pca_trim_train_raw_data_matrix[:,0], pca_trim_train_raw_data_matrix[:,1], c=colors[train_labels[trim_boolean]])

In [None]:
# Try to identify the indices of the outlying points from this plot

for i in range(train_labels[trim_boolean].size):
  if pca_trim_train_raw_data_matrix[i,1] > 6*1e6 or pca_trim_train_raw_data_matrix[i,0] > 6*1e6:
    print(i, np.multiply(pca_trim_train_raw_data_matrix[i], np.array([1/1e6, 1/1e6])))

In [None]:
# Plot the scatterplot after Removing Outliers from PCA (2 components) Projection of Centered Non-Outlier Raw Data

trim_boolean2 = np.array(train_labels[trim_boolean].size*[True])
trim_boolean2[186] = False
trim_boolean2[489] = False

plt.scatter(pca_trim_train_raw_data_matrix[trim_boolean2][:,0], pca_trim_train_raw_data_matrix[trim_boolean2][:,1], c=colors[train_labels[trim_boolean][trim_boolean2]])

CODE FOR 3-SECOND AUDIO CLIPS SUBSECTION

In [None]:
# Divide raw training data into 3-second data points

ts_all_train_raw_data_matrix = []
for i in range(0, all_train_raw_data_matrix.shape[1], int(all_train_raw_data_matrix.shape[1]/10)):
  mat_slice = all_train_raw_data_matrix[:,i:i+int(all_train_raw_data_matrix.shape[1]/10)]
  ts_all_train_raw_data_matrix.append(mat_slice)
ts_all_train_raw_data_matrix = np.concatenate(ts_all_train_raw_data_matrix, axis=0)
ts_all_train_numer_labels = np.concatenate(10*[all_train_numer_labels], axis=0).copy()

print(ts_all_train_raw_data_matrix.shape)
print(ts_all_train_numer_labels.shape)

In [None]:
#we stratified split 75/25 (ignoring the 10x splits), so overall, 60% train, 20% val, 10% public test, 10% private test

import sklearn.model_selection

ts_train_raw_data_matrix, ts_val_raw_data_matrix, ts_train_labels, ts_val_labels = sklearn.model_selection.train_test_split(ts_all_train_raw_data_matrix, ts_all_train_numer_labels, train_size=0.75, random_state=123456, stratify=ts_all_train_numer_labels)

print(ts_train_raw_data_matrix.shape)
print(ts_val_raw_data_matrix.shape)
print(ts_train_labels.shape)
print(ts_val_labels.shape)
print()

nums_labels = {}
for i in range(ts_train_labels.size):
  label = ts_train_labels[i]
  nums_labels[label] = nums_labels.get(label, 0) + 1
print(nums_labels)

nums_labels = {}
for i in range(ts_val_labels.size):
  label = ts_val_labels[i]
  nums_labels[label] = nums_labels.get(label, 0) + 1
print(nums_labels)

In [None]:
# Store 3-second data for future use (in reality we didn't need to save this data)

mdic = {'ts_train_raw_data_matrix':ts_train_raw_data_matrix, 'ts_val_raw_data_matrix':ts_val_raw_data_matrix, 'ts_train_labels':ts_train_labels, 'ts_val_labels':ts_val_labels}

import scipy.io

scipy.io.savemat('3sec_train_val_data.mat', mdic)

In [None]:
# Plot the PCA (2 components) Projection of Centered Raw 3-Second Clips

import sklearn.decomposition
import matplotlib.pyplot as plt

colors = np.array(['red', 'blue', 'green', 'yellow', 'black', 'gray', 'magenta', 'pink', 'orange', 'brown'])
pca = sklearn.decomposition.PCA(n_components=2)
pca_train_raw_data_matrix = pca.fit_transform(ts_train_raw_data_matrix)
plt.scatter(pca_train_raw_data_matrix[:,0], pca_train_raw_data_matrix[:,1], c=colors[ts_train_labels])
plt.show()

CODE FOR MFCCs of 5-Second Audio Clips SUBSECTION

In [None]:
# Create the ground truth y vector containing the genres of all training data as numbers (e.g. 'blues' is 0, 'classical' is 1, etc.)
# This one contains duplicates too

all_train_numer_labels = []
for i in range(800):
  genre = all_train_names_labels['Genre'].iloc[i]
  numer_genre = genres.index(genre)
  all_train_numer_labels.append(np.array([numer_genre]))
all_train_numer_labels = np.concatenate(all_train_numer_labels, axis=0)
print(all_train_numer_labels.shape)
print(all_train_numer_labels.dtype)
print(all_train_numer_labels)

In [None]:
# trim or extend all the training data to be 30 seconds long

import scipy

all_train_raw_data_matrix = [] #it will also be trimmed/extended to 30 secs for all clips
lengths = set()

for i in range(800):
  data = scipy.io.wavfile.read(f'train{i:03}.wav')[1]
  if data.size < 661500:
    data = np.pad(data, (0,661500-data.size))
  elif data.size > 661500:
    data = data[:661500]
  data = data.reshape((1,-1))
  lengths.add(data.shape)
  all_train_raw_data_matrix.append(data)

all_train_raw_data_matrix = np.concatenate(all_train_raw_data_matrix, axis=0)
print(lengths)
print(all_train_raw_data_matrix.shape)

In [None]:
# Allows splitting 30 second songs into (30/shrink_factor) second clips

def split_to_smaller_data(X, y, shrink_factor):
  new_X = []
  for i in range(0, X.shape[1], int(X.shape[1]/shrink_factor)):
    mat_slice = X[:,i:i+int(X.shape[1]/shrink_factor)]
    new_X.append(mat_slice)
  new_X = np.concatenate(new_X, axis=0)
  new_y = np.concatenate(shrink_factor*[y], axis=0).copy()
  return new_X, new_y

In [None]:
# Check shapes if data was split to 5 second clips, to make sure it's working correctly

print(split_to_smaller_data(all_train_raw_data_matrix, all_train_numer_labels, 6)[0].shape)
print(split_to_smaller_data(all_train_raw_data_matrix, all_train_numer_labels, 6)[1].shape)

In [None]:
# Save 5-second training data (this turned out to be unnecessary)

mdic = {'X':split_to_smaller_data(all_train_raw_data_matrix, all_train_numer_labels, 6)[0],
        'y':split_to_smaller_data(all_train_raw_data_matrix, all_train_numer_labels, 6)[1]}

import scipy.io

scipy.io.savemat('5sec_train.mat', mdic)

In [None]:
# Load saved 5-second training data

import scipy.io

f_sec_data = scipy.io.loadmat('5sec_train.mat')

all_X = f_sec_data['X'].astype(float)

print(all_X.shape)
del f_sec_data

In [None]:
# Create MFCC features (20 for each 5-second datapoint)

import librosa.feature
import numpy as np

full_mfcc_X = []
cnt = 0
for data_point in all_X:
  a = np.mean(librosa.feature.mfcc(y=data_point), axis=1).reshape((1,-1))
  full_mfcc_X.append(a)
  cnt+=1
  if cnt % 100 == 0:
    print(cnt)

In [None]:
# Convert MFCC data into ndarray, and check shape of ndarray

full_mfcc_X = np.concatenate(full_mfcc_X, axis=0)

print()
print(full_mfcc_X.shape)

In [None]:
# Save MFCC data

mdic = {'X':full_mfcc_X,
        'y':scipy.io.loadmat('5sec_train.mat')['y']}

import scipy.io

scipy.io.savemat('mfcc_5sec_train.mat', mdic)

In [None]:
# Center, Normalize, and shuffle MFCC data

import scipy.io
import sklearn.preprocessing
import sklearn.utils

mfcc_f_sec_data = scipy.io.loadmat('mfcc_5sec_train.mat')

all_mfcc_X = mfcc_f_sec_data['X'].astype(float)
all_y = mfcc_f_sec_data['y'].reshape(-1,)

scaler = sklearn.preprocessing.StandardScaler()
all_mfcc_X = scaler.fit_transform(all_mfcc_X)

all_mfcc_X, all_y = sklearn.utils.shuffle(all_mfcc_X, all_y, random_state=123456)

In [None]:
# Plot PCA (2 components) Projection of Centered MFCCs of 5-Second Clips

import sklearn.decomposition
import numpy as np

pca_2 = sklearn.decomposition.PCA(n_components=2)
pca_2_all_mfcc_X = pca_2.fit_transform(all_mfcc_X)

import matplotlib.pyplot as plt

colors = np.array(['red', 'blue', 'green', 'yellow', 'black', 'gray', 'magenta', 'pink', 'orange', 'brown'])

plt.scatter(pca_2_all_mfcc_X[:,0], pca_2_all_mfcc_X[:,1], c=colors[all_y])
plt.show()

**CODE FOR K-Nearest Neighbors SUBSECTION**


In [None]:
# Compute KNN 5-fold cross validation scores from different hyper-parameter combinations
# Uses MFCC 5-second data

import sklearn.model_selection
import sklearn.neighbors

for n_components in (2,3,5,7,9,11,14,17,20):
  pca = sklearn.decomposition.PCA(n_components=n_components)
  pca_all_mfcc_X = pca.fit_transform(all_mfcc_X)
  for n_neighbors in [1,2,5,10,25,50,100,250,500,1000,2500,3800]:
    knn = sklearn.neighbors.KNeighborsClassifier(n_neighbors=n_neighbors)
    knn_cv = sklearn.model_selection.cross_validate(knn, pca_all_mfcc_X, all_y)
    print(n_components, n_neighbors, knn_cv['test_score'], np.mean(knn_cv['test_score']))
  print()

In [None]:
# load MFCC 5-second training data

import scipy.io
import sklearn.preprocessing
import sklearn.utils

mfcc_f_sec_data = scipy.io.loadmat('mfcc_5sec_train.mat')

all_mfcc_X = mfcc_f_sec_data['X'].astype(float)
all_y = mfcc_f_sec_data['y'].reshape(-1,)

scaler = sklearn.preprocessing.StandardScaler()
all_mfcc_X = scaler.fit_transform(all_mfcc_X)

all_mfcc_X, all_y = sklearn.utils.shuffle(all_mfcc_X, all_y, random_state=123456)

In [None]:
# Output and reshape the 5-second MFCC testing data predictions

import librosa.feature
import numpy as np

test_data_matrix = []

for i in range(200):
  data = scipy.io.wavfile.read(f'test{i:03}.wav')[1]
  if data.size < 661500:
    data = np.pad(data, (0,661500-data.size))
  elif data.size > 661500:
    data = data[:661500]
  data = data.reshape((1,-1))
  test_data_matrix.append(data)

test_data_matrix = np.concatenate(test_data_matrix, axis=0)
print(test_data_matrix.shape)



test_data_matrix = split_to_smaller_data(test_data_matrix, np.zeros((test_data_matrix.shape[0],)), 6)[0]



test_data_matrix = test_data_matrix.astype(float)

test_full_mfcc_X = []
cnt = 0
for data_point in test_data_matrix:
  a = np.mean(librosa.feature.mfcc(y=data_point), axis=1).reshape((1,-1))
  test_full_mfcc_X.append(a)
  cnt+=1
  if cnt % 100 == 0:
    print(cnt)

test_full_mfcc_X = np.concatenate(test_full_mfcc_X, axis=0)

print()
print(test_full_mfcc_X.shape)




small_test_predictions = rf.predict(scaler.transform(test_full_mfcc_X)).reshape((-1,))
print(small_test_predictions.reshape((6,-1)))

In [None]:
# Train the KNN with the optimmal hyperparameter settings

import sklearn.neighbors

knn = sklearn.neighbors.KNeighborsClassifier(n_neighbors=1)
knn.fit(all_mfcc_X, all_y)
knn.score(all_mfcc_X, all_y)

In [None]:
# Predict the 5-second test data with the KNN

small_test_predictions = knn.predict(scaler.transform(test_full_mfcc_X)).reshape((-1,))
print(small_test_predictions.reshape((6,-1)))

In [None]:
# Format test data predictions appropriately

import scipy.stats
import pandas as pd

test_pred = scipy.stats.mode(small_test_predictions.reshape((6,-1)), axis=0).mode

test_predictions_1 = pd.DataFrame(np.concatenate([np.array([f'test{i:03}.wav' for i in range(200)]).reshape((-1,1)), (np.array(genres)[test_pred]).reshape((-1,1))], axis=1), columns=['ID', 'Genre'])

In [None]:
# Output predictions as csv

test_predictions_1.to_csv('knn_mfcc_test_predictions_1.csv', index=False)

**CODE FOR RANDOM FOREST SUBSECTION**

In [None]:
# Try Random Forest on MFCC 5-second data with default hyperparameters, to check if random forest is a viable method

import sklearn.ensemble

rf_cv = sklearn.model_selection.cross_validate(sklearn.ensemble.RandomForestClassifier(), pca_all_mfcc_X, all_y)
print(rf_cv['test_score'], np.mean(rf_cv['test_score']))

In [None]:
# Do Random Forest with different hyperparameter combinations

hyperparam_choices = {
'n_estimators' : [25, 50, 100, 250],
'max_depth' : [3, 5, 10, 15, None],
}

rf_grid_search = sklearn.model_selection.GridSearchCV(sklearn.ensemble.RandomForestClassifier(), param_grid=hyperparam_choices)
rf_grid_search.fit(pca_all_mfcc_X, all_y)
print(rf_grid_search.best_estimator_)
print(rf_grid_search.cv_results_['mean_test_score'])

In [None]:
# Do Random Forest with 10k decision trees (WARNING, THIS TOOK ME 20-30 MIN TO RUN)

hyperparam_choices = {
'n_estimators' : [10000],
}

rf_grid_search = sklearn.model_selection.GridSearchCV(sklearn.ensemble.RandomForestClassifier(), param_grid=hyperparam_choices)
rf_grid_search.fit(pca_all_mfcc_X, all_y)
print(rf_grid_search.best_estimator_)
print(rf_grid_search.cv_results_['mean_test_score'])

In [None]:
# load MFCC 5-second training data

import scipy.io
import sklearn.preprocessing
import sklearn.utils

mfcc_f_sec_data = scipy.io.loadmat('mfcc_5sec_train.mat')

all_mfcc_X = mfcc_f_sec_data['X'].astype(float)
all_y = mfcc_f_sec_data['y'].reshape(-1,)

scaler = sklearn.preprocessing.StandardScaler()
all_mfcc_X = scaler.fit_transform(all_mfcc_X)

all_mfcc_X, all_y = sklearn.utils.shuffle(all_mfcc_X, all_y, random_state=123456)

In [None]:
# Train the Random Forest model with the settings which we found to work before

import sklearn.ensemble

rf = sklearn.ensemble.RandomForestClassifier(n_estimators=10000)
rf.fit(all_mfcc_X, all_y)
rf.score(all_mfcc_X, all_y)

In [None]:
# Output and reshape the 5-second MFCC testing data predictions

import librosa.feature
import numpy as np

test_data_matrix = []

for i in range(200):
  data = scipy.io.wavfile.read(f'test{i:03}.wav')[1]
  if data.size < 661500:
    data = np.pad(data, (0,661500-data.size))
  elif data.size > 661500:
    data = data[:661500]
  data = data.reshape((1,-1))
  test_data_matrix.append(data)

test_data_matrix = np.concatenate(test_data_matrix, axis=0)
print(test_data_matrix.shape)



test_data_matrix = split_to_smaller_data(test_data_matrix, np.zeros((test_data_matrix.shape[0],)), 6)[0]



test_data_matrix = test_data_matrix.astype(float)

test_full_mfcc_X = []
cnt = 0
for data_point in test_data_matrix:
  a = np.mean(librosa.feature.mfcc(y=data_point), axis=1).reshape((1,-1))
  test_full_mfcc_X.append(a)
  cnt+=1
  if cnt % 100 == 0:
    print(cnt)

test_full_mfcc_X = np.concatenate(test_full_mfcc_X, axis=0)

print()
print(test_full_mfcc_X.shape)




small_test_predictions = rf.predict(scaler.transform(test_full_mfcc_X)).reshape((-1,))
print(small_test_predictions.reshape((6,-1)))

In [None]:
# Convert predictions to appropriate format

import scipy.stats
import pandas as pd

test_pred = scipy.stats.mode(small_test_predictions.reshape((6,-1)), axis=0).mode

test_predictions_1 = pd.DataFrame(np.concatenate([np.array([f'test{i:03}.wav' for i in range(200)]).reshape((-1,1)), (np.array(genres)[test_pred]).reshape((-1,1))], axis=1), columns=['ID', 'Genre'])

In [None]:
# Output predictions to csv

test_predictions_1.to_csv('rf_test_predictions_2.csv', index=False)