In [None]:
import tensorflow as tf
print(tf.__version__)

from tensorflow import keras
from keras import backend as K
from keras import Sequential, Input, Model
from keras.layers import Conv1D, BatchNormalization, Dropout, Dense, Softmax, ReLU, Lambda, Activation
from keras import optimizers

import pandas as pd
import librosa
import numpy as np
import sklearn

2.8.0


In [None]:
# Connect Google Colab to Google Drive
from google.colab import drive
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


## **Load data into Pandas dataset**

In [None]:
TRAIN_PATH = '/content/gdrive/My Drive/Gita/RositaNorm/'
dataset_file = TRAIN_PATH + 'labels.csv'

In [None]:
data = pd.read_csv(dataset_file)
data

Unnamed: 0.1,Unnamed: 0,Filename,Gender,Disease
0,0,AVPEPUDEAC0001_rosita.wav,,0
1,1,AVPEPUDEAC0003_rosita.wav,,0
2,2,AVPEPUDEAC0004_rosita.wav,,0
3,3,AVPEPUDEAC0005_rosita.wav,,0
4,4,AVPEPUDEAC0006_rosita.wav,,0
...,...,...,...,...
95,95,AVPEPUDEA0055_rosita.wav,,1
96,96,AVPEPUDEA0056_rosita.wav,,1
97,97,AVPEPUDEA0057_rosita.wav,,1
98,98,AVPEPUDEA0058_rosita.wav,,1


In [None]:
from sklearn.model_selection import train_test_split

df_X_train, df_X_test, df_y_train, df_y_test = train_test_split(data["Filename"].to_numpy(), data["Disease"].to_numpy(), test_size= 0.3, random_state=True, shuffle=True)
#df_X_train, df_X_val, df_y_train, df_y_val = train_test_split(df_X_train, df_y_train, test_size=0.2, random_state=True, shuffle=True)

In [None]:
df_X_train

array(['AVPEPUDEA0022_rosita.wav', 'AVPEPUDEA0059_rosita.wav',
       'AVPEPUDEA0006_rosita.wav', 'AVPEPUDEA0055_rosita.wav',
       'AVPEPUDEA0048_rosita.wav', 'AVPEPUDEAC0046_rosita.wav',
       'AVPEPUDEAC0054_rosita.wav', 'AVPEPUDEA0011_rosita.wav',
       'AVPEPUDEAC0026_rosita.wav', 'AVPEPUDEAC0040_rosita.wav',
       'AVPEPUDEA0046_rosita.wav', 'AVPEPUDEA0005_rosita.wav',
       'AVPEPUDEA0034_rosita.wav', 'AVPEPUDEAC0018_rosita.wav',
       'AVPEPUDEA0042_rosita.wav', 'AVPEPUDEAC0047_rosita.wav',
       'AVPEPUDEAC0051_rosita.wav', 'AVPEPUDEA0051_rosita.wav',
       'AVPEPUDEAC0029_rosita.wav', 'AVPEPUDEA0058_rosita.wav',
       'AVPEPUDEAC0049_rosita.wav', 'AVPEPUDEA0007_rosita.wav',
       'AVPEPUDEAC0027_rosita.wav', 'AVPEPUDEAC0006_rosita.wav',
       'AVPEPUDEA0010_rosita.wav', 'AVPEPUDEAC0057_rosita.wav',
       'AVPEPUDEAC0024_rosita.wav', 'AVPEPUDEA0047_rosita.wav',
       'AVPEPUDEAC0005_rosita.wav', 'AVPEPUDEA0030_rosita.wav',
       'AVPEPUDEAC0034_rosita.wav', 'AVPE

In [None]:
df_y_train

array([1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1,
       0, 0, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1,
       0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1,
       0, 1, 0, 0])

## **Perform Data Augmentation**

In [None]:
!pip install audiomentations

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting audiomentations
  Downloading audiomentations-0.24.0-py3-none-any.whl (67 kB)
[K     |████████████████████████████████| 67 kB 3.6 MB/s 
Installing collected packages: audiomentations
Successfully installed audiomentations-0.24.0


Data augmentation was applied only to the audios in the training split, and not to the ones in the test split. These transformations include Noise Addition, Pitch Scaling (change of the pitch of the voice), Time Stretching (changing the speed of the sound but without changing the pitch) and Polarity Inversion (multiply the waveform by -1).

In [None]:
from audiomentations import Compose, AddGaussianNoise, PitchShift, TimeStretch, PolarityInversion, Normalize, HighPassFilter

NUM_AUGMENTATIONS = 10 #number of augmentations per training signal

augment = Compose([
  AddGaussianNoise(min_amplitude=0.1, max_amplitude=0.2, p=0.5),
  PitchShift(min_semitones=-4, max_semitones=4, p=0.7),
  TimeStretch(min_rate=0.8, max_rate=1.25, p=0.7),
  PolarityInversion(p=0.7),
])


## **Obtain MFCCs and create train and test data subsets with them**

In [None]:
SAMPLE_RATE = 44100
NUM_MFCC = 40
MFCC_MAX_LEN = 500

In [None]:
def normalize_audio(audio):
    audio = audio / np.max(np.abs(audio))
    return audio

Function to extract MFCCs from audio signal

In [None]:
def audio_to_mfcc(audio, max_len=MFCC_MAX_LEN):

    audio = normalize_audio(audio)
    
    mfcc = librosa.feature.mfcc(y=audio, sr=SAMPLE_RATE, n_mfcc=NUM_MFCC)

    # If maximum length exceeds mfcc lengths then pad the remaining ones
    if (max_len > mfcc.shape[1]):
        pad_width = max_len - mfcc.shape[1]
        mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant')

    # Else cutoff the remaining parts
    else:
        mfcc = mfcc[:, :max_len]
    
    return mfcc

In [None]:
def append_X_Y(X, y, label, audio):
    y.append(label)
    mfcc = audio_to_mfcc(audio)
    X.append(mfcc)

Create new training dataset with the MFCC coefficients obtained from the original and augmented audios

In [None]:
from tqdm import tqdm

X_train = []
y_train = []

PD_idx = 0
HC_idx = 0

for idx, audio_filename in tqdm(enumerate(df_X_train)):
    label = df_y_train[idx]
    audio, sr = librosa.load(TRAIN_PATH + audio_filename, sr=44100)
    
    if (label == 1):
      PD_idx = idx
    else:
      HC_idx = idx

    append_X_Y(X_train, y_train, label, audio)

    # Perform data augmentation if NUM_AUGMENTATIONS > 0
    for i in range(NUM_AUGMENTATIONS):
      augmented_audio = augment(audio, sr)
      append_X_Y(X_train, y_train, label, augmented_audio)


70it [04:52,  4.18s/it]


In [None]:
X_train = np.array(X_train)
y_train = np.array(y_train)
X_train.shape[0] == len(y_train)

True

In [None]:
X_train[0].shape

(40, 500)

In [None]:
y_train

array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,

In [None]:
# Random shuffle of the new dataset
X_train, y_train = sklearn.utils.shuffle(X_train, y_train)

In [None]:
y_train

array([1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 1,
       1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1,
       1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 0,
       1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0,
       1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0,
       1, 1, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1,
       1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0, 0,
       1, 0, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0,
       0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 1,
       0, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1,
       1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0,
       0, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0,
       0, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0,
       0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1,

In [None]:
#X_val = []
#y_val = []

#for idx, audio_filename in tqdm(enumerate(df_X_val)):
#    label = df_y_val[idx]
#    audio, sr = librosa.load(TRAIN_PATH + audio_filename, sr=44100)
    
#    append_X_Y(X_val, y_val, label, audio)

In [None]:
#X_val = np.array(X_val)
#y_val = np.array(y_val)
#X_val.shape[0] == len(y_val)

Create new test dataset with the MFCC coefficients obtained from the original audios

In [None]:
X_test = []
y_test = []

for idx, audio_filename in tqdm(enumerate(df_X_test)):
    label = df_y_test[idx]
    audio, sr = librosa.load(TRAIN_PATH + audio_filename, sr=44100)
    
    append_X_Y(X_test, y_test, label, audio)

30it [00:12,  2.45it/s]


In [None]:
X_test = np.array(X_test)
y_test = np.array(y_test)
X_test.shape[0] == len(y_test)

True

In [None]:
X_train[0].shape

(40, 500)

## **Build TDNN as a sequential model**

In [None]:
feature_dim_1 = NUM_MFCC  #number of MFCCs
feature_dim_2 = MFCC_MAX_LEN  #max length for the MFCCs

In [None]:
# TDNN model
numFilters = 32
dropout_rate = 0.2


In [None]:
# Layer 1: Input
input_tensor = Input(shape=(feature_dim_1, feature_dim_2))

# Layer 2
x = Conv1D(numFilters, 5, dilation_rate=1)(input_tensor)
x = BatchNormalization()(x)
x = Dropout(dropout_rate)(x)
x = ReLU()(x)
# Layer 3
x = Conv1D(numFilters, 3, dilation_rate=2)(x)
x = BatchNormalization()(x)
x = Dropout(dropout_rate)(x)
x = ReLU()(x)
# Layer 4
x = Conv1D(numFilters, 3, dilation_rate=3)(x)
x = BatchNormalization()(x)
x = Dropout(dropout_rate)(x)
x = ReLU()(x)
# Layer 5
x = Conv1D(numFilters, 1, dilation_rate=1)(x)
x = BatchNormalization()(x)
x = Dropout(dropout_rate)(x)
x = ReLU()(x)
# Layer 6
x = Conv1D(1500, 1, dilation_rate=1)(x)
x = BatchNormalization()(x)
x = Dropout(dropout_rate)(x)
x = ReLU()(x)

# Layer 7: stats pooling
mean = tf.math.reduce_mean(x, axis=1)
std = tf.math.reduce_variance(x, axis=1)
stat_pooling = tf.concat((mean, std), axis=1)
x_vector = Activation('linear')(stat_pooling)  #x-vectors

x_vec_model = Model(inputs = input_tensor, outputs = x_vector);

In [None]:
x_vec_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 40, 500)]    0           []                               
                                                                                                  
 conv1d (Conv1D)                (None, 36, 32)       80032       ['input_1[0][0]']                
                                                                                                  
 batch_normalization (BatchNorm  (None, 36, 32)      128         ['conv1d[0][0]']                 
 alization)                                                                                       
                                                                                                  
 dropout (Dropout)              (None, 36, 32)       0           ['batch_normalization[0][0]']

In [None]:
X_train[0].shape

(40, 500)

The x-vectors are extracted for each of the test samples. These x-vectors will be the input to the LDA (or PLDA) model, which will be trained to fit the training data. The TDNN model is not trained as there aren't any dense layers (the model works only as and embedder to extract the x-vectors).

In [None]:
x_vectors_training = x_vec_model.predict(X_train)

In [None]:
x_vectors_training.shape

(770, 3000)

In [None]:
#x_vectors_val = x_vec_model.predict(X_val)

In [None]:
#x_vectors_val.shape

In [None]:
x_vectors_test = x_vec_model.predict(X_test)

In [None]:
x_vectors_test.shape

(30, 3000)

In [None]:
!python -m pip install scikeras

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting scikeras
  Downloading scikeras-0.8.0-py3-none-any.whl (27 kB)
Installing collected packages: scikeras
Successfully installed scikeras-0.8.0


## **Train LDA model**

The output of this model will be the classifications of the audios (1 for PD and 0 for HC) embedded as x-vectors. 

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import cross_validate, cross_val_predict
from sklearn.metrics import confusion_matrix

#epochs = 20
#batch_size = 5

# LDA model definition
LDA_model = LinearDiscriminantAnalysis()

# Create pipeline
#pipeline = Pipeline(steps=[('x_vec_model',x_vec),('LDA_model',LDA_model)])

cv_results = cross_validate(LDA_model, x_vectors_training, y_train, cv=5, 
                            scoring=["accuracy", "precision_macro", "recall_macro", "roc_auc"], return_train_score=True, verbose=1)

print("\nValidation accuracy: {mean_accuracy} +/- {std_accuracy}".format(
    mean_accuracy=np.mean(cv_results['test_accuracy']),
    std_accuracy=np.std(cv_results['test_accuracy'])))
print("Training accuracy: {mean_accuracy} +/- {std_accuracy}".format(
    mean_accuracy=np.mean(cv_results['train_accuracy']),
    std_accuracy=np.std(cv_results['train_accuracy'])))
print("Validation recall: {mean_recall} +/- {std_recall}".format(
    mean_recall=np.mean(cv_results['test_recall_macro']),
    std_recall=np.std(cv_results['test_recall_macro'])))
print("Training recall: {mean_recall} +/- {std_recall}".format(
    mean_recall=np.mean(cv_results['train_recall_macro']),
    std_recall=np.std(cv_results['train_recall_macro'])))
print("Validation precision: {mean_precision} +/- {std_precision}".format(
    mean_precision=np.mean(cv_results['test_precision_macro']),
    std_precision=np.std(cv_results['test_precision_macro'])))
print("Training precision: {mean_precision} +/- {std_precision} \n".format(
    mean_precision=np.mean(cv_results['train_precision_macro']),
    std_precision=np.std(cv_results['train_precision_macro'])))

print("Validation ROC AUC: {mean_auc} +/- {std_auc}".format(
    mean_auc=np.mean(cv_results['test_roc_auc']),
    std_auc=np.std(cv_results['test_roc_auc'])))
print("Training ROC AUC: {mean_auc} +/- {std_auc} \n".format(
    mean_auc=np.mean(cv_results['train_roc_auc']),
    std_auc=np.std(cv_results['train_roc_auc'])))

print("Fit mean time: {fit_time}".format(fit_time=np.mean(cv_results['fit_time'])))
print("Score mean time: {score_time} \n".format(score_time=np.mean(cv_results['score_time'])))

#LDA_acc = np.mean(cv_results['test_accuracy'])
#LDA_auc = np.mean(cv_results['test_roc_auc'])

# Confusion matrix
y_pred = cross_val_predict(LDA_model, x_vectors_test, y_test, cv=5)
cm = confusion_matrix(y_test, y_pred)
cm_results = {'tn': cm[0, 0], 'fp': cm[0, 1], 'fn': cm[1, 0], 'tp': cm[1, 1]}
print("Confusion Matrix:")
print("True Positives: {tp}".format(tp=cm_results['tp']))
print("False Positives: {fp}".format(fp=cm_results['fp']))
print("True Negatives: {tn}".format(tn=cm_results['tn']))
print("False Negatives: {fn}\n".format(fn=cm_results['fn']))

test_acc = (cm_results['tp']+cm_results['tn'])/(cm_results['tp']+cm_results['fp']+cm_results['tn']+cm_results['fn'])
test_precision = cm_results['tp']/(cm_results['tp']+cm_results['fp'])
test_recall = cm_results['tp']/(cm_results['tp']+cm_results['fn'])

print("Test accuracy: {}".format(test_acc))
print("Test precision: {}".format(test_precision))
print("Test recall: {}".format(test_recall))

[Parallel(n_jobs=1)]: Using backend SequentialBackend with 1 concurrent workers.
[Parallel(n_jobs=1)]: Done   5 out of   5 | elapsed:    6.7s finished



Validation accuracy: 0.6064935064935064 +/- 0.01995102791654187
Training accuracy: 0.9581168831168831 +/- 0.006194410398811342
Validation recall: 0.6063311095906033 +/- 0.02062456175465307
Training recall: 0.9580395880067332 +/- 0.006253509962032595
Validation precision: 0.6067161007882844 +/- 0.02048359708058242
Training precision: 0.9581493949025086 +/- 0.006150986933280926 

Validation ROC AUC: 0.6707579256471663 +/- 0.030610682396348978
Training ROC AUC: 0.982313808855235 +/- 0.003405844001211975 

Fit mean time: 1.2902331829071045
Score mean time: 0.015572452545166015 

Confusion Matrix:
True Positives: 9
False Positives: 5
True Negatives: 9
False Negatives: 7

Test accuracy: 0.6
Test precision: 0.6428571428571429
Test recall: 0.5625


Execute only for cosine similarity measure (finally not applied)