## InfoGAN based Respiratory Disease Detection from respiratory sounds with Deep neural network

This is a deep learning based model using a deep neural network to detect respiratory diseases from the respiratory sounds. 

Prior to feeding the model to the deep netwok, the sounds are pre-processed for noise reduction. 

They are then fed to an independent component analysis module for removing heart sounds.

The processed sound audio frames are now sliced to target the respiratory cycles and mfcc features are extracted. 

The extracted sounds are now fed into an Information-maximising GAN to augment the data with new audio samples to prevent the unbalance in the dataset.

These audios are now fed to a deep neural network to detect the respiratory disease present in the audio.

The proposed model performs better in prediction in both training and test sets.

Importing Required Libraries

In [None]:
from scipy import signal
from scipy.signal import kaiserord, lfilter, firwin, freqz
import numpy as np
import librosa
from pylab import figure, clf, plot, xlabel, ylabel, xlim, ylim, title, grid, axes, show
import matplotlib.pyplot as plt
import soundfile as sf
import os
import librosa
from sklearn.decomposition import PCA

Independant Component Analysis

In [None]:
def g(x):
    return np.tanh(x)
def g_der(x):
    return 1 - g(x) * g(x)
def center(X):
    X = np.array(X)
    mean = X.mean(axis=0, keepdims=True)
    
    return X- mean
def whitening(X):
    cov = np.cov(X)
    d, E = np.linalg.eigh(cov)
    D = np.diag(d)
    D_inv = np.sqrt(np.linalg.inv(D))
    X_whiten = np.dot(E, np.dot(D_inv, np.dot(E.T, X)))
    return X_whiten

def calculate_new_w(w, X):
    w_new = (X * g(np.dot(w.T, X))).mean(axis=1) - g_der(np.dot(w.T, X)).mean() * w
    w_new /= np.sqrt((w_new ** 2).sum())
    return w_new

In [None]:
def ica(X, iterations, tolerance=1e-5):
    X = center(X)
    #X = whitening(X)
    components_nr = X.shape[0]
    #print(X.shape[0])
    W = np.zeros((components_nr, components_nr), dtype=X.dtype)
    for i in range(components_nr):
        w = np.random.rand(components_nr)
        for j in range(iterations):
            w_new = calculate_new_w(w, X)
            if i >= 1:
                w_new -= np.dot(np.dot(w_new, W[:i].T), W[:i])
            distance = np.abs(np.abs((w * w_new).sum()) - 1)
            w = w_new
            if distance < tolerance:
                break
        W[i, :] = w
    S = np.dot(W, X)
    return S

In [None]:
def plot_mixture_sources_predictions(X,  S):
    fig = plt.figure()

    plt.subplot(3, 1, 1)
    for x in X:
        plt.plot(x)
    plt.title("mixtures")

    plt.subplot(3,1,3)
    for s in S:
        plt.plot(s)
    plt.title("predicted sources")
    
    fig.tight_layout()
    plt.show()

In [None]:
def get_label(pid):
  with open('C:\\Users\\Dell\\Desktop\\CIP\\data.txt') as f:
    lines = f.readlines()
    for line in lines:
      lines_ = []
      lines_.append(list(line.split()))

      if lines_[0][0] == str(pid):
        return lines_[0][1]


In [None]:
def get_label(pid):
  with open('/content/data.txt') as f:
    lines = f.readlines()
    for line in lines:
      lines_=[]
      lines_.append(list(line.split()))
      #print(lines_[0][1])
      if lines_[0][0]==str(pid):
         return lines_[0][1]

Frame Slicing And Feature Extraction

In [None]:
def get_mfcc(s):
  frames = librosa.util.frame(s,11025, 11025,axis=0)
  nm=[]
  for frame in frames:
    in_frames = librosa.util.frame(frame,1103, 1102,axis=0)
    mfccs=[]
    for in_frame in in_frames:
      mfcc = np.mean(librosa.feature.mfcc(in_frame,n_mfcc=13,sr=22050), axis=1)
      mfccs.append(mfcc)
    mfccs = np.array(mfccs).reshape(130*1)
    nm.append(mfccs)

  #Principal Component Analysis
  pca = PCA(n_components=40)
  pca.fit(np.array(nm).reshape(40,130))
  #print("Principal Components:\n",pca.singular_values_)
  return np.array(pca.singular_values_).reshape(40,1)

In [None]:
from google.colab import drive  
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
TEST_NAMES=[
            "122_2b3_Tc_mc_LittC2SE.wav",
            "226_1b1_Al_sc_Meditron.wav",
            "219_2b2_Ar_mc_LittC2SE.wav",
            "191_2b1_Pl_mc_LittC2SE.wav",
            "191_2b1_Pr_mc_LittC2SE.wav",
            "140_2b3_Ll_mc_LittC2SE.wav",
            "111_1b3_Tc_sc_Meditron.wav",
            "168_1b1_Al_sc_Meditron.wav",
            "201_1b3_Ar_sc_Meditron.wav",
            "116_1b2_Pl_sc_Meditron.wav",
            "196_1b1_Pr_sc_Meditron.wav",
            "169_1b2_Ll_sc_Meditron.wav",
            "169_1b1_Lr_sc_Meditron.wav",
            "173_1b1_Al_sc_Meditron.wav",
            "206_1b1_Ar_sc_Meditron.wav",
            "161_1b1_Pl_sc_Meditron.wav",
            "167_1b1_Pr_sc_Meditron.wav",
            "149_1b1_Lr_sc_Meditron.wav",
            "105_1b1_Tc_sc_Meditron.wav",
            "131_1b1_Al_sc_Meditron.wav",
            "119_1b1_Ar_sc_Meditron.wav",
            "165_1b1_Pl_sc_Meditron.wav",
            "101_1b1_Pr_sc_Meditron.wav",
            "137_1b1_Ll_sc_Meditron.wav",
            "121_1p1_Tc_sc_Meditron.wav",
            "123_1b1_Al_sc_Meditron.wav",
            "102_1b1_Ar_sc_Meditron.wav",
            "183_1b1_Pl_sc_Meditron.wav",
            "159_1b1_Pr_sc_Meditron.wav",
            "187_1b1_Ll_sc_Meditron.wav",
            "194_1b1_Lr_sc_Meditron.wav",
            "117_1b2_Tc_mc_LittC2SE.wav",
            "113_1b1_Al_sc_Litt3200.wav",
            "104_1b1_Ar_sc_Litt3200.wav",
            "107_2b4_Pl_mc_AKGC417L.wav",
            "106_2b1_Pr_mc_LittC2SE.wav",
            "112_1p1_Ll_sc_Litt3200.wav",
            "118_1b1_Lr_sc_Litt3200.wav",
            "110_1p1_Lr_sc_Meditron.wav"
            ]
TEST_DATA=[]
TEST_LABELS=[]

Implementing ICA and MFCC extraction 

In [None]:
path = '/content/drive/MyDrive/ICBHI_final_database'
#endpath = 'D:\\Users\\HP\\Desktop\\CIP\\ICBHI_final_database\\Preprocessed'
sounds = []
labels = []
i=1
for file in os.listdir(path):
  if file[-3:] == 'wav' and file[:3]!='103' and file[:3]!='108' and file[:3]!='115':
   
    pid = file[:3]
    data_x, sampling_rate = librosa.load(os.path.join(path,file),res_type='kaiser_fast', duration=20)
    
    #adjusting audio length
    if len(data_x)<441000:
      diff = 441000-len(data_x)
      da = np.full((1,diff),0)
      data_x = np.append(data_x, da)


    #FILTERING AND NOISE REDUCTION
    a = signal.firwin(1081, cutoff = 100, window = "hanning", fs=sampling_rate,pass_zero=False)
    filtered_x = lfilter(a, 1.0, data_x)
    #print(np.array(filtered_x).shape)
    
    #ICA
    row_new = np.zeros((1, len(filtered_x)), dtype=filtered_x.dtype)
    new_filtered_x=np.vstack((filtered_x,row_new))
    print(new_filtered_x.shape)
    S = ica(new_filtered_x, iterations=10000)
    #print(np.array(S[1]).shape)
    #plot_mixture_sources_predictions(new_filtered_x, S)

    
    #FRAME SLICING AND FEATURE EXTRACTION
    mfccs = np.array(get_mfcc(S[1]))
    #print(len(mfccs))
    sounds.append( mfccs )
    labels.append(get_label(pid))

    #BUILDING TEST SET
    if file in TEST_NAMES:
      TEST_DATA.append(mfccs)
      TEST_LABELS.append(get_label(pid))
      print(file)

    #print(i)
    i=i+1

In [None]:
print(np.array(TEST_DATA).shape)
print(np.array(TEST_LABELS).shape)

(0,)
(0,)


In [None]:
print(len(sounds))
print(len(sounds[0]))
print(np.array(sounds[0]).shape)

319
40
(40, 1)


In [None]:
np.save('features_sounds',np.array(sounds))
np.save('features_labels',np.array(labels))
np.save('TEST_sounds',np.array(TEST_DATA))
np.save('TEST_labels',np.array(TEST_LABELS))

In [None]:
sounds = np.load("features_sounds.npy")
labels = np.load("features_labels.npy")
TEST_DATA = np.load("TEST_sounds.npy")
TEST_LABELS = np.load("TEST_labels.npy")

In [None]:
print(np.array(TEST_DATA).shape)
print(np.array(TEST_LABELS).shape)

(26, 40, 1)
(26,)


Augmentation - Base paper technique

In [None]:
def add_noise(data,x):
    noise = np.random.randn(len(data))
    data_noise = data + x * noise
    return data_noise

def shift(data,x):
    return np.roll(data, x)

def stretch(data, rate):
    data = librosa.effects.time_stretch(data, rate)
    return data

In [None]:
r_sounds=[]
r_labels=[]
for i in range(len(sounds)):
  r_sounds.append(sounds[i])
  r_labels.append(labels[i])
  if(labels[i]!="COPD"):
    data_noise = add_noise(data_x,0.005)


Baseline for INFO Gan

In [None]:
%tensorflow_version 2.x
import tensorflow as tf

In [None]:
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint, EarlyStopping
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
import tensorflow_probability as tfp

In [None]:
def create_gen_input(batch_size=32, noise_size=40, n_class=6, seed=None):
  # create noise input
  noise = tf.random.normal([batch_size, noise_size], seed=seed)
  # Create categorical latent code
  label = tf.random.uniform([batch_size], minval=0, maxval=6, dtype=tf.int32, seed=seed)
  label = tf.one_hot(label, depth=n_class)
  #a=np.array(label)
  #label=np.where(a==1)[1]
  # Create one continuous latent code
  c_1 = tf.random.uniform([batch_size, 1], minval=-1, maxval=1, seed=seed)
  return label, c_1, noise
  

In [None]:
#take
import keras.layers
def create_generator_continuous(input_size = 47):
    # Build functional API model
    # input
    input = keras.layers.Input(shape=(input_size,))

    # Fully-connected layer.
    dense_1 = keras.layers.Dense(units=512, use_bias=False) (input)
    bn_1 = keras.layers.BatchNormalization()(dense_1)
    act_1 = keras.layers.ReLU()(bn_1)
    # Fully-connected layer. The output should be able to reshape into 7x7
    dense_2 = keras.layers.Dense(units=128, use_bias=False) (act_1)
    bn_2 = keras.layers.BatchNormalization()(dense_2)
    act_2 = keras.layers.ReLU()(bn_2)
    # Reshape
    #reshape = keras.layers.Reshape(target_shape=(512))(act_2)

    #nf = n_filters
    # First transposed convolutional layer

    dense_3 = keras.layers.Dense(units=47, use_bias=False) (act_2)
    bn_3 = keras.layers.BatchNormalization()(dense_3)
    output = keras.layers.ReLU()(bn_3)

    # Number of filters halved after each transposed convolutional layer
    #nf = nf//2
    # Second transposed convolutional layer
    # strides=(2, 2): shape is doubled after the transposed convolutio

    # Final transposed convolutional layer: output shape: 28x28x1, tanh activation
    #output = keras.layers.Conv2DTranspose(1, kernel_size=(4, 4), strides=(1, 1), 
                                       #  padding="same", activation="tanh")(act_2)
    model = keras.models.Model(inputs=input, outputs=output)
    return model

In [None]:
#take
def create_discriminator_continuous(n_class=6, input_size=47):
    # Build functional API model
    # Image Input
    input = keras.layers.Input(shape=(input_size,))

    # Fully-connected layer.
    dense_1 = keras.layers.Dense(units=512, use_bias=False) (input)
    bn_1 = keras.layers.BatchNormalization()(dense_1)
    act_1 = keras.layers.ReLU()(bn_1)

    # Fully-connected layer. The output should be able to reshape into 7x7
    dense_2 = keras.layers.Dense(units=128, use_bias=False) (act_1)
    bn_2 = keras.layers.BatchNormalization()(dense_2)
    act_2 = keras.layers.ReLU()(bn_2)
    # Reshape
    #reshape = keras.layers.Reshape(target_shape=(512))(act_2)

    #nf = n_filters
    # First transposed convolutional layer

    dense_3 = keras.layers.Dense(units=256, use_bias=False) (act_2)
    bn_3 = keras.layers.BatchNormalization()(dense_3)
    act_3 = keras.layers.ReLU()(bn_3)

    d_output = keras.layers.Dense(1, activation='sigmoid')(act_3)

    q_dense = keras.layers.Dense(128, use_bias=False)(act_3)
    q_bn = keras.layers.BatchNormalization()(q_dense)
    q_act = keras.layers.LeakyReLU(alpha=0.1)(q_bn)

    # Classification (discrete output)
    clf_out = keras.layers.Dense(n_class, activation="softmax")(q_act)

    # Gaussian distribution mean (continuous output)
    mu = keras.layers.Dense(1)(q_act)

    # Gaussian distribution standard deviation (exponential activation to ensure the value is positive)
    sigma = keras.layers.Dense(1, activation=lambda x: tf.math.exp(x))(q_act)

    # Discriminator model (not compiled)
    d_model = keras.models.Model(inputs=input, outputs=d_output)

    # Auxiliary model (not compiled)
    q_model = keras.models.Model(inputs=input, outputs=[clf_out, mu, sigma])
    return d_model, q_model

In [None]:
#take
fake_batch=[]
class InfoGAN_Continuous(keras.Model):
    def __init__(self, d_model, g_model, q_model,noise_size, num_classes):
        super(InfoGAN_Continuous, self).__init__()
        self.d_model = d_model
        self.g_model = g_model
        self.q_model = q_model
        self.noise_size = noise_size
        self.num_classes = num_classes

    def compile(self, d_optimizer, g_optimizer, q_optimizer):
        super(InfoGAN_Continuous, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.q_optimizer = q_optimizer

    def create_gen_input(self, batch_size, noise_size, n_class, seed=None):
        # create noise input
        noise = tf.random.normal([batch_size, noise_size], seed=seed)
        # Create categorical latent code
        label = tf.random.uniform([batch_size], minval=0, maxval=6, dtype=tf.int32, seed=seed)
        label = tf.one_hot(label, depth=n_class)
        # Create one continuous latent code
        c_1 = tf.random.uniform([batch_size, 1], minval=-1, maxval=1, seed=seed)
        return label, c_1, noise

    def concat_inputs(self, input):
        concat_input = keras.layers.Concatenate()(input)
        return concat_input

    def train_step(self, real_image_batch):
        # Define loss functions
        binary_loss = keras.losses.BinaryCrossentropy()
        categorical_loss = keras.losses.CategoricalCrossentropy()
        # Half-batch for training discriminator and batch for training generator and auxiliary model
        batch = tf.shape(real_image_batch)[0]
        # Create generator input 
        g_label, c_1, g_noise = self.create_gen_input(batch, self.noise_size, self.num_classes, seed=None)
        g_input = self.concat_inputs([g_label, c_1, g_noise])
        fake_image_batch=[]
        with tf.GradientTape() as d_tape: 
            self.d_model.trainable = True
            d_tape.watch(self.d_model.trainable_variables)
            # Train discriminator using half batch real images
            y_disc_real = tf.ones((batch, 1))
            d_real_output = self.d_model(real_image_batch, training=True)
            d_loss_real = binary_loss(y_disc_real, d_real_output)
            # Train discriminator using half batch fake images     
            y_disc_fake = tf.zeros((batch, 1))
            # Create fake image batch
            fake_image_batch = self.g_model(g_input, training=True)
            d_fake_output = self.d_model(fake_image_batch, training=True)
            d_loss_fake = binary_loss(y_disc_fake, d_fake_output)
            d_loss = d_loss_real + d_loss_fake
        # Calculate gradients
        d_gradients = d_tape.gradient(d_loss, self.d_model.trainable_variables)
        # Optimize
        self.d_optimizer.apply_gradients(zip(d_gradients, self.d_model.trainable_variables))
        with tf.GradientTape() as g_tape, tf.GradientTape() as q_tape:
            # Create generator input 
            g_label, c_1, g_noise = self.create_gen_input(batch*2, self.noise_size, self.num_classes, seed=None)
            g_input = self.concat_inputs([g_label, c_1, g_noise])
            g_tape.watch(self.g_model.trainable_variables)
            q_tape.watch(self.q_model.trainable_variables)
            # Create fake image batch
            fake_image_batch = self.g_model(g_input, training=True)
            d_fake_output = self.d_model(fake_image_batch, training=True)
            # Generator Image loss
            y_gen_fake = tf.ones((batch*2, 1))
            g_img_loss = binary_loss(y_gen_fake, d_fake_output)
            # Auxiliary loss
            cat_output, mu, sigma = self.q_model(fake_image_batch, training=True)
            # Categorical loss
            cat_loss = categorical_loss(g_label, cat_output)
            # Use Gaussian distributions to represent the output
            dist = tfp.distributions.Normal(loc=mu, scale=sigma)
            # Losses (negative log probability density function as we want to maximize the probability density function)
            c_1_loss = tf.reduce_mean(-dist.log_prob(c_1))
            # Generator total loss
            g_loss = g_img_loss + (cat_loss + 0.1*c_1_loss)
            # Auxiliary function loss
            q_loss = (cat_loss + 0.1*c_1_loss)
        # Calculate gradients
        # We do not want to modify the neurons in the discriminator when training the generator and the auxiliary model
        self.d_model.trainable=False
        g_gradients = g_tape.gradient(g_loss, self.g_model.trainable_variables)
        q_gradients = q_tape.gradient(q_loss, self.q_model.trainable_variables)
        # Optimize
        self.g_optimizer.apply_gradients(zip(g_gradients, self.g_model.trainable_variables))
        self.q_optimizer.apply_gradients(zip(q_gradients, self.q_model.trainable_variables))
        fake_batch = fake_image_batch
        return {"d_loss_real": d_loss_real, "d_loss_fake": d_loss_fake, "g_img_loss": g_img_loss ,
                "cat_loss": cat_loss, "c_1_loss": c_1_loss}

In [None]:
#take
def concat_inputs(input):
        concat_input = keras.layers.Concatenate()(input)
        return concat_input

In [None]:
#take
from sklearn.model_selection import train_test_split
def load_real_image(batch_size=32):
    #(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data(path="mnist.npz")
    X_train, X_test, y_train, y_test = train_test_split(np.array(sounds).reshape(len(sounds),40), labels, test_size=0.2, random_state=0)
    # Add the color channel - change to 4D tensor, and convert the data type to 'float32'
    #train_images = X_train.reshape((X_train.shape[0], 28, 28, 1)).astype('float32')
    train_images = np.array(X_train)
    # Set the pixel values from -1 to 1
    train_images = (train_images/255.0) * 2 - 1
    df = pd.DataFrame(y_train, columns=['Class'])
    # creating instance of labelencoder
    labelencoder = LabelEncoder()
    # Assigning numerical values and storing in another column
    df['Class_cat'] = labelencoder.fit_transform(df['Class'])
    y = df[['Class_cat']].to_numpy().reshape(len(X_train))
    label1 = tf.one_hot(y, depth=6)
    print(label1.shape)
    c1 = tf.ones(len(X_train), 1)
    c1 = c1.numpy().reshape(len(X_train), 1)
    gen_input = concat_inputs([label1, c1, train_images])
    # Shuffle and separate in batch
    print(gen_input.shape)
    buffer_size = train_images.shape[0]

    #train_images_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(buffer_size).batch(batch_size)
    return gen_input
g_model_continuous=create_generator_continuous()
d_model_continuous,q_model_continuous=create_discriminator_continuous()
infogan = InfoGAN_Continuous(d_model_continuous, g_model_continuous, q_model_continuous, noise_size=40, num_classes=6)
infogan.compile(d_optimizer=tf.keras.optimizers.Adam(learning_rate=2e-4),
                g_optimizer=tf.keras.optimizers.Adam(learning_rate=5e-4),
                q_optimizer=tf.keras.optimizers.Adam(learning_rate=2e-4))
real_images = load_real_image(batch_size=32)
infogan.fit(real_images, epochs=500)


(255, 6)
(255, 47)
Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 52/500
Epoch 53/500
Epoch 54/500
Epoch 55/500
Epoch 56/500
Epoch 57/500
Epoch 58/500
Epoch 59/500
Epoch 60/500
Epoch 61/500
Epoch 62/500
Epoch 63/500
Epoch 64/500
Epoch 65/500
Epoch 66/500
Epoch 67/500
Epoch 68/500
Epoch 69/500
Epoch 70/500
Epoch 71/500
Epoch 72/500
Epoch 73/500
Epoch 74/500
Epoch 75/500
Epoch 76/500
Ep

<keras.callbacks.History at 0x7f30de4a8390>

In [None]:
#take
x = infogan.g_model.predict(real_images)
print(x)
print(len(x))

[[0.         0.         0.         ... 0.1201743  0.         0.        ]
 [0.         0.         0.         ... 0.14276014 0.         0.        ]
 [0.         0.         0.         ... 0.         0.         0.        ]
 ...
 [0.         0.         0.         ... 0.13946953 0.         0.        ]
 [0.23485887 0.         0.         ... 0.         0.         0.40678263]
 [0.         0.         0.         ... 0.         0.         0.        ]]
255


In [None]:
#take
def get_samples(label):
  
  data=[]
  
  for i in range(len(sounds)):
    if enc_labels[i] == label:
      data.append(sounds[i])
  print(len(data))
  gen_num = 2000 - len(data)
  #data=(data/255.0) * 2 - 1
  data = transform_data(data, label, gen_num)
  #data = np.array(data).reshape(len(data), 47)
  return np.array(data)

In [None]:
#take
def transform_data(data, className, num):
    
    q = num//len(data)
    r = num%len(data)
    temp=[]
    for i in range(q):
      for j in range(len(data)):
        temp.append(data[j])
    for i in range(r):
      temp.append(data[i])
    batch_size = len(temp)
    label = np.arange(batch_size)
    label.fill(className)
    label = tf.one_hot(label, depth=6)
    c1 = tf.ones(batch_size, 1)
    c1 = c1.numpy().reshape(batch_size, 1)
    temp1 = np.array(temp).reshape(len(temp), 40)  
    temp1 = (temp1/255.0) * 2 - 1
    return concat_inputs([label, c1, temp1])


In [None]:
#take
gen_x=[]
gen_y=[]
df = pd.DataFrame(labels, columns=['Class'])
    # creating instance of labelencoder
labelencoder = LabelEncoder()
    # Assigning numerical values and storing in another column
df['Class_cat'] = labelencoder.fit_transform(df['Class'])
enc_labels = df[['Class_cat']].to_numpy().reshape(len(sounds))

for i in range(6):
  samples = get_samples(i)
  y = np.arange(len(samples))
  y.fill(i)
  if i==0:
    gen_x = np.array(samples)
    gen_y = np.array(y)
  else:
    gen_x = np.concatenate((gen_x, samples),axis=0)
    gen_y = np.concatenate((np.array(gen_y), np.array(y)), axis=0)
gen_x = np.array(gen_x).reshape(len(gen_x),47)
gen_y = labelencoder.inverse_transform(gen_y)

4
3
263
13
26
10


In [None]:
#take
print(gen_x)
print(gen_y)
gen_y1 = np.sort(gen_y)
print(gen_y1[11000])
print(labelencoder.classes_)

[[ 1.          0.          0.         ... -0.80080944 -0.8300862
  -1.        ]
 [ 1.          0.          0.         ... -0.80813736 -0.81651944
  -1.        ]
 [ 1.          0.          0.         ... -0.8183458  -0.84656763
  -1.        ]
 ...
 [ 0.          0.          0.         ... -0.8017075  -0.8287334
  -1.        ]
 [ 0.          0.          0.         ... -0.76861525 -0.77409464
  -1.        ]
 [ 0.          0.          0.         ... -0.79243153 -0.81853276
  -1.        ]]
['Bronchiectasis' 'Bronchiectasis' 'Bronchiectasis' ... 'URTI' 'URTI'
 'URTI']
URTI
['Bronchiectasis' 'Bronchiolitis' 'COPD' 'Healthy' 'Pneumonia' 'URTI']


In [None]:
#take
gen_x=infogan.g_model.predict(gen_x)


In [None]:
#take
print(len(gen_x))

11681


In [None]:
#take
new_gen_x = np.delete(np.array(gen_x).reshape(len(gen_x), 47), [0,1,2,3,4,5,6], axis=1)
new_x = np.concatenate((np.array(sounds).reshape(len(sounds), 40), new_gen_x), axis=0)
print(new_x.shape)
new_y = np.concatenate((np.array(labels).reshape(len(labels),1), np.array(gen_y).reshape(len(gen_y),1)), axis=0)
print(new_y.shape)

(12000, 40)
(12000, 1)


In [None]:
np.save('gan_sounds',np.array(new_x))
np.save('gan_labels',np.array(new_y))

In [None]:
new_x = np.load("gan_sounds.npy")
print(np.array(new_x).shape)
new_y = np.load("gan_labels.npy", allow_pickle=True)
print(np.array(new_y).shape)

(12000, 40)
(12000, 1)


Baseline Implementation of Deep neural Network

In [None]:
#take
from keras.utils import np_utils
from keras.layers import add, Conv2D,Input,BatchNormalization,TimeDistributed,Embedding,LSTM,GRU,Dense,MaxPooling1D,Dropout,LeakyReLU,ReLU,Flatten,concatenate,Bidirectional
from keras.layers.merge import concatenate
from keras.models import Model,load_model

def InstantiateModel(in_):
   model_2_1 = GRU(32,return_sequences=True,activation=None,go_backwards=True)(in_)
   model_2 = LeakyReLU()(model_2_1)
   model_2 = GRU(128,return_sequences=True, activation=None,go_backwards=True)(model_2)
   #model_2 = BatchNormalization()(model_2)
   model_2 = LeakyReLU()(model_2)
    
   model_3 = GRU(64,return_sequences=True,activation=None,go_backwards=True)(in_)
   model_3 = LeakyReLU()(model_3)
   model_3 = GRU(128,return_sequences=True, activation=None,go_backwards=True)(model_3)
    #model_3 = BatchNormalization()(model_3)
   model_3 = LeakyReLU()(model_3)
    
   model_add_1 = add([model_3,model_2])
    
   model_5 = GRU(128,return_sequences=True,activation=None,go_backwards=True)(model_add_1)
   model_5 = LeakyReLU()(model_5)
   model_5 = GRU(32,return_sequences=True, activation=None,go_backwards=True)(model_5)
   model_5 = LeakyReLU()(model_5)
    
   model_6 = GRU(64,return_sequences=True,activation=None,go_backwards=True)(model_add_1)
   model_6 = LeakyReLU()(model_6)
   model_6 = GRU(32,return_sequences=True, activation=None,go_backwards=True)(model_6)
   model_6 = LeakyReLU()(model_6)
    
   model_add_2 = add([model_5,model_6,model_2_1])
    
    
   model_7 = Dense(64, activation=None)(model_add_2)
   model_7 = LeakyReLU()(model_7)
   model_7 = Dropout(0.2)(model_7)
   model_7 = Dense(16, activation=None)(model_7)
   model_7 = LeakyReLU()(model_7)
    
   model_9 = Dense(32, activation=None)(model_add_2)
   model_9 = LeakyReLU()(model_9)
   model_9 = Dropout(0.2)(model_9)
   model_9 = Dense(16, activation=None)(model_9)
   model_9 = LeakyReLU()(model_9)
    
   model_add_3 = add([model_7,model_9])
   model_add_3 = tf.keras.layers.Flatten()(model_add_3)
   model_10 = Dense(16, activation=None)(model_add_3)
    #model_10 = BatchNormalization()(model_10)
   model_10 = LeakyReLU()(model_10)
   model_10 = Dropout(0.5)(model_10)
    #Model_7 = MaxPooling1D(pool_size=2)(mode)
   model_10 = Dense(6, activation="softmax")(model_10)
    
   return model_10

In [None]:
#take
from keras import backend as K
#from models import InstantiateModel
from keras.models import Model
from tensorflow.keras.optimizers import Adamax
from keras.layers import Input
y_pred=[]
DNN_model = []
def trainModel(X, y):
  K.clear_session()
  batch_size=X.shape[0]
  time_steps=X.shape[1]
  data_dim=X.shape[2]
  Input_Sample = Input((time_steps,data_dim))
  Output_ = InstantiateModel(Input_Sample)
  Model_Enhancer = Model(inputs=Input_Sample, outputs=Output_)
  Model_Enhancer.compile(loss='categorical_crossentropy', metrics=['accuracy'], optimizer=Adamax())
  DNN_model=Model_Enhancer
  ES = EarlyStopping(monitor='val_loss', min_delta=0.5, patience=200, verbose=1, mode='auto', baseline=None,
                              restore_best_weights=False)
  MC = ModelCheckpoint('best_model.h5', monitor='val_acc', mode='auto', verbose=0, save_best_only=True)
    
    #class_weights = class_weight.compute_sample_weight('balanced',
	#                                                 np.unique(y[:,0],axis=0),
	#                                                 y[:,0])
  #print(np.unique(y))
  X_train, X_test, y_train, y_test = train_test_split(np.array(X).reshape(len(X),40), np.array(y), test_size=0.54, random_state=500)
  #print(np.unique(np.array(y_test)))
  #print(np.unique(np.array(y_train)))
  from tensorflow.keras.utils import to_categorical
  
  y_binary_train = to_categorical(y_train)
  y_binary_test = to_categorical(y_test)
  #print(y_binary_train.shape)
  print(y_binary_test.shape)
  ModelHistory = Model_Enhancer.fit(np.array(X_train).reshape(len(X_train),40,1), y_binary_train,batch_size=batch_size, epochs=200,
                                  validation_data=(np.array(X_test).reshape(len(X_test),40,1), y_binary_test),
                                  callbacks = [MC],
                                  verbose=1)
  y_pred = Model_Enhancer.predict(np.array(X_test).reshape(len(X_test), 40, 1), batch_size = batch_size, verbose = 1, callbacks = [MC])
  return y_pred, y_test, Model_Enhancer

In [None]:
#take
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,matthews_corrcoef
from sklearn.metrics import cohen_kappa_score,roc_auc_score,confusion_matrix,classification_report

def evalModel(y_test, y_pred):
	'''
	    Evaluate the performance of the model.
	    Args:
	       y_test: The array of features to be tested against.
	       y_pred: Model predictions.
        Returns: Accuracy, Precision, Recall, F1 score, Cohens kappa, Matthews correlation coefficient
                 of the model after evaluation.
	'''
	  #y_test = y_test.reshape(y_test.shape[0],y_test.shape[2])
    #y_test =np.argmax(y_test,axis=1)

    # accuracy: (tp + tn) / (p + n)
	accuracy = accuracy_score(y_test, y_pred)
	print('Accuracy: %f' % accuracy)
	# precision tp / (tp + fp)
	precision = precision_score(y_test, y_pred,average='weighted')
	print('Precision: %f' % precision)
	# recall: tp / (tp + fn)
	recall = recall_score(y_test, y_pred,average='weighted')
	print('Recall: %f' % recall)
	# f1: 2 tp / (2 tp + fp + fn)
	f1 = f1_score(y_test, y_pred,average='weighted')
	print('F1 score: %f' % f1)
	 
	# kappa
	kappa = cohen_kappa_score(y_test, y_pred)
	print('Cohens kappa: %f' % kappa)
	MatthewsCorrCoef = matthews_corrcoef(y_test, y_pred)
	print('Matthews correlation coefficient: %f' % MatthewsCorrCoef)
	# ROC AUC
	'''auc = roc_auc_score(y_test, y_pred)
	print('ROC AUC: %f' % auc)'''
	# confusion matrix
	matrix = classification_report(y_test, y_pred)
	print(matrix)

	return {
	       "Accuracy": accuracy,
	       "Precision": precision,
	       "Recall": recall,
	       "F1 score": f1,
	       "Cohens kappa": kappa,
	       "Matthews correlation coefficient": MatthewsCorrCoef
	}

In [None]:
#take
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint, EarlyStopping
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder

bridge_df = pd.DataFrame(labels, columns=['Class'])
# creating instance of labelencoder
labelencoder = LabelEncoder()
# Assigning numerical values and storing in another column
bridge_df['Class_cat'] = labelencoder.fit_transform(bridge_df['Class'])

y = bridge_df[['Class_cat']].to_numpy()
y_pred, y_test, DNN_model = trainModel(np.array(sounds).reshape(len(sounds),40,1), np.array(y))


(173, 6)
Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200

In [None]:
#take
print(y_pred)
print(DNN_model)

[[9.1125043e-16 1.0728270e-15 1.0000000e+00 5.4740378e-14 6.1422532e-18
  4.0402298e-22]
 [4.2972743e-04 4.3157146e-05 9.7648990e-01 2.3020977e-02 1.6224541e-05
  1.6321364e-07]
 [1.5469562e-18 1.7283180e-17 1.0000000e+00 5.2903550e-16 3.0832994e-21
  1.1101750e-25]
 ...
 [2.3248536e-03 1.0226683e-02 8.5135043e-01 3.8598277e-02 9.6671060e-02
  8.2862645e-04]
 [1.4188667e-03 5.4730936e-03 9.3846369e-01 3.4702558e-02 1.9782707e-02
  1.5909955e-04]
 [2.1369283e-14 1.0005576e-14 1.0000000e+00 4.5619444e-13 2.4366973e-16
  1.2082587e-19]]
<keras.engine.functional.Functional object at 0x7f30dc141650>


In [None]:
#take
yt_pred = np.argmax(y_pred, axis=1)
yt_pred = yt_pred.reshape(len(yt_pred),1)
print(yt_pred.shape, y_test.shape)

(173, 1) (173, 1)


In [None]:
#take
evalModel(y_test, yt_pred)

Accuracy: 0.763006
Precision: 0.665920
Recall: 0.763006
F1 score: 0.705586
Cohens kappa: 0.029685
Matthews correlation coefficient: 0.036758
              precision    recall  f1-score   support

           0       0.00      0.00      0.00         3
           1       0.00      0.00      0.00         3
           2       0.80      0.94      0.86       138
           3       0.50      0.17      0.25         6
           4       0.14      0.07      0.09        15
           5       0.00      0.00      0.00         8

    accuracy                           0.76       173
   macro avg       0.24      0.20      0.20       173
weighted avg       0.67      0.76      0.71       173



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


{'Accuracy': 0.7630057803468208,
 'Cohens kappa': 0.029685362517099967,
 'F1 score': 0.7055855154389903,
 'Matthews correlation coefficient': 0.036757570874220616,
 'Precision': 0.6659202707289519,
 'Recall': 0.7630057803468208}

In [None]:
#take
bridge_df = pd.DataFrame(new_y, columns=['Class'])
# creating instance of labelencoder
labelencoder = LabelEncoder()
# Assigning numerical values and storing in another column
bridge_df['Class_cat'] = labelencoder.fit_transform(bridge_df['Class'])

y_new = bridge_df[['Class_cat']].to_numpy()
y_pred_new, y_test_new, GAN_DNN_model = trainModel(np.array(new_x).reshape(len(new_x),40,1), np.array(y_new))


(6480, 6)
Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/20

In [None]:
#take
yt_pred_new = np.argmax(y_pred_new, axis=1)
yt_pred_new = yt_pred_new.reshape(len(yt_pred_new),1)
print(yt_pred_new.shape, y_test_new.shape)

(6480, 1) (6480, 1)


In [None]:
#take
evalModel(y_test_new, yt_pred_new)

Accuracy: 0.982562
Precision: 0.983005
Recall: 0.982562
F1 score: 0.982538
Cohens kappa: 0.979071
Matthews correlation coefficient: 0.979174
              precision    recall  f1-score   support

           0       1.00      1.00      1.00      1107
           1       1.00      1.00      1.00      1067
           2       0.97      0.92      0.95      1085
           3       1.00      0.99      0.99      1062
           4       0.93      0.99      0.96      1112
           5       1.00      1.00      1.00      1047

    accuracy                           0.98      6480
   macro avg       0.98      0.98      0.98      6480
weighted avg       0.98      0.98      0.98      6480



{'Accuracy': 0.9825617283950617,
 'Cohens kappa': 0.9790709268180418,
 'F1 score': 0.9825379379259611,
 'Matthews correlation coefficient': 0.9791744073571553,
 'Precision': 0.9830051805830259,
 'Recall': 0.9825617283950617}

Testing using all possible Input Cases

In [None]:
bridge_df = pd.DataFrame(TEST_LABELS, columns=['Class'])
bridge_df['Class_cat'] = labelencoder.fit_transform(bridge_df['Class'])
Y_TEST = bridge_df[['Class_cat']].to_numpy()
print(Y_TEST)

[[5]
 [3]
 [5]
 [2]
 [2]
 [2]
 [0]
 [2]
 [5]
 [3]
 [5]
 [2]
 [2]
 [2]
 [0]
 [2]
 [0]
 [2]
 [5]
 [3]
 [4]
 [3]
 [5]
 [5]
 [4]
 [1]]


In [None]:
TEST_PREDICTED  = GAN_DNN_model.predict(np.array(TEST_DATA).reshape(len(TEST_DATA),40))
TEST_PREDICTED = np.argmax(TEST_PREDICTED, axis=1)
TEST_PREDICTED = TEST_PREDICTED.reshape(len(TEST_PREDICTED),1)
print(TEST_PREDICTED)

[[5]
 [3]
 [5]
 [2]
 [2]
 [2]
 [2]
 [2]
 [5]
 [3]
 [5]
 [2]
 [2]
 [2]
 [2]
 [2]
 [0]
 [2]
 [5]
 [2]
 [2]
 [2]
 [2]
 [5]
 [4]
 [1]]


Metrics for this test set

In [None]:
from sklearn.metrics import accuracy_score,precision_score,confusion_matrix
print("Accuracy score : " , accuracy_score(Y_TEST,TEST_PREDICTED))
print("Precision : ", precision_score(Y_TEST,TEST_PREDICTED,average="weighted"))
print("Confusion Matrix : \n", confusion_matrix(Y_TEST, TEST_PREDICTED))

Accuracy score :  0.7692307692307693
Precision :  0.8615384615384615
Confusion Matrix : 
 [[1 0 2 0 0 0]
 [0 1 0 0 0 0]
 [0 0 9 0 0 0]
 [0 0 2 2 0 0]
 [0 0 1 0 1 0]
 [0 0 1 0 0 6]]


In [None]:
labelencoder.classes_

array(['Bronchiectasis', 'Bronchiolitis', 'COPD', 'Healthy', 'Pneumonia',
       'URTI'], dtype=object)

In [None]:
TEST_NAMES=[
            "105_1b1_Tc_sc_Meditron.wav",
            "102_1b1_Ar_sc_Meditron.wav",
            "101_1b1_Pr_sc_Meditron.wav",
            "104_1b1_Ar_sc_Litt3200.wav",
            "107_2b4_Pl_mc_AKGC417L.wav",
            "106_2b1_Pr_mc_LittC2SE.wav",
             "169_1b1_Lr_sc_Meditron.wav",
             "118_1b1_Lr_sc_Litt3200.wav",
            "119_1b1_Ar_sc_Meditron.wav",
            "194_1b1_Lr_sc_Meditron.wav",
            "165_1b1_Pl_sc_Meditron.wav",
            "117_1b2_Tc_mc_LittC2SE.wav",
            "113_1b1_Al_sc_Litt3200.wav",
            "112_1p1_Ll_sc_Litt3200.wav",
            "196_1b1_Pr_sc_Meditron.wav",
            "201_1b3_Ar_sc_Meditron.wav",
            "110_1p1_Lr_sc_Meditron.wav",
            "116_1b2_Pl_sc_Meditron.wav",
             "131_1b1_Al_sc_Meditron.wav",
            "183_1b1_Pl_sc_Meditron.wav",
            "191_2b1_Pr_mc_LittC2SE.wav",
            "159_1b1_Pr_sc_Meditron.wav",
            "137_1b1_Ll_sc_Meditron.wav",
            "219_2b2_Ar_mc_LittC2SE.wav",
            "167_1b1_Pr_sc_Meditron.wav",
            "122_2b3_Tc_mc_LittC2SE.wav", 
            "226_1b1_Al_sc_Meditron.wav",
            "191_2b1_Pl_mc_LittC2SE.wav",
            "206_1b1_Ar_sc_Meditron.wav",
             "111_1b3_Tc_sc_Meditron.wav",
            "121_1p1_Tc_sc_Meditron.wav",
            "123_1b1_Al_sc_Meditron.wav",
            "168_1b1_Al_sc_Meditron.wav",
             "169_1b2_Ll_sc_Meditron.wav",
            "140_2b3_Ll_mc_LittC2SE.wav",
            "161_1b1_Pl_sc_Meditron.wav",
            "173_1b1_Al_sc_Meditron.wav",
            "149_1b1_Lr_sc_Meditron.wav",
            "187_1b1_Ll_sc_Meditron.wav"            
            ]


In [None]:
from prettytable import PrettyTable
table = PrettyTable(["Audio","Actual label","Predicted label"])
for i in range(len(TEST_PREDICTED)):
  row = []
  row.append(TEST_NAMES[i])
  row.append(labelencoder.inverse_transform(np.array(Y_TEST[i]).reshape(1)))
  row.append(labelencoder.inverse_transform(np.array(TEST_PREDICTED[i]).reshape(1)))
  row = np.array(row).reshape(3)
  table.add_row(row)
print(table)

+----------------------------+--------------------+--------------------+
|           Audio            |    Actual label    |  Predicted label   |
+----------------------------+--------------------+--------------------+
| 105_1b1_Tc_sc_Meditron.wav |      ['URTI']      |      ['URTI']      |
| 102_1b1_Ar_sc_Meditron.wav |    ['Healthy']     |    ['Healthy']     |
| 101_1b1_Pr_sc_Meditron.wav |      ['URTI']      |      ['URTI']      |
| 104_1b1_Ar_sc_Litt3200.wav |      ['COPD']      |      ['COPD']      |
| 107_2b4_Pl_mc_AKGC417L.wav |      ['COPD']      |      ['COPD']      |
| 106_2b1_Pr_mc_LittC2SE.wav |      ['COPD']      |      ['COPD']      |
| 169_1b1_Lr_sc_Meditron.wav | ['Bronchiectasis'] |      ['COPD']      |
| 118_1b1_Lr_sc_Litt3200.wav |      ['COPD']      |      ['COPD']      |
| 119_1b1_Ar_sc_Meditron.wav |      ['URTI']      |      ['URTI']      |
| 194_1b1_Lr_sc_Meditron.wav |    ['Healthy']     |    ['Healthy']     |
| 165_1b1_Pl_sc_Meditron.wav |      ['URTI']      |

  
