<a href="https://colab.research.google.com/github/Jpw306/Music-Genre-RNN/blob/main/Music_Genre_RNN_with_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Import Libraries

In [59]:
import numpy as np
import librosa
import matplotlib.pyplot as plt
import pandas as pd
import math
import random

Parse Music Data

In [60]:
# Import CSV File
df = pd.read_csv("train_data.csv")

# Seperate columns into features
chroma_mean = df["chroma_stft_mean"].to_numpy()
chroma_var = df["chroma_stft_var"].to_numpy()
rms_mean = df["rms_mean"].to_numpy()
rms_var = df["rms_var"].to_numpy()
scm = df["spectral_centroid_mean"].to_numpy()
scv = df["spectral_centroid_var"].to_numpy()
sbm = df["spectral_bandwidth_mean"].to_numpy()
sbv = df["spectral_bandwidth_var"].to_numpy()

# Convert label from string to int
map = {}
y = []
for label in df['label']:
  if label not in map:
    map.update({label: len(map)})
  y.append(map.get(label))

Normalize Music Data

In [61]:
features = [chroma_mean, chroma_var, rms_mean, rms_var, scm, scv, sbm, sbv]

# normalize between -1 and 1
def normalize_range(arr):
  arr_min = arr.min()
  arr_max = arr.max()
  for i in range(len(arr)):
    arr[i] = (2 * (arr[i] - arr_min) / (arr_max - arr_min)) -1

# normalize all data
for x in features:
  normalize_range(x)

Some Constants

In [62]:
input_size = 8 # number of features
output_size = len(df["label"].unique()) # number of genres to detect (subject to change)
hidden_size = math.floor(input_size * 2 / 3) + output_size # (2/3 input size) + output size

Some Functions

In [63]:
"""
Given an index of test data, return all features for that sample
"""
def get_features(sample_index):
  return_arr = [x[sample_index] for x in features]
  return np.array(return_arr).reshape(input_size, 1).T

Set up RNN / LSTM

In [64]:
"""WEIGHTS:"""

# calculate range for weights
# using xavier initialization
lower_x, upper_x = -1 / math.sqrt(input_size), 1 / math.sqrt(input_size) # input range for x_t
lower_h, upper_h = -1 / math.sqrt(hidden_size), 1 / math.sqrt(hidden_size) # input range for h_t-1

# function for h_size feedback neurons
def h_size_feedback():
  return np.random.uniform(lower_h, upper_h, (hidden_size, hidden_size))

def x_size_feedback():
  return np.random.uniform(lower_x, upper_x, (input_size, hidden_size))

# notation: (W)eight_(t)o(f)rom
W_fh = h_size_feedback() # weight into forget gate from h_t-1
W_fx = x_size_feedback() # weight into forget gate from x_t (current input)
W_ih = h_size_feedback() # weight into input gate from h_t-1 (For input gate)
W_ix = x_size_feedback() # weight into input gate from x_t (For input gate)
W_ch = h_size_feedback() # weight into input gate from h_t-1 (For candidate memory)
W_cx = x_size_feedback() # weight into input gate from x_t (For candidate memory)
W_oh = h_size_feedback() # weight into output gate from h_t-1
W_ox = x_size_feedback() # weight into output gate from x_t

weights = [W_fh, W_fx, W_ih, W_ix, W_ch, W_cx, W_oh, W_ox]

"""BIASES:"""

# notation: (B)ias_(t)o
b_f = np.ones([hidden_size, 1]) # bias for forget gate
b_i = np.ones([hidden_size, 1]) # bias for input gate
b_c = np.ones([hidden_size, 1]) # bias for candidate memory
b_o = np.ones([hidden_size, 1]) # bias for output gate

biases = [b_f, b_i, b_c, b_o]

Common Functions

In [65]:
def sigmoid(x):
  return 1 / (1 + np.exp(-x))

""" BUILT IN NUMPY FUNCTIONS TO USE """
# np.add() for element wise addition
# np.tanh() for tanh
# np.multiply for element-wise multiplication

' BUILT IN NUMPY FUNCTIONS TO USE '

LSTM Cell

In [66]:
def LSTM_cell(c_t, h_t, x_t):
  # Forget Gate
  f_t = np.dot(W_fh, h_t) + np.dot(x_t, W_fx).T + b_f
  f_out = sigmoid(f_t)
  c_t = np.multiply(c_t, f_out)

  # Input Gate
  i_t = np.dot(W_ih, h_t) + np.dot(x_t, W_ix).T + b_i
  i_out = sigmoid(i_t)
  cand_t = np.dot(W_ch, h_t) + np.dot(x_t, W_cx).T + b_c
  cand_out = np.multiply(i_out, np.tanh(cand_t))
  c_t = np.add(c_t, cand_out)

  # Output Gate
  o_t = np.dot(W_oh, h_t) + np.dot(x_t, W_ox).T + b_o
  o_out = sigmoid(o_t)
  h_t = np.multiply(np.tanh(c_t), o_out)

  return c_t.flatten(), h_t.flatten(), f_out.flatten(), i_out.flatten(), cand_out.flatten(), o_out.flatten()

def LSTM_wrapper(num_rounds, data_index):
  # Define Cell and Hidden states
  c_t_mem = np.zeros([num_rounds + 1, hidden_size])
  h_t_mem = np.zeros([num_rounds + 1, hidden_size])
  f_out_mem = np.zeros([num_rounds + 1, hidden_size])
  i_out_mem = np.zeros([num_rounds + 1, hidden_size])
  cand_out_mem = np.zeros([num_rounds + 1, hidden_size])
  o_out_mem = np.zeros([num_rounds + 1, hidden_size])

  # Create -1st entry for memory lines
  c_t_mem[0] = np.zeros(hidden_size)
  h_t_mem[0] = np.zeros(hidden_size)
  f_out_mem[0] = np.zeros(hidden_size)
  i_out_mem[0] = np.zeros(hidden_size)
  cand_out_mem[0] = np.zeros(hidden_size)
  o_out_mem[0] = np.zeros(hidden_size)

  actual_index = data_index * 10

  for i in range(num_rounds):
    c_t_cur = c_t_mem[i].reshape(hidden_size, 1)
    h_t_cur = h_t_mem[i].reshape(hidden_size, 1)
    f_out_cur = f_out_mem[i].reshape(hidden_size, 1)
    i_out_cur = i_out_mem[i].reshape(hidden_size, 1)
    cand_out_cur = cand_out_mem[i].reshape(hidden_size, 1)
    o_out_cur = o_out_mem[i].reshape(hidden_size, 1)
    c_t_mem[i + 1], h_t_mem[i + 1], f_out_mem[i + 1], i_out_mem[i + 1], cand_out_mem[i + 1], o_out_mem[i + 1] = LSTM_cell(c_t_cur, h_t_cur, get_features(actual_index + i))

  return c_t_mem, h_t_mem, f_out_mem, i_out_mem, cand_out_mem, o_out_mem

Back Propagation Through Time (BPTT)

In [67]:
def bptt(res, x_t, i):
  '''
  res[0] -> c_t_mem
  res[1] -> h_t_mem
  res[2] -> f_out_mem
  res[3] -> i_out_mem
  res[4] -> cand_out_mem
  res[5] -> o_out_mem
  '''

  # Constants
  learningRate = 0.0001

  # Find components of gradient
  delta_b_f = (y - res[1][i]) * res[5][i] * (1 - np.tanh(res[0][i])) * res[0][i - 1] * res[2][i] * (1 - res[2][i])
  delta_b_i = (y - res[1][i]) * res[5][i] * (1 - np.tanh(res[0][i])) * res[0][i - 1] * res[3][i] * (1 - res[3][i])
  delta_b_c = (y - res[1][i]) * res[5][i] * (1 - np.tanh(res[0][i])) * res[0][i - 1] * res[4][i] * (1 - res[4][i])
  delta_b_o = (y - res[1][i]) * np.tanh(res[0][i]) * res[5][i] * (1 - res[5][i])

  b_f = delta_b_f
  b_i = delta_b_i
  b_c = delta_b_c
  b_o = delta_b_o

  # Update weights and biases based off of gradient calculation
  w_fx -= learningRate * delta_b_f * x_t
  w_ix -= learningRate * delta_b_i * x_t
  w_cx -= learningRate * delta_b_c * x_t
  w_ox -= learningRate * delta_b_o * x_t
  w_oh -= learningRate * delta_b_f * res[1][i - 1]
  w_fh -= learningRate * delta_b_i * res[1][i - 1]
  w_ih -= learningRate * delta_b_c * res[1][i - 1]
  w_ch -= learningRate * delta_b_o * res[1][i - 1]

  return

Gradient Descent

In [68]:
for epoch in range(10000):
  for point in range(len(df)):
    if point % 10 == 0:
      res = LSTM_wrapper(10, point)
      for i in reversed(range(10)):
        bptt(res, get_features(point), i+1)

ValueError: operands could not be broadcast together with shapes (2500,) (15,) 

Validating the LSTM RNN

Parse Music Data

In [None]:
# Import CSV File
df = pd.read_csv("test_data.csv")

# Seperate columns into features
chroma_mean = df["chroma_stft_mean"].to_numpy()
chroma_var = df["chroma_stft_var"].to_numpy()
rms_mean = df["rms_mean"].to_numpy()
rms_var = df["rms_var"].to_numpy()
scm = df["spectral_centroid_mean"].to_numpy()
scv = df["spectral_centroid_var"].to_numpy()
sbm = df["spectral_bandwidth_mean"].to_numpy()
sbv = df["spectral_bandwidth_var"].to_numpy()

# Convert label from string to int
y = []
for label in df['label']:
  y.append(map.get(label))

Normalize Music Data

In [None]:
features = [chroma_mean, chroma_var, rms_mean, rms_var, scm, scv, sbm, sbv]

# normalize between -1 and 1
def normalize_range(arr):
  arr_min = arr.min()
  arr_max = arr.max()
  for i in range(len(arr)):
    arr[i] = (2 * (arr[i] - arr_min) / (arr_max - arr_min)) -1

# normalize all data
for x in features:
  normalize_range(x)

Collect expected outputs from RNN

In [None]:
for i in range(len(df)):
  if point % 10 == 0:
    res = LSTM_wrapper(10, point)
    print(res)