In [1]:
#Research references:
#1) Dry/wet cough classification: https://link.springer.com/article/10.1007/s10439-013-0741-6
#2) Pneumonia classification: https://ieeexplore.ieee.org/stamp/stamp.jsp?arnumber=6987276

In [2]:
#from scipy.io.wavfile import read
#import wave
import numpy as np
import os
import sox
#import pywt #wavelets
from pydub import AudioSegment
from pydub.silence import split_on_silence
from pydub.utils import mediainfo
import matplotlib.pyplot as plt
import python_speech_features as spe_feats
import pandas as pd
from scipy.stats import kurtosis, skew
from scipy.signal import lfilter
import librosa
import math
import sys

## Functions

In [10]:
#Apply pre-emphasis (high-pass) filter
def apply_preEmph(x):
    x_filt = lfilter([1., -0.63], 1, x)
    return x_filt
        
#Obtain autocorrelation
def autocorr(x):
    result = np.correlate(x, x, mode='full')
    return result[int((result.size+1)/2):] #Note: other people use re.size/2:, but this does not work for me 
                                   # TODO: check consistency in other computers
#Get fundamental frequency (F0)
def get_F0(x,fs):
    #autocorrelation-based method to extract F0
    xcorr_arr = autocorr(x)
    
    #looking for F0 in the frequency interval 50-500Hz, but we search in time domain
    min_ms = round(fs/500)
    max_ms = round(fs/50)
    
    xcorr_slot = xcorr_arr[max_ms+1:2*max_ms+1]
    xcorr_slot = xcorr_slot[min_ms:max_ms]
    t0 = np.argmax(xcorr_slot)
    F0 = fs/(min_ms+t0-1)
    return F0

#Get formants
def get_formants(x, lp_order):
    #LP-based method
    
    #compute lp coefficients
    a = librosa.lpc(x, lp_ord)

    #get roots from lp coefficients
    rts = np.roots(a)
    rts = [r for r in rts if np.imag(r) >= 0]

    #get angles
    angz = np.arctan2(np.imag(rts), np.real(rts))

    #get formant frequencies
    formants = sorted(angz * (fs_targ / (2 * math.pi)))
    
    return formants

#Extract frequencies
def feature_extraction(x,fs,feats_df,lp_ord,ID):
#Extract features from signal x (identified as ID), and concatenate them to dataframe feats_df
#Features' reference: https://link.springer.com/article/10.1007/s10439-013-0741-6
  
    #TODO:
    #0)Wavelets
    
    #DOUBT: if log-energy feature is included, should I also include the first mfcc coefficient (c0) ?
    #1)mfcc
    mfcc_feat = spe_feats.mfcc(x,fs)
          
    #2)zero-crossing rate
    zcr_feat = (((x[:-1] * x[1:]) < 0).sum())/len(x)
    
    #3)Formant frequencies
    #using LP-coeffcs-based method
    form= get_formants(x, lp_ord)
    
    #we keep just the first 4 formants
    formant_feats = form[0:4]
    
    #4)Log-energy
    logEnergy_feat = np.log10( ( (np.power(x,2)).sum()/len(x) ) + eps)    
    
    #5)Pitch (F0)
    F0_feat = get_F0(x,fs)
    
    #6)Kurtosis
    kurt_feat = kurtosis(x)
    
    #7)Bispectrum Score (BGS)
    #TODO
    
    #8)Non-Gaussianity Score (NGS)
    #TODO
    # Do kernel density estimation
    #p = KernelDensity(kernel='gaussian', bandwidth=0.75).fit(seg_iarray)
    #q generate Gaussian distribution? using mean and standard dev of reference data, plus min and max values?
    
    #9) Adding skewness as measure of non-gaussianity (not in paper)
    skew_feat = skew(x)

    feats_df = feats_df.append(pd.DataFrame({'Id': ID, 'mfcc': [mfcc_feat], 
                                       'kurtosis': kurt_feat, 'logEnergy': logEnergy_feat, 'zcr': zcr_feat,
                                       'formants': [formant_feats], 'F0': F0_feat, 'skew': skew_feat},
                                      index=[0]), ignore_index=True, sort=False)
    return feats_df


## Settings

In [11]:
#Initialize data frame of features:

feats = pd.DataFrame([])

#tiny constant value
eps = sys.float_info.epsilon

#Features' settings:

fs_targ = 16000 # set all audios to this sampling frequency
n_channels_targ = 1

#framing
winlen=0.025
winstep=0.01

#mfcc
mfcc_coeffcs= 12 #as paper (https://link.springer.com/article/10.1007/s10439-013-0741-6)

lp_ord = int(round(2 + fs_targ/1000)) #standard rule of thumb for LP oder

## Main

In [12]:
norm_skip = False #skip normalization step (because it has been done previously)

#s = read(audiofile)
FOLDER_PATH = 'data/YT_set/wavs/1/'
for file_name in os.listdir(FOLDER_PATH):
    
    fname_noExt = os.path.splitext(file_name)[0] #file name without extension
    
    #full path file name
    full_fname = FOLDER_PATH+file_name
    #print(full_fname)
    
    #TODO: put normalized wavs in other folder
    #name for normalization
    NORM_FOLDER_PATH = 'data/YT_set/wavs/norm/'
    norm_fname = NORM_FOLDER_PATH + os.path.splitext(file_name)[0] + '_NORM.wav'
    
    if norm_skip is False: 
        ## Normalization
        
        #level to same dB
        tfm = sox.Transformer()
        tfm.gain(gain_db=0.0, normalize=False, limiter=False, balance=None)
        #downsample to 16kHz and 1 channel
        tfm.convert(samplerate=fs_targ, n_channels=n_channels_targ, bitdepth=None) 
        #tfm.norm(db_level=0.0)
    
        # create the output normalized audio
        
        print(norm_fname)
        tfm.build(full_fname, norm_fname)
        tfm.effects_log
    
    # load normalized audio
    s = AudioSegment.from_wav(norm_fname)
    #sampling rate:
    info = mediainfo(norm_fname)
    fs = float(info['sample_rate'])
    
    #get ID of recording
    ID = fname_noExt.split('-')[1] #for the current type of naming
    
    #get label
    ## TODO: probably better to insert label in the file name, during creation of data set
    
    
    ## Segmentation of cough streams (silence-based)
    #min_silence_len in ms, silence_thresh in dB
    s_segments = split_on_silence (s, min_silence_len = 600, silence_thresh = -30)
    ## TODO: set more accurate thresholds, or find other way to split (variance-based?)
    
    #convert s_segments to numpy array format
    AudioSegment2numpy_arr = lambda x: np.asarray(x.get_array_of_samples())
    s_segments_np = list(map(AudioSegment2numpy_arr, s_segments))
    
    print('High-pass filtering...')
    #pre-emphasis filtering to each segment
    preEmph_filtering = lambda x: apply_preEmph(x)
    s_segments_filt = list(map(preEmph_filtering, s_segments_np))
    
    #TODO
    #2) the segment is divided into X non-overlapping subsegments (X=3 for dry/wet cough paper,
    #X=12 for pneumonia paper)
    #TODO: window framing: I think maybe the segments need to be windowed with non-overlapping frames? 
    #(see frame settings at beggining of notebook)
    
    print('Computing features...')
    #Feature extraction for each segment
    
    #(lambda function doesn't work )
    #feat_extr_step = lambda x, fs, feats_df, lp_ord, ID: feature_extraction(x,fs,feats_df,lp_ord,ID)
    #feats = feat_extr_step(s_segments_filt,fs,feats,lp_ord,ID)
    for idx, seg_i in enumerate(s_segments_filt):
        print(idx)
        feats = feature_extraction(seg_i,fs,feats,lp_ord,ID)
    
       

output_file: data/YT_set/wavs/norm/Dry Afternoon Cough-6LK6yHtIung_NORM.wav already exists and will be overwritten on build


data/YT_set/wavs/norm/Dry Afternoon Cough-6LK6yHtIung_NORM.wav
High-pass filtering...
Computing features...
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


### Pre-processing of features

In [None]:
#TODO: Pre-proces the dataframe prior to use with a model (normalize if needed for the model, check there is no NaNs...)





## Model training

In [None]:
#Cough sound
#Breathing rate
#Breathing rhytm (consistence smoothness)
#Cough rate
#Panic level
#Hoarseness