In [1]:
import tensorflow as tf
from pathlib import Path
import os
import glob
import tensorflow_io as tfio
import numpy as np
import keras
from IPython.display import display, Audio
import matplotlib.pyplot as plt
import keras
import heartpy as hp


In [2]:
trainingFolder = "./heartbeats/classifications"
trainingpath = Path(trainingFolder)
paths = [Path(dir[0]) for dir in os.walk(trainingpath)][1:]

In [29]:
INPUT_SAMPLING_RATE = 44100 # standardized sampling rate
RESAMPLING_RATE = 20000
SAMPLING_RATE = 10000 # more than enough for low frequency heart beats
DATASET_AUDIO_PATH = "./heartbeats/classifications"
SHUFFLE_SEED=1234
VALID_SPLIT=.9
BATCH_SIZE=32

In [4]:
def paths_and_labels_to_dataset(audio_paths, labels):
    """Constructs a dataset of audios and labels."""
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(lambda x: path_to_audio(x))
    audio_ds = tf.data.Dataset.from_tensor_slices(tf.keras.preprocessing.sequence.pad_sequences( ## PAD to normalized Size
        audio_ds, dtype='float64', padding='post',
        ))
    for element in audio_ds:
        print(element)
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))


def path_to_audio(path):
    """Reads and decodes an audio file."""
    audio = tf.io.read_file(path)
    # audio, _  = tf.audio.decode_wav(audio, 1, SAMPLING_RATE) # decode at normalized rate of 44100
    audio, _  = tf.audio.decode_wav(audio, 1, SAMPLING_RATE) # decode at normalized rate of 44100
    audio = tfio.audio.resample(audio,INPUT_SAMPLING_RATE,RESAMPLING_RATE) ## Resample audio to manageable rate
    return audio


def audio_to_fft(audio):
    # Since tf.signal.fft applies FFT on the innermost dimension,
    # we need to squeeze the dimensions and then expand them again
    # after FFT
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    # Return the absolute value of the first half of the FFT
    # which represents the positive frequencies
    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])


# Get the list of audio file paths along with their corresponding labels

class_names = os.listdir(DATASET_AUDIO_PATH)
print("Our class names: {}".format(class_names,))

audio_paths = []
labels = []
for label, name in enumerate(class_names):
    print("Processing speaker {}".format(name,))
    dir_path = Path(DATASET_AUDIO_PATH) / name
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.endswith(".wav")
    ]
    audio_paths += speaker_sample_paths
    labels += [label] * len(speaker_sample_paths)

print(
    "Found {} files belonging to {} classes.".format(len(audio_paths), len(class_names))
)

# Shuffle
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(labels)

# Split into training and validation
num_val_samples = int(VALID_SPLIT * len(audio_paths))
print("Using {} files for training.".format(len(audio_paths) - num_val_samples))
train_audio_paths = audio_paths[:-num_val_samples]
train_labels = labels[:-num_val_samples]

print("Using {} files for validation.".format(num_val_samples))
valid_audio_paths = audio_paths[-num_val_samples:]
valid_labels = labels[-num_val_samples:]

# Create 2 datasets, one for training and the other for validation
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)


# Transform audio wave to the frequency domain using `audio_to_fft`
train_ds = train_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
train_ds = train_ds.prefetch(tf.data.AUTOTUNE)

valid_ds = valid_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.AUTOTUNE
)
valid_ds = valid_ds.prefetch(tf.data.AUTOTUNE)

NameError: name 'DATASET_AUDIO_PATH' is not defined

In [87]:
class RandomShift(tf.keras.layers.Layer):
    def __init__(self, sampling_rate=RESAMPLING_RATE, 
                 shift_max=.4, shift_direction='both', **kwargs):
        super(RandomShift, self).__init__(**kwargs)
        self.sampling_rate = sampling_rate
        self.shift_max = shift_max
        self.shift_direction = shift_direction
    
    def call(self, audio, training=None):
        if not training:
            return audio
        shift = tf.experimental.numpy.random.randint(self.sampling_rate * self.shift_max)
        if self.shift_direction == 'right':
            shift = -shift
        elif self.shift_direction == 'both':
            direction = tf.experimental.numpy.random.randint(0, 2)
            if direction == 1:
                shift = -shift
        augmented_data = tf.roll(audio, shift,0)
        # Set to silence for heading/ tailing
        # if shift > 0:
        #     augmented_data[:shift] = 0
        # else:
        #     augmented_data[shift:] = 0
        return augmented_data

In [94]:
model = keras.models.Sequential()
# model.add(keras.layers.Input(, name="input"))
# model.add(RandomShift())
weight_decay = 1e-4
model.add(keras.layers.Conv1D(32, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Conv1D(32, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())

model.add(keras.layers.MaxPooling1D(pool_size=1))
model.add(keras.layers.Dropout(0.2))

model.add(keras.layers.Conv1D(64, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Conv1D(64, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.MaxPooling1D(pool_size=1))
model.add(keras.layers.Dropout(0.3))
# model.add(keras.layers.LSTM(200))
model.add(keras.layers.Conv1D(128, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Conv1D(128, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.MaxPooling1D(pool_size=1))
model.add(keras.layers.Dropout(0.3))

model.add(keras.layers.Conv1D(256, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.Conv1D(256, 3, padding='same', kernel_regularizer=keras.regularizers.l2(weight_decay), activation='selu'))
model.add(keras.layers.BatchNormalization())
model.add(keras.layers.MaxPooling1D(pool_size=1))
model.add(keras.layers.Dropout(0.3))

model.add(keras.layers.Flatten())

model.add(keras.layers.Dense(128, activation='selu'))
model.add(keras.layers.BatchNormalization())

model.add(keras.layers.Dense(len(class_names)))


# model.build()

# model.summary()

# Compile the model using Adam's default learning rate
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss="sparse_categorical_crossentropy", metrics=["accuracy",]
)

# Add callbacks:
# 'EarlyStopping' to stop training when the model is not enhancing anymore
# 'ModelCheckPoint' to always keep the model that has the best val_accuracy
# model_save_filename = "model.h5"

# earlystopping_cb = keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
# mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(
#     model_save_filename, monitor="val_accuracy", save_best_only=True
# )

In [95]:
history = model.fit(
    train_ds,
    epochs=200,
    validation_data=valid_ds,
    # callbacks=[earlystopping_cb, mdlcheckpoint_cb],
)

plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')
plt.show()
# _, test_acc = model.evaluate(test_images,  test_labels, verbose=2)

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200

In [75]:
print(model.evaluate(valid_ds))


[6.282197952270508, 0.2405063360929489]


In [9]:
SAMPLES_TO_DISPLAY = 10

test_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)


for audios, labels in test_ds.take(1):
    # Get the signal FFT
    ffts = audio_to_fft(audios)
    # Predict
    y_pred = model.predict(ffts)
    # Take random samples
    rnd = np.random.randint(0, BATCH_SIZE, SAMPLES_TO_DISPLAY)
    audios = audios.numpy()[rnd, :, :]
    labels = labels.numpy()[rnd]
    y_pred = np.argmax(y_pred, axis=-1)[rnd]

    for index in range(SAMPLES_TO_DISPLAY):
        # For every sample, print the true and predicted label
        # as well as run the voice with the noise
        print(
            "Speaker: {} - Predicted: {}".format(
                class_names[labels[index]],
                class_names[y_pred[index]],
            )
        )
        display(Audio(audios[index, :, :].squeeze(), rate=RESAMPLING_RATE))

tf.Tensor(
[[-1.30805908e-08]
 [ 4.08500647e-08]
 [-4.30870912e-08]
 ...
 [ 9.89601016e-03]
 [ 1.07517801e-02]
 [ 9.87135991e-03]], shape=(3628, 1), dtype=float64)
tf.Tensor(
[[-1.22661390e-07]
 [ 3.43625970e-07]
 [-1.64817038e-07]
 ...
 [-9.54116322e-03]
 [-9.82972421e-03]
 [-1.00763217e-02]], shape=(3628, 1), dtype=float64)
tf.Tensor(
[[ 4.93607211e-10]
 [-3.43029627e-09]
 [ 7.22245153e-09]
 ...
 [-4.14034585e-04]
 [-3.72261216e-04]
 [-1.29947992e-04]], shape=(3628, 1), dtype=float64)
tf.Tensor(
[[ 2.22123253e-09]
 [-7.23183602e-09]
 [ 3.62155927e-09]
 ...
 [-6.33899763e-04]
 [ 6.59338373e-04]
 [ 1.93601160e-03]], shape=(3628, 1), dtype=float64)
tf.Tensor(
[[-3.03568441e-08]
 [ 8.25640498e-08]
 [-7.61660175e-08]
 ...
 [ 9.28931125e-03]
 [ 6.56927377e-03]
 [ 8.89969617e-03]], shape=(3628, 1), dtype=float64)
tf.Tensor(
[[ 7.45346895e-08]
 [-2.07500719e-07]
 [ 9.35172579e-08]
 ...
 [-7.79128540e-03]
 [-7.66059570e-03]
 [-7.47565553e-03]], shape=(3628, 1), dtype=float64)
tf.Tensor(
[[ 1.

Speaker: aunlabelled - Predicted: normal


Speaker: murmur - Predicted: normal


Speaker: aunlabelled - Predicted: extrahls


Speaker: normal - Predicted: normal


Speaker: murmur - Predicted: normal


Speaker: normal - Predicted: extrahls


Speaker: normal - Predicted: normal


Speaker: aunlabelled - Predicted: extrahls


Speaker: normal - Predicted: normal


In [60]:
train_ds.shape

AttributeError: 'PrefetchDataset' object has no attribute 'shape'

In [64]:
print([i for i,_ in enumerate(train_ds)][-1] + 1)


1
