In [5]:

# Load various imports
from datetime import datetime
from os import listdir
from os.path import isfile, join

import librosa
import librosa.display

import numpy as np
import pandas as pd

import tensorflow
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, GlobalAveragePooling2D, Conv1D
from tensorflow.keras.layers import LeakyReLU,  BatchNormalization, Activation, Flatten, MaxPooling1D, Input
from sincnet_tensorflow import SincConv1D, LayerNorm

import keras
from keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint

from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.preprocessing import MinMaxScaler

In [None]:
def add_noise(data):
    noise_value = 0.015 * np.random.uniform() * np.amax(data)   # 1.5% with variation from 0-1 with emphasis on 0.5 of the maximum value 
    data = data + noise_value * np.random.normal(size=data.shape[0])   # Add noise to the data
    return data

def stretch_process(data, rate=0.8):
    return librosa.effects.time_stretch(data, rate=0.8) #elongate data

def pitch_process(data, sampling_rate, pitch_factor=0.7):
    return librosa.effects.pitch_shift(data, sr=sampling_rate, n_steps=pitch_factor) # Shifts the pitch of the audio data by a specified number of semitones.
#this shift is very subtle, 0.7 of a semitone is less than 1/12 of a octave. barely any change jjust a little 

def extract_process(data, sample_rate, debug=False):
    output_result = np.array([])
    #amt of times  signal crosses zero (measuring silence) 
    mean_zero = np.mean(librosa.feature.zero_crossing_rate(y=data).T, axis=0) 
    if debug: print('mean_zero shape',mean_zero.shape) 
    #you can horizontally add elements that have the same amount of rows without having to flatten the array

    output_result = np.hstack((output_result, mean_zero)) 
    #positive of all values + short time ft (measuring loudness)  + zero crossing rate (measuring silence) 
    stft_out = np.abs(librosa.stft(data))
    if debug: print('stft_out shape', stft_out.shape) 

    chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft_out, sr=sample_rate).T, axis=0)
    output_result = np.hstack((output_result, chroma_stft))
    if debug: print('chroma_stft shape',chroma_stft.shape) 
    
    mfcc_out = np.mean(librosa.feature.mfcc(y=data, sr=sample_rate, n_mfcc=40).T, axis=0)
    output_result = np.hstack((output_result, mfcc_out))
    if debug: print('mfcc_out shape',mfcc_out.shape) 

    root_mean_out = np.mean(librosa.feature.rms(y=data).T, axis=0)
    output_result = np.hstack((output_result, root_mean_out)) 
    if debug:  print('root_mean_out shape',root_mean_out.shape) 

    mel_spectogram = np.mean(librosa.feature.melspectrogram(y=data, sr=sample_rate).T, axis=0)
    output_result = np.hstack((output_result, mel_spectogram))
    if debug:  print('mel_spectogram shape',mel_spectogram.shape) 


    return output_result

def extract_features(file_name, debug=False, extract=True, length = 427770):
    try:
        # Load the original audio file
        audio, sample_rate = librosa.load(file_name, res_type='kaiser_fast', duration=42, offset=0.6)
        if debug: print(sample_rate)
        assert  sample_rate == 22050, "Sample rate is not 22050" 
        
        l = audio.shape[0]
        
        if(audio.shape[0]< length):
            audio = np.pad(audio, (0, length - audio.shape[0]))
        else:
            audio = audio[:length]
        

        if(extract):  # If extract is True, augment the audio file

            # Extract features from the original audio data
            extracted_features = extract_process(audio, sample_rate, debug=False)
            result = np.array(extracted_features)
            
        else: 
            result = np.array(audio)
            
        if debug: print("result shape before nouse",result.shape)
            

        # Add noise and extract features
        noise_out = add_noise(audio)
        if(extract): 
            
            output_2 = extract_process(noise_out, sample_rate)
            result = np.vstack((result, output_2))
        else: 
            result =  np.vstack((result, noise_out))
            
        if debug: print("result shape before stretch",result.shape, noise_out.shape) 

        # Time-stretch and then pitch-shift before extracting features
        new_out = stretch_process(audio,0.8)
        new_out2 = new_out[0:audio.shape[0]]
        stretch_pitch = pitch_process(new_out2, sample_rate,pitch_factor=0.7)
        if debug: print("before final:",result.shape,  stretch_pitch.shape) 
        if(extract): 
            output_3 = extract_process(stretch_pitch, sample_rate)
            result = np.vstack((result, output_3))
        else: 
            result = np.vstack((result, stretch_pitch))
            
        if debug: print("final:",result.shape,  stretch_pitch.shape) 

    except Exception as e:
        print("Error encountered while parsing file: ", file_name)
        print("Error Details:", e)
        return None

    return result, l

In [7]:
mypath = 'C:/Users/Agraw/Downloads/archive/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files'
filenames = [f for f in listdir(mypath) if (isfile(join(mypath, f)) and f.endswith('.wav'))] 
print(len(filenames))

p_id_in_file = [] # patient IDs corresponding to each file
for name in filenames:
    p_id_in_file.append(int(name[:3]))

p_id_in_file = np.array(p_id_in_file)

filepaths = [join(mypath, f) for f in filenames] # full paths of files
p_diag=pd.read_csv('C:/Users/Agraw/Downloads/archive/Respiratory_Sound_Database/Respiratory_Sound_Database/patient_diagnosis.csv',header=None)
labels = np.array([p_diag[p_diag[0] == x][1].values[0] for x in p_id_in_file]) # labels for audio files

# delete the very rare diseases, rewrite into same array for space? 
new_filepaths = np.delete(filepaths, np.where((labels == 'Asthma') | (labels == 'LRTI'))[0], axis=0)
new_labels = np.delete(labels, np.where((labels == 'Asthma') | (labels == 'LRTI'))[0], axis=0)
assert len(new_filepaths) == len(new_labels)

# print class counts
unique_elements, counts_elements = np.unique(new_labels, return_counts=True) 
#amt of values that have same value,  i.e. same class 
print(np.asarray((unique_elements, counts_elements)))
print(len(new_filepaths), len(new_labels))

920
[['Bronchiectasis' 'Bronchiolitis' 'COPD' 'Healthy' 'Pneumonia' 'URTI']
 ['16' '13' '793' '35' '37' '23']]
917 917


In [8]:
class DataGenerator(keras.utils.Sequence):

  def __init__(self, list_IDs, labels, batch_size=32, shuffle=True, debug = False, extract_ = True):
    'Initialization'
    self.batch_size = batch_size
    self.labels = labels
    self.list_IDs = list_IDs
    self.shuffle = shuffle
    self.indexes = np.arange(len(self.list_IDs))
    self.debug = debug 
    self.extract_ =  extract_
    self.on_epoch_end()
    
      
  def  __len__(self):
    return int(np.floor(len(self.list_IDs) / self.batch_size)) #all files in epoch 1 batch at a time

  def __getitem__(self, index):
    indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size] #[0,31] [32,63] ..
    list_IDs_temp = [self.list_IDs[k] for k in indexes] 
    label_temp = [self.labels[k] for k in indexes]
    #does this work... 
    #list_IDs_temp = new_filepaths 
      
    X, y = self.__data_generation(list_IDs_temp, label_temp)
    return X, y

  def __data_generation(self, filepath, label):
    X = []
    y = []

    for i in range(self.batch_size):
      data, l = extract_features(filepath[i], debug=False, extract=self.extract_)
      X.append(data)
      y.append(label[i]) 
    
    X = np.array(X)
    y = np.array(y)

    
    if self.debug: print(X.shape, y.shape)
    X = X.reshape(-1, X.shape[2]) # flatten (batch_size, 3, <featrue_size>) -> (batch*3,  <feature_size>)

    y = np.repeat(label, 3, axis=0) # augment chagne this code to batch*3,6 not 54 
    if self.debug: print(X.shape,  y.shape)

    X = np.expand_dims(X,axis=2) #readable format for model
    if self.debug: print(X.shape,  y.shape)
    
    return X, y

  def on_epoch_end(self):
    if self.shuffle == True:
      np.random.shuffle(self.indexes)
  



In [9]:
le = LabelEncoder() #[0,0,0,0,0,1] []
new_labels_1hot = le.fit_transform(new_labels) 
new_labels_1hot = to_categorical(new_labels_1hot) #using onehot encoding to convert labels into 1hot format [0,0,0,0,0,1]  

train_fp, test_fp, label_train, label_test = train_test_split(new_filepaths, new_labels_1hot, stratify=new_labels_1hot, #doesnt work with features 
                                                     test_size=0.2, random_state = 42)
temp = label_train
temp2 = train_fp
label_train = label_train[:int(len(label_train)*.8)]
label_val = temp[int(len(temp)*.8):]
train_fp = train_fp[:int(len(train_fp)*.8)]
train_val = temp2[int(len(temp2)*.8):]

# for d in unique_elements:
#     files_trained = train_fp[np.argwhere(label_train ==d)]
#     files_tested = test_fp[np.argwhere(label_test ==d)]

#     print(len(files_trained), len(files_tested))
    
batch_size = 32
training_generator =  DataGenerator(train_fp, label_train, batch_size=batch_size, shuffle=True, extract_=True, debug = False)
testing_generator =  DataGenerator(test_fp, label_test, batch_size=batch_size, shuffle=False, extract_=True, debug = False)
validation_generator = DataGenerator(train_val, label_val, batch_size=batch_size, shuffle=True, extract_=True, debug = False)



In [12]:
assert True 
print(training_generator.__len__())
print(train_fp.__len__())
for k in range(2):
    for i in range(train_fp.__len__()): 
        x,y = training_generator.__getitem__(i)
        print(x.shape, y.shape)
    train_fp.on_epoch_end()   # Call on_epoch_end to reset the index

    
#training_generator.__getitem__(0)
# x 3 -> 3 features -> 3,3, <> 
# y 3, <>
# y 3,3,<> = 3,3,6 => 54... 9, 6
# (9, 427770) (54,)


18
586
(96, 182, 1) (96, 6)


  return pitch_tuning(


(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)
(96, 182, 1) (96, 6)


IndexError: list index out of range

In [15]:
tmp_x, tmp_y = training_generator.__getitem__(0)
#(32,182,1) (32, 6)
print(tmp_x.shape[1])

KeyboardInterrupt: 

In [None]:
from tensorflow.keras import layers, Sequential
sincmodel = False

num_labels = len(np.unique(new_labels))  # Determine the number of unique classes

'''
right now doing feature extract, want to use only raw data and make new model 
'''
if(sincmodel):
    sinc_layer = SincConv1D(N_filt=64, Filt_dim=129, fs=22050, stride=16, padding="SAME")
    inputs = Input((427770, 1))

    x = sinc_layer(inputs)
    x = LayerNorm()(x)

    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPooling1D(pool_size=2)(x)


    x = Conv1D(2, 3, strides=1, padding='valid')(x)
    x = BatchNormalization(momentum=0.05)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPooling1D(pool_size=2)(x)

    x = Conv1D(2, 3, strides=1, padding='valid')(x)
    x = BatchNormalization(momentum=0.05)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPooling1D(pool_size=2)(x)

    x = Conv1D(2, 3, strides=1, padding='valid')(x)
    x = BatchNormalization(momentum=0.05)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPooling1D(pool_size=2)(x)

    x = Conv1D(2, 3, strides=1, padding='valid')(x)
    x = BatchNormalization(momentum=0.05)(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = MaxPooling1D(pool_size=2)(x)

    x = Flatten()(x)

    x = Dense(256)(x)
    x = BatchNormalization(momentum=0.05, epsilon=1e-5)(x)
    x = LeakyReLU(alpha=0.2)(x)

    x = Dense(256)(x)
    x = BatchNormalization(momentum=0.05, epsilon=1e-5)(x)
    x = LeakyReLU(alpha=0.2)(x)

    prediction = Dense(num_labels, activation='softmax')(x)
    model = tensorflow.keras.models.Model(inputs=inputs, outputs=prediction)

    model.summary()
else:

    model = Sequential([
        layers.Conv1D(256, kernel_size=5, strides=1, padding='same', activation='relu', input_shape=(tmp_x.shape[1], 1)),
        layers.MaxPooling1D(pool_size=5, strides = 2, padding = 'same'),
        layers.Conv1D(256, kernel_size=5, strides=1, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=5, strides = 2, padding = 'same'),
        layers.Conv1D(128, kernel_size=5, strides=1, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=5, strides = 2, padding = 'same'),
        layers.Conv1D(64, kernel_size=5, strides=1, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=5, strides = 2, padding = 'same'),
        layers.Conv1D(32, kernel_size=5, strides=1, padding='same', activation='relu'),
        layers.MaxPooling1D(pool_size=5, strides = 2, padding = 'same'),
        layers.Dropout(0.2),
        layers.Flatten(),
        layers.Dense(units=32, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(units=num_labels, activation='softmax')
    ])

In [9]:
model.compile(optimizer = 'adam' , loss = 'categorical_crossentropy' , metrics = ['accuracy'])
early_stop = tensorflow.keras.callbacks.EarlyStopping(monitor="loss",patience=100,mode="min")

In [10]:
evConv1D_Model = model.fit(training_generator, epochs=200, validation_data=validation_generator, callbacks=[early_stop])
# evConv1D_Model = model.fit_generator(generator=training_generator, validation_data=testing_generator, batch_size=64, epochs=200,)   

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200

KeyboardInterrupt: 

In [None]:
Model_Results = model.evaluate(testing_generator)#check
print("LOSS:  " + "%.4f" % Model_Results[0])
print("ACCURACY:  " + "%.4f" % Model_Results[1])

In [None]:
# Plotting loss
plt.figure(figsize=(14, 5))
plt.subplot(1, 2, 1)
plt.plot(evConv1D_Model.history['loss'], label='Training Loss')
plt.plot(evConv1D_Model.history['val_loss'], label='Validation Loss')
plt.legend()
plt.title('Training and Validation Loss')

# Plotting accuracy
plt.subplot(1, 2, 2)
plt.plot(evConv1D_Model.history['accuracy'], label='Training Accuracy')
plt.plot(evConv1D_Model.history['val_accuracy'], label='Validation Accuracy')
plt.legend()
plt.title('Training and Validation Accuracy')

plt.show()


In [None]:
# Getting the predictions
import sklearn
y_pred = Model.predict(testing_generator)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)

# Generating the confusion matrix
confusion_mtx = confusion_matrix(y_true, y_pred_classes)

# Displaying the confusion matrix
cm_display = ConfusionMatrixDisplay(confusion_mtx, display_labels=le.classes_).plot()
print(sklearn.metrics.f1_score(y_true, y_pred_classes, average='weighted'))
