In [1]:
from os import listdir
from os.path import isfile, join
import pandas as pd
import librosa
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sn
import matplotlib.pyplot as plt

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras import regularizers, optimizers
from tensorflow.keras.layers import Dense, Conv1D, Flatten, Activation, MaxPooling1D, Dropout
from tensorflow.keras.utils import plot_model,to_categorical

In [2]:
%matplotlib inline 

In [3]:
# Class for each data point in dataset (stores id, diagnosis, and path to audio file)
class Diagnosis():
    def __init__(self, id, diagnosis, path):
        self.id = id
        self.diagnosis = diagnosis
        self.path = path

In [4]:
# Returns list of paths to all audio files and path to audio files directory
def get_audio_files():
    audio_path = 'D:/Summer_Intern/archive/Respiratory_Sound_Database/Respiratory_Sound_Database/audio_and_txt_files/'
    files = [f for f in listdir(audio_path) if isfile(join(audio_path, f))] # Gets all files in the audio_and_txt_files folder
    audio_files = [f for f in files if f.endswith('.wav')] # Gets all audio files (.wav)
    audio_files = sorted(audio_files)
    return audio_files, audio_path

In [5]:
# Processes all diagnoses and creates Diagnosis objects for each data point in dataset
def diagnosis_data():
    diagnosis = pd.read_csv('D:/Summer_Intern/archive/Respiratory_Sound_Database/Respiratory_Sound_Database/patient_diagnosis.csv')
    audio_files, audio_path = get_audio_files()
    diag_dict = { 101: "URTI" } # dictionary w/ id and diagnosis
    diag_list = []
    
    for index, row in diagnosis.iterrows():
        diag_dict[row[0]] = row[1]
    
    i = 0
    for f in audio_files:
        diag_list.append(Diagnosis(i, diag_dict[int(f[:3])], audio_path+f))
        i += 1
    return diag_list

In [6]:
def audio_features(filename):
    sound, sample_rate = librosa.load(filename)
    stft = np.abs(librosa.stft(sound)) # short-time Fourier transform (STFT) spectrogram
    
    mfccs = np.mean(librosa.feature.mfcc(y=sound, sr=sample_rate, n_mfcc=40), axis=1) # Mel-frequency cepstral coefficients
    chroma = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate), axis=1) # Chromagram from STFT spectrogram
    mel = np.mean(librosa.feature.melspectrogram(y=sound, sr=sample_rate), axis=1) # Mel-scaled spectrogram
    contrast = np.mean(librosa.feature.spectral_contrast(S=stft, sr=sample_rate), axis=1) # Spectral contrast
    tonnetz = np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(sound), sr=sample_rate), axis=1) # Tonal centroid features
    
    features = np.concatenate((mfccs, chroma, mel, contrast, tonnetz))
    return features

In [7]:
def data_points():
    labels = []
    images = []
    
    encoding = { "COPD":0, "Healthy":1, "URTI":2, "Bronchiectasis":3, "Pneumonia":4, "Bronchiolitis":5, "Asthma":6, "LRTI":7 }
    
    i = 0
    for f in diagnosis_data():
        print(i)
        labels.append(encoding[f.diagnosis])
        images.append(audio_features(f.path))
        i += 1
    return np.array(labels), np.array(images)

In [10]:
def preprocessing(labels, images):
    # Remove Asthma and LRTI because lack of data for those, will only hurt the model
    images = np.delete(images, np.where((labels == 7) | (labels == 6))[0], axis=0) 
    labels = np.delete(labels, np.where((labels == 7) | (labels == 6))[0], axis=0)      
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, random_state=10)

    # Hot one encode the labels
    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)
    
    # Format new data
    y_train = np.reshape(y_train, (y_train.shape[0], 6))
    X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
    y_test = np.reshape(y_test, (y_test.shape[0], 6))
    X_test = np.reshape(X_test, (X_test.shape[0], X_train.shape[1],  1))

    return X_train, X_test, y_train, y_test

In [11]:
print("Retrieving and processing data...")
labels, images = data_points()
X_train, X_test, y_train, y_test = preprocessing(labels, images)
print("Finished data retrieval and preprocessing!")

Retrieving and processing data...
0


  diag_dict[row[0]] = row[1]


TypeError: melspectrogram() takes 0 positional arguments but 1 positional argument (and 1 keyword-only argument) were given

In [None]:
model = Sequential()
model.add(Conv1D(64, kernel_size=5, activation='relu', input_shape=(193, 1)))

model.add(Conv1D(128, kernel_size=5, activation='relu'))
model.add(MaxPooling1D(2)) 

model.add(Conv1D(256, kernel_size=5, activation='relu'))

model.add(Dropout(0.3))
model.add(Flatten())

model.add(Dense(512, activation='relu'))   
model.add(Dense(6, activation='softmax'))

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
history = model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=70, batch_size=200, verbose=1)

In [None]:
score = model.evaluate(X_test, y_test, batch_size=60, verbose=0)
print('Accuracy: {0:.0%}'.format(score[1]/1))
print("Loss: %.4f\n" % score[0])

# Plot accuracy and loss graphs
plt.figure(figsize = (15,5))
plt.subplot(1,2,1)
plt.title('Accuracy')
plt.plot(history.history['accuracy'], label = 'training acc')
plt.plot(history.history['val_accuracy'], label = 'validation acc')
plt.legend()

plt.subplot(1,2,2)
plt.title('Loss')
plt.plot(history.history['loss'], label = 'training loss')
plt.plot(history.history['val_loss'], label = 'validation loss')
plt.legend()

In [None]:
matrix_index = ["COPD", "Healthy", "URTI", "Bronchiectasis", "Pneumonia", "Bronchiolitis"]

preds = model.predict(X_test)
classpreds = np.argmax(preds, axis=1) # predicted classes 
y_testclass = np.argmax(y_test, axis=1) # true classes

cm = confusion_matrix(y_testclass, classpreds)
print(classification_report(y_testclass, classpreds, target_names=matrix_index))

# Get percentage value for each element of the matrix
cm_sum = np.sum(cm, axis=1, keepdims=True)
cm_perc = cm / cm_sum.astype(float) * 100
annot = np.empty_like(cm).astype(str)
nrows, ncols = cm.shape
for i in range(nrows):
    for j in range(ncols):
        c = cm[i, j]
        p = cm_perc[i, j]
        if i == j:
            s = cm_sum[i]
            annot[i, j] = '%.1f%%\n%d/%d' % (p, c, s)
        elif c == 0:
            annot[i, j] = ''
        else:
            annot[i, j] = '%.1f%%\n%d' % (p, c)


# Display confusion matrix 
df_cm = pd.DataFrame(cm, index = matrix_index, columns = matrix_index)
df_cm.index.name = 'Actual'
df_cm.columns.name = 'Predicted'
fig, ax = plt.subplots(figsize=(10,7))
sn.heatmap(df_cm, annot=annot, fmt='')

In [None]:
np.sum(cm)