# Recurrent Neural Networks
The following Recurrent Neural Networks (RNN) will be used for musical genre classification. This is because the task of classify all the new music that is released nowadays it is impossible to be done by a human being.

## Initialization

### Libraries

In [ ]:
# Import general purpose python libraries
import os
from pathlib import Path
import numpy as np
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Embedding, LSTM, Bidirectional, Dense, Attention
from keras.optimizers import SGD, Adam

# Import function to plot the results
import plots

### Data Configuration Parameters
Configuration variables related to the data

In [ ]:
# Randomize the initial network weights
random_seed = True

# Paths to where training, testing, and validation images are
database_dir = 'dataset/'
train_dir = 'dataset/training/audio_vectors'
val_dir = 'dataset/val/audio_vectors'
test_dir = 'dataset/test/audio_vectors'

# Directory where to store weights of the model and results
root_dir = "results"
# Create root directory for results if it does not exist
if not os.path.exists(root_dir):
    os.makedirs(root_dir)

# Input dimension (number of subjects in our problem)
num_classes = 6

# Name of each gesture of the database
CLASSES = [x for x in os.listdir(train_dir) if os.path.isdir(os.path.join(train_dir, x))]
#CLASSES = ['Alternative', 'Pop', 'Rock', 'Dance', 'Classical', 'Techno']

# Parameters that characterize the audios
output_dim = 256
sequence_length = 661500

### Configuration Training Parameters & Loading of training, validation and test datasets

In [ ]:
# Parameters that configures the training process
batch_size = 1  # Batch size
epochs = 5  # Number of epochs
initial_lr = 1e-5   # Learning rate
seed = 42  # Random number
modelRNN = 'LSTM'  # RNN model which will be used
version = f'{modelRNN}_BS{batch_size}_E{epochs}_LR{initial_lr}'
experiment_dir = f'{root_dir}/{modelRNN}'
# Create experiment directory if it does not exist
if not os.path.exists(experiment_dir):
    os.makedirs(experiment_dir)

X_train, X_val, X_test = [], [], []
y_train, y_val, y_test = [], [], []

for genre in CLASSES:
    if genre == '.idea':
        continue
    # 1. Generate train dataset (ds) from directory of samples
    dataset = f'{train_dir}/{genre}'
    vectrs = os.listdir(dataset)
    print(f'Number of files in {genre} folder: {len(vectrs)}')
    for vectr in vectrs:
        file, extension = os.path.splitext(vectr)
        if not extension == '.npy': # Sometimes folders contains file automatically generated by the OS.
            print(f'\tNot valid file: {file}')
            vectrs.remove(vectr)
            continue

        data = np.load(vectr)
        X_train.append(data)
        y_train.append(genre)

    # 2. Generate validation dataset (ds) from directory of samples
    dataset = f'{val_dir}/{genre}'
    vectrs = os.listdir(dataset)
    print(f'Number of files in {genre} folder: {len(vectrs)}')
    for vectr in vectrs:
        file, extension = os.path.splitext(vectr)
        if not extension == '.npy':
            print(f'\tNot valid file: {file}')
            vectrs.remove(vectr)
            continue

        data = np.load(vectr)
        X_val.append(data)
        y_val.append(genre)

    # 3. Generate test dataset (ds) from directory of samples
    dataset = f'{test_dir}/{genre}'
    vectrs = os.listdir(dataset)
    print(f'Number of files in {genre} folder: {len(vectrs)}')
    for vectr in vectrs:
        file, extension = os.path.splitext(vectr)
        if not extension == '.npy':
            print(f'\tNot valid file: {file}')
            vectrs.remove(vectr)
            continue

        data = np.load(vectr)
        X_test.append(data)
        y_test.append(genre)

train_ds = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_ds = train_ds.shuffle(buffer_size=len(X_train), seed=seed).batch(batch_size)
print(f'Total files in training dataset:{len(X_train)}')

val_ds = tf.data.Dataset.from_tensor_slices((X_val, y_val))
val_ds = val_ds.shuffle(buffer_size=len(X_val), seed=seed).batch(batch_size)
print(f'Total files in validation dataset:{len(X_val)}')

test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_ds = test_ds.shuffle(buffer_size=len(X_test), seed=seed).batch(batch_size)
print(f'Total files in test dataset:{len(X_test)}')

# Training process
#### Available Models: LSTM & BiLSTM

### Long Short-Term Memory (LSTM)

In [ ]:
def LSTM(num_clases, output_dim, sequence_length):
    # Sequential Model
    model = Sequential()
    
    # Embedding layer
    model.add(Embedding(input_dim=num_clases, output_dim=output_dim, input_length=sequence_length))
    
    # LSTM layer
    model.add(LSTM(output_dim, return_sequences=True))

    # Attention layer
    model.add(Attention())
    
    # Dense layer
    model.add(Dense(1, activation='relu'))
    
    return model

### Bidirectional Long Short-Term Memory (BiLSTM)

In [ ]:
def BiLSTM(num_clases, output_dim, sequence_length):
    # Sequential Model
    model = Sequential()

    # Embedding layer
    model.add(Embedding(input_dim=num_clases, output_dim=output_dim, input_length=sequence_length))

    # LSTM layer
    model.add(Bidirectional(LSTM(output_dim, return_sequences=True)))

    # Attention layer
    model.add(Attention())

    # Dense layer
    model.add(Dense(1, activation='relu'))

    return model

## Model execution

In [ ]:
#Model
if modelRNN == 'LSTM':
    model = LSTM(num_classes, output_dim, sequence_length)
elif modelRNN == 'BiLSTM':
    model = BiLSTM(num_classes, output_dim, sequence_length)
else:
    print('Wrong model selection or Model no available\n')

# Print the architecture of the model
model.summary()

## Set model training process
#### Configuration of several training decisions:
1. Optimizer using `Adam`
2. Model training configuration using `compile` with `categorical_crossentropy` due to the classification labeling

In [ ]:
# Set random seed
if random_seed:
    seed = np.random.randint(0,2*31-1)
else:
    seed = 5
np.random.seed(seed)
tf.random.set_seed(seed)


# 1. Configure optimizer
adam = Adam(learning_rate=initial_lr)

# 2. Configure training process
model.compile(loss='categorical_crossentropy', optimizer=adam, metrics=['categorical_accuracy'])


## Train the model
1. Load parameters from previous trainings if they exist.
2. Fit the model
3. Save the weights

In [ ]:
# Load pretrained model
weights_path = f"weights_{version}.h5" # Name of the file to store the weights
weights_file = Path(weights_path)
weights_load_path = f'{experiment_dir}/{weights_path}'
if weights_load_path:
    try:
        model.load_weights(weights_load_path)
        print("Loaded model from {}".format(weights_load_path))
    except:
        print("Impossible to find weight path. Returning untrained model")

# Fit the model
history = model.fit(train_ds, validation_data=val_ds, epochs=epochs, batch_size=batch_size)

# Save weights
weights_save_path = os.path.join(experiment_dir, weights_path)
model.save_weights(weights_save_path)

## Training Results
Accuracy and Loss obtained along the training process

In [ ]:
plots.accloss(history, modelRNN, experiment_dir, version)

# Testing
### Model Testing
1. Compute the loss function and accuracy for the test data
2. Confusion Matrix obtained from testing results

In [ ]:
# Evaluate model
scores = model.evaluate(test_ds, verbose=0)
print("Accuracy: %.2f%%" % (scores[1]*100))
print("Loss: %.2f" % scores[0])

# Obtain results to present the confusion matrix
prob_class = model.predict(test_ds, batch_size=batch_size)
# Classified labels
y_pred = tf.argmax(prob_class, axis=-1)
# Ground truth
y_true = tf.argmax(tf.concat([label for image, label in test_ds], axis=0), axis=1)
# Visualize confusion matrix                                           
plots.cm(y_true, y_pred, modelRNN, CLASSES, experiment_dir, version)