Using a dataset of mfccs with a RNN-LSTM model with oversampling.

The minority class is oversampled to match the majority class.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import warnings
warnings.simplefilter("ignore")
import os
import numpy as np
import matplotlib.pyplot as plt
import math
import json
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import plot_model
from contextlib import redirect_stdout
import numpy as np
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras import optimizers
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve

In [3]:
folder = '/content/drive/My Drive/Colab Notebooks/COVID-19'
if not os.path.exists(folder):
  print(folder + ' does not exist')

mfcc_folder = os.path.join(folder, 'mfcc_json_files')
if not os.path.exists(mfcc_folder):
  print(mfcc_folder + ' does not exist')

Read in json files created previously in Extract_rawMFCCs.ipynb.


In [4]:
def load_jsonfile(path, filename):
  with open(os.path.join(path, filename)) as json_file:
    data = json.load(json_file)
  print("{} loaded".format(filename))
  return data

In [5]:
test_data = load_jsonfile(mfcc_folder, 'test_mfccs.json')
train_data = load_jsonfile(mfcc_folder, 'train_mfccs.json')

test_mfccs.json loaded
train_mfccs.json loaded


Change labels to 0s and 1s

In [6]:
test_data_labels = []
for i in range (len(test_data["labels"])):
  if test_data["labels"][i] == 'healthy':
    test_data_labels.append(1)
  else:
    test_data_labels.append(0)

train_data_labels = []
for i in range (len(train_data["labels"])):
  if train_data["labels"][i] == 'healthy':
    train_data_labels.append(1)
  else:
    train_data_labels.append(0)


Create X_train, y_train, X_test, y_test

In [7]:
# Test data
X_test = np.array(test_data["mfcc"])
y_test = np.array(test_data_labels)

# Train data
X_train = np.array(train_data["mfcc"])
y_train = np.array(train_data_labels)

print("X_train shape: {}, y_train shape: {}, X_test shape: {}, y_test shape: {}".format(
    X_train.shape, y_train.shape, X_test.shape, y_test.shape))


X_train shape: (11401, 301, 13), y_train shape: (11401,), X_test shape: (1000, 301, 13), y_test shape: (1000,)


Split out a validation set

In [8]:
validation_size = 0.2
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=validation_size)

Check shape of X_train, y_train, X_val, y_val, X_test and y_test

In [9]:
print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(y_val.shape)
print(X_test.shape)
print(y_test.shape)

(9120, 301, 13)
(9120,)
(2281, 301, 13)
(2281,)
(1000, 301, 13)
(1000,)


Oversample minority class in training set only.

In [10]:
# How many of each class in training set.
count0 = 0
count1 = 0
for i in range (y_train.shape[0]):
  if y_train[i] == 0:
    count0 += 1
  else:
    count1 +=1
print("0: {}, 1: {}".format(count0, count1))
print()

0: 921, 1: 8199



In [11]:
data_0 = []
label_0 = []
data_1 = []
label_1 = []

for i in range (y_train.shape[0]):
  if y_train[i] == 0:
    data_0.append(X_train[i])
    label_0.append(y_train[i])
  else:
    data_1.append(X_train[i])
    label_1.append(y_train[i])

data_new = data_0 + data_1
label_new = label_0 + label_1

for i in range (int(count1/count0)-1):
  data_new += data_0
  label_new += label_0

oversampled_X_train = np.array(data_new)
oversampled_y_train = np.array(label_new)

print(oversampled_X_train)
print(oversampled_y_train)

print(oversampled_X_train.shape)
print(oversampled_y_train.shape)


[[[-2.17569926e+02  0.00000000e+00 -5.41198678e-15 ... -3.95310394e-15
   -1.33393494e-15  0.00000000e+00]
  [-2.17569926e+02  0.00000000e+00 -5.41198678e-15 ... -3.95310394e-15
   -1.33393494e-15  0.00000000e+00]
  [-2.17569926e+02  0.00000000e+00 -5.41198678e-15 ... -3.95310394e-15
   -1.33393494e-15  0.00000000e+00]
  ...
  [-2.06444460e+02  1.02993226e+01  7.54447435e+00 ...  4.25425647e+00
    2.10960833e+00  2.06684785e+00]
  [-2.06720698e+02  1.17412240e+01  8.44674721e+00 ...  4.76979114e+00
    1.81245188e+00  1.75587211e+00]
  [-2.07403337e+02  1.14943011e+01  8.35166892e+00 ...  4.49844522e+00
    1.90752901e+00  5.82236666e-01]]

 [[-1.98068343e+02 -1.10666308e-14 -5.41198678e-15 ... -1.97655197e-15
   -1.33393494e-15 -1.34373226e-15]
  [-1.98068343e+02 -1.10666308e-14 -5.41198678e-15 ... -1.97655197e-15
   -1.33393494e-15 -1.34373226e-15]
  [-1.62734593e+02  2.28890620e+01 -3.07410973e+00 ... -4.27176613e+00
   -1.18730215e+00  7.56545374e-01]
  ...
  [-1.92067523e+02  7.4

In [12]:
# How many of each class in  oversampled training set.
count0 = 0
count1 = 0
for i in range (oversampled_y_train.shape[0]):
  if oversampled_y_train[i] == 0:
    count0 += 1
  else:
    count1 +=1
print("0: {}, 1: {}".format(count0, count1))
print()

0: 7368, 1: 8199



In [13]:
#Replace existing X_train and y_train
X_train = oversampled_X_train
y_train = oversampled_y_train
print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(y_val.shape)
print(X_test.shape)
print(y_test.shape)

(15567, 301, 13)
(15567,)
(2281, 301, 13)
(2281,)
(1000, 301, 13)
(1000,)


Create datasets

In [14]:
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val))
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))

In [15]:
# Make sure the training data is shuffled when fed into the model

BATCH_SIZE = 100
SHUFFLE_BUFFER_SIZE = 10000 # Needs to be bigger than number of samples in training set

train_dataset = train_dataset.shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE)
val_dataset = val_dataset.batch(BATCH_SIZE)
test_dataset = test_dataset.batch(BATCH_SIZE)

# RNN-LSTM Network



Although there are built in measures for precision and recall, there are not the equivalent measures of specificity and negative prediction rate so I have had to create custom metrics for those.

I have used example in (tf.keras.metrics.Metric | TensorFlow Core v2.7.0) for Precision
 and changed it to calculate the metrics I need.

In [16]:
# Always run this code.
from keras.utils import metrics_utils
from keras.utils.generic_utils import to_list
from keras import backend
class Specificity(tf.keras.metrics.Metric):
  def __init__(self,
               thresholds=None,
               top_k=None,
               class_id=None,
               name=None,
               dtype=None):
    super(Specificity, self).__init__(name=name, dtype=dtype)
    self.init_thresholds = thresholds
    self.top_k = top_k
    self.class_id = class_id

    default_threshold = 0.5 if top_k is None else metrics_utils.NEG_INF
    self.thresholds = metrics_utils.parse_init_thresholds(
        thresholds, default_threshold=default_threshold)
    self._thresholds_distributed_evenly = (
        metrics_utils.is_evenly_distributed_thresholds(self.thresholds))
    self.true_negatives = self.add_weight(
        'true_negatives',
        shape=(len(self.thresholds),),
        initializer=tf.compat.v1.zeros_initializer)
    self.false_positives = self.add_weight(
        'false_positives',
        shape=(len(self.thresholds),),
        initializer=tf.compat.v1.zeros_initializer)

  def update_state(self, y_true, y_pred, sample_weight=None):
    """Accumulates true positive and false positive statistics.
    Args:
      y_true: The ground truth values, with the same dimensions as `y_pred`.
        Will be cast to `bool`.
      y_pred: The predicted values. Each element must be in the range `[0, 1]`.
      sample_weight: Optional weighting of each example. Defaults to 1. Can be a
        `Tensor` whose rank is either 0, or the same rank as `y_true`, and must
        be broadcastable to `y_true`.
    Returns:
      Update op.
    """
    return metrics_utils.update_confusion_matrix_variables(
        {
            metrics_utils.ConfusionMatrix.TRUE_NEGATIVES: self.true_negatives,
            metrics_utils.ConfusionMatrix.FALSE_POSITIVES: self.false_positives
        },
        y_true,
        y_pred,
        thresholds=self.thresholds,
        thresholds_distributed_evenly=self._thresholds_distributed_evenly,
        top_k=self.top_k,
        class_id=self.class_id,
        sample_weight=sample_weight)

  def result(self):
    result = tf.math.divide_no_nan(
        self.true_negatives,
        tf.math.add(self.true_negatives, self.false_positives))
    return result[0] if len(self.thresholds) == 1 else result

  def reset_state(self):
    num_thresholds = len(to_list(self.thresholds))
    backend.batch_set_value([(v, np.zeros((num_thresholds,)))
                             for v in (self.true_negatives,
                                       self.false_positives)])

  def get_config(self):
    config = {
        'thresholds': self.init_thresholds,
        'top_k': self.top_k,
        'class_id': self.class_id
    }
    base_config = super(Specificity, self).get_config()
    return dict(list(base_config.items()) + list(config.items()))

In [17]:
# Always run this code.
from keras.utils import metrics_utils
from keras.utils.generic_utils import to_list
from keras import backend
class NPV(tf.keras.metrics.Metric):
  def __init__(self,
               thresholds=None,
               top_k=None,
               class_id=None,
               name=None,
               dtype=None):
    super(NPV, self).__init__(name=name, dtype=dtype)
    self.init_thresholds = thresholds
    self.top_k = top_k
    self.class_id = class_id

    default_threshold = 0.5 if top_k is None else metrics_utils.NEG_INF
    self.thresholds = metrics_utils.parse_init_thresholds(
        thresholds, default_threshold=default_threshold)
    self._thresholds_distributed_evenly = (
        metrics_utils.is_evenly_distributed_thresholds(self.thresholds))
    self.true_negatives = self.add_weight(
        'true_negatives',
        shape=(len(self.thresholds),),
        initializer=tf.compat.v1.zeros_initializer)
    self.false_negatives = self.add_weight(
        'false_negatives',
        shape=(len(self.thresholds),),
        initializer=tf.compat.v1.zeros_initializer)

  def update_state(self, y_true, y_pred, sample_weight=None):
    """Accumulates true positive and false positive statistics.
    Args:
      y_true: The ground truth values, with the same dimensions as `y_pred`.
        Will be cast to `bool`.
      y_pred: The predicted values. Each element must be in the range `[0, 1]`.
      sample_weight: Optional weighting of each example. Defaults to 1. Can be a
        `Tensor` whose rank is either 0, or the same rank as `y_true`, and must
        be broadcastable to `y_true`.
    Returns:
      Update op.
    """
    return metrics_utils.update_confusion_matrix_variables(
        {
            metrics_utils.ConfusionMatrix.TRUE_NEGATIVES: self.true_negatives,
            metrics_utils.ConfusionMatrix.FALSE_NEGATIVES: self.false_negatives
        },
        y_true,
        y_pred,
        thresholds=self.thresholds,
        thresholds_distributed_evenly=self._thresholds_distributed_evenly,
        top_k=self.top_k,
        class_id=self.class_id,
        sample_weight=sample_weight)

  def result(self):
    result = tf.math.divide_no_nan(
        self.true_negatives,
        tf.math.add(self.true_negatives, self.false_negatives))
    return result[0] if len(self.thresholds) == 1 else result

  def reset_state(self):
    num_thresholds = len(to_list(self.thresholds))
    backend.batch_set_value([(v, np.zeros((num_thresholds,)))
                             for v in (self.true_negatives,
                                       self.false_negatives)])

  def get_config(self):
    config = {
        'thresholds': self.init_thresholds,
        'top_k': self.top_k,
        'class_id': self.class_id
    }
    base_config = super(NPV, self).get_config()
    return dict(list(base_config.items()) + list(config.items()))

# **Metrics available for use**

In [18]:
# Always run this cell.
precision = tf.keras.metrics.Precision(name = "precision")
recall = tf.keras.metrics.Recall(name = "recall")
specificity = Specificity(name = "specificity")
TP = tf.keras.metrics.TruePositives(name = "TP") 
FP = tf.keras.metrics.FalsePositives(name = "FP")
FN = tf.keras.metrics.FalseNegatives(name = "FN")
TN = tf.keras.metrics.TrueNegatives(name = "TN")
auc = tf.keras.metrics.AUC(name = "auc")
npv = NPV(name = "NPV")

# **Fit model on the training dataset**

In [19]:
#Run this cell to create model or use next cell to load existing model.
model_name = "Covid_Model_l2.h5" #Change name here for each new model

INPUT_SHAPE = (X_train.shape[1], X_train.shape[2])

model = keras.Sequential([              
   
    # LSTM layers
    layers.LSTM(64, input_shape=INPUT_SHAPE, return_sequences=True),
    layers.LSTM(64),
    
    # Head
    layers.Dense(64, activation='relu'),
    layers.Dropout(0.3),
    layers.Dense(1, activation='sigmoid')
])

model._name= model_name[:-3]

model_plots_folder = os.path.join(folder,'model_plots')

# Save model summaries to a text file.
with open(os.path.join(model_plots_folder,'Model_summaries.txt'), 'a') as f:
    with redirect_stdout(f):
        model.summary()

# Save model plots to individual png files.
plot_model(
    model,
    to_file=(os.path.join(model_plots_folder, model_name[:-3] + ".png")),
    show_shapes=True,
    show_dtype=False,
    show_layer_names=True,
    rankdir="TB",
    expand_nested=False,
    dpi=96,
    layer_range=None,
    show_layer_activations=False,
)

model.summary()
plot_model(model, show_shapes=True)

es_callback = EarlyStopping(monitor = 'val_auc',
                            mode='max',
                            min_delta = 0.01,
                            patience = 10,
                            restore_best_weights = True)

model.compile(
    optimizer=tf.keras.optimizers.Adam(epsilon=0.01),
    loss='binary_crossentropy',
    metrics=[TP, FP, FN, TN, 
             npv, specificity, 
             recall, auc
             ]
)

model_folder = '/content/drive/My Drive/Colab Notebooks/models/'

history = model.fit(
    train_dataset,
    validation_data=val_dataset,
    callbacks = [es_callback],
    epochs = 30
    )

model.save(model_folder + model_name)


Model: "Covid_Model_l2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 301, 64)           19968     
                                                                 
 lstm_1 (LSTM)               (None, 64)                33024     
                                                                 
 dense (Dense)               (None, 64)                4160      
                                                                 
 dropout (Dropout)           (None, 64)                0         
                                                                 
 dense_1 (Dense)             (None, 1)                 65        
                                                                 
Total params: 57,217
Trainable params: 57,217
Non-trainable params: 0
_________________________________________________________________
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoc

In [None]:
#Only run this cell if you want to reload model run previously.
model_folder = '/content/drive/My Drive/Colab Notebooks/models/'

custom_objects = {"NPV": NPV, "Specificity": Specificity}
with keras.utils.custom_object_scope(custom_objects):
    model = tf.keras.models.load_model(model_folder + "Covid_Model_m3.h5")

# **Apply model to test dataset.**

In [20]:
_,TP,FP,FN,TN, Spec, npv, Recall, AUC = model.evaluate(test_dataset)
print('Confusion Matrix: ', TP, FP, FN, TN)
print('Test NPV: %.4f' % npv)
print('Test Specificity: %.4f' % Spec)
print('Test Sensitivity: %.4f' % Recall)
print('Test ROC-AUC: %.4f' % AUC)

Confusion Matrix:  529.0 47.0 357.0 67.0
Test NPV: 0.5877
Test Specificity: 0.1580
Test Sensitivity: 0.5971
Test ROC-AUC: 0.6295
