In [29]:
# Performance Improvement in Deep Learning Architecture for Phonocardiogram Signal Classification using Spectrogram

In [30]:
import os
import shutil

# Remove the folder recursively:
#shutil.rmtree('/kaggle/working/image_data/abnormal')
#shutil.rmtree('/kaggle/working/image_data/normal')
#shutil.rmtree('/kaggle/working/image_data')

# Create the directory for image data
os.mkdir('/kaggle/working/image_data/')
os.mkdir('/kaggle/working/image_data/abnormal')
os.mkdir('/kaggle/working/image_data/normal')

In [31]:
#Imports used
import numpy as np

import warnings 
warnings.filterwarnings('ignore')

import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
import os
import wave
import pylab
from pathlib import Path
from scipy import signal
from scipy.io import wavfile
from sklearn.metrics import confusion_matrix
import itertools
import gc


plt.style.use('dark_background')

## Loading the signals from the .wav format

In [32]:
#Input directory where the wav files are stored
input_directory = '../input/spectogram/data'

wav_temp = os.listdir(input_directory)
wav = []

for i in range(len(wav_temp)) :
    if(wav_temp[i].endswith(".wav")):
        wav.append(wav_temp[i])

wav.sort()
for i in range(10):
    
    if(wav[i].endswith(".wav")):
        file_name = wav[i][0:5]
        
        if(file_name[0] == 'e'):
              file_name = wav[i][0:6]
        
        with open(input_directory+'/'+file_name+'.hea') as f:
            lines = f.read().splitlines()
            last_line = lines[-1].split()[1]
            print(file_name + " " + last_line)

gc.collect()
del wav_temp

## Reading the signals and normalizing them

In [33]:
#Opening the wav file 
signal_wave = wave.open(os.path.join(input_directory, wav[6]), 'r')

#Specifying sample_rate
sample_rate = 50000
sig = np.frombuffer(signal_wave.readframes(sample_rate), dtype=np.int16)

max_data = np.max(sig)
min_data = np.min(sig)
norm_signal = (sig - min_data)/(max_data - min_data)
sig = norm_signal - 0.5

#Creating the figure
plt.figure(figsize=(12,12))
sig_plot = plt.subplot(211)
sig_plot.set_title(wav[6])
sig_plot.plot(sig)
sig_plot.set_xlabel('Sample Rate * Time')
sig_plot.set_ylabel('Energy')

spectogram_plot = plt.subplot(212)
spectogram_plot.specgram(sig, NFFT=1024, Fs=sample_rate, noverlap=900)
spectogram_plot.set_xlabel('Time')
spectogram_plot.set_ylabel('Frequency')

plt.show()
gc.collect()


## Step 1 : Read all the .wav files and convert to spectogram and save as png in respective class folders

In [34]:
#To limit the signal to 5 seconds
def Limit(S,Fs):
    if(len(S)/Fs>=5):
        S=S[:5*Fs]
    else:
        for i in range(len(S),5*Fs):
            S=np.append(S,0)
    return S

#Function to get sound and frame info
def get_wav_info(wav_file):
    wav = wave.open(wav_file, 'r')
    frames = wav.readframes(-1)
    sound_info = pylab.frombuffer(frames, 'int16')
    frame_rate = wav.getframerate()
    wav.close()
    return sound_info, frame_rate

#Function to get the class label
def get_class(hea_file):
    
    #Reading the corresponding header file
    with open(input_directory+'/'+hea_file+'.hea') as f:
            lines = f.read().splitlines()
            last_line = lines[-1].split()[1]
            
    return last_line

#Output directory where the images of spectogram are stored
output_directory = './image_data/'

for filename in wav:
    
    #Making sure only wav files are read
    if "wav" in filename:
        
        file_path = os.path.join(input_directory, filename)
        file_stem = Path(file_path).stem
        
        #Getting the target directory
        target_dir = f'{get_class(file_stem)}'
        
        dist_dir = output_directory+target_dir.lower()
        
        file_dist_path = os.path.join(dist_dir, file_stem)
        
        #Converting signal to spectogram and saving as png
        if not os.path.exists(file_dist_path + '.png'):
            file_stem = Path(file_path).stem
            sound_info, frame_rate = get_wav_info(file_path)
            sig = Limit(sound_info,frame_rate)
            max_data = np.max(sig)
            min_data = np.min(sig)
            norm_signal = (sig - min_data)/(max_data - min_data)
            sig = norm_signal - 0.5
            pylab.specgram(sig, Fs=frame_rate)
            pylab.savefig(f'{file_dist_path}.png')
            pylab.close()

gc.collect()

## Step 2 : Read all images and convert them to numpy arrays and also the class labels

#### 1. Read the image
#### 2. Resize image to 128 x 128 x 3 
#### 3. Normalize image by taking max and min values for each image
#### 4. Oversample the respective class if needed (If no augmentation, 3rd input must be greater than number of classes)

In [None]:
#To read the images into an array
from tqdm import tqdm
import cv2

#Properties of image
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 128
N_CHANNELS = 3

#Number of classes
N_CLASSES = 2

from skimage import transform
from tensorflow.keras import layers

def load_data(data,num_classes,class_label,augmen_times):
    
    X = []
    y = []
    
    #To traverse every spectogram image in data 
    for file_type in os.listdir(data):
        
        if not file_type.startswith('.'):
            
        #It is binary classification so we have two class labels 0 and 1 
        #If ABNORMAL the label will be 1
        #If NORMAL the label will be 0
            if file_type in ['abnormal']:
                label = 1
            elif file_type in ['normal']:
                label = 0
            
            for filename in tqdm(os.listdir(data + '/' + file_type)):
                #To read every image from the folders
                image = cv2.imread(data +'/'+ file_type + '/' + filename)
                
                #If the image is found
                if image is not None:
                    
                    #To resize the random sized images into a fixed size of 128x128x3
                    image = transform.resize(image, (IMAGE_HEIGHT, IMAGE_WIDTH, N_CHANNELS))
                    
                    #Changing the datatype into array to process through the cnn algorithm
                    image_data_as_arr = np.asarray(image)
                    
                    del image
                    
                    #Augmented addition abnormal images if detected
                    if(label==class_label and class_label<num_classes):
                        data_augmentation = tf.keras.Sequential([
                                             layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical"),
                                            layers.experimental.preprocessing.RandomRotation(0.2)])
                        for i in range(augmen_times):
                            augmented_image = data_augmentation(image_data_as_arr)
                            X.append(augmented_image)
                            y.append(label)
                            
                    #Appending the data in the empty lists of X and y
                    X.append(image_data_as_arr)
                    y.append(label)
    X = np.asarray(X)
    y = np.asarray(y)
    
    return X,y


#Loading the train data
X_train, y_train = load_data(r'./image_data/',2,1,3)
gc.collect()

## Printing count of each class to make sure there is no imbalance in dataset

In [None]:
#Getting counts of each labels
unique, counts = np.unique(y_train, return_counts=True)
dict(zip(unique, counts))

In [None]:
import random
#Plotting all the spectogram images
plt.figure(figsize=(12, 12))

for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    rand_num = random.randint(0,len(X_train))
    plt.imshow(X_train[rand_num])
    plt.title(int(y_train[rand_num]))
    plt.axis("off")
    
plt.show()

## Step 3 : Split the dataset into train and test and perform one hot encoding on class labels

In [None]:
from sklearn.model_selection import train_test_split
from keras.utils.np_utils import to_categorical

#Splitting the train and test data
xTrain, xTest, yTrain, yTest = train_test_split(X_train, y_train, random_state=42, test_size=0.2)

#One hot encoding the labels
y_trainHot = np.uint8(to_categorical(yTrain, num_classes = 2))
y_testHot = np.uint8(to_categorical(yTest, num_classes = 2))

gc.collect()

#del X_train
#del y_train

## Step 4 : Make the CNN model and train the model with the respective parameters

In [None]:
#The main CNN architecture
import keras
from keras import models
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPooling2D, MaxPool2D
from sklearn.metrics import classification_report, confusion_matrix

#The binary labels for our CNN model in the form of a dictionary
bin_labels = {0:'Normal',1:'Abnormal'}

def CNN(imgs,img_labels,test_imgs,test_labels,stride):
    
    #Number of classes (2)
    num_classes = len(img_labels[0])
    
    epochs = 30
    
    #Size of image
    img_rows,img_cols=imgs.shape[1],imgs.shape[2]
    input_shape = (img_rows, img_cols, 3)
    
    #Creating the model
    model = Sequential()
    
    #First convolution layer
    model.add(Conv2D(32, kernel_size=(3, 3),
                     activation='relu',
                     input_shape=input_shape,
                     strides=stride))
    
    #First maxpooling layer
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Dropout(0.2))
    
    #Second convolution layer
    model.add(Conv2D(64, (3, 3), activation='relu'))
    
    #Second maxpooling layer
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    #Third convolution layer
    model.add(Conv2D(128, (3, 3), activation='relu'))
    
    #Third maxpooling layer
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Dropout(0.2))
    
    #Convert the matrix to a fully connected layer
    model.add(Flatten())
    
    #Dense function to convert FCL to 128 values
    model.add(Dense(128, activation='relu'))
    
    model.add(Dropout(0.2))
    
    #Dense function to convert FCL to 128 values
    model.add(Dense(128, activation='relu'))
    
    #Dense function to convert FCL to 128 values
    model.add(Dense(128, activation='relu'))
    
    #Final dense layer on which softmax function is performed
    model.add(Dense(num_classes, activation='softmax'))
    
    #Model parameters
    model.compile(loss='categorical_crossentropy',
                  optimizer='adamax',
                  metrics=['binary_accuracy'])
    
    #Evaluate the model on the test data before training your model
    score = model.evaluate(test_imgs,test_labels, verbose=1)
    
    print('\nKeras CNN binary accuracy:', score[1],'\n')
    
    #The model details
    history = model.fit(imgs,img_labels,
                        shuffle = True, 
                        epochs=epochs, 
                        validation_data = (test_imgs, test_labels))
    
    #Evaluate the model on the test data after training your model
    score = model.evaluate(test_imgs,test_labels, verbose=1)
    print('\nKeras CNN binary accuracy:', score[1],'\n')
    
    #Predict the labels from test data
    y_pred = model.predict(test_imgs)
    Y_pred_classes = np.argmax(y_pred,axis=1) 
    Y_true = np.argmax(test_labels,axis=1)
    
    #Correct labels
    for i in range(len(Y_true)):
        if(Y_pred_classes[i] == Y_true[i]):
            print("The predicted class is : " , Y_pred_classes[i])
            print("The real class is : " , Y_true[i])
            break
            
    #The confusion matrix made from the real Y values and the predicted Y values
    confusion_mtx = [Y_true, Y_pred_classes]
    
    #Summary of the model
    model.summary()
    
    return model,confusion_mtx
   
model,conf_mat = CNN(xTrain,y_trainHot,xTest,y_testHot,1);

In [None]:
gc.collect()

#Predict the labels from test data
y_pred = model.predict(xTest)
Y_pred_classes = np.argmax(y_pred,axis=1) 
Y_true = np.argmax(y_testHot,axis=1)

from sklearn.metrics import precision_recall_fscore_support
prec,recall,f1,_ = precision_recall_fscore_support(Y_true,Y_pred_classes,average='binary')

print("Precision : ", prec)
print("Recall : ", recall)
print("F1 score : ", f1)