<a href="https://colab.research.google.com/github/MohamedAfham/COVID-19-Cough-Detection/blob/master/Cough_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import librosa
import matplotlib.pyplot as plt
from IPython.display import Audio
from librosa import display
import numpy as np
import scipy
import seaborn as sns
import random
import cv2 as cv
import math

from sklearn.metrics import roc_curve
from sklearn.metrics import auc

import tensorflow.compat.v1 as tf
#import tensorflow as tf
#tf.enable_eager_execution()
#tf.disable_v2_behavior()
import tensorflow.keras as keras
from sklearn.metrics import confusion_matrix, classification_report
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten,BatchNormalization
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Activation,ReLU,LeakyReLU
from tensorflow.keras.models import load_model
#from tensorflow.keras.layers.advanced_activations import LeakyReLU
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, LearningRateScheduler,Callback

In [109]:
from google.colab import drive 
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
def read_audio(audio_directory,audio_file):
  audio,sampling_rate = librosa.load(audio_directory+audio_file)
  return (audio,sampling_rate)

def volume_aug(audio):
  factor_list = [0.5,1,1.5,2]
  factor = factor_list[np.random.randint(0,4)]
  augmented_samples = audio * factor
  return augmented_samples

def bg_noise_aug(audio,bg_noise):
  length = len(audio)
  rand_start = np.random.randint(0,len(bg_noise)- length)
  augmented_audio = audio + bg_noise[rand_start:rand_start + length]
  return augmented_audio

def spect(audio):
  NFFT = 512
  f, t, Sxx = scipy.signal.spectrogram(x=audio,fs=sampling_rate,window=np.hamming(NFFT),nfft=NFFT,noverlap=int(NFFT/3),nperseg=NFFT,mode='magnitude')
  spectrum = 20*np.log10(Sxx)
  spectrum = spectrum.ravel().reshape(257,321,1)
  return spectrum

def melspectrogram(audio,sampling_rate):  #no of mels means no of mel filters
    #samples, sampling_rate = librosa.core.load(directory + audio_file)   #loading data set
    samples = audio
    frame_size = 0.025
    frame_stride = frame_size * 0.7
    no_of_mels = 128
    frame_length, frame_step = frame_size * sampling_rate, frame_stride * sampling_rate  
    frame_length = int(round(frame_length))
    frame_step = int(round(frame_step))
    samples_length = len(samples)
    num_frames = int(np.ceil(float(np.abs(samples_length - frame_length)) / frame_step))

    pad_samples_length = num_frames * frame_step + frame_length
    z = np.zeros((pad_samples_length - samples_length))
    pad_samples = np.append(samples, z)  #zero padding
       
    spect_samples = np.abs(librosa.stft(pad_samples, n_fft=frame_length, hop_length=frame_step, window=np.hamming(frame_length))) #calculating stft 
    spect_samples = librosa.amplitude_to_db(spect_samples, ref=np.max)
    mel_filter = librosa.filters.mel(sampling_rate, frame_length, n_mels=128, fmin=0, fmax=None)  #calculating mel filter array
    
    mel_spect_samples = np.dot(mel_filter, spect_samples)   #calculating mel spectrogram
    spectrum = mel_spect_samples.ravel().reshape(128,287,1)
    #spectrum = mel_spect_samples
    return spectrum

def normalize_spectrum(spectrum):
  return (spectrum - np.mean(spectrum))/np.std(spectrum)

In [0]:
bg_noise_dir = ['Station.wav','NeighbourSpeaking.wav','Metro.wav','Hallway.wav','FootSteps.wav','AirportAnnouncements.wav']
audio_directory = 'gdrive/My Drive/Cough Detection/Sounds/Background Noise/'
bg_noise_list = []
for noise in range(len(bg_noise_dir)):
  audio_file = bg_noise_dir[noise]
  noise_add = read_audio(audio_directory,audio_file)[0]
  bg_noise_list.append(noise_add)


In [85]:
audio_directory = "gdrive/My Drive/Cough Detection/Sounds/Cough/"
audio,sampling_rate = read_audio(audio_directory,"cough - 56a.wav")
spectrum = melspectrogram(audio)
spectrum.shape

(128, 287)

In [73]:
import os
audio_directory = "gdrive/My Drive/Cough Detection/Sounds/"
non_cough = os.listdir(audio_directory+"Audio/")
cough = os.listdir(audio_directory+"Cough/")

3309


In [0]:
test_cough = random.sample(cough,45)
test_non_cough = random.sample(non_cough,45)

train_val_cough = list(set(cough) - set(test_cough))
train_val_non_cough = list(set(non_cough) - set(test_non_cough))

In [0]:
#train_val_cough = np.load('gdrive/My Drive/Cough Detection/Numpy Arrays/train_val_cough.npy').tolist()
#train_val_cough = [np.array(xi) for xi in train_val_cough]

#non_cough_audio = np.load('gdrive/My Drive/Cough Detection/Numpy Arrays/non_cough_train_val.npy').tolist()
#non_cough_audio = [np.array(xi) for xi in non_cough_audio]

#test_cough = np.load('gdrive/My Drive/Cough Detection/Numpy Arrays/test_cough.npy').tolist()
#test_cough = [np.array(xi) for xi in test_cough]

#test_non_cough_audio = np.load('gdrive/My Drive/Cough Detection/Numpy Arrays/non_cough_test.npy').tolist()
#test_non_cough_audio = [np.array(xi) for xi in test_non_cough_audio]

In [0]:
random.shuffle(train_val_cough)
train_cough,val_cough = train_val_cough[:int(len(train_val_cough)*0.9)], train_val_cough[int(len(train_val_cough)*0.9):]

random.shuffle(train_val_non_cough)
train_non_cough,val_non_cough = train_val_non_cough[:int(len(train_val_non_cough)*0.9)], train_val_non_cough[int(len(train_val_non_cough)*0.9):]

train_files = [train_non_cough, train_cough]
val_files = [val_non_cough, val_cough]

**For the arrays with augmentation model.fit_generator() function**

In [0]:
#Generator Function
def generator(batch_size, is_train):
  if is_train:
    audio_list = train_files
  else:
    audio_list = val_files

  while True:
    batch_features = np.zeros((batch_size,128,287,1))
    batch_labels = np.zeros((batch_size,1))
    rand_list = [[],[]]
    for n in range(batch_size):
      while True:
        label = np.random.randint(0,2)
        if label == 1:
          audio_directory = "gdrive/My Drive/Cough Detection/Sounds/Cough/"
        elif label ==0:
          audio_directory = "gdrive/My Drive/Cough Detection/Sounds/Audio/"

        rand_audio = random.sample(audio_list[label],1)[0]
        if rand_audio not in rand_list[label]:
          rand_list[label].append(rand_audio)
          selected_audio,sampling_rate = read_audio(audio_directory,rand_audio)
          if len(selected_audio) != 110250:
            length = int(5.0 * sampling_rate)
            audio = librosa.util.fix_length(selected_audio,length)
            noise = np.random.randn(len(audio)) * 0.0005
            selected_audio = audio + noise
          volume_aug_audio = volume_aug(selected_audio)
          rand_noise = random.randrange(len(bg_noise_list))
          bg_noise = bg_noise_list[rand_noise]
          bg_noise_added = bg_noise_aug(volume_aug_audio,bg_noise)
          spectrum = melspectrogram(bg_noise_added,sampling_rate)
          batch_features[n] = normalize_spectrum(spectrum)
          batch_labels[n] = label
          break

    yield batch_features,batch_labels

#Callbacks
trained_model_path_Adam = 'gdrive/My Drive/Cough Detection/Pre trained Models/model-{val_accuracy:.2f}-Adam.h5'
checkpoint_Adam = ModelCheckpoint(trained_model_path_Adam, 
                             monitor='val_accuracy', 
                             save_best_only=True, 
                             mode='max', 
                             period=1)

trained_model_path_SGD= 'gdrive/My Drive/Cough Detection/Pre trained Models/model-{val_accuracy:.2f}-SGD.h5'
checkpoint_SGD = ModelCheckpoint(trained_model_path_SGD, 
                             monitor='val_accuracy', 
                             save_best_only=True, 
                             mode='max', 
                             period=1)

early_stop = EarlyStopping(monitor='val_accuracy', 
                           min_delta=0.001, 
                           patience=50, 
                           mode='max',
                           restore_best_weights=False)

def step_decay(epoch):
   initial_rate = 0.01
   drop = 0.5
   epochs_drop = 10.0
   lrate = initial_rate * math.pow(drop,math.floor((1+epoch)/epochs_drop))
   return lrate

step_scheduler  = LearningRateScheduler(step_decay)

#Model 
shape = (128,287,1)
model = Sequential()

#Layer 1 
model.add(MaxPooling2D(pool_size=(2,2),input_shape=shape,name='MaxPooling2D_1'))
model.add(Conv2D(32, kernel_size=(5,5),padding='same'))
model.add(BatchNormalization())
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(4,4)))

#Layer 2
model.add(Conv2D(64, kernel_size=(5,5),padding='same',name='Conv2D_2'))
model.add(BatchNormalization())
model.add(LeakyReLU(alpha=0.1))
model.add(MaxPooling2D(pool_size=(2,2),name='MaxPooling2D_3'))

model.add(Flatten(name = 'Flatten'))

#Layer 3
#model.add(Dense(256,name = 'Dense_1'))
#model.add(BatchNormalization(name = 'BatchNormalization_2'))
#model.add(LeakyReLU(alpha=0.1))
#model.add(Dropout(0.5,name = 'Dropout_1'))

#Layer 4
model.add(Dense(128,name = 'Dense_2'))
model.add(BatchNormalization(name = 'BatchNormalization_3'))
model.add(LeakyReLU(alpha=0.1))
model.add(Dropout(0.5,name = 'Dropout_2'))

#Layer 5
model.add(Dense(128,name = 'Dense_3'))
model.add(BatchNormalization(name = 'BatchNormalization_4'))
model.add(LeakyReLU(alpha=0.1))
#model.add(Dropout(0.5,name = 'Dropout_3'))

model.add(Dense(1,activation='sigmoid',name = 'Dense_4'))

adam = keras.optimizers.Adam(learning_rate =1e-2, beta_1=0.9, beta_2=0.999, amsgrad=False)

model.compile(optimizer= adam, loss='binary_crossentropy', metrics=['accuracy'])
batch_size = 17
history = model.fit_generator(generator(batch_size,True), epochs=150,steps_per_epoch=50, validation_data=generator(batch_size,False),validation_steps=20,callbacks = [checkpoint_Adam,step_scheduler,early_stop])
x = model.optimizer.lr
tf.print(x)

In [0]:
model_cont = keras.models.load_model('gdrive/My Drive/Cough Detection/Pre trained Models/model-0.97-Adam_best.h5')

def step_decay(epoch):
   initial_rate = 3e-3
   drop = 0.5
   epochs_drop = 10.0
   lrate = initial_rate * math.pow(drop,math.floor((1+epoch)/epochs_drop))
   return lrate

step_scheduler  = LearningRateScheduler(step_decay)
sgd = keras.optimizers.SGD(learning_rate=3e-3, momentum=0.98, nesterov=False)
model_cont.compile(optimizer= sgd, loss='binary_crossentropy', metrics=['accuracy'])
batch_size = 17
history = model_cont.fit_generator(generator(batch_size,True), epochs=150,steps_per_epoch=50, validation_data=generator(batch_size,False),validation_steps=17,callbacks = [checkpoint_SGD,early_stop,step_scheduler])
x = model_cont.optimizer.lr
tf.print(x)

In [0]:
  fig ,ax = plt.subplots()
ax.plot(history.history['accuracy'])
ax.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()
fig.savefig('gdrive/My Drive/Cough Detection/Accuracy Loss plots/acc_sgd.jpg')

In [0]:
fig ,ax = plt.subplots()
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Val'], loc='upper left')
plt.show()
fig.savefig('gdrive/My Drive/Cough Detection/Accuracy Loss plots/loss_sgd.png')

In [0]:
#Prepare the test data
random.shuffle(test_non_cough)
random.shuffle(test_cough)

test_data= [test_non_cough,test_cough]

In [0]:
n_input = len(test_data[1]) + len(test_data[0])
model_data = np.zeros((n_input,257,321,1))
model_label = np.zeros((n_input,1))
data_list = [[],[]]

for i in range(n_input):
  while True:
    label = np.random.randint(0,2)
    rand_num = random.randrange(len(test_data[label]))
    if rand_num not in data_list[label]:
      data_list[label].append(rand_num)
      selected_audio = test_data[label][rand_num]
      volume_aug_audio = volume_aug(selected_audio)
      rand_noise = random.randrange(len(bg_noise_list))
      bg_noise = bg_noise_list[rand_noise]
      bg_noise_added = bg_noise_aug(volume_aug_audio,bg_noise)
      spectrum = spect(bg_noise_added)
      model_data[i] = normalize_spectrum(spectrum)
      model_label[i] = label
      break

In [0]:
test_model = keras.models.load_model('gdrive/My Drive/Cough Detection/Pre trained Models/model-0.97-Adam_best.h5')
#test_model = keras.models.load_model('gdrive/My Drive/Cough Detection/Pre trained Models/model-0.97-SGD.h5')

pred = test_model.predict(x = model_data,batch_size = n_input)
print (test_model.evaluate(x = model_data, y = model_label,batch_size=n_input))
preds = np.array([int(np.round(pred[i]+0.05)) for i in range(len(pred))])


print (confusion_matrix(model_label,preds))
print ('\n')
print (classification_report(model_label,preds))

In [0]:
y_pred_keras = pred.ravel()
y_test = model_label.ravel()

In [0]:
fpr_keras, tpr_keras, thresholds_keras = roc_curve(y_test, y_pred_keras)

In [0]:
auc_keras = auc(fpr_keras, tpr_keras)

In [0]:
fig = plt.figure(1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras, label='Keras (area = {:.3f})'.format(auc_keras))
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve')
plt.legend(loc='best')
plt.show()
fig.savefig('gdrive/My Drive/Cough Detection/Accuracy Loss plots/ROC_adam.png')

plt.figure(2)
plt.xlim(0, 0.2)
plt.ylim(0.8, 1)
plt.plot([0, 1], [0, 1], 'k--')
plt.plot(fpr_keras, tpr_keras, label='Keras (area = {:.3f})'.format(auc_keras))
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('ROC curve (zoomed in at top left)')
plt.legend(loc='best')
plt.show()