In [1]:
import os
import shutil

from jupyter_client.manager import KernelManager
from IPython import display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_io as tfio
#

import keras.backend as K


from keras import Model
from keras.callbacks import  ModelCheckpoint
from keras.layers import *
from keras.utils.vis_utils import plot_model
from keras import layers
from keras.models import Sequential

from keras.optimizers import *
from keras.callbacks import *
import random
from keras.metrics import *


##
import librosa
import soundfile as sf
import sounddevice as sd
import audio2numpy as a2n
import numpy as np
import math

## arduino control
import pyfirmata
import serial
import time



In [20]:
## Final assemble


yamnet_model_path = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_path)
    


def load_mymodel_and_weights(weights_path):
    """ This function will load all the needed material for doing prediction like the whole model and variables"""
    
    """loading model"""
    
    input_segment= tf.keras.layers.Input(shape=(1024), dtype=tf.float32,
                    name='input_segment')
    X= Dense(512, LeakyReLU(0.1))(input_segment)
    X= Dense(256, LeakyReLU(0.1))(X)
    output_layer= Dense(2)(X)
    my_model= Model(input_segment, output_layer)
    
    
    my_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                    optimizer= tf.keras.optimizers.Adam(learning_rate= 1e-5),
                    metrics=['accuracy'])
    
    weights= my_model.load_weights(weights_path)
    return my_model, weights

## This function will predict wheather the given audio file have cough or not in it. 

def do_prediction(testing_wav_data, sr= 16000):
    
    """Input= numpy audio file
        output 1 if the audio have cough, 0 if not
    """
    testing_wav_data= load_wav_16k_mono(testing_wav_data, sr)
    scores, embeddings, spectrogram = yamnet_model(testing_wav_data)
    result= my_model.predict(embeddings)
    inferred_class = my_classes[result.mean(axis=0).argmax()]
    
    print(f'The main sound is: {inferred_class}')
    
    return result.mean(axis=0).argmax() ## output 1 if the audio have cough, 0 if not


def recording_audio(duration, fs=16000):
    """This function will record audio file for given sec"""
    recorded_audio= sd.rec(int(duration*fs), samplerate= fs, channels= 1)
    sd.wait()
    return recorded_audio

def loading_audio(audio_path):
    loaded_audio, sr= sf.read(audio_path)
    loaded_audio= loaded_audio.reshape(loaded_audio.shape[0], 1)
    duration= round(len(loaded_audio)/sr)
    
    return loaded_audio, sr, duration


def load_wav_16k_mono(audio_file, sr= 16000):
    """ Load a WAV file, convert it to a float tensor, resample to 16 kHz single-channel audio. """ 
    wav= tf.cast(audio_file, dtype= tf.float32)
    # sample_rate= sr
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sr, dtype=tf.int64)
    wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

def load_wav_for_map(audio_file, sr, label):
    return load_wav_16k_mono(audio_file, sr), label



In [None]:
import pyfirmata
import time

comport='COM9'

board=pyfirmata.Arduino(comport)

led_1=board.get_pin('d:13:o')


In [15]:
# def arduino_led(val):
#     # try: 
#     #     serialcomm= serial.Serial(port, baud_rate)
#     #     serialcomm.timeout= 1
#     # except:
#     #     serialcomm.close()
#     #     serialcomm= serial.Serial(port, baud_rate)
#     #     serialcomm.timeout= 1
    
#     serialcomm= serial.Serial(port, baudrate= baud_rate)
#     serialcomm.timeout = 1
#     serialcomm.write(val.encode())
#     time.sleep(0.5)
#     serialcomm.close()

def arduino_led(val):
    if val==0:
        led_1.write(0)
    if val == 1:
        led_1.write(1)
        time.sleep(0.1)
        led_1.write(0)

In [22]:
def cough_rate(wav_file, sr , duration, sec_per_audio):
    count= 0
    for i in range(int(np.ceil(duration/sec_per_audio))):
        
        temp_wav_data= wav_file[round(sr*i) : round(sr*(i+sec_per_audio))]
        # sd.play(temp_wav_data)
        # sd.wait()
        result= do_prediction(temp_wav_data, sr)
        arduino_led(result) ## for controlling arduino
        count= count + result
    
    print(f"Current Cough rate is {count/duration} coughs per sec")
    
    return count 


## cough rate for every sec
def cough_rate_new(wav_file, sr= 16000):
    result= do_prediction(wav_file, sr)
    # if result == 1:
    #     arduino_led("on")
    return int(result)



load config 

load the model

record the audio

convert it to segments to predict



In [6]:
#### configuration variable

duration= 1 
fs= 44000 #16000 #smaple rate
sec_per_audio= 1 ## audio length for giving to prediction
my_classes= ["Not_Cough", "Cough"]

port= "COM9" # port of the arduino connected to the computer
baud_rate= 9600  # arduino baud rate


In [7]:
# loading model and weights
weights_path= "D:\\Python Codes\\Python Projects\\Cough Detector Project\\Weights\\try4_ep_135_val_acc_0.874.h5"

my_model, weights= load_mymodel_and_weights(weights_path)

In [None]:
## recording or loading file and determining cough rate

loaded_audio= recording_audio(duration=duration, fs= fs)
cough_rate(loaded_audio, fs, duration, sec_per_audio= duration)

# audio_path= 'D:\Python Codes\Python Projects\Cough Detector Project\EZTMC4Z-coughing_mono.wav'
# loaded_audio, sr, duration= loading_audio(audio_path)
# cough_rate(loaded_audio,sr, duration, sec_per_audio= 1.75)


In [23]:
## real time implementaton
# duration= 2
# fs= 16000 #smaple rate
# sec_per_audio= 1 ## audio length for giving to prediction
# my_classes= ["Not_Cough", "Cough"]

count_time= 60

while True:
    start_time= time.time()
    cough_rate_array= list()
    while True:
        current_time= time.time()
        elapsed_time= current_time - start_time
        if elapsed_time > count_time:
            final_cough_rate = sum(cough_rate_array)
            print(f'Current cough count is {final_cough_rate} coughs  in {elapsed_time} sec')
            break 
        else:
            loaded_audio= recording_audio(duration= duration, fs= fs)
            # sd.play(loaded_audio)
            cr= cough_rate_new(loaded_audio, fs)
            cough_rate_array.append(cr)
    
    break


# n= round(3600/duration)
# # cough_rate_array= list()
# for i in range(n):
#     loaded_audio= recording_audio(duration=duration, fs= fs)
#     cr= cough_rate(loaded_audio, fs, duration, sec_per_audio= duration)
#     cough_rate_array.append(cr)

# final_cough_rate= sum(cough_rate_array) #/(duration*n)

# print(f'Current cough count {final_cough_rate} for the last hour')





The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not_Cough
The main sound is: Not

In [28]:
count_time= 10
starting_time= time.time()
while True:
    current_time= time.time()
    elapsed_time= current_time - starting_time
    if elapsed_time > count_time:
        break
    else:
        recorded_audio= recording_audio(duration= 5, fs= 16000)
        sd.play(recorded_audio, 16000)
        sd.wait()

In [29]:
# loaded_audio, sr= sf.read('D:\Python Codes\Python Projects\Cough Detector Project\EZTMC4Z-coughing_mono.wav')
# loaded_audio= loaded_audio.reshape(loaded_audio.shape[0],1)
# # testing_wav_data= load_wav_16k_mono(loaded_audio.reshape(loaded_audio.shape[0],1), sr)
# # sd.play(loaded_audio)
# duration= round(len(loaded_audio)/sr)

# audio_path= 'D:\Python Codes\Python Projects\Cough Detector Project\EZTMC4Z-coughing_mono.wav'
audio_path= "F:\Downloads\Documents\Cough Detector\Segmented Flusense Data\eval\Cough\\0_0Dh4NhF27jc_cough-27.wav"
loaded_audio, sr, duration= loading_audio(audio_path)
cough_rate(loaded_audio, sr, duration, sec_per_audio= 1)

The main sound is: Not_Cough
Current Cough rate is 0.0 coughs per sec


0

### Evaluation

In [43]:
def pos_neg_count(folder_path):
    pos_count= 0
    neg_count= 0
    for path_name in os.listdir(folder_path):
        temp_audio_path= os.path.join(folder_path, path_name)
        temp_loaded_audio, temp_sr, temp_duration= loading_audio(temp_audio_path)
        result=cough_rate(temp_loaded_audio, temp_sr, temp_duration, sec_per_audio= temp_duration)
        if result == 1 :
            pos_count= pos_count + 1
        elif result == 0:
            neg_count = neg_count + 1
            
    return pos_count, neg_count


In [44]:
cough_folder_path= "F:\Downloads\Documents\Cough Detector\Segmented Flusense Data\eval\Cough"
others_folder_path= "F:\Downloads\Documents\Cough Detector\Segmented Flusense Data\eval\Others"
pp_count, pn_count = pos_neg_count(cough_folder_path)
np_count, nn_count= pos_neg_count(others_folder_path)



The main sound is: Not_Cough
Current Cough rate is 0.0 coughs per sec
The main sound is: Not_Cough
Current Cough rate is 0.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Not_Cough
Current Cough rate is 0.0 coughs per sec
The main sound is: Not_Cough
Current Cough rate is 0.0 coughs per sec
The main sound is: Not_Cough
Current Cough rate is 0.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Cough
Current Cough rate is 1.0 coughs per sec
The main sound is: Not_Cough
Current Cough rate is 0.0 c

In [53]:
print(pp_count, pn_count, np_count, nn_count)
TP= pp_count
TN= nn_count
FN= pn_count
FP= np_count
accur= (TP + TN)/ (TP + FN + TN + FP)
precision= TP / (TP + FP)
recall= TP /(TP + FN)

F1= 2 * precision * recall /(precision +recall)
specificity = TN/ (TN + FP)

print(f"accuracy= {accur}, precision= {precision}, recall= {recall}, F1= {F1}, specificity= {specificity}")

948 283 65 1712
accuracy= 0.8843085106382979, precision= 0.9358341559723593, recall= 0.7701056051990252, F1= 0.8449197860962566, specificity= 0.9634214969048959


TP= 948,  FN= 283, FP= 65, TN= 1712

accuracy= 0.8843085106382979, precision= 0.9358341559723593, recall= 0.7701056051990252, F1= 0.8449197860962566, specificity= 0.9634214969048959

In [None]:
# count= 0
# for i in range(int(np.ceil(duration/sec_per_audio))):
#     temp_wav_data= myrecord[fs*i : fs*(i+sec_per_audio)]
#     result= do_prediction(temp_wav_data)
#     count= count + result

## Arduino control

In [67]:
## arduino control
import serial
import time

port= "COM9" # port of the arduino connected to the computer
baud_rate= 9600  # arduino baud rate

## opening a port for serial control (for new port open previous port must be close)

try: 
    serialcomm= serial.Serial(port, baud_rate)
    serialcomm.timeout= 1
except:
    serialcomm.close()
    serialcomm= serial.Serial(port, baud_rate)
    serialcomm.timeout= 1
    



In [81]:
# serialcomm.close()
def arduino_led(val):
    # try: 
    #     serialcomm= serial.Serial(port, baud_rate)
    #     serialcomm.timeout= 1
    # except:
    #     serialcomm.close()
    #     serialcomm= serial.Serial(port, baud_rate)
    #     serialcomm.timeout= 1
    serialcomm= serial.Serial(port, baudrate= baud_rate)
    if val== 1 :
        val= "on"
        serialcomm.write(val.encode())
        
        time.sleep(2)
        val= "off"
        serialcomm.write(val.encode())
    serialcomm.close()
    
    

In [83]:
array= [0, 1, 0, 1, 0, 1, 0, 1,  1, 1, 1, 0, 1 ]
l= len(array)

for i in range(l):
    arduino_led(array[i])

# serialcomm.close()
