<center><h1>Feature Extractor</h1></center>

In [226]:
import os, glob
import pandas as pd
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
from typing import List
from IPython.display import Audio, display, clear_output

In [236]:
DATA_PATH = "challengeA_data"
TRAIN_PATH = "challengeA_data/train"
TEST_PATH = "challengeA_data/test"
TRAIN_METADATA_PATH = "challengeA_data/2022challengeA_train_modified.csv"
TEST_METADATA_PATH = "challengeA_data/2022challengeA_test.csv"

In [121]:
train_df = pd.read_csv(TRAIN_METADATA_PATH, index_col=[0])
train_df = train_df.sample(frac=1).reset_index(drop=True)
train_df.head()

Unnamed: 0,file_id,emotion,encoded_emotion,origin,duration,sample_rate,n_channels
0,08b0b344-3f9d-40a4-a275-424877a6cc3f.wav,angry,0,tess,1.781068,24414.0,1
1,47e77950-a2d6-4abc-9bba-343f28bf9bb7.wav,sadness,5,ravdess,3.937271,48000.0,1
2,a42b6fba-c7b2-4e62-a735-5dc522efeb9b.wav,disgust,1,crema,2.268937,16000.0,1
3,0db38767-1286-42c6-925f-b69cf82fb6d2.wav,neutral,4,crema,3.203187,16000.0,1
4,e59e47b7-f7ed-4c68-a03d-c059fc48d7a0.wav,fear,2,crema,2.202187,16000.0,1


In [227]:
test_df = pd.read_csv(TEST_METADATA_PATH, index_col=[0])
test_df.head()

Unnamed: 0,file_id,origin,predicted_emotion
0,030472df-9d70-4d76-a1a5-acb4c33537d3.wav,crema,
1,ac4720de-e0d9-4667-86a7-4236d410ed25.wav,crema,
2,264928af-cb15-4125-abf7-9408369d83b2.wav,crema,
3,2233ce2b-35ae-483c-9397-1058f681b6ef.wav,crema,
4,472aa1eb-b4dc-452c-84b7-934ed61285da.wav,crema,


In [229]:
emotions = sorted(train_df["emotion"].unique())
emotion_encoder = dict(zip(emotions, np.arange(7)))
emotion_decoder = dict(zip(list(emotion_encoder.values()), list(emotion_encoder.keys())))
emotion_decoder

{0: 'angry',
 1: 'disgust',
 2: 'fear',
 3: 'happy',
 4: 'neutral',
 5: 'sadness',
 6: 'surprise'}

In [228]:
class Util:
    def __init__(self, path, df, sr, n_fft, hop_length, n_mfcc=13, norm_method="min_max"):
        self.sr = sr
        self.path = path 
        self.df = df
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.norm_method = norm_method
        self.n_mfcc = n_mfcc
    
    def get_waveform(self, waveform_file):
        waveform_path = os.path.join(self.path, waveform_file)
        waveform, _ = librosa.load(waveform_path, sr=self.sr)

        return waveform
            
    def fix_length(self, array, target_length):
        """Truncates to obtain the middle of an array or pads an array with zeros
        to achieve a target length.
            :param np.ndarray array: Either a waveform or gram.
            :param int target_length: The target length in number of samples, number of frames.
        """
        assert array.ndim == 1 or array.ndim == 2, f"This function does not accommodate fixing the length\
        of an array with dimensions {array.shape}. The array must either be 1- or 2-dimensional."
        
        modified_array = array.copy()
        
        if array.ndim == 2:
            # Then it must be a gram and target length should then be in number of frames
            num_frames = array.shape[1]
            if num_frames > target_length:
                modified_array = Util.truncate(array, target_length)
            elif num_frames < target_length:
                modified_array = Util.pad_with_zeros(array, target_length)
        elif array.ndim == 1:
            # Then it must be a waveform and the target_length must be in number of samples.
            num_samples = len(array)
            if num_samples > target_length:
                modified_array = Util.truncate(array, target_length)
            elif num_samples < target_length:
                modified_array = Util.pad_with_zeros(array, target_length)
        
        return modified_array
    
    @staticmethod
    def truncate(array, target_length):
        assert array.ndim == 1 or array.ndim == 2, f"This function doesn't accommodate arrays with dimensions\
         {array.shape}. The array must either be 1- or 2-dimensional"
        
        modified_array = array.copy()
        
        if array.ndim == 1:
            samples_to_trunc = len(array) - target_length
            assert samples_to_trunc > 0, "Can't truncate an array whose length is lesser than the target length."
            trunc_left = trunc_right = samples_to_trunc // 2
            # if the number of samples to truncate is odd, then one more sample must be truncated on one of the sides.
            # The side picked is trivial, so I pick the left arbitrarily.
            if samples_to_trunc % 2 == 1:
                trunc_left += 1

            modified_array = array[trunc_left:len(array)-trunc_right]
            assert len(modified_array) == target_length, f"The length of the waveform, {len(modified_array)} is not equal\
            to the target number of samples, {target_length}."
        elif array.ndim == 2:
            num_frames = array.shape[1]
            frames_to_trunc = num_frames - target_length
            assert frames_to_trunc > 0, "Can't truncate an array whose length is lesser than the target length."
            trunc_left = trunc_right = frames_to_trunc // 2
            # if the number of samples to truncate is odd, then one more sample must be truncated on one of the sides.
            # The side picked is trivial, so I pick the left arbitrarily.
            if frames_to_trunc % 2 == 1:
                trunc_left += 1

            modified_array = array[:, trunc_left:num_frames-trunc_right]
            assert modified_array.shape[1] == target_length, f"The length of the waveform, {modified_array.shape[1]} is not equal\
            to the target number of samples, {target_length}."
            
        return modified_array
                
    @staticmethod
    def pad_with_zeros(array, target_length):
        """Pads either a waveform or gram with 0s to the target length.
        In the case of a waveform, target_length is the target number of samples;
        With grams, it's the target number of frames."""
        modified_array = array.copy()
        if array.ndim == 2:
            num_frames = array.shape[1]
            num_frames_to_pad = target_length - num_frames
            # The first tuple in the tuple of tuples indicates the number of rows of zeros
            # to pad on top of and below the gram; the second the number of columns of zeros
            # to place before and after.
            npad = ((0, 0), (0, num_frames_to_pad))
            modified_array = np.pad(array, npad)
        elif array.ndim == 1:
            num_samples = len(array)
            num_samples_to_pad = target_length - num_samples
            modified_array = np.pad(array, (0, num_samples_to_pad))
            
        return modified_array
    
    def get_specgram(self, file):
        """Returns the magnitudes of the fourier coefficients of a short time fourier transform.
        Note that the coefficients have not been converted to decibels."""
        waveform = self.get_waveform(file)
        
        stft = librosa.stft(y=waveform, n_fft=self.n_fft, hop_length=self.hop_length)
        specgram = np.abs(stft)
        specgram = librosa.amplitude_to_db(specgram, ref=np.max)
        
        return specgram
    
    def get_mel_specgram(self, file):
        """Returns log-mel-spectrogram given a file name."""
        waveform = self.get_waveform(file)
        mel_specgram = librosa.feature.melspectrogram(y=waveform, sr=self.sr, 
                                                      n_fft=self.n_fft, hop_length=self.hop_length)
        mel_specgram = librosa.amplitude_to_db(mel_specgram, ref=np.max)
        return mel_specgram
    
    def get_mfccs(self, file, n_mfcc=13):
        mel_specgram = self.get_mel_specgram(file)
        mfccs = librosa.feature.mfcc(S=mel_specgram, n_mfcc=n_mfcc)
        
        # discard the first coefficient
        return mfccs[1:]
    
    def get_combined_mfccs(self, file):
        mfccs = self.get_mfccs(file)
        deltas = librosa.feature.delta(mfccs)
        delta_deltas = librosa.feature.delta(mfccs, order=2)
        
        combined_mfccs = np.concatenate((mfccs, deltas, delta_deltas), axis=0)
        return combined_mfccs
    
    def normalize(self, array, method=None):
        if method == None: method = self.norm_method
        assert method == "min_max" or method == "standard", f"'{method}' is not a recognized normalization method."
        epsilon = 1e-9
        def normalize_(vec, method=method):
            if method == "min_max":
                return (vec - vec.min())/(vec.max()-vec.min() + epsilon)
            elif method == "standard":
                return (vec - vec.mean())/(vec.std()+epsilon)
            
        norm_arr = []
        for layer in array:
            norm_layer = normalize_(layer)
            norm_arr.append(norm_layer)
        
        return np.array(norm_arr, dtype=np.float32)
    
    def get_label(self, file):
        return self.df[self.df["file_id"] == file]["encoded_emotion"].values[0]

<h1>Saving features to file</h1>

In [240]:
class SavingUtil(Util):
    emotions = sorted(train_df["emotion"].unique())
    emotion_encoder = dict(zip(emotions, np.arange(7)))
    emotion_decoder = dict(zip(list(emotion_encoder.values()), list(emotion_encoder.keys())))
    _accepted_features = ["mel_spectrogram", "spectrogram", "mfccs", "combined_mfccs"]
    
    def __init__(self, train_path=TRAIN_PATH,
                 df=train_df,
                 sr=16000, 
                 target_duration=4,
                 n_fft=512, 
                 hop_length=512, 
                 n_mfcc=13, 
                 norm_method="min_max"):
        super().__init__(train_path=train_path, df=df, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mfcc=n_mfcc, norm_method=norm_method)
        self.target_duration = target_duration # seconds
        
        # used for fixing the length of waveforms
        self.target_samples = target_duration * sr 
        
        # used for fixing the length of grams
        self.target_frames = (sr * target_duration - n_fft)//hop_length + 1 
        
    # wrapper for Util's get_mel_specgram() method     
    def get_mel_specgram(self, file):
        mel_specgram = super().get_mel_specgram(file)
        mel_specgram = super().normalize(mel_specgram, method=self.norm_method)
        mel_specgram = super().fix_length(mel_specgram, self.target_frames)
        return mel_specgram
    
    # Wrapper for Util's get_mfccs() method
    def get_mfccs(self, file):
        """Gets the mfccs of the file, normalizes them, and then fixes their length."""
        # for some reason was causing issues when calling super().get_mfccs() so I just compute the mfccs
        # from the spectrogram. 
        mel_specgram = super().get_mel_specgram(file)
        mfccs = librosa.feature.mfcc(S=mel_specgram, n_mfcc=self.n_mfcc)
        mfccs = super().normalize(mfccs, method=self.norm_method)
        mfccs = super().fix_length(mfccs, self.target_frames)
        
        return mfccs[1:]
    
    # Wrapper for Util's get_combined_mfccs() method 
    def get_combined_mfccs(self, file):
        # Had the same issue with this function as the one above, so I just compute the combined
        # mfccs here as opposed to in the parent class.
        mel_specgram = super().get_mel_specgram(file)
        mfccs = librosa.feature.mfcc(S=mel_specgram, n_mfcc=self.n_mfcc)
        mfccs = mfccs[1:]
        deltas = librosa.feature.delta(mfccs)
        delta_deltas = librosa.feature.delta(mfccs, order=2)
        combined_mfccs = np.concatenate((mfccs, deltas, delta_deltas), axis=0)
        combined_mfccs = super().normalize(combined_mfccs, method=self.norm_method)
        combined_mfccs = super().fix_length(combined_mfccs, self.target_frames)
        
        return combined_mfccs 
        
    def save_to_file(self, file, array, feature_save_dir, category):
        """Saves the file to the feature save directory as a .npy file.

            :param str file: - File name with .wav extension
            :param np.ndarray array: The array of data to be saved
            :param str feature_save_dir: the relative path to the save directory for the features
            :param str category: The class of the label. Used to determine in which class subdirectory the file 
                should be saved.
        """
        # removing the extension
        file = file.replace(".wav", ".npy")

        save_dir = os.path.join(os.getcwd(), feature_save_dir, category)
        save_path = os.path.join(save_dir, file)

        # Creates a directory for a particular feature type
        if not os.path.isdir(feature_save_dir):
            os.mkdir(feature_save_dir)

        if not os.path.isdir(save_dir):
            os.mkdir(save_dir)

        # The absolute path must be used for saving for whatever reason.
        np.save(save_path, array) # The format of the file is .npy, which is a binary file
        
    # modification of above function for testing data
    def save_test_features_to_file(self, file, array, feature_save_dir):
        """Saves the file to the feature save directory as a .npy file.
            :param str file: - File name with .wav extension
            :param np.ndarray array: The array of data to be saved
            :param str feature_save_dir: the relative path to the save directory for the features
        """
        # removing the extension
        file = file.replace(".wav", ".npy")

        save_dir = os.path.join(os.getcwd(), feature_save_dir)
        save_path = os.path.join(save_dir, file)

        # Creates a directory for a particular feature type
        if not os.path.isdir(feature_save_dir):
            os.mkdir(feature_save_dir)

        # The absolute path must be used for saving for whatever reason.
        np.save(save_path, array) # The format of the file is .npy, which is a binary file
    
    @staticmethod
    def get_num_files_in_directory(directory, suffix, excluded: List[str]=None):
        total = 0
        for root, dirs, files in os.walk(directory):
            total += len([file for file in files if file.endswith(suffix) and not any([file.endswith(sub) for sub in excluded])])
        return total
    
    @staticmethod
    def get_noise_files(directory, suffix, no_augmented=True):
        total = 0
        for root, dirs, files in os.walk(directory):
            if no_augmented:
                total += len([file for file in files if file.endswith(suffix) and not (file.endswith("_1.npy") or file.endswith("_2.npy") or file.endswith("_3.npy") or file.endswith("_noise.npy"))])
            else:
                total += len([file for file in files if file.endswith(suffix)])
        return total

    def save_files(self, files, data_path, feature_type):
        """Saves files in batches in case saving gets disrupted midway. It only accommodates non-augmented data.
            - files: List of .wav file names
            - data_path: The relative path to the feature's save directory
            - feature_type: The type of feature to extract
        """
        assert feature_type in SavingUtil._accepted_features, f"'{feature_type}' is not a recognized feature type."
        feature_save_dir = os.path.join(data_path, feature_type)
        # sort the files so that if saving is stopped before completion, it's easy to pick up from where it left off.
        files = sorted(files)
        num_files = len(files)
        excluded=["_1.npy", "_2.npy", "_3.npy", "_noise.npy"]
        num_files_already_processed = SavingUtil.get_num_files_in_directory(feature_save_dir, ".npy", excluded)
        num_files_left_to_process = num_files - num_files_already_processed
        batch_size = 10
        num_batches = num_files_left_to_process // batch_size
        # if the number of batches * the batch_size doesn't capture all the data, then
        # add an additional batch which will accommodate the leftover files
        if num_batches * batch_size < num_files_left_to_process:
            num_batches += 1
        
        files_processed = 0
        # start processing files from the index after the last file that was processed, which is essentially
        # the number of files already processed because of 0-based indexing.
        for batch in range(num_batches):
            start_index = num_files_already_processed + batch * batch_size
            for file_index in range(start_index, min(num_files, start_index + batch_size)):
                file = files[file_index]
                encoded_emotion = super().get_label(file)
                decoded_emotion = SavingUtil.emotion_decoder[encoded_emotion]
                
                if feature_type == "mel_spectrogram":
                    features = self.get_mel_specgram(file)
                elif feature_type == "mfccs":
                    features = self.get_mfccs(file)
                elif feature_type == "combined_mfccs":
                    features = self.get_combined_mfccs(file)
                elif feature_type == "spectrogram":
                    features = self.get_spectrogram(file)
                    
                self.save_to_file(file, features, feature_save_dir, decoded_emotion)
                files_processed += 1
                
            portion_complete = round((files_processed / num_files_left_to_process)*100, 2)
            clear_output(wait=True)
            display(f"Saving files to {feature_save_dir}" + '.'*(batch%3+1))
            display(f"Progress: {portion_complete}%")
            
        print("Complete!")
        
    # modification of above function for testing data
    def save_test_files(self, files, data_path, feature_type):
        """Saves test files in batches in case saving gets disrupted midway. It only accommodates non-augmented data.
            - files: List of .wav file names
            - data_path: The relative path to the feature's save directory
            - feature_type: The type of feature to extract
        """
        assert feature_type in SavingUtil._accepted_features, f"'{feature_type}' is not a recognized feature type."
        if not os.path.isdir(os.path.join(data_path, "test_features")):
            os.mkdir(os.path.join(data_path, "test_features"))
        feature_save_dir = os.path.join(data_path, "test_features", feature_type)
        # sort the files so that if saving is stopped before completion, it's easy to pick up from where it left off.
        files = sorted(files)
        num_files = len(files)
        num_files_already_processed = SavingUtil.get_num_files_in_directory(feature_save_dir, ".npy")
        num_files_left_to_process = num_files - num_files_already_processed
        batch_size = 10
        num_batches = num_files_left_to_process // batch_size
        # if the number of batches * the batch_size doesn't capture all the data, then
        # add an additional batch which will accommodate the leftover files
        if num_batches * batch_size < num_files_left_to_process:
            num_batches += 1
        
        files_processed = 0
        # start processing files from the index after the last file that was processed, which is essentially
        # the number of files already processed because of 0-based indexing.
        for batch in range(num_batches):
            start_index = num_files_already_processed + batch * batch_size
            for file_index in range(start_index, min(num_files, start_index + batch_size)):
                file = files[file_index]
                
                if feature_type == "mel_spectrogram":
                    features = self.get_mel_specgram(file)
                elif feature_type == "mfccs":
                    features = self.get_mfccs(file)
                elif feature_type == "combined_mfccs":
                    features = self.get_combined_mfccs(file)
                elif feature_type == "spectrogram":
                    features = self.get_spectrogram(file)
                    
                self.save_test_features_to_file(file, features, feature_save_dir)
                files_processed += 1
                
            portion_complete = round((files_processed / num_files_left_to_process)*100, 2)
            clear_output(wait=True)
            display(f"Saving files to {feature_save_dir}" + '.'*(batch%3+1))
            display(f"Progress: {portion_complete}%")
            
        print("Complete!")
        
    def save_augmented_files(self, files, data_path, feature_type):
        """Essentially the same as the save_files() method, but it's modified to work for only augmented data – the noisy waveforms."""
        """Saves files in batches in case saving gets disrupted midway.
            - files: List of .wav file names
            - data_path: The relative path to the feature's save directory
            - feature_type: The type of feature to extract
        """
        assert feature_type in SavingUtil._accepted_features, f"'{feature_type}' is not a recognized feature type."
        feature_save_dir = os.path.join(data_path, feature_type)
        # sort the files so that if saving is stopped before completion, it's easy to pick up from where it left off.
        files = sorted(files)
        num_files = len(files)
        num_files_already_processed = SavingUtil.get_num_files_in_directory(feature_save_dir, "_noise.npy")
        num_files_left_to_process = num_files - num_files_already_processed
        batch_size = 10
        num_batches = num_files_left_to_process // batch_size
        # if the number of batches * the batch_size doesn't capture all the data, then
        # add an additional batch which will accommodate the leftover files
        if num_batches * batch_size < num_files_left_to_process:
            num_batches += 1
        
        files_processed = 0
        # start processing files from the index after the last file that was processed, which is essentially
        # the number of files already processed because of 0-based indexing.
        for batch in range(num_batches):
            start_index = num_files_already_processed + batch * batch_size
            for file_index in range(start_index, min(num_files, start_index + batch_size)):
                file = files[file_index]
                encoded_emotion = super().get_label(file)
                decoded_emotion = SavingUtil.emotion_decoder[encoded_emotion]
                
                if feature_type == "mel_spectrogram":
                    features = self.get_mel_specgram(file)
                elif feature_type == "mfccs":
                    features = self.get_mfccs(file)
                elif feature_type == "combined_mfccs":
                    features = self.get_combined_mfccs(file)
                elif feature_type == "spectrogram":
                    features = self.get_spectrogram(file)
                    
                # Adding in the suffix _noise so that the file can be differentiated from the original data
                # when saved to file. 
                file = file.replace(".wav", "") + "_noise" + ".wav"
                self.save_to_file(file, features, feature_save_dir, decoded_emotion)
                files_processed += 1
                
            portion_complete = round((files_processed / num_files_left_to_process)*100, 2)
            clear_output(wait=True)
            display(f"Saving files to {feature_save_dir}" + '.'*(batch%3+1))
            display(f"Progress: {portion_complete}%")
            
        print("Complete!")

<h3>Mel-spectrograms</h3>

In [249]:
files = list(train_df["file_id"])
util = SavingUtil()
feature_type = "mel_spectrogram"
util.save_files(files, DATA_PATH, feature_type)

Complete!


In [241]:
# testing data
test_files = list(test_df["file_id"])
util = SavingUtil(TEST_PATH)
feature_type = "mel_spectrogram"
util.save_test_files(test_files, DATA_PATH, feature_type)

'Saving files to challengeA_data/test_features/mel_spectrogram.'

'Progress: 100.0%'

Complete!


<h3>MFCCs</h3>

In [352]:
files = list(train_df["file_id"])
util = SavingUtil()
feature_type = "mfccs"
util.save_files(files, DATA_PATH, feature_type)

'Saving files to challengeA_data/mfccs...'

'Progress: 100.0%'

Complete!


<h3>MFCCs with deltas and delta-deltas</h3>

In [373]:
files = list(train_df["file_id"])
util = SavingUtil()
feature_type = "combined_mfccs"
util.save_files(files, DATA_PATH, feature_type)

'Saving files to challengeA_data/combined_mfccs...'

'Progress: 100.0%'

Complete!


<h3>Extracting mel-spectrograms from the augmented data</h3>

In the data augmentation section, I created some examples from the raw audio by adding noise. I want to convert those audio files into mel-spectrograms, so that they can be loaded efficiently in the final pipeline. I will make some modifications to the SavingUtil.savefiles() method so that this can be achieved.

In [410]:
files = glob.glob("challengeA_data/train/augmented_data/*.wav")
files = [file.split('/')[-1] for file in files]
util = SavingUtil()
feature_type = "mel_spectrogram"
util.save_augmented_files(files, DATA_PATH, feature_type)

'Saving files to challengeA_data/mel_spectrogram...'

'Progress: 100.0%'

Complete!


<h1>Extracting static features</h1>

<h2>Training data</h2>

This feature extraction method is a modification of a feature extraction method from [this](https://github.com/Renovamen/Speech-Emotion-Recognition) github page. <br>
<br>
The idea of using mel-energy spectrum dynamic coefficients came from [this](https://www.researchgate.net/publication/43785303_Speech_Emotion_Recognition_Using_Support_Vector_Machines) paper; and the use of the fundamental frequency from [this](https://ieeexplore.ieee.org/document/6512793).

In [220]:
def features(X, sample_rate: float) -> np.ndarray:
    """X is an amplitude time series"""
    frame_length = 512
    hop_length = frame_length // 4
    ext_features = {}
    stft = np.abs(librosa.stft(X, n_fft=frame_length, hop_length=hop_length))

    pitches, magnitudes = librosa.piptrack(y=X, sr=sample_rate, S=stft, fmin=70, fmax=400)
    pitch = []
    for i in range(magnitudes.shape[1]):
        index = magnitudes[:, 1].argmax()
        pitch.append(pitches[index, i])

    pitch_tuning_offset = librosa.pitch_tuning(pitches)
    pitchmean = np.mean(pitch)
    pitchstd = np.std(pitch)
    pitchmax = np.max(pitch)
    pitchmin = np.min(pitch)
    ext_features["pitch_mean"] = pitchmean
    ext_features["pitch_std"] = pitchstd
    ext_features["pitch_max"] = pitchmax
    ext_features["pitch_min"] = pitchmin
    
    # mean fundamental frequency
    f0 = librosa.yin(X, sr = sr, fmin = librosa.note_to_hz('C2'), fmax= librosa.note_to_hz('C7'), frame_length=512)
    f0_mean = np.mean(f0)
    ext_features["f0_mean"] = f0_mean

    cent = librosa.feature.spectral_centroid(y=X, sr=sample_rate)
    cent = cent / np.sum(cent)
    meancent = np.mean(cent)
    stdcent = np.std(cent)
    maxcent = np.max(cent)
    ext_features["cent_mean"] = meancent
    ext_features["cent_std"] = stdcent
    ext_features["cent_max"] = maxcent

    flatness = np.mean(librosa.feature.spectral_flatness(y=X))
    ext_features["flatness"] = flatness
    
    mfccs = librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=13)
    mfcc_mean = np.mean(mfccs.T, axis=0)
    for i, val in enumerate(mfcc_mean): ext_features["mfcc_mean_"+str(i)] = val
    mfccsstd = np.std(mfccs.T, axis=0)
    for i, val in enumerate(mfccsstd): ext_features["mfcc_std_"+str(i)] = val
    mfccmax = np.max(mfccs.T, axis=0)
    for i, val in enumerate(mfccmax): ext_features["mfcc_max_"+str(i)] = val
        
    # Mel-energy spectrum dynamic coefficients. From DOI:10.1007/978-3-642-21402-8_35
    medc = np.mean(mfccs, axis=1)
    for i, val in enumerate(medc): ext_features["medc_"+str(i)] = val
    medc_deltas = librosa.feature.delta(medc)
    for i, val in enumerate(medc_deltas): ext_features["medc_delta1_"+str(i)] = val
    medc_delta_deltas = librosa.feature.delta(medc, order=2)
    for i, val in enumerate(medc_delta_deltas): ext_features["medc_delta2_"+str(i)] = val

    chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T, axis=0)
    for i, val in enumerate(chroma): ext_features["chroma_"+str(i)] = val

    mel = np.mean(librosa.feature.melspectrogram(y=X, sr=sample_rate).T, axis=0)
    for i, val in enumerate(mel): ext_features["mel_"+str(i)] = val

    contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=sample_rate).T, axis=0)
    for i, val in enumerate(contrast): ext_features["contrast_"+str(i)] = val

    zerocr = np.mean(librosa.feature.zero_crossing_rate(X))
    ext_features["zero_crossing_rate"] = zerocr

    S, phase = librosa.magphase(stft)
    meanMagnitude = np.mean(S)
    ext_features["magnitude_mean"] = meanMagnitude
    stdMagnitude = np.std(S)
    ext_features["magnitude_std"] = stdMagnitude
    maxMagnitude = np.max(S)
    ext_features["magnitude_max"] = maxMagnitude

    rms = librosa.feature.rms(S=S, frame_length=frame_length, hop_length=hop_length)[0]
    meanrms = np.mean(rms)
    ext_features["rms_mean"] = meanrms
    stdrms = np.std(rms)
    ext_features["rms_std"] = stdrms
    maxrms = np.max(rms)
    ext_features["rms_max"] = maxrms
    
    return ext_features

def get_label(file, df):
    return df[df["file_id"] == file]["encoded_emotion"].values[0]

In [199]:
files = glob.glob("challengeA_data/train/*.wav")

df_list = []
num_files = len(files)
batch_size = 50
num_batches = num_files // batch_size
if num_batches * batch_size < num_files:
    num_batches += 1
files_processed = 0
for batch in range(num_batches):
    start_index = batch*batch_size
    for file_index in range(start_index, min(num_files, start_index+batch_size)):
        file = files[file_index]
        file_id = file.split('/')[-1]
        label = get_label(file_id, train_df)
        X, sr = librosa.load(file, sr=16000)
        ext_features = features(X, sr)
        ext_features["encoded_emotion"] = label
        ext_features["file_id"] = file_id
        df_list.append(ext_features)
        files_processed += 1
        
    clear_output(wait=True)
    progress = files_processed/num_files*100
    display(f"Extracting static features"+'.'*(batch%3+1))
    display(f"Progress: {round(progress, 2)}")

ext_features_df = pd.DataFrame(df_list)

'Extracting static features..'

'Progress: 100.0'

In [205]:
ext_features_df

Unnamed: 0,pitch_mean,pitch_std,pitch_max,pitch_min,f0_mean,cent_mean,cent_std,cent_max,flatness,mfcc_mean_0,...,contrast_6,zero_crossing_rate,magnitude_mean,magnitude_std,magnitude_max,rms_mean,rms_std,rms_max,encoded_emotion,file_id
0,21.363703,39.539577,108.381409,0.0,201.633808,0.015625,0.003854,0.030538,0.009262,-264.383972,...,16.490656,0.069748,0.343575,1.960476,108.180984,0.055346,0.068585,0.396134,3,03d02c0b-599b-41ac-bcea-efce75e1a64d.wav
1,0.000000,0.000000,0.000000,0.0,172.674204,0.013889,0.007211,0.037036,0.037028,-446.079895,...,15.031881,0.109002,0.027601,0.071469,1.569534,0.002847,0.001841,0.009539,2,8f6d2894-bb3f-4fee-8ee6-8c6f74fa2d88.wav
2,24.783173,64.267929,202.607910,0.0,155.116020,0.011494,0.005017,0.025381,0.023962,-386.664032,...,14.833075,0.078260,0.040636,0.131825,3.759614,0.005263,0.003013,0.018523,1,e06de6b7-5fcc-4d1b-81ea-8e37a7e2db11.wav
3,2.909268,18.647245,133.430908,0.0,413.060910,0.015873,0.007603,0.038214,0.027647,-303.695984,...,30.097169,0.201381,0.264241,1.047829,20.467342,0.033354,0.034311,0.129928,0,4557ddd7-2779-4fef-bc82-7ec3fcdb9199.wav
4,8.907910,54.515553,351.490479,0.0,573.440503,0.014706,0.007923,0.030084,0.048945,-340.717773,...,29.568332,0.261274,0.174153,0.649212,19.466547,0.020545,0.021535,0.094402,0,4e61d5d1-04c3-494d-bb9b-061def5cfe7e.wav
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
10105,22.562132,61.431889,200.441757,0.0,564.067067,0.015873,0.008028,0.034278,0.040569,-320.214905,...,28.761959,0.234739,0.182952,0.644933,17.329002,0.022467,0.019401,0.075627,0,95433daf-9b23-4725-b76c-a7cb0e4a1474.wav
10106,24.003160,49.084866,140.544632,0.0,201.604130,0.014085,0.008569,0.039060,0.035927,-350.843719,...,15.298714,0.141801,0.099290,0.316607,7.413186,0.011161,0.009548,0.042489,5,03672751-408f-4a63-a731-756910195b97.wav
10107,17.383577,54.917084,202.148438,0.0,145.836110,0.015152,0.005121,0.042695,0.008157,-405.525604,...,13.622684,0.052512,0.040902,0.158767,6.231894,0.005977,0.004056,0.024115,4,a95aa915-334e-4909-813a-179cb1a48085.wav
10108,79.311813,105.619774,234.317245,0.0,314.145166,0.012658,0.007323,0.027025,0.048782,-326.102173,...,14.962163,0.229566,0.080395,0.219080,7.538439,0.007969,0.006542,0.037295,1,002c87b5-52b1-4d87-991a-b4d3915317b2.wav


In [204]:
ext_features_df[ext_features_df.isna().any(axis=1)]

Unnamed: 0,pitch_mean,pitch_std,pitch_max,pitch_min,f0_mean,cent_mean,cent_std,cent_max,flatness,mfcc_mean_0,...,contrast_6,zero_crossing_rate,magnitude_mean,magnitude_std,magnitude_max,rms_mean,rms_std,rms_max,encoded_emotion,file_id
1661,0.0,0.0,0.0,0.0,2285.714286,,,,1.000001,-1131.370972,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5,436ff098-f41f-482e-8662-ec270f535b90.wav


There are some null values. I will impute them by using the averages of the columns. Although this method is not advisable since it can alter the variance of the dataset, using methods like regression or a KNN would be overkill since it's only one instance that will be affected; thus, it will have a negligble effect on the dataset.

In [206]:
ext_features_df = ext_features_df.fillna(ext_features_df.mean())

  ext_features_df = ext_features_df.fillna(ext_features_df.mean())


In [210]:
ext_features_df[ext_features_df.isna().any(axis=1)]

Unnamed: 0,pitch_mean,pitch_std,pitch_max,pitch_min,f0_mean,cent_mean,cent_std,cent_max,flatness,mfcc_mean_0,...,contrast_6,zero_crossing_rate,magnitude_mean,magnitude_std,magnitude_max,rms_mean,rms_std,rms_max,encoded_emotion,file_id


The null values have successfully been imputed.

I want the file_id and the encoded emotion to be at the beginning of the dataframe, so I reverse the order of columns.

In [211]:
cols = ext_features_df.columns.to_list()
cols = cols[::-1] # reverse the column order
ext_features_df = ext_features_df[cols]
ext_features_df.head()

Unnamed: 0,file_id,encoded_emotion,rms_max,rms_std,rms_mean,magnitude_max,magnitude_std,magnitude_mean,zero_crossing_rate,contrast_6,...,mfcc_mean_0,flatness,cent_max,cent_std,cent_mean,f0_mean,pitch_min,pitch_max,pitch_std,pitch_mean
0,03d02c0b-599b-41ac-bcea-efce75e1a64d.wav,3,0.396134,0.068585,0.055346,108.180984,1.960476,0.343575,0.069748,16.490656,...,-264.383972,0.009262,0.030538,0.003854,0.015625,201.633808,0.0,108.381409,39.539577,21.363703
1,8f6d2894-bb3f-4fee-8ee6-8c6f74fa2d88.wav,2,0.009539,0.001841,0.002847,1.569534,0.071469,0.027601,0.109002,15.031881,...,-446.079895,0.037028,0.037036,0.007211,0.013889,172.674204,0.0,0.0,0.0,0.0
2,e06de6b7-5fcc-4d1b-81ea-8e37a7e2db11.wav,1,0.018523,0.003013,0.005263,3.759614,0.131825,0.040636,0.07826,14.833075,...,-386.664032,0.023962,0.025381,0.005017,0.011494,155.11602,0.0,202.60791,64.267929,24.783173
3,4557ddd7-2779-4fef-bc82-7ec3fcdb9199.wav,0,0.129928,0.034311,0.033354,20.467342,1.047829,0.264241,0.201381,30.097169,...,-303.695984,0.027647,0.038214,0.007603,0.015873,413.06091,0.0,133.430908,18.647245,2.909268
4,4e61d5d1-04c3-494d-bb9b-061def5cfe7e.wav,0,0.094402,0.021535,0.020545,19.466547,0.649212,0.174153,0.261274,29.568332,...,-340.717773,0.048945,0.030084,0.007923,0.014706,573.440503,0.0,351.490479,54.515553,8.90791


I'll now save the features as a .csv file so that they can be used both for analysis and training later.

In [212]:
ext_features_df.to_csv("challengeA_data/ext_features.csv")

<h2>Extracting the static features from the testing data</h2>

In [221]:
files = glob.glob("challengeA_data/test/*.wav")

df_list = []
num_files = len(files)
batch_size = 50
num_batches = num_files // batch_size
if num_batches * batch_size < num_files:
    num_batches += 1
files_processed = 0
for batch in range(num_batches):
    start_index = batch*batch_size
    for file_index in range(start_index, min(num_files, start_index+batch_size)):
        file = files[file_index]
        file_id = file.split('/')[-1]
        X, sr = librosa.load(file, sr=16000)
        ext_features = features(X, sr)
        ext_features["file_id"] = file_id
        df_list.append(ext_features)
        files_processed += 1
        
    clear_output(wait=True)
    progress = files_processed/num_files*100
    display(f"Extracting static features"+'.'*(batch%3+1))
    display(f"Progress: {round(progress, 2)}")

test_ext_features_df = pd.DataFrame(df_list)

'Extracting static features.'

'Progress: 100.0'

In [224]:
cols = test_ext_features_df.columns
cols = cols[::-1]
test_ext_features_df = test_ext_features_df[cols]
test_ext_features_df.head()

Unnamed: 0,file_id,rms_max,rms_std,rms_mean,magnitude_max,magnitude_std,magnitude_mean,zero_crossing_rate,contrast_6,contrast_5,...,mfcc_mean_0,flatness,cent_max,cent_std,cent_mean,f0_mean,pitch_min,pitch_max,pitch_std,pitch_mean
0,75e70b60-343d-409f-a956-bf900f0539e8.wav,0.030247,0.005467,0.006978,7.646789,0.19208,0.059084,0.095326,14.980575,15.734339,...,-369.9953,0.030664,0.033477,0.006532,0.01087,144.252273,0.0,170.727966,68.689148,41.313015
1,823ca09a-648d-46ee-bfd7-069a35baeb92.wav,0.250561,0.043656,0.029509,65.521172,1.167429,0.231272,0.109998,14.094052,17.068421,...,-276.970001,0.033838,0.033964,0.005954,0.014493,240.167107,0.0,202.795929,70.466469,31.061205
2,fc3342c6-5c5d-4e7d-9210-b425dfae9408.wav,0.034399,0.009672,0.009711,9.812358,0.305776,0.051465,0.134193,31.265947,17.917684,...,-438.711243,0.023048,0.050212,0.013555,0.017241,567.144093,0.0,0.0,0.0,0.0
3,a458a4f0-0500-4d65-b8ad-131e21fe528e.wav,0.038927,0.005907,0.008298,8.294876,0.224092,0.054525,0.0563,14.647235,15.893116,...,-382.502869,0.009365,0.04051,0.005021,0.011628,167.822575,0.0,108.398911,36.647083,17.988691
4,b55ea059-c5ed-433e-9bdd-13d9ce24b3ad.wav,0.112187,0.02182,0.016809,28.510019,0.611545,0.115927,0.071067,14.089204,17.714378,...,-343.037231,0.00947,0.033004,0.003945,0.012658,200.973899,0.0,202.232788,83.071732,50.256897


In [225]:
test_ext_features_df.to_csv("challengeA_data/test_ext_features.csv")