# Test with a holdout dataset
This notebook contains only a subset of the code present in the "*solution notebook*" to simplify the testing on the holdout dataset.

You just need to execute all the cells that follows these steps:
1.   Upload all the needed file on the notebook or connect it to a Google Drive account in which these files are accesible. 
2.   Extract features from samples and save them loacally on the VM.
3.   Load the features extracted and give them as input to the ensemble model or chose the model you want to test and then feed it with the extracted features. 
4. Save the result on a csv file. 

In [None]:
# Import all the needed libraries
import os
import tqdm
import librosa
import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow import keras
from google.colab import files
from tensorflow.keras import layers
from tensorflow.keras.utils import normalize
from sklearn.preprocessing import StandardScaler

In [None]:
# Gloval variable definition 
TARGET_COLUMN = "emotion"
NUM_CLASSES = 7
CLASS_NAMES = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sadness', 'surprise']
TEST_SET_PREDICTIONS_FILE='/path/to/the file with prediction'   #@param {type: "string"}

# Dictionaries that maps class code to class name and vice versa
id2label = {str(i): label for i, label in enumerate(CLASS_NAMES)}
label2id = {v: k for k, v in id2label.items()}

# Get needed file
I have prepared two ways to get the file needed:


1.  **Upload data** from your file system.
2.  **Connect a Google Drive** in which you have this directory to the Notebook to get data.

Execute one of the following section, depending on your preference.
The value of some global variable depends on this choice.



## Upload data from your FS

Upload the dataframe with the holdout test along the directory with the audio files and the ensemble models directory, which contains all the best trained model I have submitted.

In [None]:
# upload the csv with the test set
uploaded = files.upload()

In [None]:
# upload the ensemble model directory
uploaded = files.upload()

In [None]:
# upload the stacking model directory
uploaded = files.upload()

In [None]:
# upload the all_models model directory
uploaded = files.upload()

In [None]:
# upload the directory with the audio files
uploaded = files.upload()

In [None]:
# Global variable definition
TEST_FILE = "2022challengeA_test.csv"
TEST_DIR = "test/"

ENSEMBLE_MODELS_PATH = "ensemble_models/"
STACKING_MODELS_PATH = "stacking_models/"
ALL_MODELS_PATH = "all_models/"

# List with all the names of models that join the ensemble
MODELS_TO_ENSEMBLE = [model_name for model_name in os.listdir(ENSEMBLE_MODELS_PATH) if 'h5' in model_name]

## Connect Google Drive

In [None]:
# Connect to Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Global variable definition
TEST_FILE = "path/to/your/csv with the test set"  #@param {type: "string"}
TEST_DIR = "path/to/your/directory with all the audios present in the test set"   #@param {type: "string"}                 

ALL_MODELS_PATH = "path/to/your/directory with all the models"        #@param {type: "string"}
STACKING_MODELS_PATH = "path/to/your/directory with all the stacking models"  #@param {type: "string"}
ENSEMBLE_MODELS_PATH = "path/to/your/directory with all the ensemble models"  #@param {type: "string"}

# List with all the names of models that join the ensemble
MODELS_TO_ENSEMBLE = [model_name for model_name in os.listdir(ENSEMBLE_MODELS_PATH) if 'h5' in model_name]

# Extract Feature

In [None]:
def extract_feature_1D(file_name, mfcc=True, chroma=True, mel=True):
  '''Extract 1D features from the audio file given as input..
    Parameters
    ----------
    file_name: audio file to be processed.
    
    mfcc: boolean to specify if the Mel-frequency cepstral coefficients features has to be extracted.

    chroma: boolean to specify if the chromagram features has to be extracted.

    mel: boolean to specify if the log mel spectogram features has to be extracted.

    Return
    ----------
    features: numpy.ndarray with the extracted features.
  '''
  X, sample_rate = librosa.load(file_name)
  features=np.array([])  
  if mfcc:
    mfccs=np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T, axis=0)
    features=np.hstack((features, mfccs))
  if chroma:
    stft=np.abs(librosa.stft(X))
    chroma=np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T,axis=0)
    features=np.hstack((features, chroma))
  if mel:
    mel=np.mean(librosa.feature.melspectrogram(X, sr=sample_rate).T,axis=0)
    features=np.hstack((features, mel))

  return features


def extract_feature_2D(path, n_fft, hop_length, n_mels):
    """ Extract log mel spectrogram to audio file, which are all padded to 8 seconds to get same length features.
    Return:
        log_mel_spectrogram: nd.array with the log mel spectogram of the audio specified with the path
    """
    y, sr = librosa.load(path, sr=16000, duration=8)

    file_length = np.size(y)
    if file_length != 128000:
        y = np.concatenate((y, np.zeros(128000-file_length)), axis=0)

    mel_spectrogram = librosa.feature.melspectrogram(y, sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    log_mel_spectrogram = librosa.amplitude_to_db(mel_spectrogram)
    log_mel_spectrogram = log_mel_spectrogram.reshape((-1,))

    return log_mel_spectrogram


def extract_features(annot, feature_type):
  '''Extract and save and save in a file the features from all the audios contained in the dataset, which has to be passed as a pandas dataframe.
    Parameters
    ----------
    annot: pandas dataframe which contains the dataset with labels on column "emotion" and file name in the column "file_id".

    
    feature_type: type of feature you need, Possibilities are "1D" for 1D-feature and "2D" for 2D-features.

    Return
    ----------
    dataset: tf.data.Dataset cached and batched
    features2D: numpy.ndarray with the 2D features needed.
    labels: numpy.ndarray with the ground thruth.
  '''
  files = annot['file_id'].to_list()
  features = []

  for i in tqdm.tqdm(range(len(files))):
    try:
      file = files[i]
      fname = os.path.join(TEST_DIR, file)
      
      if feature_type == "1D":
          sample = extract_feature_1D(fname, mfcc=True, chroma=False, mel=False)
      elif feature_type == "2D":
        sample = extract_feature_2D(fname, n_fft=2048, hop_length=512, n_mels=128)

      features.append(sample)
    except Exception as e:
      print("\n", file)
      print(e)
    
  features = np.array(features)
  
  if feature_type == "2D":
    features = features.reshape(-1, 128, 251, 1)

  feature_file = feature_type + ".npy" 

  with open(feature_file, 'wb') as f:
    np.save(f, features)


def get_features(feature_type):
  '''Read the features stored in the npy, scale and return them as a numpy.ndarray'''
  with open(feature_type + ".npy", 'rb') as f:
    features = np.load(f)
  
  features = scale_features(features)
  return features


def scale_features(features):
  '''Scale the features given as input using the standard scaler, thus obtaining zero-mean and unitary std'''
  scaler = StandardScaler()
  
  features_shape = features.shape
  
  features = np.reshape(features, (features_shape[0],-1)) 
  scaled_features = scaler.fit_transform(features)
  scaled_features = np.reshape(scaled_features, features_shape)

  return scaled_features


def weighted_average(models_scores):
  '''Returns the weighted score predictions.'''
  # Use the weights found with the Genetic Algorithm 
  weights=[0.2011812,  0.27912898, 0.12519793, 0.07452854, 0.1335278,  0.18643554]
  weighted_scores = []
  for i in range(len(models_scores[0])):
    weighted_score = [weight*scores[i] for weight, scores in zip(weights, models_scores)]
    weighted_average = np.sum(weighted_score, axis = 0)
    weighted_scores.append(weighted_average)
  return weighted_scores


class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
        super(TransformerBlock, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.rate = rate
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs, inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

    def get_config(self):
        config = super().get_config().copy()
        config.update({
              'embed_dim': self.embed_dim,
              'num_heads': self.num_heads,
              'ff_dim': self.ff_dim,
              'rate': self.rate
        })
        return config

In [None]:
test_annot = pd.read_csv(TEST_FILE, index_col=0)
extract_features(test_annot, "1D")   # extract features and save them in a file called 1D.npy
extract_features(test_annot, "2D")   # extract features and save them in a file called 2D.npy

# Ensemble

In [None]:
# Define an array with all the ensemble models loaded from the directory "ensemble_models"
models = []
for i, model_name in enumerate(MODELS_TO_ENSEMBLE):
  model = tf.keras.models.load_model(os.path.join(ENSEMBLE_MODELS_PATH, model_name), custom_objects={"TransformerBlock": TransformerBlock})
  model._name = f'model_{i}'
  models.append(model)

In [None]:
# Get the features saved in the files 1D.npy and 2D.npy
MONO_DIM_TEST_DS = get_features("1D")   
TWO_DIM_TEST_DS = get_features("2D")

FUSED_TEST_DS = {"features_1D": MONO_DIM_TEST_DS, "features_2D": TWO_DIM_TEST_DS}

In [None]:
# Define a model with the predictions (probability distribution over all the labels) of all the models ensemble 
TEST_SCORES = []
for i, model_name in enumerate(MODELS_TO_ENSEMBLE):
  if '1D' in model_name:
    TEST_SCORES.append(models[i].predict(MONO_DIM_TEST_DS, verbose=1))
  elif 'fused' in model_name :
    TEST_SCORES.append(models[i].predict(FUSED_TEST_DS, verbose=1))
  else:
    TEST_SCORES.append(models[i].predict(TWO_DIM_TEST_DS, verbose=1))

In [None]:
# Make an average of all the predictions using the weights found with the genetic algorithm
ensemble_scores = weighted_average(TEST_SCORES)

# Select the label to assign to each sample as the one with the highest probability 
ensemble_predictions = np.argmax(ensemble_scores, axis=1)

In [None]:
# Convert the code of the label predicted with the name of the class
test_annot = pd.read_csv(TEST_FILE, index_col=0)
label_prediction = [id2label[str(prediction)] for prediction in ensemble_predictions]
test_annot['predicted_emotion'] = label_prediction

# Store the result into a pandas Dataframe
test_annot.to_csv(TEST_SET_PREDICTIONS_FILE, index=False)
test_annot = pd.read_csv(TEST_SET_PREDICTIONS_FILE)
test_annot

Unnamed: 0,file_id,origin,predicted_emotion
0,030472df-9d70-4d76-a1a5-acb4c33537d3.wav,crema,sadness
1,ac4720de-e0d9-4667-86a7-4236d410ed25.wav,crema,happy
2,264928af-cb15-4125-abf7-9408369d83b2.wav,crema,fear
3,2233ce2b-35ae-483c-9397-1058f681b6ef.wav,crema,disgust
4,472aa1eb-b4dc-452c-84b7-934ed61285da.wav,crema,fear
...,...,...,...
1380,1656443f-b726-49a7-b572-534bfdecc6c8.wav,tess,angry
1381,d4243d13-0ba7-41e1-90ad-7881e946fce6.wav,tess,disgust
1382,d92d1675-aba4-4c57-a48e-6ee8037a9d36.wav,tess,neutral
1383,34c3505d-4325-4d3d-8207-723950e7268d.wav,tess,surprise


# Single model

In [None]:
# Get the features saved in the files 1D.npy and 2D.npy
MONO_DIM_TEST_DS = get_features("1D")   
TWO_DIM_TEST_DS = get_features("2D")

FUSED_TEST_DS = {"features_1D": MONO_DIM_TEST_DS, "features_2D": TWO_DIM_TEST_DS}

In [None]:
# Define a dictionary that maps each model with its input 
map_model_to_ds = {'cnn_lstm_1D.h5':MONO_DIM_TEST_DS, 'stacking_ensemble.h5': TWO_DIM_TEST_DS, 'cnn_transformer.h5':TWO_DIM_TEST_DS, 'cnn_lstm_2D.h5':TWO_DIM_TEST_DS, 'cnn_lstm_fused.h5':FUSED_TEST_DS, '2D_lstm_cnn.h5':TWO_DIM_TEST_DS, '1D_lstm_cnn.h5':MONO_DIM_TEST_DS}

In [None]:
# Chose the model you want to load (from the ensemble directory), but if you can change it to the all models directory and test anything you want
MODEL_NAME = "cnn_transformer.h5"

MODEL_TO_LOAD = ENSEMBLE_MODELS_PATH + MODEL_NAME   # you can also change the directory, e.g. STACKING_MODELS_PATH + MODEL_NAME

# Load the model 
model_loaded = tf.keras.models.load_model((MODEL_TO_LOAD), custom_objects={"TransformerBlock": TransformerBlock})

# Get the probability distribution over all the labels for each sample
model_scores = model_loaded.predict(map_model_to_ds[MODEL_NAME])

# Get the code of the class with the highest probability 
model_predictions = np.argmax(model_scores, axis=1)