In [1]:
import os
import csv
from tqdm import tqdm
from scipy.io import wavfile
import numpy as np
import librosa
from sklearn.preprocessing import MinMaxScaler
import pickle
import tensorflow as tf
import keras.backend as K

wav_dir = '/home/mabikana/Desktop/wav/2021/cleaned_wav/'
pickle_valence_file = '/home/mabikana/Documents/PhD/SER Code/RECOLA Processing/valence.pickle'
pickle_arousal_file = '/home/mabikana/Documents/PhD/SER Code/RECOLA Processing/arousal.pickle'
pickle_spectrogram_scaler_file = '/home/mabikana/Documents/PhD/SER Code/RECOLA Processing/melspectrograms_scaler.pickle'
pickle_arousal_scaler_file = '/home/mabikana/Documents/PhD/SER Code/RECOLA Processing/arousal_scaler.pickle'
pickle_valence_scaler_file = '/home/mabikana/Documents/PhD/SER Code/RECOLA Processing/valence_scaler.pickle'
model_dir = '/home/mabikana/Documents/PhD/SER Code/RECOLA Processing/model.h5'
test_dir = '/home/mabikana/Documents/PhD/SER Code/RECOLA Processing/test'

speech_rate = 44100
dur = 4
dur_multiplier = 25

In [2]:
# create and save spectrograms
def get_log_mel_spectrogram(path, n_fft, hop_length, n_mels):
    audio, sr = librosa.load(path, sr=8000, duration=dur)
    y = librosa.resample(audio, orig_sr=8000, target_sr=speech_rate)
    audio_len = speech_rate * dur
    
    file_length = np.size(y)
    if file_length != audio_len:
        y = np.concatenate((y, np.zeros(audio_len-file_length)), axis=0)
    
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels)
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram)
    log_mel_spectrogram = log_mel_spectrogram.reshape(-1,1)

    return log_mel_spectrogram


# Concordance correlation coefficient (CCC)-based loss function - using non-inductive statistics
def ccc(gold, pred):
    gold       = K.squeeze(gold, axis=-1)
    pred       = K.squeeze(pred, axis=-1)
    gold_mean  = K.mean(gold, axis=-1, keepdims=True)
    pred_mean  = K.mean(pred, axis=-1, keepdims=True)
    covariance = (gold-gold_mean)*(pred-pred_mean)
    gold_var   = K.mean(K.square(gold-gold_mean), axis=-1,  keepdims=True)
    pred_var   = K.mean(K.square(pred-pred_mean), axis=-1, keepdims=True)
    ccc        = K.constant(2.) * covariance / (gold_var + pred_var + K.square(gold_mean - pred_mean) + K.epsilon())
    return ccc


def ccc_loss(gold, pred):  
    # input (num_batches, seq_len, 1)
    ccc_loss   = K.constant(1.) - ccc(gold, pred)

    return ccc_loss


def load_from_pickled_file(file_name):
    with open(file_name, 'rb') as handle:
        file = pickle.load(handle)
    return file

In [3]:
# load model
model = tf.keras.models.load_model(model_dir, custom_objects={"ccc_loss": ccc_loss, "ccc": ccc})

# load scalers
melspect_scaler = load_from_pickled_file(pickle_spectrogram_scaler_file)
valence_scaler = load_from_pickled_file(pickle_valence_scaler_file)
arousal_scaler = load_from_pickled_file(pickle_arousal_scaler_file)


In [4]:
chunk_length_ms = dur * 1000
slice_length = int(chunk_length_ms / 1000) # in seconds
overlap =  int((chunk_length_ms / 1000) / 2) # in seconds

for wav_file in os.listdir(wav_dir):
    frequency, signal = wavfile.read(wav_dir + wav_file)
    slices = np.arange(0, len(signal)/frequency, slice_length-overlap, dtype=int)
    i = 0
    chunk_counter = 0

    valence = []
    arousal = []

    for start, end in zip(slices[:-1], slices[1:]):
        start_audio = start * frequency
        end_audio = (end + overlap)* frequency
        audio_slice = signal[int(start_audio): int(end_audio)]
        chunk_name = test_dir + '/test_{0}.wav'.format(chunk_counter)
        wavfile.write(chunk_name, 8000, audio_slice)
        spect = get_log_mel_spectrogram(path=chunk_name, n_fft=2048, hop_length=512, n_mels=128)
        spect = spect.reshape(1,-1)
        normalized_audio_slice = melspect_scaler.transform(spect)
        input_mel = np.reshape(normalized_audio_slice,(1,128, -1,1))
        predictions = model.predict(input_mel)

    #     print("Predictions ", predictions)
        valence.append(valence_scaler.inverse_transform(predictions[0]))
        arousal.append(arousal_scaler.inverse_transform(predictions[1]))
        chunk_counter += 1

    # print("\n")
    # print("Valence ", [x[0][0] for x in valence])
    # print("\n")
    # print("Arousal ", [x[0][0] for x in arousal])
    

Predictions  [array([[-0.158096]], dtype=float32), array([[-0.00997522]], dtype=float32)]
Predictions  [array([[-0.29140925]], dtype=float32), array([[0.07460787]], dtype=float32)]
Predictions  [array([[-0.16084649]], dtype=float32), array([[0.29166558]], dtype=float32)]
Predictions  [array([[-0.429802]], dtype=float32), array([[0.11677528]], dtype=float32)]
Predictions  [array([[0.03683661]], dtype=float32), array([[0.17612834]], dtype=float32)]
Predictions  [array([[0.07435983]], dtype=float32), array([[0.23790424]], dtype=float32)]
Predictions  [array([[0.08525831]], dtype=float32), array([[0.20854019]], dtype=float32)]
Predictions  [array([[-0.06891292]], dtype=float32), array([[0.26207522]], dtype=float32)]
Predictions  [array([[-0.41961613]], dtype=float32), array([[0.20721011]], dtype=float32)]
Predictions  [array([[-0.39948097]], dtype=float32), array([[0.12804277]], dtype=float32)]
Predictions  [array([[-0.08424418]], dtype=float32), array([[0.22141093]], dtype=float32)]
Predi