**Predicting Respiratory Diseases by the Patient’s Breathing Sounds**

Kaarel Tamuri<br>
Stina-Marie Maripuu

Set within the healthcare industry, this project focuses on supporting the diagnostic process for respiratory diseases. It aims to harness the potential of machine learning to analyze breathing sounds, to help cure patients more effectively and reliably.

We are using CNN (Convolutional Neural Network), which is a network architecture for deep learning that learns directly from data

In [None]:
#LIBARIES
!pip3 install resampy

In [None]:
#IMPORTS
import os
import pandas as pd
import numpy as np
import librosa
import matplotlib.pyplot as plt
import random
from tqdm import tqdm
import resampy


from imblearn.over_sampling import SMOTE

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, GlobalAveragePooling2D
from tensorflow.keras import metrics
from tensorflow.keras.callbacks import ModelCheckpoint

from keras.utils import to_categorical


from datetime import datetime
import seaborn as sns

In [None]:
#All PATHS TO DATA
path_files = "archive/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files/"
path_diagnose = "archive/Respiratory_Sound_Database/Respiratory_Sound_Database/patient_diagnosis.csv"

In [None]:
#.wav filenames
wav_files = []
for file in os.listdir(path_files):
    if file.endswith(".wav"):
        wav_files.append(file)


#patient_diagnosis.csv
diagnose = pd.read_csv(path_diagnose, header=None)

In [None]:
#Get all file paths
filepaths = []
for i in wav_files:
    path = path_files+i
    filepaths.append(path)


#Find diagnose for every file

file_patient =[]
for i in wav_files:
    pn = int (i[:3])
    file_patient.append(pn)

file_patient = np.array(file_patient)

labels = []
for i in file_patient:
    row = diagnose[diagnose[0] == i]
    labels.append(row[1].values[0])
labels = np.array(labels)



In [None]:
unique_elements, counts_elements = np.unique(labels, return_counts=True)

plt.figure(figsize=(10, 6))
bars = plt.bar(unique_elements, counts_elements, color='skyblue')

# Adding the count labels on top of the bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval, int(yval), va='bottom', ha='center')

plt.xlabel('Illness')
plt.ylabel('Count')
plt.title('Count of Each Illness Based On Audio Files Before Data Balancing')
plt.xticks(rotation=45)
plt.show()

In [None]:
def add_noise(data, noise_level=0.005):
    noise = np.random.randn(len(data))
    return data + noise_level * noise

def time_shift(data, sampling_rate, shift_max=0.2):
    shift_amount = int(sampling_rate * shift_max)
    shift = np.random.randint(-shift_amount, shift_amount)
    return np.roll(data, shift)

def change_pitch(audio, sample_rate, n_steps=2.0, bins_per_octave=12, res_type='soxr_hq', scale=False):
    return librosa.effects.pitch_shift(y=audio, sr=sample_rate, n_steps=n_steps, 
                                       bins_per_octave=bins_per_octave, res_type=res_type, scale=scale)

def change_speed(audio, speed_factor=1.2):
    return librosa.effects.time_stretch(y=audio, rate=speed_factor)



def data_augmentation(file_path):
    audio, sample_rate = librosa.load(file_path, res_type='kaiser_fast')
    augmented_audio = [
        add_noise(audio),
        time_shift(audio, sample_rate),
        change_pitch(audio, sample_rate),
        change_speed(audio),
    ]
    return augmented_audio, sample_rate



In [None]:
def audioLen(file_path):
    audio, sample_rate = librosa.load(file_path, sr=None)
    duration = len(audio) / sample_rate
    return duration

durations = []
for i in filepaths:
    durations.append(audioLen(i))
    

max_dur = max(durations)
min_dur = min(durations)
mean_dur = np.mean(durations)
median_dur = np.median(durations)
percentile_90 = np.percentile(durations, 90)

# Print the results
print(f"Maximum: {max_dur}")
print(f"Minimum: {min_dur}")
print(f"Mean: {mean_dur}")
print(f"Median: {median_dur}")
print(f"90th Percentile: {percentile_90}")

plt.hist(durations, bins='auto', color='blue', alpha=0.7, rwidth=0.85)
plt.title('Audiofile Durations')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.grid(axis='y', alpha=0.75)

# Display the histogram
plt.show()

    
    

In [None]:
#I take 90th Percentile to cover as much as possible, but to not be as slow
durLen = 20
paddingLen = int(np.ceil(durLen*22050/512))


def extract(file_path):
    try:
        audio, sample_rate = librosa.load(file_path, res_type='kaiser_fast', duration=durLen) 
        mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=40)
        pad_width = paddingLen - mfccs.shape[1]
        mfccs = np.pad(mfccs, pad_width=((0, 0), (0, pad_width)), mode='constant')

    except Exception as e:
        print(file_path, e)
        return None 

    return mfccs


def extract_features_from_augmented_audio(file_path):
    augmented_audios, sample_rate = data_augmentation(file_path)
    augmented_features = []  # Initialize an empty list to store features
    for aug_audio in augmented_audios:
        mfccs = librosa.feature.mfcc(y=aug_audio, sr=sample_rate, n_mfcc=40)
        pad_width = paddingLen - mfccs.shape[1]
        mfccs = np.pad(mfccs, pad_width=((0, 0), (0, pad_width)), mode='constant')
        augmented_features.append(mfccs)
    return augmented_features

    

In [None]:
labelsNew = []
featuresNew = []

# Use tqdm for the progress bar
for file_path, label in tqdm(zip(filepaths, labels), total=len(filepaths), desc="Processing files"):
    
    #augment data
    if label not in ["COPD", "LRTI", "Asthma"]:
        augmented_audios, sample_rate = data_augmentation(file_path)
        for aug_audio in augmented_audios:
            mfccs = librosa.feature.mfcc(y=aug_audio, sr=sample_rate, n_mfcc=40)
            pad_width = paddingLen - mfccs.shape[1]
            mfccs_padded = np.pad(mfccs, pad_width=((0, 0), (0, pad_width)), mode='constant')
            featuresNew.append(mfccs_padded)
            labelsNew.append(label)


    mfccs = extract(file_path)
    featuresNew.append(mfccs)
    labelsNew.append(label)

featuresNew = np.array(featuresNew)
labelsNew = np.array(labelsNew)


In [None]:

features = featuresNew
labels = labelsNew

#We can not train a model, when there are too few samples. 
#Therefore, we do not include Asthma and LTRI.
features1 = np.delete(features, np.where((labels == 'Asthma') | (labels == 'LRTI'))[0], axis=0) 
labels1 = np.delete(labels, np.where((labels == 'Asthma') | (labels == 'LRTI'))[0], axis=0)

# features per ilness
unique_elements, counts_elements = np.unique(labels1, return_counts=True)

for i in range(len(unique_elements)):
    print("Ilness: "+unique_elements[i]+"; Count: "+str(counts_elements[i]))



In [None]:
from imblearn.over_sampling import SMOTE

#One hot
le = LabelEncoder()
labelDummies = le.fit_transform(labels1)  # Change Labels string to int
labelDummiesList = to_categorical(labelDummies)
# Make list by using labelDummies


features = np.reshape(features1, (*features1.shape, 1))
#ERROR
# Make Train/Test sets
X_train, X_test, y_train, y_test = train_test_split(features1, labelDummiesList, stratify=labelDummiesList, test_size=0.2, random_state=1)

# Apply SMOTE to the training data
smote = SMOTE()
X_train_smote, y_train_smote = smote.fit_resample(X_train.reshape(X_train.shape[0], -1), y_train)

# Reshape the features back after SMOTE
X_train_smote = X_train_smote.reshape(X_train_smote.shape[0], features1.shape[1], features1.shape[2], 1)
len(labelDummiesList)


In [None]:
# Convert one-hot encoded labels back to label encoding
y_train_smote_labels = np.argmax(y_train_smote, axis=1)

# Count the occurrences of each class
unique_classes, counts_classes = np.unique(y_train_smote_labels, return_counts=True)

# Map the integer labels back to the original class names
class_names = le.inverse_transform(unique_classes)

    
plt.figure(figsize=(10, 6))
bars = plt.bar(class_names, counts_classes, color='skyblue')

# Adding count labels on top of the bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval, int(yval), va='bottom', ha='center')

plt.xlabel('Illness')
plt.ylabel('Count')
plt.title('Count of Each Illness Based On Audio Files After Data Balancing')
plt.xticks(rotation=45)
plt.show()



In [None]:
rows = 40 #MFCC number
columns = paddingLen #frames
channels = 1

num_labels = labelDummiesList.shape[1]
filter_size = 2

model = Sequential()
model.add(Conv2D(filters=16, kernel_size=filter_size,
                 input_shape=(rows, columns, channels), activation='relu'))
model.add(MaxPooling2D(pool_size=2))
model.add(Dropout(0.2))

model.add(Conv2D(filters=32, kernel_size=filter_size, activation='relu'))
model.add(MaxPooling2D(pool_size=2))
model.add(Dropout(0.2))

model.add(Conv2D(filters=64, kernel_size=filter_size, activation='relu'))
model.add(MaxPooling2D(pool_size=2))
model.add(Dropout(0.2))

model.add(Conv2D(filters=128, kernel_size=filter_size, activation='relu'))
model.add(MaxPooling2D(pool_size=2))
model.add(Dropout(0.2))

model.add(GlobalAveragePooling2D())

model.add(Dense(num_labels, activation='softmax')) 


In [None]:
model.compile(loss='categorical_crossentropy', metrics=['accuracy', metrics.Recall()], optimizer='adam')


In [None]:
# train model
num_epochs = 200
num_batch_size = 128

model.fit(X_train_smote, y_train_smote, batch_size=num_batch_size, epochs=num_epochs,
          validation_data=(X_test, y_test), verbose=1)




In [None]:
# Evaluating the model on the training and testing set
train_loss, train_accuracy, train_recall = model.evaluate(X_train_smote, y_train_smote, verbose=1)
test_loss, test_accuracy, test_recall = model.evaluate(X_test, y_test, verbose=1)

print("Training:")
print("\tAccuracy:", train_accuracy)
print("\tRecall:", train_recall)


print("Testing:")
print("\tAccuracy:", test_accuracy)
print("\tRecall:", test_recall)




In [None]:
# Data for the bar chart
metrics = ['Accuracy', 'Recall']
train_values = [train_accuracy, train_recall]
test_values = [test_accuracy, test_recall]

x = np.arange(len(metrics))  # the label locations
width = 0.35  # the width of the bars

# Creating the bar chart
plt.figure(figsize=(10, 6))
train_bars = plt.bar(x - width/2, train_values, width, label='Train', color='blue', alpha=0.7)
test_bars = plt.bar(x + width/2, test_values, width, label='Test', color='green', alpha=0.7)

# Adding labels to the bars
def add_labels(bars):
    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2, yval, round(yval, 2), va='bottom', ha='center')

add_labels(train_bars)
add_labels(test_bars)

plt.xlabel('Metrics')
plt.ylabel('Scores')
plt.title('Model Performance on Train vs Test Set')
plt.xticks(x, metrics)
plt.legend()
plt.show()

In [None]:
predictions = model.predict(X_test)

predClass = np.argmax(predictions, axis=1)

realClass = np.argmax(y_test, axis=1)



In [None]:
# Compute ROC curve and ROC area for each class
fpr = {}
tpr = {}
roc_auc = {}
for i in range(len(unique_classes)):
    fpr[i], tpr[i], _ = roc_curve(y_test[:, i], predictions[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

In [None]:
c_names = ['Bronchiectasis', 'Bronchiolitis', 'COPD', 'Healthy', 'Pneumonia', 'URTI']

In [None]:
plt.figure(figsize = (8,6))

for i in range(len(c_names)):
    plt.plot(fpr[i], tpr[i], label=f'ROC curve (area = {roc_auc[i]:.2f}) for {c_names[i]}')
    
plt.plot([0, 1], [0, 1], 'k--')

plt.title('ROC')
plt.xlabel('FPR')
plt.ylabel('TPR')

plt.legend()
plt.grid()
plt.show()


In [None]:
print(classification_report(realClass, predClass, target_names=c_names))