In [2]:
import pandas as pd
import numpy as np
import kaggle

import os
import sys
from zipfile import ZipFile

from scipy.io import wavfile
import librosa
import librosa.display
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split

# to play the audio files
from IPython.display import Audio

import keras
from keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
from keras.models import Sequential
from keras.layers import Dense, Conv1D, MaxPooling1D, Flatten, Dropout, BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import ModelCheckpoint

import warnings
if not sys.warnoptions:
    warnings.simplefilter("ignore")
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [3]:
!kaggle datasets download -d "kinguistics/heartbeat-sounds"

Downloading heartbeat-sounds.zip to c:\Users\Muhlis\OneDrive - Universitas Negeri Jakarta (UNJ)\Documents\Eksperimen Fisika\Heartbeat-Sound-Classification




  0%|          | 0.00/110M [00:00<?, ?B/s]
  1%|          | 1.00M/110M [00:00<01:07, 1.68MB/s]
  2%|▏         | 2.00M/110M [00:01<01:03, 1.78MB/s]
  3%|▎         | 3.00M/110M [00:01<01:03, 1.76MB/s]
  4%|▎         | 4.00M/110M [00:02<01:03, 1.76MB/s]
  5%|▍         | 5.00M/110M [00:02<01:01, 1.80MB/s]
  5%|▌         | 6.00M/110M [00:03<00:59, 1.83MB/s]
  6%|▋         | 7.00M/110M [00:04<01:05, 1.66MB/s]
  7%|▋         | 8.00M/110M [00:04<01:05, 1.65MB/s]
  8%|▊         | 9.00M/110M [00:05<01:05, 1.61MB/s]
  9%|▉         | 10.0M/110M [00:06<01:04, 1.63MB/s]
 10%|▉         | 11.0M/110M [00:06<01:02, 1.66MB/s]
 11%|█         | 12.0M/110M [00:07<01:02, 1.63MB/s]
 12%|█▏        | 13.0M/110M [00:08<01:00, 1.67MB/s]
 13%|█▎        | 14.0M/110M [00:08<01:01, 1.65MB/s]
 14%|█▎        | 15.0M/110M [00:09<00:58, 1.70MB/s]
 15%|█▍        | 16.0M/110M [00:09<00:55, 1.77MB/s]
 15%|█▌        | 17.0M/110M [00:10<00:52, 1.85MB/s]
 16%|█▋        | 18.0M/110M [00:10<00:52, 1.85MB/s]
 17%|█▋        | 19.

In [None]:
from zipfile import ZipFile
file_name = "heartbeat-sounds.zip"
with ZipFile(file_name, 'r') as zip_file:
  zip_file.printdir()
  print('Extracting all the files now...')
  zip_file.extractall()
  print('Done!')

In [None]:
set_a = pd.read_csv("./set_a.csv")
set_a.head()

In [None]:
set_b_directory_list = os.listdir("./set_b")

file_label = []
file_path = []

for file in set_b_directory_list:
    part = file.split('_')
    if part[0] == 'extrastole':
        file_path.append('set_b/'+file)
        file_label.append('extrastole')
    elif part[0] == 'murmur':
        file_path.append('set_b/'+file)
        file_label.append('murmur')
    elif part[0] == 'normal':
        file_path.append('set_b/'+file)
        file_label.append('normal')
label_df = pd.DataFrame(file_label, columns=['label'])
path_df = pd.DataFrame(file_path, columns=['fname'])
set_b = pd.concat([path_df, label_df], axis=1)
set_b.head()

In [None]:
set_a.drop(["sublabel","dataset"],axis="columns",inplace=True)
data_ab = pd.concat([set_a, set_b])
data_ab = data_ab.dropna()
data_ab = data_ab.reset_index()
data_ab.drop(["index"],axis="columns",inplace=True)
data_ab

In [None]:
data_ab.describe()

In [None]:
data_ab.isnull().sum()

In [None]:
def create_waveplot(data, samplingrate, label:str):
    plt.figure(figsize=(14,4))
    plt.title(f"Waveplot audio untuk kondisi jantung : {label}", fontsize=25, pad=20)
    librosa.display.waveshow(data, sr = samplingrate)
    plt.show()

In [None]:
print("Pembagian dataset:")
print(f"  - data a = {len(data_ab[data_ab['fname'].str.contains('set_a/')])}")
print(f"  - data b = {len(data_ab[data_ab['fname'].str.contains('set_b/')])}")

In [None]:
print("Jumlah label kondisi jantung =",str(len(data_ab.label.unique())),"kategori, diantaranya:")

for i in data_ab.label.unique():
    print(f"- {i}\t: {data_ab['label'].value_counts()[i]}")

In [None]:
path = data_ab.fname[data_ab.label==data_ab.label.unique()[0]][19]
data = librosa.load(path)[0]
rate = wavfile.read(path)[0]
create_waveplot(data, rate, data_ab.label.unique()[0])
Audio(path)

In [None]:
path = data_ab.fname[data_ab.label==data_ab.label.unique()[1]][50]
data = librosa.load(path)[0]
rate = wavfile.read(path)[0]
create_waveplot(data, rate, data_ab.label.unique()[1])
Audio(path)

In [None]:
path = data_ab.fname[data_ab.label==data_ab.label.unique()[2]][63]
data = librosa.load(path)[0]
rate = wavfile.read(path)[0]
create_waveplot(data, rate, data_ab.label.unique()[2])
Audio(path)

In [None]:
path = data_ab.fname[data_ab.label==data_ab.label.unique()[3]][97]
data = librosa.load(path)[0]
rate = wavfile.read(path)[0]
create_waveplot(data, rate, data_ab.label.unique()[3])
Audio(path)

In [None]:
path = data_ab.fname[data_ab.label==data_ab.label.unique()[4]][146]
data = librosa.load(path)[0]
rate = wavfile.read(path)[0]
create_waveplot(data, rate, data_ab.label.unique()[4])
Audio(path)

In [None]:
def noise(data):
    noise_value = 0.009*np.random.uniform()*np.amax(data)
    data = data + noise_value*np.random.normal(size=data.shape[0])
    return data

def stretch(data, rate=0.6):
    return librosa.effects.time_stretch(data, rate)

def shift(data, samplingrate):
    return np.roll(data,samplingrate/10)

def pitch(data, samplingrate, pitch_factor=0.7):
    return librosa.effects.pitch_shift(data, samplingrate, pitch_factor)

In [None]:
def extract_features(data, samplerate):
    # ZCR
    result = np.array([])
    zcr = np.mean(librosa.feature.zero_crossing_rate(y=data).T, axis=0)
    result=np.hstack((result, zcr)) # stacking horizontally

    # Chroma_stft
    stft = np.abs(librosa.stft(data))
    chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft, sr=samplerate).T, axis=0)
    result = np.hstack((result, chroma_stft)) # stacking horizontally

    # MFCC
    mfcc = np.mean(librosa.feature.mfcc(y=data, sr=samplerate).T, axis=0)
    result = np.hstack((result, mfcc)) # stacking horizontally

    # Root Mean Square Value
    rms = np.mean(librosa.feature.rms(y=data).T, axis=0)
    result = np.hstack((result, rms)) # stacking horizontally

    # MelSpectogram
    mel = np.mean(librosa.feature.melspectrogram(y=data, sr=samplerate).T, axis=0)
    result = np.hstack((result, mel)) # stacking horizontally
    
    return result

def get_features(path):
    # duration and offset are used to take care of the no audio in start and the ending of each audio files as seen above.
    data = librosa.load(path)[0]
    rate = wavfile.read(path)[0]
    
    # without augmentation
    res1 = extract_features(data, rate)
    result = np.array(res1)
    
    # data with noise
    noise_data = noise(data)
    res2 = extract_features(noise_data, rate)
    result = np.vstack((result, res2)) # stacking vertically
    
    # data with stretching and pitching
    new_data = stretch(data)
    data_stretch_pitch = pitch(new_data, rate)
    res3 = extract_features(data_stretch_pitch, rate)
    result = np.vstack((result, res3)) # stacking vertically
    
    return result

In [None]:
X, Y = [], []
for path, label in zip(data_ab.fname, data_ab.label):
    feature = get_features(path)
    for element in feature:
        X.append(element)
        Y.append(label)

In [None]:
Features = pd.DataFrame(X)
Features['label'] = Y
Features.to_csv('features.csv', index=False)
Features.head()

In [None]:
print("Jumlah data fitur setiap label =",str(len(data_ab.label.unique())),"kategori, diantaranya:")

for i in Features.label.unique():
    print(f"- {i}\t: {Features['label'].value_counts()[i]}")

In [None]:
X = Features.iloc[:,:-1].values
Y = Features['label'].values
enc = OneHotEncoder()
scaler = StandardScaler()
Y_Encode = enc.fit_transform(np.array(Y).reshape(-1,1)).toarray()

In [None]:
print(f"Ukuran data fitur : {X.shape}")
print(f"Ukuran data label : {Y_Encode.shape}")

In [None]:
x_train, x_test, y_train, y_test = train_test_split(X, Y_Encode, train_size=0.8, random_state=4, shuffle=True)
x_train.shape, y_train.shape, x_test.shape, y_test.shape

In [None]:
x_train = np.expand_dims(x_train,axis=2)
x_test = np.expand_dims(x_test,axis=2)

In [None]:
print(f"Ukuran data train : {x_train.shape}")
print(f"Ukuran data test : {x_test.shape}")

In [None]:
Early_Stopper = EarlyStopping(monitor="loss",patience=3,mode="min")
rlrp = ReduceLROnPlateau(monitor='val_loss', factor=0.4, verbose=0, patience=2, min_lr=0.001)
Checkpoint_Model = ModelCheckpoint(monitor="val_accuracy",
                                   mode='max',
                                   save_best_only=True,
                                   save_weights_only=True,
                                   filepath="./modelcheck")

In [None]:
model=Sequential()
model.add(Conv1D(256, kernel_size=5, strides=1, padding='same', activation='relu', input_shape=(x_train.shape[1], 1)))
model.add(MaxPooling1D(pool_size=3, strides = 2, padding = 'same'))

model.add(Conv1D(256, kernel_size=4, strides=1, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=3, strides = 2, padding = 'same'))
model.add(Dropout(0.3))

model.add(Conv1D(128, kernel_size=4, strides=1, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=3, strides = 2, padding = 'same'))
model.add(Dropout(0.3))

model.add(Conv1D(64, kernel_size=4, strides=1, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=3, strides = 2, padding = 'same'))
model.add(Dropout(0.3))

model.add(Conv1D(32, kernel_size=4, strides=1, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=3, strides = 2, padding = 'same'))
model.add(Dropout(0.3))

model.add(Flatten())
model.add(Dense(units=1024, activation='relu'))
model.add(Dropout(0.3))

model.add(Dense(units=5, activation='softmax'))
model.compile(optimizer = 'adam' , loss = 'categorical_crossentropy' , metrics = ['accuracy'])

model.summary()

In [None]:
Conv1D_Model = model.fit(x_train, y_train, batch_size=12, epochs=50, validation_data=(x_test, y_test), callbacks=[rlrp, Checkpoint_Model])

In [None]:
Model_Results = model.evaluate(x_test,y_test)

In [None]:
print(f"LOSS\t\t: {round(Model_Results[0]*100,2)}%")
print(f"ACCURACY\t: {round(Model_Results[1]*100,2)}%")

epochs = [i for i in range(50)]
fig , ax = plt.subplots(1,2)
train_acc = Conv1D_Model.history['accuracy']
train_loss = Conv1D_Model.history['loss']
test_acc = Conv1D_Model.history['val_accuracy']
test_loss = Conv1D_Model.history['val_loss']

fig.set_size_inches(20,6)
ax[0].plot(epochs , train_loss , label = 'Training Loss')
ax[0].plot(epochs , test_loss , label = 'Testing Loss')
ax[0].set_title('Training & Testing Loss')
ax[0].legend()
ax[0].set_xlabel("Epochs")

ax[1].plot(epochs , train_acc , label = 'Training Accuracy')
ax[1].plot(epochs , test_acc , label = 'Testing Accuracy')
ax[1].set_title('Training & Testing Accuracy')
ax[1].legend()
ax[1].set_xlabel("Epochs")
plt.show()

In [None]:
pred_test = model.predict(x_test)
y_pred = enc.inverse_transform(pred_test)
y_test = enc.inverse_transform(y_test)
df = pd.DataFrame(columns=['Predicted Labels', 'Actual Labels'])
df['Predicted Labels'] = y_pred.flatten()
df['Actual Labels'] = y_test.flatten()
df.head(10)

In [None]:
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize = (12, 10))
cm = pd.DataFrame(cm , index = [i for i in enc.categories_] , columns = [i for i in enc.categories_])
sns.heatmap(cm, linecolor='white', cmap='Blues', linewidth=1, annot=True, fmt='')
plt.title('Confusion Matrix', size=20)
plt.xlabel('Predicted Labels', size=10)
plt.ylabel('Actual Labels', size=10)
plt.show()

In [None]:
print(classification_report(y_test, y_pred))

In [None]:
model.save('Conv1D_model(rlrp,Checkpoint_Model)')