In [None]:
import tensorflow as tf
from tensorflow import keras
import os
import copy
import matplotlib.pyplot as plt

import pandas as pd
import numpy as np
import librosa
from numpy import random

import librosa.display

training_groundtruth = pd.read_csv('/kaggle/input/training-groundtruth/training-groundtruth-new.csv')
AUDIO_FILES_DIR = '/kaggle/input/mp3files-nodoctor/rmDoc-audio/'
AUDIO_FILES = sorted([x for x in os.listdir(AUDIO_FILES_DIR) if x[-4:]=='.wav'])

In [None]:
def noise(data):
    noise_amp = 0.03*np.random.uniform()*np.amax(data)
    data = data + noise_amp*np.random.normal(size=data.shape[0])
    return data
def shift(data):
    shift_range = int(np.random.uniform(low=-5, high = 5)*1000)
    return np.roll(data, shift_range)
def pitch(data, sampling_rate, pitch_factor=0.7):
    return librosa.effects.pitch_shift(data, sr=sampling_rate, n_steps=pitch_factor)
def _plot_signal_and_augmented_signal(signal, augmented_signal, sr):
    fig, ax = plt.subplots(nrows=2)
    librosa.display.waveshow(signal, sr=sr, ax=ax[0])
    ax[0].set(title='Original Signal')
    librosa.display.waveshow(augmented_signal, sr=sr, ax=ax[1])
    ax[1].set(title='Augmented Signal')
    plt.show()
# Code copied and edited from https://www.kaggle.com/code/davids1992/specaugment-quick-implementation
def spec_augment(original_melspec,
                 freq_masking_max_percentage = 0.05, 
                 time_masking_max_percentage = 0.05):

    augmented_melspec = original_melspec.copy()
    all_frames_num, all_freqs_num = augmented_melspec.shape

    # Frequency masking
    freq_percentage = random.uniform(0.0, freq_masking_max_percentage)
    num_freqs_to_mask = int(freq_percentage * all_freqs_num)
    f0 = int(np.random.uniform(low = 0.0, high = (all_freqs_num - num_freqs_to_mask)))
    
    augmented_melspec[:, f0:(f0 + num_freqs_to_mask)] = 0

    # Time masking
    time_percentage = random.uniform(0.0, time_masking_max_percentage)
    num_frames_to_mask = int(time_percentage * all_frames_num)
    t0 = int(np.random.uniform(low = 0.0, high = (all_frames_num - num_frames_to_mask)))
    
    augmented_melspec[t0:(t0 + num_frames_to_mask), :] = 0
    
    return augmented_melspec

In [None]:
# https://www.kaggle.com/code/abduallahhussien/speech-emotion-recognition-93-accuracy
def extract_features(data, sr, SpecAugment = False):
    
    result = np.array([])
    
    mfccs = librosa.feature.mfcc(y=data, sr=sr, n_mfcc=13)
    if SpecAugment:
        mfccs = spec_augment(mfccs)
     
    return mfccs

def get_features(path):
    # duration and offset are used to take care of the no audio in start and the ending of each audio files as seen above.
    data, sample_rate = librosa.load(path, duration=70, offset=0.5, sr=None)
    avg_Size = sample_rate*70
    data = np.pad(data, (0,max(0,avg_Size-len(data))), 'constant')
    
    #without augmentation
    res1 = extract_features(data, sample_rate)
    result = np.array(np.expand_dims(res1, axis=0))
    
    #noised
    noise_data = noise(data)
    res2 = extract_features(noise_data, sample_rate)
    result = np.vstack((result, np.expand_dims(res2, axis=0))) # stacking vertically
    
    #pitched
    pitch_data = pitch(data, sample_rate)
    res5 = extract_features(pitch_data, sample_rate)
    result = np.vstack((result, np.expand_dims(res5, axis=0))) 
                     
    #SpecAugment
    res8 = extract_features(data, sample_rate, True)
    result = np.vstack((result, np.expand_dims(res8, axis=0)))
                     
    return result

In [None]:
data_train = []
data_test = []
x_train = []
x_test = []
y_train = []
y_test = []
length = 0
cou = 0

LoadSave = True
LoadAddress = '/kaggle/input/mfcc13-70s/mfcc13.npy'

if not LoadSave:
    for index, row in training_groundtruth.iterrows():
        row['index'] = index
        length+=1
        if row['type'] == "train":
            data_train.append(row)
        else:
            data_test.append(row)

    for rows_of_type in [data_train, data_test]:
        for row in rows_of_type:
            temp = []
            temp = np.array(temp)
            cou+=1
            print(f'Status {int(cou/length*100)}%', end='\r')
            if row['type'] == "train":
                mfccs_feature = get_features(AUDIO_FILES_DIR + AUDIO_FILES[row['index']])
                for f in mfccs_feature:
                    x_train.append(f)
                    y_train.append(1 if row['dx'] == 'ProbableAD' else 0)
            else:
                f = get_features(AUDIO_FILES_DIR + AUDIO_FILES[row['index']])[0]
                x_test.append(f)
                y_test.append(1 if row['dx'] == 'ProbableAD' else 0)
    x_train = np.array(x_train)
    x_test = np.array(x_test)
    y_train = np.array(y_train)
    y_test = np.array(y_test)
    x_train = np.expand_dims(x_train, axis=3)
    x_test = np.expand_dims(x_test, axis=3)
    np.save('mfcc13.npy', {'x_train': x_train, 'x_test': x_test, 'y_train': y_train, 'y_test': y_test})
else:
    temp = np.load(LoadAddress, allow_pickle=True).item()
    x_train = np.array(temp['x_train'])
    x_test = np.array(temp['x_test'])
    y_train = np.array(temp['y_train'])
    y_test = np.array(temp['y_test'])

In [None]:
x_train.shape, x_test.shape, y_train.shape, y_test.shape

In [None]:
# create sequential model with 4 CNN layers then flatten then dense then output
from keras.layers import BatchNormalization, Dropout
from keras import regularizers
from sklearn.model_selection import KFold
import json
from gc import collect

num_folds = 5
kfold = KFold(n_splits=num_folds, shuffle=True)

# fig , ax = plt.subplots(5,2)
# fig.set_size_inches(20,30)
checkpointreach = False
for filter1 in [4,8,16,32]:
    for filter2 in [4,8,16,32]:
        for unit in [4,8,16,32]:
            for fac1 in [0.01,0.02,0.04]:
                for fac2 in [0.01, 0.02, 0.04]:
                    for fac3 in [0.01,0.02,0.04]:
                        acc_per_fold = []
                        loss_per_fold = []
                        fold_no = 1
                        history_fold = []
                        print(f'filter1: {filter1}, filter2: {filter2}, unit: {unit}, fac1: {fac1}, fac2: {fac2}, fac3: {fac3}')
                        if checkpointreach == False:
                            if filter1 == 32 and filter2 == 16 and unit == 16 and fac1 == 0.02 and fac2 == 0.02 and fac3 == 0.01:
                                checkpointreach = True
                                continue
                            else:
                                continue
                        for train, test in kfold.split(x_train, y_train):
                            keras.backend.clear_session()
                            collect()
                            model = keras.Sequential()
                            model.add(keras.layers.Conv2D(filter1, kernel_size=(3, 3), activation='relu', input_shape=(13, 2188, 1), kernel_regularizer=regularizers.l1(fac1)))
                            model.add(BatchNormalization())
                            model.add(keras.layers.MaxPooling2D((2, 2)))
                            model.add(keras.layers.Conv2D(filter2, kernel_size=(3, 3), activation='relu', kernel_regularizer=regularizers.l1_l2(fac2)))
                            model.add(BatchNormalization())
                            model.add(keras.layers.MaxPooling2D((2, 2)))
                            model.add(keras.layers.Flatten())
                            model.add(Dropout(0.3))
                            model.add(keras.layers.Dense(unit, activation='relu', kernel_regularizer=regularizers.l2(fac3)))
                            model.add(keras.layers.Dense(1, activation='sigmoid'))
                            model.compile(optimizer='sgd',
                                          loss='binary_crossentropy',
                                          metrics=['accuracy'])
#                             print('------------------------------------------------------------------------')
#                             print(f'Training for fold {fold_no} ...')
                            rlrp = keras.callbacks.ReduceLROnPlateau(monitor='val_accuracy',
                                                                    patience=3,
                                                                    verbose=0,
                                                                    factor=0.5,
                                                                    min_lr=0.00001)
                            earlystopping = keras.callbacks.EarlyStopping(monitor ="val_accuracy",
                                                      mode = 'auto', patience = 30,
                                                      restore_best_weights = True)

                            history=model.fit(x_train[train], y_train[train], batch_size=64, epochs=100, validation_data=(x_train[test], y_train[test]), callbacks=[rlrp,earlystopping],verbose=0)

                            # Generate generalization metrics
                            scores = model.evaluate(x_test,y_test, verbose=0)
                            print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')
                            acc_per_fold.append(scores[1] * 100)
                            loss_per_fold.append(scores[0])

                            # Increase fold number
                            fold_no = fold_no + 1
                        temp = {}
                        temp['filter1'] = filter1
                        temp['filter2'] = filter2
                        temp['unit'] = unit
                        temp['fac1'] = fac1
                        temp['fac2'] = fac2
                        temp['fac3'] = fac3
                        temp['acc'] = acc_per_fold
                        temp['loss'] = loss_per_fold
#                         temp['history'] = history_fold
                        # append in the json
                        with open('result.json', 'a') as fp:
                            json.dump(temp, fp)
                            fp.write('\n')
                        print('------------------------------------------------------------------------')

In [None]:
# with open('result.json', 'r') as fp:
#     ParamSet = fp.readlines()
# ParamSet = [json.loads(x) for x in ParamSet]
# ParamSet = sorted(ParamSet, key = lambda i: sum(i['acc'])/num_folds,reverse=True)
# print(ParamSet[0])
# print(ParamSet[1])
# print(ParamSet[2])
# print(ParamSet[3])
# print(ParamSet[4])