Loading modules

In [10]:
import librosa
import numpy as np
from pydub import AudioSegment
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint,ReduceLROnPlateau,EarlyStopping
from tensorflow.keras.layers import BatchNormalization,Dropout,Conv1D,Add,MaxPooling1D,Lambda,TimeDistributed,Dense,Activation,GlobalMaxPooling1D

Loading audio files

In [11]:
path='/home/bigpenguin/projects/project_covid/gathrd_data/COHORT1/cough/cough_pos/pos-0609-002-cough-m-27.wav'

In [12]:
sound=AudioSegment.from_file(path)
sound=sound.set_frame_rate(22050)
channel_sounds = sound.split_to_mono()
samples = [s.get_array_of_samples() for s in channel_sounds]
fp_arr = np.array(samples).T.astype(np.float32)
fp_arr /= np.iinfo(samples[0].typecode).max

fp_arr=np.array([x[0] for x in fp_arr])
x=librosa.util.normalize(fp_arr)

sr=sound.frame_rate
len(x),sr

(330750, 22050)

Cough filtering model

In [13]:
def _bn_relu(layer, dropout=0, **params):
    '''
    This function returns a Batch Normalization Layer with desired activation function
    '''
    layer = BatchNormalization()(layer)
    layer = Activation(params["conv_activation"])(layer)

    if dropout > 0:
        
        layer = Dropout(params["conv_dropout"])(layer)

    return layer

def add_conv_weight(
        layer,
        filter_length,
        num_filters,
        subsample_length=1,
        **params):
    '''
    This function returns a Convolution Layer with desired params
    '''
    
    layer = Conv1D(
        filters=num_filters,
        kernel_size=filter_length,
        strides=subsample_length,
        padding='same',
        kernel_initializer=params["conv_init"])(layer)
    return layer


def add_conv_layers(layer, **params):
    '''
    This function returns a Conv1D and BN Layer stacked together
    '''
    for subsample_length in params["conv_subsample_lengths"]:
        layer = add_conv_weight(
                    layer,
                    params["conv_filter_length"],
                    params["conv_num_filters_start"],
                    subsample_length=subsample_length,
                    **params)
        layer = _bn_relu(layer, **params)
    return layer

def resnet_block(
        layer,
        num_filters,
        subsample_length,
        block_index,
        **params):
    '''
    This function returns a Resnet Block with desired activation function
    '''

    def zeropad(x):
        '''
        This function pads zeros to the input vector by a zero vector of same shape in 3rd Dimension
        This is used when convolution filters are doubled every 4th Residual Block to match the dimensions
        '''
        y = tf.zeros_like(x)
        return tf.concat([x, y], axis=2)

    def zeropad_output_shape(input_shape):
        '''
        This function checks the shape of input then doubles the 3rd dimension and returns the shape as tuple
        This is used to get dimesion shape for the zeropad function
        '''
        shape = list(input_shape)
        assert len(shape) == 3
        shape[2] *= 2
        return tuple(shape)

    # Adding Skip Connections
    shortcut = MaxPooling1D(pool_size=subsample_length)(layer)
    # At each 4th residual block, double the convolution filters, pad the shortcut so that dimensions match
    zero_pad = (block_index % params["conv_increase_channels_at"]) == 0 \
        and block_index > 0
    if zero_pad is True:
        shortcut = Lambda(zeropad, output_shape=zeropad_output_shape)(shortcut)
    for i in range(params["conv_num_skip"]):
        if not (block_index == 0 and i == 0):
            layer = _bn_relu(
                layer,
                dropout=params["conv_dropout"] if i > 0 else 0,
                **params)
        layer = add_conv_weight(
            layer,
            params["conv_filter_length"],
            num_filters,
            subsample_length if i == 0 else 1,
            **params)
    layer = Add()([shortcut, layer])
    return layer

def get_num_filters_at_index(index, num_start_filters, **params):
    '''
    This function returns the convolution filters for the specified layer
    '''
    return 2**int(index / params["conv_increase_channels_at"]) \
        * num_start_filters

def add_resnet_layers(layer, **params):
    '''
    This Function addds the residual blocks that make up the structure
    The first and last layers of the network are special-cased due to this pre-activation block structure.
    '''
    layer = add_conv_weight(
        layer,
        params["conv_filter_length"],
        params["conv_num_filters_start"],
        subsample_length=1,
        **params)
    layer = _bn_relu(layer, **params)
    for index, subsample_length in enumerate(params["conv_subsample_lengths"]):
        num_filters = get_num_filters_at_index(
            index, params["conv_num_filters_start"], **params)
        layer = resnet_block(
            layer,
            num_filters,
            subsample_length,
            index,
            **params)
    layer = _bn_relu(layer, **params)
    return layer
    
def add_output_layer(layer, **params):
    '''
    This Function adds the output layer which is a Dense Layer wrapped in a TimeDistributed Layer.
    We use TimeDistributed layer so that the model outputs a prediction for each timestep and the temporal information is retained 
    Because TimeDistributed applies the same instance of Dense to each of the timestamps, the same set of weights are used at each timestamp. 
    '''
    layer = GlobalMaxPooling1D(name = 'feats')(layer)
    layer = Dense(params["num_categories"])(layer)
    return Activation('sigmoid')(layer)

def add_compile(model, **params):
    '''
    This functions adds the compiler to the model
    We have used Adam Optimizer and Categorical Cross-Entropy Loss'''
    
    optimizer = Adam(
        learning_rate=params["learning_rate"],
        clipnorm=params.get("clipnorm", 1))

    model.compile(loss='binary_crossentropy',
                  optimizer=optimizer,
                  metrics=['accuracy'])
    return model

def build_network(**params):
    '''
    This function builds the entire network based on given parameters'''
    inputs = tf.keras.Input(shape=params['input_shape'],
                   dtype='float32',
                   name='inputs')

    if params.get('is_regular_conv', False):
        layer = add_conv_layers(inputs, **params)
    else:
        layer = add_resnet_layers(inputs, **params)

    output = add_output_layer(layer, **params)
    model = tf.keras.Model(inputs=[inputs], outputs=[output])
    if params.get("compile", True):
        model = add_compile(model, **params)
    return model


In [14]:
# Model Parameters
params = {
"conv_subsample_lengths": [1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2],
"conv_filter_length": 16,
"conv_num_filters_start": 32,
"conv_init": "he_normal",
"conv_activation": "relu",
"conv_dropout": 0.2,
"conv_num_skip": 2,
"conv_increase_channels_at": 4,

"learning_rate": 0.001,
"input_shape": [44288, 1],
"num_categories": 1,
"compile":False
}

# Cough Model
cough_model = build_network(**params)
model_path = '/home/bigpenguin/projects/surfboard_works/CNN_Weights/cnn_model3_cough_vs_non_cough.h5'
cough_model.load_weights(model_path)

In [15]:
X_s = []

if len(x)>=44288:
    x = x[0:44288]
else:
    pad_num = 44288-len(y)
    x = np.pad(x, [(0), (pad_num)], mode='constant')

X_s.append(x)

In [18]:
X_s=np.array(X_s)

In [19]:
indices = []
y_pred_proba = cough_model.predict(X_s)
y_pred = np.where(y_pred_proba>0.5,1,0)
for idx,(i,j) in enumerate(zip(y_pred,y_pred_proba)):
    if i[0] ==0:
        indices.append(idx)
    elif i[0]==1:
        continue
print(len(indices))

0


In [20]:
y_pred

array([[1]])