# Dataset Preprocessing and Feature Extraction for the Berlin Database of Emotional Speech

### This notebook handles the preprocessing pipeline
It includes:
1.  **Data Loading & Statistics**: Analyzing file durations and sample rates.
2.  **Data Cleaning**: Removing silence, normalizing, and padding audio.
3.  **Feature Extraction**: Computing Mel Spectrograms, MFCCs, and other spectral/prosodic features.
4.  **Visualization**: Visualizing various feature representations.

In [None]:
import numpy as np
import os
import librosa as lr
import h5py
import soundfile as sf
import pandas as pd
import re
import wave
import scipy
from scipy import interpolate
from scipy.stats import zscore
import scipy.stats
from scipy.signal import firwin, lfilter, savgol_filter
from scipy.fftpack import dct
import json
import random
import shutil
import matplotlib.pyplot as plt
import torch
import torchaudio
import torchaudio.functional as F
import torchaudio.transforms as T
from torchaudio.transforms import Vad
import torchaudio.compliance.kaldi as ta_kaldi
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
import pywt
from pydub import AudioSegment
import datetime 
import python_speech_features
import librosa.display

print(torch.__version__)
print(torchaudio.__version__)

current_dir = os.getcwd()
data_base_path = os.path.abspath(os.path.join(current_dir, "../../../data"))

audio_data_path = os.path.join(data_base_path, "EmoDB/wav/")
silb_data_path = os.path.join(data_base_path, "EmoDB/silb/")
audio_data_path_with_silence = os.path.join(data_base_path, "EmoDB/BerlinEMO_DB_Silence/")
usm_data_path = os.path.join(data_base_path, "usm_eval/")

if not os.path.exists(audio_data_path_with_silence):
    os.makedirs(audio_data_path_with_silence)

print(f"Audio Data Path: {audio_data_path}")
print(f"Output Path: {audio_data_path_with_silence}")
      
usm_classes=["airplane",
"alarm",
"birds",
"bus",
"car", 
"cheering",
"church bell",
"dogs",
"drilling",
"glass break",
"gunshot",
"hammer",
"helicopter",
"jackhammer",
"lawn mower",
"motorcycle",
"music",
"rain",
"sawing",
"scream",
"siren",
"speech",
"thunderstorm",
"train",
"truck",
"wind"]

selected_classes=["wind", "thunderstorm", "rain", "bus", "birds"]

## Helper functions

### Class and Sentence Mapping
Helper functions to extract emotion labels and sentence text from the specific filename format of Emo-DB.

In [None]:
def get_class(filename):
    if "W" in filename[5]:
        return "Anger"
    if "L" in filename[5]:
        return "Boring"
    if "E" in filename[5]:
        return "Disgust"
    if "A" in filename[5]:
        return "Fear"
    if "F" in filename[5]:
        return "Happy"
    if "T" in filename[5]:
        return "Sad"
    if "N" in filename[5]:
        return "Neutral"


def get_sentence(filename):
    if "a01" in filename[2:5]:
        return "Der Lappen liegt auf dem Eisschrank."
    if "a02" in filename[2:5]:
        return "Das will sie am Mittwoch abgeben."
    if "a04" in filename[2:5]:
        return "Heute abend könnte ich es ihm sagen."
    if "a05" in filename[2:5]:
        return "Das schwarze Stück Papier befindet sich da oben neben dem Holzstück."
    if "a07" in filename[2:5]:
        return "In sieben Stunden wird es soweit sein."
    if "b01" in filename[2:5]:
        return "Was sind denn das für Tüten, die da unter dem Tisch stehen?"
    if "b02" in filename[2:5]:
        return "Sie haben es gerade hochgetragen und jetzt gehen sie wieder runter."
    if "b03" in filename[2:5]:
        return "An den Wochenenden bin ich jetzt immer nach Hause gefahren und habe Agnes besucht."
    if "b09" in filename[2:5]:
        return "Ich will das eben wegbringen und dann mit Karl was trinken gehen."
    if "b10" in filename[2:5]:
        return "Die wird auf dem Platz sein, wo wir sie immer hinlegen.	"

### Compute dataset statistics

### Database Statistics
We iterate through all `.wav` files to calculate the distribution of audio durations and verify the sample rate. We also calculate the average decibel level for normalization.

In [None]:
def database_stats(audio_data_path):
    file_names=[]
    samples=[]
    durations=[]
    for path in os.listdir(audio_data_path):
        if path[-3:]=="wav":
            file_path=os.path.join(audio_data_path, path)
            file_names.append(path)
            y, sr = sf.read(file_path)
            # print(sr)
            durations.append((len(y)/sr))
            samples.append(y)
    max_duration=np.max(np.array(durations))
    print(f'Es sind {len(file_names)} Dateien geladen worden.')
    print(f'Die Längste Datei ist {np.max(np.array(durations)):.3f} Sekunden und die kürzeste {np.min(np.array(durations)):.3f} Sekunden lang. Durchschnittlich sind die Dateien {np.mean(np.array(durations)):.3f} Sekunden lang.')
    print(f'Die Samplerate ist {sr}.')
    return samples, file_names, durations

def calculate_avg_db(audio_data_path):
    samples, _, _ = database_stats(audio_data_path)
    avg_db = [np.mean(np.abs(y)) for y in samples]
    avg_db= np.mean(np.array(avg_db))
    avg_db_db = 20 * np.log10(avg_db)
    print(f"Die durchschnittliche Lautstärke beträgt {avg_db_db:.3f} dB.")
    return avg_db_db

In [None]:
samples, file_names, durations=database_stats(audio_data_path)

## Count emotions

### Emotion Distribution
We count the occurrences of each emotion category (Anger, Boredom, Disgust, Fear, Happy, Sad, Neutral) to check dataset balance.

In [None]:
def count_emotions(filenames):
    anger, boring, disgust, fear, happy, sad, neutral = 0,0,0,0,0,0,0
    for filename in filenames:
        if "W" in filename[5]:
            anger+=1
        if "L" in filename[5]:
            boring+=1
        if "E" in filename[5]:
            disgust+=1
        if "A" in filename[5]:
            fear+=1
        if "F" in filename[5]:
            happy+=1
        if "T" in filename[5]:
            sad+=1
        if "N" in filename[5]:
            neutral+=1
    return np.array([disgust, anger, fear, happy, neutral, boring, sad])

_, file_names, _ = database_stats(audio_data_path)
list_count=count_emotions(file_names)
print(f"Emotion classes: [E_n={list_count[0]}, W_n={list_count[1]}, A_n={list_count[2]}, F_n={list_count[3]}, N_n={list_count[4]}, L_n={list_count[5]}, T_n={list_count[6]}]")

## Visualization Helper Functions

### Plotting Functions
Functions to visualize Waveforms, Log Mel Spectrograms, MFCCs, Tonnetz, and Spectral Contrast.

In [None]:
def plot_or_save_waveforms(y,y_emphasized, task, path, folder_path_to_save):
    file_name_wave = folder_path_to_save+f"{path[:-4]}_wave.png"
    time1 = lr.times_like(y, sr=sr)
    time2 = lr.times_like(y_emphasized, sr=sr)
    plt.figure()
    plt.plot(time1, y, color='midnightblue', label='Signal')  
    plt.plot(time2, y_emphasized, color='cornflowerblue', label='Emphasized Signal', alpha=0.6) 
    plt.tick_params(axis='both', which='both', bottom=False, top=False, left=False, right=False)
    plt.xticks([], [])
    plt.yticks([], [])
    plt.title(f"Waveforms of {path[:-4]}")
    plt.xlabel('Time')
    plt.ylabel('Amplitude')
    plt.legend()
    if task=="plot":
        plt.show()
    elif task=="save":
        plt.savefig(file_name_wave)

In [None]:
def plot_log_mel_spectrogram(mel_spec_log, y, sr, path, folder_path_to_save, task):
    sentence = get_sentence(path[:-4])
    emotion_class = get_class(path[:-4])
    font_settings = {
                'family': 'sans-serif',
                'size': 16,
                'weight': 'normal',
            }
    plt.figure(figsize=(10, 4))
    plt.imshow(mel_spec_log, origin='lower', aspect='auto', extent=[0, len(y)/sr, 0, sr/2]) # cmap='Spectral'
    plt.colorbar(format='%+2.0f dB')
    plt.title("Log Mel Frequency Spectrogram", fontdict=font_settings)
    plt.xlabel('Time in s', fontdict=font_settings)
    plt.ylabel('Frequency in Hz', fontdict=font_settings)
    plt.tight_layout()

    if task == "plot":
        plt.show()
    elif task == "save":
        plt.savefig(folder_path_to_save + f"{path[:-4]}_log_mel_spectrogram.png")


def plot_mfcc(mfcc, y, sr, n_mfcc, path, folder_path_to_save, task):
    utterance = get_sentence(path[:-4])
    emotion_class = get_class(path[:-4])
    font_settings = {
                'family': 'sans-serif',
                'size': 16,
                'weight': 'normal',
            }
    plt.figure(figsize=(10, 4))
    plt.imshow(mfcc, aspect='auto', extent=[0, len(y)/sr, 0, n_mfcc], origin='lower', cmap="Spectral")
    plt.colorbar(format='%+2.0f dB')
    plt.title("Mel Frequency Cepstral Coefficients", fontdict=font_settings)
    plt.ylabel('Coefficient', fontdict=font_settings)
    plt.xlabel('Time in s', fontdict=font_settings)
    plt.tight_layout()

    if task == "plot":
        plt.show()
    elif task == "save":
        plt.savefig(folder_path_to_save + f"{path[:-4]}_mfcc.png")

In [None]:
def plot_tonnetz(tonnetz):
    fifths_octaves = tonnetz[0:2, :]
    minor_thirds_octaves = tonnetz[2:4, :]
    major_thirds_octaves = tonnetz[4:6, :]
    def plot_tonal_space(x, y, title):
        plt.figure(figsize=(6,6))
        plt.scatter(x, y)
        plt.xlim([-1, 1])
        plt.ylim([-1, 1])
        plt.title(title)
        plt.xlabel('dimension 1')
        plt.ylabel('dimension 2')
        plt.gca().set_aspect('equal', adjustable='box')
    # plot dimensions
    plot_tonal_space(fifths_octaves[0], fifths_octaves[1], 'fifths and octaves')
    plot_tonal_space(minor_thirds_octaves[0], minor_thirds_octaves[1], 'minor thirds and octaves')
    plot_tonal_space(major_thirds_octaves[0], major_thirds_octaves[1], 'major thirds and octaves')
    plt.show()
    plt.figure(figsize=(10, 6))


def plot_spectral_contrast(spectral_contrast, hop_length, sr):
    times = np.arange(spectral_contrast.shape[1]) * hop_length / sr  
    for i, band in enumerate(spectral_contrast, 1):
        plt.plot(times, band, label=f'contrast band {i}')
    plt.xlabel('time in seconds')
    plt.ylabel('spectral contrast (intensity?)')
    plt.title('spectral contrast (time)')
    plt.legend()
    plt.show()


def plot_standard(input, name):
    plt.figure(figsize=(15, 5))
    plt.imshow(input, aspect='auto', origin='lower', cmap='coolwarm')
    plt.colorbar(format='%+2.0f dB')
    plt.title(name)
    plt.tight_layout()
    plt.show()

## Feature Visualization

### Visualizations of Mel Frequency Cepstral Coefficients and Log Mel Frequency Spectrograms

We run a test extraction on a single file to visualize Mel Spectrograms and MFCCs.

In [None]:
folder_path_to_save = "./VisualsEMO_DB/"
if os.path.isdir(folder_path_to_save):
    shutil.rmtree(folder_path_to_save)
os.makedirs(folder_path_to_save)

In [None]:
def create_or_plot_visualization(x, task, n_fft=440, n_mfcc=13, hop_length=160, n_mels=128, pre_coef=0.96):
    file_names=[]
    i=1
    for path in os.listdir(audio_data_path):
        if path[-3:]=="wav":
            file_path=os.path.join(audio_data_path, path)
            file_names.append(path)
            y, sr = lr.load(file_path)
            y_emphasized=lr.effects.preemphasis(y, coef=pre_coef)
            # y_emphasized = np.append(y[0], y[1:] - pre_emphasis * y[:-1])
            stft = lr.stft(y_emphasized, n_fft=n_fft, hop_length=hop_length)
            mel_spec = lr.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, norm=2, window='hamming')
            mel_spec_log = np.array(lr.power_to_db(mel_spec, ref=np.max).astype(np.float32))
            mfcc = lr.feature.mfcc(S=lr.power_to_db(mel_spec), n_mfcc=n_mfcc)
            plot_log_mel_spectrogram(mel_spec_log,y,sr, path, folder_path_to_save, task)
            print(mfcc.shape)
            plot_mfcc(mfcc, y, sr, n_mfcc, path, folder_path_to_save, task)
            if x>=i:
                i+=1
            else: break
create_or_plot_visualization(x=1, task="plot")

## Data Preprocessing Class

### Preprocessing Pipeline
This class handles:
1.  **Silence Removal**: Removing silence from audio clips.
2.  **Normalization**: Normalizing audio volume to a target dB.
3.  **Augmentation**: Options for White Noise and USM dataset augmentation.
4.  **Saving**: Saving the processed files to a new directory.

In [None]:
class Data_Preprocessing:
    def __init__(self, sr, origin_folder_path, audio_data_path_with_silence, usm_data_path, usm_classes, selected_classes, avg_db):
        self.origin_folder_path = origin_folder_path
        self.audio_data_path_with_silence = audio_data_path_with_silence
        self.usm_data_path = usm_data_path
        self.usm_classes = usm_classes
        self.selected_classes = selected_classes
        self.file_list = os.listdir(self.origin_folder_path)
        self.y_usm_array, self.sr_usm = self.get_y_usm()
        self.sr_usm=44100
        self.target_sr=16000
        self.sr=sr
        self.avg_db=avg_db
        self.arb_length=4 # in seconds
        self.pre_emp=0.97

    def get_y_usm(self):
        # Load USM Datensatz
        samples_usm = []
        for i in range(0,2000):
            path=f"{i}_mix.wav"
            path2=f"{i}_mix_target.npy"
            file_path=os.path.join(self.usm_data_path, path)
            file_path2=os.path.join(self.usm_data_path, path2)
            data = np.load(file_path2, allow_pickle=True)
            if self.usm_classes[np.argmax(data)] in self.selected_classes:
                # print(self.usm_classes[np.argmax(data)])
                y_usm=[]
                y_usm, sr_usm = lr.load(file_path, sr=None)
                samples_usm.append(y_usm)
        return samples_usm, sr_usm
    

    def pitch_shift(self, samples_filtered, sr):
        steps = random.uniform(-4, 4)
        samples_filtered = lr.effects.pitch_shift(samples_filtered, sr=sr, n_steps=steps)
        return samples_filtered
    

    def resample(self, y, orig_sr, target_sr):
        y_resample = lr.resample(y.T, orig_sr, target_sr)
        if len(y_resample.shape) > 1:
            y_resample = np.average(y_resample, axis=0)
        return y_resample, sr


    def da_white(self, y, sr):
        # Normalverteilung für White Noise
        white_noise = random.uniform(0.001,0.015) * np.random.rand(len(y))
        samples_white = y + white_noise
        return samples_white
    

    def da_usm(self, y, sr):
        # USM Datensatz Augmentierung
        random_sample_idx = random.randrange(len(self.y_usm_array))
        y_usm=[]
        y_usm = self.y_usm_array[random_sample_idx]
        y_usm, sr_usm = self.resample(y_usm, self.sr_usm, self.sr)
        start = int(random.uniform(0, 1) * (len(y_usm) - len(y)))
        clipped_y = y_usm[start:(start + len(y))]
        # Aktuell wird keine Audio Datei aus dem USM Datensatz genutzt, wenn die Audio Datei länger als 5 Sekunden ist (maximale Länge der USM Clips)
        if clipped_y.shape != y.shape:
            return y
        y += clipped_y
        return y    
            

    def add_silence_noise_and_save(self):
        idx=0
        for file in self.file_list:
            file_path = os.path.join(self.origin_folder_path, file)
            target_file_path= os.path.join(self.audio_data_path_with_silence, file)

            if os.path.isfile(file_path) and file_path[-3:]=="wav":
                y, sr = sf.read(file_path)
                # for resampling
                # y,sr=self.resample(y, self.sr, self.target_sr)
                # self.sr=self.target_sr
                if idx==0: print(sr)
                
                # set arbritary length of clips
                arb_length=self.arb_length
                if len(y)/sr>arb_length:
                    num_chunks = len(y) // (arb_length*sr)
                    for i in range(num_chunks):
                        chunk = y[i*arb_length*sr : (i+1)*arb_length*sr]
                        self.process_and_save(arb_length, str(i), chunk, sr, target_file_path)
                        idx += 1
                else:
                    self.process_and_save(arb_length, '', y, sr, target_file_path)
                    idx += 1
    

    def normalize(self, y):
        avg_amplitude_y = np.mean(np.abs(y))
        if avg_amplitude_y > 0:
            vol_y_db = 20 * np.log10(avg_amplitude_y)
        else:
            vol_y_db = 0 
        db_diff = self.avg_db - vol_y_db
        scaling_factor = 10 ** (db_diff / 20)
        y_norm = y * scaling_factor
        return y_norm
    

    def process_and_save(self, arb_length, i, y, sr, target_file_path):
        if i!='':
            target_file_path=target_file_path[:-4]+"_"+str(i)+".wav"
        # high pass filter with firwin
        y = self.normalize(y)
        y=np.append(y[0], y[1:] - self.pre_emp * y[:-1])
        # y = savgol_filter(y, 9, 2)
        # overlay white noise and sounds from the USM dataset
        # y = da_usm(y,sr) and da_white(y,sr) to augment data
        # z-score normalization
        # y = zscore(y)
        # pre emphasis
        # y = lr.effects.preemphasis(y, coef=0.97)
        # silence padding before and behind
        right_array_length = int((sr*arb_length-len(y)))
        right_array = np.zeros(right_array_length)
        samples = np.concatenate((y,right_array), axis=0)
        sf.write(target_file_path, samples, sr)

## Prepare Output Directory

### Remove and create folder for the data preprocessing

In [None]:
shutil.rmtree(audio_data_path_with_silence)
os.makedirs(audio_data_path_with_silence)

## Execute Preprocessing
### Run Preprocessing
Instantiate the preprocessing class and run the pipeline to clean and prepare the audio files.

In [None]:
avg_db=calculate_avg_db(audio_data_path)
processor = Data_Preprocessing(sr, audio_data_path, audio_data_path_with_silence, usm_data_path, usm_classes, selected_classes, avg_db)
processor.add_silence_noise_and_save()

## Padding Function

### Load function to pad the audio sequence by https://github.com/Jiaxin-Ye/TIM-Net_SER/

In [None]:
def padding(signal, sr, mean_signal_length=96000): # avg signal Emo-DB: 2,78 seconds
    if len(signal.shape) > 1:
        signal = np.average(signal, axis=0)
    s_len = len(signal)
    if s_len < mean_signal_length:
        pad_len = mean_signal_length - s_len
        pad_rem = pad_len % 2
        pad_len //= 2
        signal = np.pad(signal, (pad_len, pad_len + pad_rem), 'constant', constant_values = 0)
    else:
        pad_len = s_len - mean_signal_length
        pad_len //= 2
        signal = signal[pad_len:pad_len + mean_signal_length]
    return signal

## Time Domain Statistics

### Statistical Features
Function to compute statistical measures (mean, std, min, max, skew, kurtosis, etc.) from feature arrays.

Adapted from https://maelfabien.github.io/machinelearning/Speech9/#

In [None]:
def compute_stats(freqs):
    mean = np.mean(freqs)
    std = np.std(freqs) 
    min = np.min(freqs)
    max = np.max(freqs)
    maxv = np.amax(freqs) 
    minv = np.amin(freqs) 
    median = np.median(freqs)
    cov=np.cov(freqs)
    var=np.var(freqs)
    rms=np.sqrt(np.mean(np.square(freqs)))
    skew = scipy.stats.skew(freqs)
    kurt = scipy.stats.kurtosis(freqs)
    q1 = np.quantile(freqs, 0.25)
    q2 = np.quantile(freqs, 0.5)
    q3 = np.quantile(freqs, 0.75)
    mode = scipy.stats.mode(freqs)[0][0]
    iqr = scipy.stats.iqr(freqs)
    return [min, max, mean, median, mode, std, cov, var, rms, q1, q2, q3]

## Experimental Feature Visualization
### Comprehensive Feature Test
Testing a wide range of feature extraction methods (CQT, VQT, Chromagram, Spectral Contrast, Tonnetz) to explore potential features for the model.

In [None]:
# find suitable parameters (Paper suggested: n_fft should be between 20 and 30 ms with an overlap of 30% to 50%)
def experimental_feature_visualization(audio_data_path, x=1, sr=16000, n_fft=400, hop_length=200, n_mels=128, n_mfcc=40, fmin_pitch=lr.note_to_hz('A2'), fmax_pitch=lr.note_to_hz('A7'), fmin=800, fmax=8000, bins_per_octave=12, pre_emp=0.96):
    idx=1
    n_bins= int(np.log2(fmax / fmin) * bins_per_octave)-5
    for file in os.listdir(audio_data_path):
        file_path = os.path.join(audio_data_path, file)
        if os.path.isfile(file_path):
            if file_path[-3:]=="wav":
                y, sr = sf.read(file_path)
                print(f"sr={sr}")
                y_pre = np.append(y[0], y[1:] - pre_emp * y[:-1])
                window = scipy.signal.windows.general_hamming(n_fft, alpha=0.46)
                mel = lr.feature.melspectrogram(y=y_pre, sr=sr, n_fft=n_fft, fmin=fmin, fmax=fmax, hop_length=hop_length, n_mels=n_mels, norm=2, window=window)
                mel=lr.amplitude_to_db(mel, ref=np.max)
                mfcc = lr.feature.mfcc(y=y, S=mel, n_mfcc=n_mfcc)
                cqt = lr.cqt(y, sr=sr, n_bins=n_bins, fmin=fmin, bins_per_octave=bins_per_octave, hop_length=hop_length)
                log_cqt = lr.amplitude_to_db(np.abs(cqt), ref=np.max)  
                spectral_contrast = lr.feature.spectral_contrast(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length)
                vqt = lr.vqt(y=y, sr=sr, hop_length=hop_length, fmin=fmin, n_bins=n_bins, bins_per_octave=bins_per_octave, gamma=0)
                chromagram = lr.feature.chroma_cqt(C=cqt, sr=sr, n_chroma=12)
                delta_mfccs = lr.feature.delta(mel)
                delta2_mfccs = lr.feature.delta(mel, order=2)
                plot_standard(log_cqt, 'constant-q power spectrum')
                pitches = lr.yin(y, sr=sr, hop_length=hop_length, fmin=fmin_pitch, fmax=fmax_pitch)            
                tonnetz = lr.feature.tonnetz(y=y, sr=sr, chroma=vqt, hop_length=hop_length, n_chroma=bins_per_octave, n_octaves=5)
                plot_tonnetz(tonnetz)
                plot_spectral_contrast(spectral_contrast, hop_length, sr)
                plot_standard(lr.amplitude_to_db(vqt, ref=np.max), 'Variable-Q Transform (VQT)')
                plot_standard(lr.amplitude_to_db(cqt, ref=np.max), 'Constant-Q Transform (CQT)')
                plot_standard(np.abs(chromagram), 'Chromagram')
                mel=np.mean(a=mel, axis=1)
                cqt=np.mean(a=cqt, axis=1)
                log_cqt=np.mean(a=log_cqt, axis=1)
                vqt=np.mean(a=vqt.real, axis=1)
                mfcc=np.mean(a=mfcc, axis=1)
                spectral_contrast=np.mean(a=spectral_contrast, axis=1)
                delta_mfccs=np.mean(a=delta_mfccs, axis=1)
                delta2_mfccs=np.mean(a=delta2_mfccs, axis=1)
                tonnetz=np.mean(a=tonnetz.real, axis=1)
                chromagram=np.mean(a=chromagram.real, axis=1)
                mixed_features=np.hstack((delta_mfccs, delta2_mfccs, mfcc, spectral_contrast, tonnetz, chromagram, delta_mfccs, mel, cqt, log_cqt))
                mixed_features = np.array(torch.from_numpy(mixed_features).unsqueeze(1))
                print(f"pitches yin shape: {pitches.shape}")
                print(f"mel: {mel.shape}")
                print(f"mfccs: {mfcc.shape}")
                print(f"delta_mfccs: {delta_mfccs.shape}")
                print(f"delta_delta_mfccs: {delta2_mfccs.shape}")
                print(f"log_cqt: {log_cqt.shape}")
                print(f"spectral_contrast: {spectral_contrast.shape}")
                print(f"chromagram: {chromagram.shape}")
                print(f"chroma: {chromagram.shape}")
                print(f"tonnetz: {tonnetz.shape}")
                print(f"mixed_features: {mixed_features.shape}")
                if idx==x:
                    break
                idx+=1

In [None]:
experimental_feature_visualization(x=1, audio_data_path=audio_data_path)

## Prosodic Feature Extraction
### Prosodic Features
Extracting Pitch (F0), RMS Energy, and Zero Crossing Rate.

In [None]:
def extract_prosodic_features(y, sr, n_fft, hop_length, norm, n_mfcc=40, n_mels=40, pre_emp=0.97, fmin_pitch=lr.note_to_hz('A2'), fmax_pitch=lr.note_to_hz('A7')):
    try:
        y = np.append(y[0], y[1:] - pre_emp * y[:-1])
        f0 = lr.yin(y, sr=sr, hop_length=hop_length, fmin=fmin_pitch, fmax=fmax_pitch)  
        # print(f0.shape)      
        # pitches = np.array(torch.from_numpy(pitches).unsqueeze(1))
        rms_energy = librosa.feature.rms(y=y, frame_length=n_fft, hop_length=hop_length)[0]
        rms_energy = np.array(torch.from_numpy(rms_energy).unsqueeze(1))
        zcr = lr.feature.zero_crossing_rate(y=y, frame_length=n_fft, hop_length=hop_length)
        zcr = np.array(torch.from_numpy(zcr).permute(1,0))
        if norm=="zscore":
            pitch = zscore(f0)
            rms_energy = zscore(rms_energy)
            zcr = zscore(zcr)
        # print(zcr.shape)
        # print(rms_energy.shape)
        prosodic_features = np.concatenate((zcr, rms_energy), axis=1)
        prosodic_features = np.array(torch.from_numpy(prosodic_features).permute(1,0))
    except Exception as e:
        print("none",e)
        return None 
    return prosodic_features, f0


## Spectral Feature Extraction
### Spectral Features
Extracting comprehensive spectral features including MFCCs (and deltas), Spectral Contrast, Centroid, Bandwidth, Rolloff, and Chroma.

In [None]:
def extract_spectral_features(y, sr, n_fft, hop_length, norm, n_mfcc=40, n_mels=40, pre_emp=0.97, fmin=20, fmax=int(sr/2), bins_per_octave=12, fmin_pitch=lr.note_to_hz('A2'), fmax_pitch=lr.note_to_hz('A7')):
    try:
        y = np.append(y[0], y[1:] - pre_emp * y[:-1])
        mfccs = lr.feature.mfcc(y=y, sr=sr,  n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)
        mfccs_d = lr.feature.delta(mfccs)
        mfccs_2d = lr.feature.delta(mfccs, order=2)
        spectral_contrast = lr.feature.spectral_contrast(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length)
        spectral_centroids = lr.feature.spectral_centroid(y=(y+0.001), sr=sr, n_fft=n_fft, hop_length=hop_length)
        spectral_bandwidth = lr.feature.spectral_bandwidth(y=(y+0.001), sr=sr, n_fft=n_fft, hop_length=hop_length)
        spectral_rolloff = lr.feature.spectral_rolloff(y=(y+0.001), sr=sr, n_fft=n_fft, hop_length=hop_length)
        window = scipy.signal.windows.general_hamming(n_fft, alpha=0.46)
        mel = lr.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels, norm=2, window=window) 
        log_mel = lr.amplitude_to_db(mel, ref=np.max)
        chromagram = lr.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
        tonnetz = lr.feature.tonnetz(y=lr.effects.harmonic(y), sr=sr, hop_length=hop_length)
        n_bins= int(np.log2(fmax / fmin)* bins_per_octave)-5
        cqt = lr.cqt(y, sr=sr, n_bins=n_bins, fmin=fmin, bins_per_octave=bins_per_octave, hop_length=hop_length)
        log_cqt = lr.amplitude_to_db(np.abs(cqt), ref=np.max)  
        cqcc = lr.feature.mfcc(S=log_cqt, sr=sr, n_mfcc=n_mfcc, n_mels=n_mels, n_fft=n_fft, hop_length=hop_length)
        if norm=="zscore":
            cqcc = zscore(cqcc, axis=1)
            tonnetz = zscore(tonnetz, axis=1)
            log_mel = zscore(log_mel, axis=1)
            mfccs = zscore(mfccs, axis=1)
            mfccs_d = zscore(mfccs_d, axis=1)
            mfccs_2d = zscore(mfccs_2d, axis=1)
            spectral_contrast = zscore(spectral_contrast, axis=1)
            spectral_centroids = zscore(spectral_centroids, axis=1)
            spectral_rolloff = zscore(spectral_rolloff, axis=1)
            spectral_bandwidth = zscore(spectral_bandwidth, axis=1)
            chromagram = zscore(chromagram, axis=1)
        spectral_features = np.concatenate((cqcc, tonnetz, log_mel, mfccs, mfccs_d, mfccs_2d, spectral_contrast, spectral_centroids, spectral_rolloff, spectral_bandwidth, chromagram), axis=0)
    except Exception as e:
        print("none",e)
        return None 
    return spectral_features


### Mel Frequency Cepstral Coeffients

In [None]:
def get_mfcc(y, sr, n_fft, hop_length,  n_mfcc, window_size=400, pre_emp=0.97, n_mels=84, fmin=20,fmax=300, fmin_pitch=lr.note_to_hz('A2'), fmax_pitch=lr.note_to_hz('A7')):
    try:
        # optionally with padding
        # y=padding(y,sr)
        y=np.append(y[0], y[1:] - pre_emp * y[:-1])
        # stft = lr.stft(y, n_fft=n_fft, hop_length=hop_length, win_length=window_size, window='hamming').real
        mfcc = lr.feature.mfcc(y, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length,  fmin=fmin, fmax=fmax, n_mels=n_mels, dct_type=2)
        # print(mfcc.shape)
        deltas=lr.feature.delta(mfcc)
        deltasdeltas=lr.feature.delta(mfcc, order=2)
        mfcc=zscore(mfcc)
        deltas=zscore(deltas)
        deltasdeltas=zscore(deltasdeltas)
        # print(mfcc.shape, deltas.shape, deltasdeltas.shape)
        mfcc_combined=np.concatenate([mfcc, deltas, deltasdeltas], axis=0)
    except Exception as e:
        print("none", e)
        return None
    return mfcc, mfcc_combined

### Test feature extraction

In [None]:
def test(audio_data_path, n_mfcc=13, n_fft=445, hop_length=220):
    for file in os.listdir(audio_data_path):
        file_path = os.path.join(audio_data_path, file)
        if os.path.isfile(file_path):
            if file_path[-3:]=="wav":
                y, sr = sf.read(file_path)
                mfcc, mfcc_combined=get_mfcc(y,sr, n_fft=n_fft, hop_length=hop_length, n_mfcc=n_mfcc)
                features=mfcc
                print(features.shape)
                stats=[compute_stats(features[i, :]) for i in range(features.shape[0])] 
                features_combined=np.concatenate([stats, features], axis=1)
                print(features_combined.shape)
                plot_mfcc(features, y, sr, n_mfcc, file_path, folder_path_to_save, "plot")
                break
test(audio_data_path)

### Create dataset

In [None]:
def create_features_dataset(dataset_name, audio_data_path, train_list, valid_list, test_list, n_fft, hop_length, n_mfcc=40, n_mels=128, fmin=100, fmax=10000):
    def get_class(filename):
        if "W" in filename[5]:
            return 0
        if "L" in filename[5]:
            return 1
        if "E" in filename[5]:
            return 2
        if "A" in filename[5]:
            return 3
        if "F" in filename[5]:
            return 4
        if "T" in filename[5]:
            return 5
        if "N" in filename[5]:
            return 6
    # assert dataset_name[-3:]!=".h5"
    # dataset = h5py.File(dataset_name, mode="w", libver="latest")
    train_dataset = h5py.File(("train_"+dataset_name), mode="w", libver="latest")
    for file in train_list:
        file_path = os.path.join(audio_data_path, file)
        if os.path.isfile(file_path):
            if file_path[-3:]=="wav":
                y, sr = lr.load(file_path)
                features, _ =get_mfcc(y,sr, n_fft=n_fft, hop_length=hop_length)
                stats=[compute_stats(features[i, :]) for i in range(features.shape[0])] 
                features_combined=np.concatenate([stats, features], axis=1)
                features_combined=np.array(torch.from_numpy(features_combined).permute(1,0))
                filename = file.replace(".wav", "_")
                file_class = get_class(filename)

                f = [features_combined]
                print(features_combined.shape)
                idx=0
                for _f in f:
                    filename += str(idx)
                    idx+=1
                    h5spec = train_dataset.create_dataset(filename, data=_f)
                    h5spec.attrs["class_label"] = file_class
                    # i+=1
    train_dataset.close()
    valid_dataset = h5py.File(("valid_"+dataset_name), mode="w", libver="latest")
    for file in valid_list:
        file_path = os.path.join(audio_data_path, file)
        if os.path.isfile(file_path):
            if file_path[-3:]=="wav":
                y, sr = lr.load(file_path)
                features, _ =get_mfcc(y,sr, n_fft=n_fft, hop_length=hop_length)
                stats=[compute_stats(features[i, :]) for i in range(features.shape[0])] 
                features_combined=np.concatenate([stats, features], axis=1)
                features_combined=np.array(torch.from_numpy(features_combined).permute(1,0))
                filename = file.replace(".wav", "_")
                file_class = get_class(filename)

                f = [features_combined]
                idx=0
                print(features_combined.shape)
                    
                for _f in f:
                    filename += str(idx)
                    idx+=1
                    h5spec = valid_dataset.create_dataset(filename, data=_f)
                    h5spec.attrs["class_label"] = file_class
    valid_dataset.close()
    test_dataset = h5py.File(("test_"+dataset_name), mode="w", libver="latest")
    for file in test_list:
        file_path = os.path.join(audio_data_path, file)
        if os.path.isfile(file_path):
            if file_path[-3:]=="wav":
                y, sr = lr.load(file_path)
                features, _ =get_mfcc(y,sr, n_fft=n_fft, hop_length=hop_length)
                stats=[compute_stats(features[i, :]) for i in range(features.shape[0])] 
                features_combined=np.concatenate([stats, features], axis=1)
                features_combined=np.array(torch.from_numpy(features_combined).permute(1,0))
                filename = file.replace(".wav", "_")
                file_class = get_class(filename)

                f = [features_combined]
                print(features_combined.shape)

                filename = file.replace(".wav", "_")
                file_class = get_class(filename)
                idx=0
                for _f in f:
                    filename += str(idx)
                    idx+=1
                    h5spec = test_dataset.create_dataset(filename, data=_f)
                    h5spec.attrs["class_label"] = file_class
    test_dataset.close()
    print("Train, valid and test dataset created")

## Data Parameters

### Create dataset names

In [None]:
n_fft=400
hop_length=200

vers=f"BerlinEMO_DB_spectral_features_with_timestats_n_fft_{n_fft}_hop_{hop_length}_sr16.txt"
versh5=f"BerlinEMO_DB_spectral_features_with_timestatse_n_fft_{n_fft}_hop_{hop_length}_sr16.h5"

trainset="train_"+versh5
testset="test_"+versh5
validset="valid_"+versh5
print(trainset)

### Delete dataset

In [None]:
file_path1=os.path.join(os.getcwd(), trainset)
if os.path.isfile(file_path1):
    os.remove(file_path1)

file_path2=os.path.join(os.getcwd(), testset)
if os.path.isfile(file_path2):
    os.remove(file_path2)

file_path3=os.path.join(os.getcwd(), validset)
if os.path.isfile(file_path3):
    os.remove(file_path3)

### Randomize dataset

In [None]:
file_list=os.listdir(audio_data_path_with_silence)
random.shuffle(file_list)

train_split=0.8
valid_test_split=(100-train_split*100)/2*0.01
train_len = int(train_split * len(file_list))
valid_len = int(valid_test_split * len(file_list))

train_list = file_list[:train_len]
valid_list = file_list[train_len:train_len+valid_len]
test_list = file_list[train_len+valid_len:]

### Create dataset

In [None]:
create_features_dataset(versh5, audio_data_path_with_silence, train_list, valid_list, test_list, n_fft=n_fft, hop_length=hop_length)