In [1]:
%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import torch
import torchaudio
from keras.models import *
from keras.layers import *
from keras.callbacks import EarlyStopping, History
from keras.optimizers import Adadelta
import os
import copy
from livelossplot import PlotLossesKeras
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
import tensorflow_addons as tfa
tf.config.list_physical_devices()


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 

 The versions of TensorFlow you are currently using is 2.10.1 and is not supported. 
Some things might work, some things might not.
If you were to encounter a bug, do not file an issue.
If you want to make sure you're using a tested and supported configuration, either change the TensorFlow version or the TensorFlow Addons's version. 
You can find the compatibility matrix in TensorFlow Addon's readme:
https://github.com/tensorflow/addons


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'),
 PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [2]:
#LOAD AUDIOS
def load_audios(folder_path):
    LOADED = {}
    samplerate = torchaudio.load(folder_path+'/'+os.listdir(folder_path)[0])[1]

    for filename in tqdm(os.listdir(folder_path), desc='LOAD AUDIOS'):
        if not filename.startswith('.'):
            audio = torchaudio.load(folder_path+'/'+filename, normalize=True)
            LOADED[filename] = audio[0][0]
        if audio[1] != samplerate:
            return 'ERROR: All audios in folder must have the same samplerate.'
    return LOADED, samplerate

#RESAMPLE
def resampling(audio_dict, original_samplerate, new_samplerate):
    if (new_samplerate == original_samplerate) or (new_samplerate == None):
        return audio_dict, original_samplerate
    else:
        resample = torchaudio.transforms.Resample(orig_freq=original_samplerate, new_freq=new_samplerate)
        for filename, audio in tqdm(audio_dict.items(), desc='RESAMPLING'):
            audio_dict[filename] = resample(audio)
        return audio_dict, new_samplerate

#Fragment audios FILL WITH ZEROS
def fragment_audio(audio_dict, samplerate, time=1):
    for filename, audio in tqdm(audio_dict.items(),desc='FRAGMENT AUDIOS'):
        fill = len(audio)-samplerate*time
        if fill >0:
            cut_clean = int(len(audio) - len(audio)%(samplerate*time))
            audio = audio[:cut_clean]
        else:
            audio = torch.cat((audio,torch.zeros((abs(fill),))),0)
        n_clips = int(len(audio)/(samplerate*time))
        audio = audio.reshape([n_clips, int(samplerate*time)])
        audio_dict[filename] = np.array(audio)
    return audio_dict, time

#Convert to Mel Spectrogram
def MELspectrogram(audio_dict, samplerate):
    audio_dict = copy.deepcopy(audio_dict)
    n_mels = 128
    n_fft = int(samplerate*0.029)
    hop_length = int(samplerate*0.010)
    win_length=int(samplerate*0.025)

    for filename, waveform in tqdm(audio_dict.items(), desc='MELSPECTROGRAM'):
        waveform = torch.from_numpy(waveform)
        spec = torchaudio.transforms.MelSpectrogram(sample_rate=samplerate, n_fft=n_fft, n_mels=n_mels, hop_length=hop_length,win_length=win_length)(waveform); waveform

        spec = torchaudio.transforms.AmplitudeToDB()(spec);spec
        spec = spec.numpy();spec
        spec = (spec - spec.min()) / (spec.max() - spec.min()); spec
        #spec = spec.astype('uint8'); spec
        audio_dict[filename] = spec
    return audio_dict

#Train Test split AUDIO
def train_test(audio_dict):
    df = pd.read_csv('Dataset.csv', usecols=['Participant_ID','PHQ-9 Score',],dtype={1:str})
    df['labels'] = np.zeros([len(df),],dtype=int)
    df.loc[df['PHQ-9 Score'] <10, 'labels'] = 0
    df.loc[df['PHQ-9 Score'] >=10, 'labels'] = 1
    train_labels, test_labels = train_test_split(df, test_size=0.2, train_size=0.8,shuffle=False)

    train_labels = train_labels.set_index('Participant_ID').to_dict()['labels']
    test_labels =  test_labels.set_index('Participant_ID').to_dict()['labels']

    X_train = []
    Y_train = []
    X_test = []
    Y_test = []

    for filename, data in tqdm(audio_dict.items(), 'LABEL'):
        ID = filename[:3]
        if ID in train_labels:
            dep = 0 if train_labels[ID]==0 else 1
            [X_train.append(x) for x in data]
            [Y_train.append(dep) for x in data]
        if ID in test_labels:
            dep = 0 if test_labels[ID]==0 else 1
            [X_test.append(x) for x in data]
            [Y_test.append(dep) for x in data]

    X_train = tf.convert_to_tensor(X_train)
    Y_train = tf.convert_to_tensor(Y_train)
    X_test = tf.convert_to_tensor(X_test)
    Y_test = tf.convert_to_tensor(Y_test)
    return X_train, Y_train, X_test, Y_test

def train_test2(audio_dict):
    train_set = {116: 1, 148: 5, 112: 5, 152: 5, 5: 7, 149: 12, 3: 3, 128: 11, 138: 14, 9: 21, 134: 6, 106: 24, 142: 4, 139: 8, 114: 23, 146: 10, 151: 4, 118: 8, 130: 5, 135: 9, 4: 13, 137: 10, 143: 11, 133: 11, 140: 4, 147: 3, 153: 19, 119: 1, 121: 13, 8: 12, 107: 5, 132: 20, 103: 15, 136: 19, 117: 7, 129: 2, 123: 10, 122: 15, 131: 13, 102: 6, 145: 5}

    test_set =  {111: 16, 115: 2, 108: 14, 120: 12, 113: 12, 124: 11, 125: 10, 126: 2, 144: 3, 141: 2, 127: 9}

    X_train = []
    Y_train = []
    X_test = []
    Y_test = []

    for filename, data in tqdm(audio_dict.items(), 'LABEL'):
        ID = int(filename[:3])
        if ID in train_set:
            dep = 0 if train_set[ID]< 10 else 1
            [X_train.append(x) for x in data]
            [Y_train.append(dep) for x in data]
        if ID in test_set:
            dep = 0 if test_set[ID]<10 else 1
            [X_test.append(x) for x in data]
            [Y_test.append(dep) for x in data]

    X_train = tf.convert_to_tensor(X_train)
    Y_train = tf.convert_to_tensor(Y_train)
    X_test = tf.convert_to_tensor(X_test)
    Y_test = tf.convert_to_tensor(Y_test)
    
    return X_train, Y_train, X_test, Y_test

#XY Split Depression
def XY_dep(audio_dict, Gender = None):
    df = pd.read_csv('Dataset.csv', usecols=['Participant_ID','PHQ-9 Score','Gender'],dtype={1:str})
    df = df[df.Gender == Gender] if Gender != None else df
    df['labels'] = np.zeros([len(df),],dtype=int)
    df.loc[df['PHQ-9 Score'] <10, 'labels'] = 0
    df.loc[df['PHQ-9 Score'] >=10, 'labels'] = 1

    labels = df.set_index('Participant_ID').to_dict()['labels']
    X = []
    Y = []

    for filename, data in tqdm(audio_dict.items(), 'LABEL'):
        ID = filename[:3]
        dep = 0 if labels[ID]==0 else 1
        [X.append(x) for x in data]
        [Y.append(dep) for x in data]
        
    X = tf.convert_to_tensor(X)
    Y = tf.convert_to_tensor(Y)
    return X, Y

#XY Split by gender
def XY_gender(audio_dict, Gender = None):
    df = pd.read_csv('Dataset.csv', usecols=['Participant_ID','PHQ-9 Score','Gender'],dtype={1:str})
    df['labels'] = np.zeros([len(df),],dtype=int)
    df.loc[df['Gender'] =='Male', 'labels'] = 0
    df.loc[df['Gender'] == 'Female', 'labels'] = 1

    labels = df.set_index('Participant_ID').to_dict()['labels']
    X = []
    Y = []

    for filename, data in tqdm(audio_dict.items(), 'LABEL'):
        ID = filename[:3]
        dep = 0 if labels[ID]==0 else 1
        [X.append(x) for x in data]
        [Y.append(dep) for x in data]

    X = tf.convert_to_tensor(X)
    Y = tf.convert_to_tensor(Y)

    return X, Y

#Define F1 score as metric
F1 = tfa.metrics.F1Score(1, threshold=0.5)

In [3]:
def CNN_LF(X_train,Y_train, X_test, Y_test, time, epochs, verbose=0, optimizer= 'adadelta'):
    model = Sequential()
    model.add(Resizing(128, 10 *time,input_shape=X_train[0].shape))
    #model.add(Resizing(1, 1 *time,input_shape=X_train[0].shape))
    model.add(Conv2D(30, (3, 3), strides=1, padding="same", activation="relu"))
    model.add(Dropout(0.2))
    model.add(BatchNormalization())
    model.add(MaxPool2D((2, 2), strides=2, padding="same"))
    model.add(Conv2D(15, (3, 3), strides=1, padding="same", activation="relu"))
    model.add(Dropout(0.2)) #LF
    model.add(BatchNormalization())
    model.add(MaxPool2D((2, 2), strides=2, padding="same"))
    model.add(Flatten())
    #model.add(Dense(units=512, activation="relu"))
    model.add(Dense(units=256, activation="relu")) #LF
    model.add(Dropout(0.3))
    model.add(Dense(1, activation="sigmoid"))

    #model.summary()

    model.compile(loss="binary_crossentropy", metrics=["accuracy", 'Precision', 'Recall', F1], optimizer=optimizer)

    #model.fit(X_train, Y_train, epochs=epochs, verbose=verbose,validation_data= [X_test, Y_test], shuffle=True ,callbacks = EarlyStopping(monitor='val_f1_score', patience=2, start_from_epoch=5, restore_best_weights=True, mode='max'))

    model.fit(X_train, Y_train, epochs=epochs, verbose=verbose,validation_data= [X_test, Y_test], shuffle=True ,callbacks = EarlyStopping(monitor='val_f1_score', patience=2, restore_best_weights=True, mode='max'))

    _,accuracy, prec, rec, f1 = model.evaluate(X_test, Y_test)

    return accuracy, prec, rec, float(f1)

In [4]:
def preprocess(directory, time, sample_rate):
    #LOAD AUDIOS
    audio_dict, original_samplerate = load_audios(directory)
    #RESAMPLE AUDIOS
    audio_dict, samplerate = resampling(audio_dict, original_samplerate, new_samplerate=sample_rate)
    #FRAGMENT AUDIOS
    audio_dictRAW, time = fragment_audio(audio_dict, samplerate, time=time)
    #CONVERT TO MEL SPECTROGRAM
    audio_dictMEL = MELspectrogram(audio_dictRAW, samplerate)
    #RESHAPE FOR 2D CNN
    X_train, Y_train, X_test, Y_test = train_test2(audio_dictMEL)

    X_train = tf.reshape(X_train,shape = (-1,X_train.shape[1],X_train.shape[2],1))
    X_test  = tf.reshape(X_test, shape = (-1,X_test.shape[1],X_test.shape[2],1))
    #print(X.shape)

    # X = tf.concat([X_train,X_test], axis=0)
    # Y = tf.concat([Y_train,Y_test], axis=0)

    #UNCOMMENT TO PLOT SPECTROGRAM
    #plt.imshow(np.array(X[0]), interpolation='nearest')
    #plt.show()
    
    return X_train, Y_train, X_test, Y_test

### DEVICE TEST

In [5]:
training_data = ["SHURE SM-27", "iPhoneSE2020"]
test_data = ["SHURE SM-27", "iPhoneSE2020"]
time = 5
sample_rate = None
experiment_count = 1
results_list = []  # This will be a list of dictionaries for easier DataFrame conversion

highQuality = preprocess('./SM-27',time,sample_rate)
lowQuality  = preprocess('./iPhoneSE2020',time,sample_rate)

for train in training_data:
    for test in test_data:
        X_train, Y_train,_,_ = highQuality if train=='SHURE SM-27' else lowQuality
        _,_,X_test,Y_test    = highQuality if test=='SHURE SM-27'     else lowQuality
        print('TRAIN = ' + train + '  |   TEST = '+test+ '\n\n')
    
        for i in range(experiment_count):
            accuracy, prec, rec, f1 = CNN_LF(X_train, Y_train, X_test, Y_test, time=time, epochs = 50)
            # Create a single dictionary for each experiment
            experiment_results = {
                'Data': "Train "+ train+" / Test: "+test,
                'Tests': 'Ex_' + str(i+1),
                'Accuracy': accuracy,
                'Precision': prec,
                'Recall': rec,
                'F1_Score': f1
            }
            results_list.append(experiment_results)
        print('\n---------------------------------------------------------------------------\n')

# Create DataFrame from the list of dictionaries
results_MIC = pd.DataFrame(results_list)

# Calculate the average of experiments for each training data group, ignoring non-numeric columns
average_metrics_MIC = results_MIC.groupby('Data').mean(numeric_only=True)

LOAD AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

FRAGMENT AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

MELSPECTROGRAM:   0%|          | 0/1674 [00:00<?, ?it/s]

LABEL:   0%|          | 0/1674 [00:00<?, ?it/s]

LOAD AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

FRAGMENT AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

MELSPECTROGRAM:   0%|          | 0/1674 [00:00<?, ?it/s]

LABEL:   0%|          | 0/1674 [00:00<?, ?it/s]

TRAIN = SHURE SM-27  |   TEST = SHURE SM-27



---------------------------------------------------------------------------

TRAIN = SHURE SM-27  |   TEST = iPhoneSE2020



---------------------------------------------------------------------------

TRAIN = iPhoneSE2020  |   TEST = SHURE SM-27



---------------------------------------------------------------------------

TRAIN = iPhoneSE2020  |   TEST = iPhoneSE2020



---------------------------------------------------------------------------



In [6]:
print(average_metrics_MIC)
results_MIC.to_csv('./results_MIC.csv')
average_metrics_MIC.to_csv('./average_metrics_MIC.csv')

                                         Accuracy  Precision    Recall  \
Data                                                                     
Train SHURE SM-27 / Test: SHURE SM-27    0.597135   0.597135  1.000000   
Train SHURE SM-27 / Test: iPhoneSE2020   0.402865   0.000000  0.000000   
Train iPhoneSE2020 / Test: SHURE SM-27   0.572068   0.605351  0.814093   
Train iPhoneSE2020 / Test: iPhoneSE2020  0.541629   0.586207  0.790105   

                                         F1_Score  
Data                                               
Train SHURE SM-27 / Test: SHURE SM-27    0.747758  
Train SHURE SM-27 / Test: iPhoneSE2020   0.000000  
Train iPhoneSE2020 / Test: SHURE SM-27   0.694373  
Train iPhoneSE2020 / Test: iPhoneSE2020  0.673052  


## SAMPLE RATE TEST

In [9]:
training_data = ["SHURE SM-27", "iPhoneSE2020"]
test_data = ["SHURE SM-27", "iPhoneSE2020"]
time = 5
sample_rate = [2000, 4000, 8000, 16000, 44100]
experiment_count = 1
results_list = []  # This will be a list of dictionaries for easier DataFrame conversion

for sr in sample_rate:
    X_train, Y_train, X_test, Y_test = preprocess('./SM-27',time,sample_rate=sr)

    print('SAMPLE RATE = ', sr, '\n\n')
    for i in range(experiment_count):
        accuracy, prec, rec, f1 = CNN_LF(X_train, Y_train, X_test, Y_test, time=time, epochs = 50)
        # Create a single dictionary for each experiment
        experiment_results = {
            'Sample Rate': sr,
            'Tests': 'Ex_' + str(i+1),
            'Accuracy': accuracy,
            'Precision': prec,
            'Recall': rec,
            'F1_Score': f1
        }
        results_list.append(experiment_results)
    print('\n---------------------------------------------------------------------------\n')

# Create DataFrame from the list of dictionaries
results_SR = pd.DataFrame(results_list)

# Calculate the average of experiments for each training data group, ignoring non-numeric columns
average_metrics_SR = results_SR.groupby('Sample Rate').mean(numeric_only=True)

LOAD AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

RESAMPLING:   0%|          | 0/1674 [00:00<?, ?it/s]

FRAGMENT AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

MELSPECTROGRAM:   0%|          | 0/1674 [00:00<?, ?it/s]



LABEL:   0%|          | 0/1674 [00:00<?, ?it/s]

SAMPLE RATE =  2000 



---------------------------------------------------------------------------



LOAD AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

RESAMPLING:   0%|          | 0/1674 [00:00<?, ?it/s]

FRAGMENT AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

MELSPECTROGRAM:   0%|          | 0/1674 [00:00<?, ?it/s]



LABEL:   0%|          | 0/1674 [00:00<?, ?it/s]

SAMPLE RATE =  4000 



---------------------------------------------------------------------------



LOAD AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

RESAMPLING:   0%|          | 0/1674 [00:00<?, ?it/s]

FRAGMENT AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

MELSPECTROGRAM:   0%|          | 0/1674 [00:00<?, ?it/s]



LABEL:   0%|          | 0/1674 [00:00<?, ?it/s]

SAMPLE RATE =  8000 



---------------------------------------------------------------------------



LOAD AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

RESAMPLING:   0%|          | 0/1674 [00:00<?, ?it/s]

FRAGMENT AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

MELSPECTROGRAM:   0%|          | 0/1674 [00:00<?, ?it/s]



LABEL:   0%|          | 0/1674 [00:00<?, ?it/s]

SAMPLE RATE =  16000 



---------------------------------------------------------------------------



LOAD AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

FRAGMENT AUDIOS:   0%|          | 0/1674 [00:00<?, ?it/s]

MELSPECTROGRAM:   0%|          | 0/1674 [00:00<?, ?it/s]

LABEL:   0%|          | 0/1674 [00:00<?, ?it/s]

SAMPLE RATE =  44100 



---------------------------------------------------------------------------



In [10]:
print(average_metrics_SR)
results_SR.to_csv('./results_SR.csv')
average_metrics_SR.to_csv('./average_metrics_SR.csv')

             Accuracy  Precision    Recall  F1_Score
Sample Rate                                         
2000         0.595345   0.596413  0.997002  0.746352
4000         0.600716   0.600546  0.989505  0.747452
8000         0.598030   0.597670  1.000000  0.748177
16000        0.637422   0.625239  0.980510  0.763573
44100        0.607878   0.623517  0.866567  0.725220


### QUALITY TEST

In [None]:
paths = ['44.1_24', '44.1_16', '16_16', 'MP3_320', 'MP3_96', 'OGG']
time = 5
sample_rate = None
experiment_count = 1
results_list = []  # This will be a list of dictionaries for easier DataFrame conversion

for path in paths:
    directory = '/Volumes/SESSIONS_2024 1/Quality_TEST/' + path
    X_train, Y_train, X_test, Y_test = preprocess(directory,time,sample_rate)
    print('DATA = ' + path+'\n')
    
    for i in range(experiment_count):
        accuracy, prec, rec, f1 = CNN_LF(X_train, Y_train, X_test, Y_test, time=time, epochs = 50)
        # Create a single dictionary for each experiment
        experiment_results = {
            'Data': path,
            'Tests': 'Ex_' + str(i+1),
            'Accuracy': accuracy,
            'Precision': prec,
            'Recall': rec,
            'F1_Score': f1
        }
        results_list.append(experiment_results)
    print('\n---------------------------------------------------------------------------\n')

# Create DataFrame from the list of dictionaries
results_QUALITY = pd.DataFrame(results_list)

# Calculate the average of experiments for each training data group, ignoring non-numeric columns
average_metrics_QUALITY = results_QUALITY.groupby('Data').mean(numeric_only=True)

In [None]:
print(average_metrics_QUALITY)
results_QUALITY.to_csv('/Users/luisfebrenes/Desktop/Results/results_QUALITY.csv')
average_metrics_QUALITY.to_csv('/Users/luisfebrenes/Desktop/Results/average_metrics_QUALITY.csv')