<a href="https://colab.research.google.com/github/Wattage-Wisdom/Coil_Gun/blob/main/Jordan's%20Copy%20PCG_Binary_Label_Data_Preprocess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Preprocess PCG Dataset 1

Import Proper Libraries and Define Directories

In [None]:
#Mount google drive in colab enviroment
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Import proper libraries
import os
import shutil
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import matplotlib.pyplot as plt
import os
import wave
import pylab
from pathlib import Path
from scipy import signal
from scipy.io import wavfile
from sklearn.metrics import confusion_matrix
import itertools
import gc
from tqdm import tqdm
import cv2
from skimage import transform
import random
from sklearn.model_selection import train_test_split

#Use dark background for spectrograph images as light relates to energy
plt.style.use('dark_background')

In [None]:
#Specify directories and create new directories for preprocessed data
input_directory = '/content/drive/MyDrive/Senior_Design/Datasets/PCG/dataset1'

#ADD IF ELSE LOOP HERE FOR MAKING DIRECTORIES IF DOESNT EXIST
os.mkdir('/content/drive/MyDrive/Senior_Design/Datasets/PCG/dataset1/image_data/')
os.mkdir('/content/drive/MyDrive/Senior_Design/Datasets/PCG/dataset1/image_data/abnormal')
os.mkdir('/content/drive/MyDrive/Senior_Design/Datasets/PCG/dataset1/image_data/normal')

Read Wave Files, Convert to Spectrograph, Save in Respective Folder

In [None]:
#Create a temporary wave directory
wav_temp = os.listdir(input_directory)
wav = []

#Loop to append wav file by adding data from temporary wave directory
for i in range(len(wav_temp)) :
    if(wav_temp[i].endswith(".wav")):
        wav.append(wav_temp[i])

#Sort the wav file
wav.sort()

#Visualize the first ten samples
for i in range(10):
    if(wav[i].endswith(".wav")):
        file_name = wav[i][0:5]
        if(file_name[0] == 'e'):
              file_name = wav[i][0:6]
        with open(input_directory+'/'+file_name+'.hea') as f:
            lines = f.read().splitlines()
            last_line = lines[-1].split()[1]
            print(file_name + " " + last_line)

#Perform garbage collection, force system to reclaim maximum amount of avalible memory
gc.collect()

#Delete the temporary wav directory
del wav_temp

In [None]:
#Open wav file
signal_wave = wave.open(os.path.join(input_directory, wav[6]), 'r')

#Specify sample_rate
sample_rate = 2000
sig = np.frombuffer(signal_wave.readframes(sample_rate), dtype=np.int16)

#Delete max & min in case already specified
del max_data
del min_data

#Normalize data
max_data = np.max(sig)
min_data = np.min(sig)
norm_signal = (sig - min_data)/(max_data - min_data)
sig = norm_signal - 0.5

#Create one figure for visualization, plot amplitude vs time
plt.figure(figsize=(12,12))
sig_plot = plt.subplot(211)
sig_plot.set_title(wav[6])
sig_plot.plot(sig)
sig_plot.set_xlabel('Sample Rate * Time')
sig_plot.set_ylabel('Energy')

#Plot above singals associated spectrograph for visualization
spectogram_plot = plt.subplot(212)
spectogram_plot.specgram(sig, NFFT=1024, Fs=sample_rate, noverlap=900)
spectogram_plot.set_xlabel('Time')
spectogram_plot.set_ylabel('Frequency')

#Display the plot
plt.show()

#Perform garbage collection, force system to reclaim maximum amount of avalible memory
gc.collect()

In [None]:
#Define funtion to limit the signal to 5 seconds
def Limit(S,Fs):
    if(len(S)/Fs>=5):
        S=S[:5*Fs]
    else:
        for i in range(len(S),5*Fs):
            S=np.append(S,0)
    return S

#Define function to retrieve sound and frame info from sample
def get_wav_info(wav_file):
    wav = wave.open(wav_file, 'r')
    frames = wav.readframes(-1)
    sound_info = pylab.frombuffer(frames, 'int16')
    frame_rate = wav.getframerate()
    wav.close()
    return sound_info, frame_rate

#Define function to retrive sample's class label
def get_class(hea_file):

    #Read sample's corresponding header file
    with open(input_directory+'/'+hea_file+'.hea') as f:
            lines = f.read().splitlines()
            last_line = lines[-1].split()[1]
    return last_line

#Specify output directory where the images of spectogram are stored
output_directory = './image_data/'

#Loop to convert all wave files to spectrogram
for filename in wav:

    #Make sure only wav files are read
    if "wav" in filename:
        file_path = os.path.join(input_directory, filename)
        file_stem = Path(file_path).stem

        #Get the target directory
        target_dir = f'{get_class(file_stem)}'
        dist_dir = output_directory+target_dir.lower()
        file_dist_path = os.path.join(dist_dir, file_stem)

        #Convert signal to spectogram
        if not os.path.exists(file_dist_path + '.png'):
            file_stem = Path(file_path).stem
            sound_info, frame_rate = get_wav_info(file_path)
            sig = Limit(sound_info,frame_rate)
            max_data = np.max(sig)
            min_data = np.min(sig)
            norm_signal = (sig - min_data)/(max_data - min_data)
            sig = norm_signal - 0.5
            pylab.specgram(sig, Fs=frame_rate)

            #Save spectrogram as png
            pylab.savefig(f'{file_dist_path}.png')
            pylab.close()

#Perform garbage collection, force system to reclaim maximum amount of avalible memory
gc.collect()

Read All Images, Convert to numpy Arrays with Class Labels

In [None]:
#Specify image properties
IMAGE_HEIGHT = 128
IMAGE_WIDTH = 128
N_CHANNELS = 3 #3 color channels RGB

#Number of classes
N_CLASSES = 2

#Define function to load data
def load_data(data,num_classes,class_label):

    #Create empty arrays of X and Y to be appended
    X = []
    y = []

    #To traverse every spectogram image in data
    for file_type in os.listdir(data):

        if not file_type.startswith('.'):

        #It is binary classification so we have two class labels 0 and 1
        #If ABNORMAL the label will be 1
        #If NORMAL the label will be 0
            if file_type in ['abnormal']:
                label = 1
            elif file_type in ['normal']:
                label = 0

            for filename in tqdm(os.listdir(data + '/' + file_type)):
                #To read every image from the folders
                image = cv2.imread(data +'/'+ file_type + '/' + filename)

                #If the image is found
                if image is not None:

                    #To resize the random sized images into a fixed size of 128x128x3
                    image = transform.resize(image, (IMAGE_HEIGHT, IMAGE_WIDTH, N_CHANNELS))

                    #Changing the datatype into array to process through the cnn algorithm
                    image_data_as_arr = np.asarray(image)

                    del image

                   #ADD AUGMENTATION IN PYTORCH DATALOADER

                    #Appending the data in the empty lists of X and y
                    X.append(image_data_as_arr)
                    y.append(label)

    X = np.asarray(X)
    y = np.asarray(y)

    return X,y

#Loading the data to be split
X_data, y_data = load_data(r'./image_data/',2,1)
gc.collect()

View Sample Data & Perform Test/Training Split

In [None]:
#Print count of each label to see imbalance
unique, counts = np.unique(y_train, return_counts=True)
dict(zip(unique, counts))

In [None]:
#Plot random number of sample data (with labels) for visualization
plt.figure(figsize=(12, 12))

for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    rand_num = random.randint(0,len(X_train))
    plt.imshow(X_train[rand_num])
    plt.title(int(y_train[rand_num]))
    plt.axis("off")

plt.show()

In [None]:
#Split dataset into training and test bins
xTrain, xTest, yTrain, yTest = train_test_split(X_train, y_train, random_state=42, shuffle=True, test_size=0.2) #If doesn't work try stratify instead of test_size

#Perform one hot encoding labels
y_trainHot = np.uint8(to_categorical(yTrain, num_classes = 2))
y_testHot = np.uint8(to_categorical(yTest, num_classes = 2))

#Perform garbage collection, force system to reclaim maximum amount of avalible memory
gc.collect()

Now That Data Has Been Preprocessed, Can Load Into Pytorch Network & Train