# Wheeze and Crackle Probability Classification

Credit to : https://www.kaggle.com/code/monkeypox2022/wheeze-crackles-detection#Imports

In [None]:
import os
import math
import wave
import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

%matplotlib inline

import seaborn as sns

import tensorflow as tf
import matplotlib.pyplot as plt
# np_utils change to to_categorical
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Embedding,LSTM,GRU,Dense,MaxPooling1D,Dropout
from tensorflow.keras.layers import LeakyReLU,ReLU,Flatten,concatenate,Bidirectional,TimeDistributed
from tensorflow.keras.layers import add,Conv1D,SeparableConv1D, GlobalMaxPooling2D,GlobalMaxPooling1D
from tensorflow.keras.layers import Conv2D,Input,Activation,BatchNormalization,MaxPooling2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import concatenate
from tensorflow.keras.models import Model,load_model
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.optimizers import Adamax
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras import regularizers
from tensorflow.keras.applications.densenet import DenseNet201

from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,matthews_corrcoef
from sklearn.metrics import cohen_kappa_score,roc_auc_score,confusion_matrix,classification_report

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
dataset_path = "/content/drive/MyDrive/ICBHI_final_database/"

In [None]:
df_no_diagnosis = pd.read_csv(dataset_path + 'demographic_info.txt', names =
                 ['Patient number', 'Age', 'Sex' , 'Adult BMI (kg/m2)', 'Child Weight (kg)' , 'Child Height (cm)'],
                 delimiter = ' ')

diagnosis = pd.read_csv(dataset_path + 'patient_diagnosis.csv', names = ['Patient number', 'Diagnosis'])

In [None]:
df =  df_no_diagnosis.join(diagnosis.set_index('Patient number'), on = 'Patient number', how = 'left')
df['Diagnosis'].value_counts()

Unnamed: 0_level_0,count
Diagnosis,Unnamed: 1_level_1
COPD,64
Healthy,26
URTI,14
Bronchiectasis,7
Pneumonia,6
Bronchiolitis,6
LRTI,2
Asthma,1


In [None]:
root = dataset_path + 'audio_and_txt_files/'

filenames = [s.split('.')[0] for s in os.listdir(path = root) if '.txt' in s]

## Read Respiratory Cycle data

In [None]:
def Extract_Annotation_Data(file_name, root):
    tokens = file_name.split('_')
    recording_info = pd.DataFrame(data = [tokens], columns = ['Patient number', 'Recording index', 'Chest location','Acquisition mode','Recording equipment'])
    recording_annotations = pd.read_csv(os.path.join(root, file_name + '.txt'), names = ['Start', 'End', 'Crackles', 'Wheezes'], delimiter= '\t')
    return (recording_info, recording_annotations)

In [None]:
i_list = []
rec_annotations = []
rec_annotations_dict = {}
for s in filenames:
    (i,a) = Extract_Annotation_Data(s, root)
    i_list.append(i)
    rec_annotations.append(a)
    rec_annotations_dict[s] = a
recording_info = pd.concat(i_list, axis = 0)
recording_info.head()

my_index = pd.Series(list(rec_annotations_dict.keys()))

## Get Respiratory Cycle wheeze and crackles

In [None]:
no_label_list = []
crack_list = []
wheeze_list = []
both_sym_list = []
filename_list = []
for f in filenames:
    d = rec_annotations_dict[f]
    no_labels = len(d[(d['Crackles'] == 0) & (d['Wheezes'] == 0)].index)
    n_crackles = len(d[(d['Crackles'] == 1) & (d['Wheezes'] == 0)].index)
    n_wheezes = len(d[(d['Crackles'] == 0) & (d['Wheezes'] == 1)].index)
    both_sym = len(d[(d['Crackles'] == 1) & (d['Wheezes'] == 1)].index)
    no_label_list.append(no_labels)
    crack_list.append(n_crackles)
    wheeze_list.append(n_wheezes)
    both_sym_list.append(both_sym)
    filename_list.append(f[:3])

### เราพยายามนับจำนวนครั้งของ Wheeze และ Crackles ที่เจอในผู้ป่วยนั้นๆ ผ่านการนับข้อมูลจาก Audio

In [None]:
file_label_df = pd.DataFrame(data = {'filename':filename_list, 'no label':no_label_list, 'crackles only':crack_list, 'wheezes only':wheeze_list, 'crackles and wheezees':both_sym_list})

In [None]:
lables = file_label_df
lables.sort_values(by=['filename'])
sum = lables.groupby('filename').sum()

In [None]:
conditions = [
    (sum['crackles only'] == 0) & (sum['wheezes only'] == 0) & (sum['crackles and wheezees'] == 0),
    (sum['crackles only'] == sum.max(axis=1)),
    (sum['wheezes only'] == sum.max(axis=1)),
    (sum['crackles and wheezees'] == sum.max(axis=1)),
    (sum['no label'] == sum.max(axis=1)) & (sum['crackles only'] > sum['wheezes only']) & (sum['crackles only'] > sum['crackles and wheezees']),
    (sum['no label'] == sum.max(axis=1)) & (sum['wheezes only'] >= sum['crackles only']) & (sum['wheezes only'] > sum['crackles and wheezees']),
    (sum['no label'] == sum.max(axis=1)) & (sum['crackles and wheezees'] >= sum['crackles only']) & (sum['crackles and wheezees'] >= sum['wheezes only']),
]

values = ['Healthy' , 'Crackles' , 'Wheezes','Wheezes & Crackles', 'Crackles' , 'Wheezes','Wheezes & Crackles']
sum = lables.groupby('filename').sum()
sum['diagnosis'] = np.select(conditions, values)
sum

Unnamed: 0_level_0,no label,crackles only,wheezes only,crackles and wheezees,diagnosis
filename,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
101,23,0,0,0,Healthy
102,13,0,0,0,Healthy
103,2,0,4,0,Wheezes
104,44,1,10,0,Wheezes
105,8,0,0,0,Healthy
...,...,...,...,...,...
222,24,13,8,0,Crackles
223,29,19,13,0,Crackles
224,14,0,0,0,Healthy
225,14,0,0,0,Healthy


สังเกตว่าด้านบน แต่ละผู้ป่วย หากมีแต่อาการเพียง Wheeze เราก็จะ Classify ว่าเป็น Wheeze

In [None]:
print (sum.diagnosis.value_counts())
print ('')
print (sum.diagnosis.value_counts(normalize=True) *100)

diagnosis
Crackles              50
Wheezes               36
Healthy               34
Wheezes & Crackles     6
Name: count, dtype: int64

diagnosis
Crackles              39.682540
Wheezes               28.571429
Healthy               26.984127
Wheezes & Crackles     4.761905
Name: proportion, dtype: float64


### Save Diagnosis Data

In [None]:
sum.diagnosis.to_csv('wheezes_crackles_diagnosis.csv')

## Data Augmentation

In [None]:
audio_data = dataset_path + 'audio_and_txt_files/'

In [None]:
diagnosis_df = pd.read_csv('/content/wheezes_crackles_diagnosis.csv', names=['patient_id', 'disease'],header=0)
diagnosis_df.head(4)

Unnamed: 0,patient_id,disease
0,101,Healthy
1,102,Healthy
2,103,Wheezes
3,104,Wheezes


In [None]:
def add_noise(data,x):
    noise = np.random.randn(len(data))
    data_noise = data + x * noise
    return data_noise

def shift(data,x):
    return np.roll(data, x)

def stretch(data, rate):
    data = librosa.effects.time_stretch(data, rate)
    return data

def pitch_shift (data , rate):
    data = librosa.effects.pitch_shift(data, sr=220250, n_steps=rate)
    return data

เราทำการเพิ่ม Noise, Shift บางวินาที, Stretch ยืดเสียง และเพิ่มคลื่นความถี่ Pitch ให้มีคีย์ที่สูงขึ้น

In [None]:
def mfcc_feature_exteraction_wheezes(dir_):
    X_=[]
    y_=[]
    crackles=[]
    crackles_count=0
    data = diagnosis_df
    features = 52
    for soundDir in (os.listdir(dir_)):
        if soundDir[-3:]=='wav':

            p = list(data[data['patient_id']==int(soundDir[:3])]['disease'])[0]

            if (p=='Crackles'):
                if (soundDir[:7] in crackles) and crackles_count<2:
                    data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                    mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis=0)
                    crackles.append(soundDir[:7])
                    crackles_count+=1
                    X_.append(mfccs)
                    y_.append('Not Wheezes')

                    data_stretch = stretch(data_x,1.2)
                    mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                    X_.append(mfccs_stretch)
                    y_.append('Not Wheezes')

                if (soundDir[:7] not in crackles):
                    data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                    mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis=0)
                    crackles.append(soundDir[:7])
                    crackles_count=0
                    X_.append(mfccs)
                    y_.append('Not Wheezes')

                    data_stretch = stretch(data_x,1.2)
                    mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                    X_.append(mfccs_stretch)
                    y_.append('Not Wheezes')

            if (p == 'Wheezes'):
                data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs)
                y_.append('Wheezes')

                data_stretch = stretch(data_x,1.2)
                mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch)
                y_.append('Wheezes')

                data_stretch_2 = stretch(data_x,0.8)
                mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch)
                y_.append('Wheezes')


            if (p=='Wheezes & Crackles'):
                data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs)
                y_.append('Not Wheezes')

            if (p == 'Healthy'):
                data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs)
                y_.append('Not Wheezes')

                data_stretch = stretch(data_x,1.2)
                mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch)
                y_.append('Not Wheezes')


    X_data = np.array(X_)
    y_data = np.array(y_)



    return X_data ,y_data

In [None]:
def mfcc_feature_exteraction_crackles(dir_):
    X_=[]
    y_=[]
    data = diagnosis_df
    features = 52
    for soundDir in (os.listdir(dir_)):
        if soundDir[-3:]=='wav'and soundDir[:3]!='':

            p = list(data[data['patient_id']==int(soundDir[:3])]['disease'])[0]

            if (p=='Crackles'):
                data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis=0)
                X_.append(mfccs)
                y_.append('Crackles')

                data_stretch = stretch(data_x,1.2)
                mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch)
                y_.append('Crackles')

            if (p == 'Wheezes'):
                data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs)
                y_.append('Not Crackles')

                data_stretch = stretch(data_x,1.2)
                mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch)
                y_.append('Not Crackles')

                data_stretch_2 = stretch(data_x,0.8)
                mfccs_stretch_2 = np.mean(librosa.feature.mfcc(y=data_stretch_2, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch_2)
                y_.append('Not Crackles')

            if (p=='Wheezes & Crackles'):
                data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs)
                y_.append('Not Crackles')

                data_stretch = stretch(data_x,1.2)
                mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch)
                y_.append('Not Crackles')

            if (p == 'Healthy'):
                data_x, sampling_rate = librosa.load(dir_+soundDir,res_type='kaiser_fast')
                mfccs = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs)
                y_.append('Not Crackles')

                data_stretch = stretch(data_x,1.2)
                mfccs_stretch = np.mean(librosa.feature.mfcc(y=data_stretch, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch)
                y_.append('Not Crackles')

                data_stretch_2 = stretch(data_x,0.8)
                mfccs_stretch_2 = np.mean(librosa.feature.mfcc(y=data_stretch_2, sr=sampling_rate, n_mfcc=features).T,axis = 0)
                X_.append(mfccs_stretch_2)
                y_.append('Not Crackles')

    X_data = np.array(X_)
    y_data = np.array(y_)



    return X_data ,y_data

ทำการ Featre Extraction ทั้ง Crackles และ Wheezes

In [None]:
crackles_features , crackles_lables = mfcc_feature_exteraction_crackles(audio_data)
wheezes_features , wheezes_lables = mfcc_feature_exteraction_wheezes(audio_data)

ModuleNotFoundError: No module named 'resampy'

This error is lazily reported, having originally occured in
  File /usr/local/lib/python3.10/dist-packages/librosa/core/audio.py, line 33, in <module>

----> resampy = lazy.load("resampy")

In [None]:
pip install resampy



In [None]:
def augmented_lables_count(lables):
    unique, counts = np.unique(lables, return_counts=True)
    data_count = dict(zip(unique, counts))

    data = data_count

    courses = list(data.keys())
    values = list(data.values())

    fig = plt.figure(figsize = (10, 5))

    # creating the bar plot
    plt.bar(courses, values, color =['orange','green','blue','red','yellow','black'],
            width = 0.4)

    plt.xlabel("Diseases")
    plt.ylabel("Count")
    plt.title("Count of each disease")
    plt.show()

    print (data_count)

In [None]:
wheezes_lables.shape[0]

In [None]:
augmented_lables_count(crackles_lables)
augmented_lables_count(wheezes_lables)

## แปลงให้กลายเป็น X_train, y_train

In [None]:
crackles_lables_encode = crackles_lables.reshape(crackles_lables.shape[0],1)
crackles_lables_encode = np.where(crackles_lables_encode == 'Crackles',np.array([1,0]).reshape(1,2) , crackles_lables_encode)
crackles_lables_encode = np.where(crackles_lables_encode == 'Not Crackles',np.array([0,1]).reshape(1,2) , crackles_lables_encode)


wheezes_lables_encode = wheezes_lables.reshape(wheezes_lables.shape[0],1)
wheezes_lables_encode = np.where(wheezes_lables_encode == 'Wheezes',np.array([1,0]).reshape(1,2) , wheezes_lables_encode)
wheezes_lables_encode = np.where(wheezes_lables_encode == 'Not Wheezes',np.array([0,1]).reshape(1,2) , wheezes_lables_encode)

crackles_lables = crackles_lables_encode.astype('float64')
wheezes_lables = wheezes_lables_encode.astype('float64')

---

## Testing Model

In [None]:
!pip install tensorflow==2.10.0

Collecting tensorflow==2.10.0
  Downloading tensorflow-2.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.1 kB)
Collecting gast<=0.4.0,>=0.2.1 (from tensorflow==2.10.0)
  Downloading gast-0.4.0-py3-none-any.whl.metadata (1.1 kB)
Collecting keras<2.11,>=2.10.0 (from tensorflow==2.10.0)
  Downloading keras-2.10.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting keras-preprocessing>=1.1.1 (from tensorflow==2.10.0)
  Downloading Keras_Preprocessing-1.1.2-py2.py3-none-any.whl.metadata (1.9 kB)
Collecting protobuf<3.20,>=3.9.2 (from tensorflow==2.10.0)
  Downloading protobuf-3.19.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (787 bytes)
Collecting tensorboard<2.11,>=2.10 (from tensorflow==2.10.0)
  Downloading tensorboard-2.10.1-py3-none-any.whl.metadata (1.9 kB)
Collecting tensorflow-estimator<2.11,>=2.10.0 (from tensorflow==2.10.0)
  Downloading tensorflow_estimator-2.10.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting google-auth-oaut

In [None]:
dataset_path = "/content/drive/MyDrive/ICBHI_final_database/"

In [None]:
from tensorflow.keras.models import load_model
import tensorflow as tf
import librosa
import numpy as np

In [None]:
w_model = load_model(dataset_path + 'LSTM_W_97.78.h5', custom_objects={'LSTM': tf.keras.layers.LSTM})
model = load_model(dataset_path + 'C_LSTM_CNN_1.h5', custom_objects={'LSTM': tf.keras.layers.LSTM})

In [None]:
def wheezes_crackels_detection(audio_path):
    w_classes = ["Wheezes", "No Wheezes"]
    c_classes = ["Crackles", "No Crackles"]

    data_x, sampling_rate = librosa.load(audio_path)

    features = np.mean(librosa.feature.mfcc(y=data_x, sr=sampling_rate, n_mfcc=52).T,axis = 0)

    features = features.reshape(1,52)

    w_pred = w_model.predict(np.expand_dims(features, axis = 2))
    w_classpreds = w_classes[np.argmax(w_pred, axis=1)[0]]
    w_confidence = w_pred.T[w_pred.mean(axis=0).argmax()].mean()

    c_pred = model.predict(np.expand_dims(features, axis = 2))
    c_classpreds = c_classes[np.argmax(c_pred, axis=1)[0]]
    c_confidence = c_pred.T[c_pred.mean(axis=0).argmax()].mean()

    print (w_classpreds, w_confidence)
    print (c_classpreds, c_confidence)

In [None]:
wheezes_crackels_detection('/content/drive/MyDrive/ICBHI_final_database/audio_and_txt_files/101_1b1_Al_sc_Meditron.wav')

No Wheezes 0.99999785
No Crackles 0.9975858


170_1b4_Tc_mc_AKGC417L.wav

In [None]:
wheezes_crackels_detection('/content/drive/MyDrive/ICBHI_final_database/audio_and_txt_files/104_1b1_Al_sc_Litt3200.wav')

Wheezes 0.99999857
No Crackles 0.99995196
