### Importing necessery libraries

In [None]:
import pandas as pd
import numpy as np
from pydub import AudioSegment
#import matplotlib.pyplot as plt
import os
import librosa
#%matplotlib inline
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from sklearn.metrics import classification_report, confusion_matrix

In [None]:
os.listdir("Respiratory_Sound_Database\Respiratory_Sound_Database")

Inside audio_and_txt_files folder corresponding to each .wav file there is a .txt file which has listed start time, end time of each breathing cycle in 0/1 showing if wheezhes or crackles are present in each of these cycles.

Length of each .wav audio files are different so here we trim each audio into several smaller audio of length 5 seconds and label them based on if they have wheezles, crackles, both or none. 

In [None]:

# Define your directory path containing the audio and text files
directory_path = "Respiratory_Sound_Database\\Respiratory_Sound_Database\\audio_and_txt_files\\"
filenames = [s.split('.')[0] for s in os.listdir(path = directory_path) if '.txt' in s]

# Initialize a list to store data
data = []

# Process each file in the directory
for filename in os.listdir(directory_path):
    if filename.endswith('.wav'):
        audio_path = os.path.join(directory_path, filename)
        annotation_path = audio_path.replace('.wav', '.txt')

        # Load the audio file
        audio = AudioSegment.from_wav(audio_path)

        # Load the annotations
        # Load the annotations with dtype specified
        annotations = pd.read_csv(annotation_path, sep="\t", header=None,
                                  names=['start', 'end', 'crackles', 'wheezes'],
                                  dtype={'start': float, 'end': float, 'crackles': int, 'wheezes': int})


        # Split the audio into 5-second segments and label them
        for i in range(0, len(audio), 5000):  # 5000 ms = 5 seconds
            segment = audio[i:i+5000]
            segment_filename = f"{filename[:-4]}_segment_{i//1000}.wav"
            segment_path = os.path.join(directory_path, 'segments', segment_filename)

            # Ensure the segment directory exists
            os.makedirs(os.path.join(directory_path, 'segments'), exist_ok=True)

            # Export segment
            segment.export(segment_path, format="wav")

            # Determine the label of the segment
            segment_start = i / 1000.0
            segment_end = (i + 5000) / 1000.0

            # Filter annotations that overlap with the segment
            relevant_annotations = annotations[(annotations['start'] < segment_end) & (annotations['end'] > segment_start)]

            # Determine the label based on the annotations
            has_crackles = any(relevant_annotations['crackles'] == 1)
            has_wheezes = any(relevant_annotations['wheezes'] == 1)

            if has_crackles and has_wheezes:
                label = 'Both'
            elif has_crackles:
                label = 'Crackles'
            elif has_wheezes:
                label = 'Wheezes'
            else:
                label = 'None'

            # Append to data list
            data.append({'filename': segment_path, 'label': label})

# Convert data to a DataFrame
df = pd.DataFrame(data)
pd.set_option('display.max_colwidth', None)
df.head(20)


In [None]:

# Encode labels
label_encoder = LabelEncoder()
df['label_encoded'] = label_encoder.fit_transform(df['label'])
labels = to_categorical(df['label_encoded'])


In [None]:
def extract_features(file_path):
    try:
        # Ensure the file exists to avoid FileNotFoundError
        if not os.path.exists(file_path):
            print(f"File does not exist: {file_path}")
            return None

        audio, sample_rate = librosa.load(file_path, res_type='kaiser_fast')

        # Correctly extract the segment index from the filename
        # This accounts for filenames with multiple underscores before "segment"
        try:
            base_name = os.path.basename(file_path)  # Isolate the filename from the path
            segment_index = int(base_name.split('_segment_')[-1].split('.')[0])
        except ValueError as e:
            print(f"Error parsing segment index from filename: {file_path}")
            return None

        # No need to adjust for segment start time, since we load individual segment files
        mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=40)
        mfccs_processed = np.mean(mfccs.T, axis=0)
    except Exception as e:
        print(f"Error encountered while parsing file: {file_path}, Error: {e}")
        mfccs_processed = None

    return mfccs_processed

# Assuming `df` contains a 'filename' column with the full path to each audio segment
df['features'] = df['filename'].apply(lambda x: extract_features(x))


In [None]:


# Ensure features are in a numpy array form suitable for training
features = np.array(df['features'].tolist())
features = np.expand_dims(features, axis=2)  # Reshape for CNN input

# Split data
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)


In [None]:


input_shape = (X_train.shape[1], 1, 1)  

# Define model
model = Sequential([
    Conv2D(64, (3, 1), activation='relu', input_shape=input_shape, padding='same'),
    Conv2D(64, (3, 1), activation='relu', padding='same'),
    MaxPooling2D((2, 1)),
    BatchNormalization(),
    Conv2D(128, (3, 1), activation='relu', padding='same'),
    Conv2D(128, (3, 1), activation='relu', padding='same'),
    MaxPooling2D((2, 1)),
    BatchNormalization(),
    Conv2D(256, (3, 1), activation='relu', padding='same'),
    Conv2D(256, (3, 1), activation='relu', padding='same'),
    MaxPooling2D((2, 1)),
    BatchNormalization(),
    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(256, activation='relu'),
    Dropout(0.5),
    Dense(128, activation='relu'),
    Dense(len(np.unique(df['label_encoded'])), activation='softmax')
])


model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=200, batch_size=32, validation_split=0.2)

# Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test)

print("Test Loss:", loss)
print("Test Accuracy:", accuracy)


In [None]:
# Use the trained model to make predictions on test data
predictions = model.predict(X_test)

# Convert predictions to class labels
predicted_labels = np.argmax(predictions, axis=1)

# Display the predicted labels
print("Predicted Labels:", predicted_labels)


In [None]:


# Convert one-hot encoded labels to class labels
true_labels = np.argmax(y_test, axis=1)

print(classification_report(true_labels, predicted_labels, target_names=['none', 'crackles', 'wheezes', 'both']))
print(confusion_matrix(true_labels, predicted_labels))
