In [None]:
import os
import numpy as np
import math 

import tensorflow.compat.v1 as tf 
from tensorflow.compat.v1 import keras
tf.enable_eager_execution()

from pathlib import Path
from IPython.display import display, Audio

import matplotlib.pyplot as plt
from pydub import AudioSegment

In [None]:
RAW_DATASET_PATH = os.path.join("..", "animal_sounds")
DATASET_AUDIO_PATH = os.path.join("..", "animal_sounds_clips")
# The sampling rate to use.
# This is the one used in all of the audio samples.
# This will also be the output size of the audio wave samples
# (since all samples are of 1 second long)
SAMPLING_RATE = 44100
# percentage of recordings used for validation
TEST_RATIO = 0.15
CLIP_LEN = 500

In [None]:
# Seed to use when shuffling the dataset and the noise
SHUFFLE_SEED = 5123
BATCH_SIZE = 64
EPOCHS = 100

In [None]:
def paths_and_labels_to_dataset(audio_paths, labels):
    """Constructs a dataset of audios and labels."""
    path_ds = tf.data.Dataset.from_tensor_slices(audio_paths)
    audio_ds = path_ds.map(lambda x: path_to_audio(x))
    label_ds = tf.data.Dataset.from_tensor_slices(labels)
    return tf.data.Dataset.zip((audio_ds, label_ds))


def path_to_audio(path):
    """Reads and decodes an audio file."""
    audio = tf.io.read_file(path)
    audio, _ = tf.audio.decode_wav(audio, 1, SAMPLING_RATE)
    return audio

def audio_to_fft(audio):
    # Since tf.signal.fft applies FFT on the innermost dimension,
    # we need to squeeze the dimensions and then expand them again
    # after FFT
    audio = tf.squeeze(audio, axis=-1)
    fft = tf.signal.fft(
        tf.cast(tf.complex(real=audio, imag=tf.zeros_like(audio)), tf.complex64)
    )
    fft = tf.expand_dims(fft, axis=-1)

    # Return the absolute value of the first half of the FFT
    # which represents the positive frequencies
    return tf.math.abs(fft[:, : (audio.shape[1] // 2), :])


# Get the list of audio file paths along with their corresponding labels

class_names = os.listdir(DATASET_AUDIO_PATH)
print("Our class names: {}".format(class_names,))

to_skip = ["skin_animal", "skin_animal_valid", "liver_animal", "liver_animal_valid"]

train_audio_paths = []
valid_audio_paths = []
train_labels = []
valid_labels = []
for label, name in enumerate([c for c in class_names if c not in to_skip]):
    print("Processing material {}".format(name,))
    dir_path = Path(DATASET_AUDIO_PATH) / name
    speaker_sample_paths = [
        os.path.join(dir_path, filepath)
        for filepath in os.listdir(dir_path)
        if filepath.lower().endswith(".wav")
    ]
    label = label // 2 # coz every dir has a _valid copy
    if name.endswith("_valid"):
        valid_audio_paths += speaker_sample_paths
        valid_labels += [label] * len(speaker_sample_paths)
    else:
        train_audio_paths += speaker_sample_paths
        train_labels += [label] * len(speaker_sample_paths)
    print(f"Loaded {len(speaker_sample_paths)} files from class {label}.")
    
print(
    "Found {} files belonging to {} classes.".format(len(train_audio_paths) + len(valid_audio_paths), len(class_names)//2)
)

# Shuffle
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(train_audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(valid_audio_paths)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(train_labels)
rng = np.random.RandomState(SHUFFLE_SEED)
rng.shuffle(valid_labels)

print("Using {} files for training.".format(len(train_labels)))
print("Using {} files for validation.".format(len(valid_labels)))

# Create 2 datasets, one for training and the other for validation
train_ds = paths_and_labels_to_dataset(train_audio_paths, train_labels)
train_ds = train_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

valid_ds = paths_and_labels_to_dataset(valid_audio_paths, valid_labels)
valid_ds = valid_ds.shuffle(buffer_size=32 * 8, seed=SHUFFLE_SEED).batch(32)

# Transform audio wave to the frequency domain using `audio_to_fft`
train_ds = train_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)
train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)

valid_ds = valid_ds.map(
    lambda x, y: (audio_to_fft(x), y), num_parallel_calls=tf.data.experimental.AUTOTUNE
)
valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)


In [None]:
def residual_block(x, filters, conv_num=3, activation="relu"):
    # Shortcut
    s = keras.layers.Conv1D(filters, 1, padding="same")(x)
    for i in range(conv_num - 1):
        x = keras.layers.Conv1D(filters, 3, padding="same")(x)
        x = keras.layers.Activation(activation)(x)
    x = keras.layers.Conv1D(filters, 3, padding="same")(x)
    x = keras.layers.Add()([x, s])
    x = keras.layers.Activation(activation)(x)
    return keras.layers.MaxPool1D(pool_size=2, strides=2)(x)


def build_model(input_shape, num_classes):
    inputs = keras.layers.Input(shape=input_shape, name="input")

    x = residual_block(inputs, 16, 2)
    x = residual_block(x, 32, 2)
    x = residual_block(x, 64, 3)

    x = keras.layers.AveragePooling1D(pool_size=3, strides=3)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(64, activation="relu", kernel_regularizer=keras.regularizers.L1L2(l1=1e-5, l2=1e-4))(x)

    outputs = keras.layers.Dense(num_classes, activation="softmax", name="output")(x)

    return keras.models.Model(inputs=inputs, outputs=outputs)


model = build_model((SAMPLING_RATE // 2, 1), len(class_names))

model.summary()

# Compile the model using Adam's default learning rate
model.compile(
    optimizer="Adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"],
)

# Add callbacks:
# 'EarlyStopping' to stop training when the model is not enhancing anymore
# 'ModelCheckPoint' to always keep the model that has the best val_accuracy
model_save_filename = "model_41kHz_no_skin_no_liver.h5"

earlystopping_cb = keras.callbacks.EarlyStopping(patience=15, restore_best_weights=True)
mdlcheckpoint_cb = keras.callbacks.ModelCheckpoint(
    model_save_filename, monitor="accuraccy", save_best_only=True
)

In [None]:
CHECKPOINT_PATH = Path("model_41kHz_no_skin_no_liver.h5")
history = None
if os.path.exists(CHECKPOINT_PATH):
    model.load_weights(CHECKPOINT_PATH)
else:
    history = model.fit(
        train_ds,
        epochs=EPOCHS,
        validation_data=valid_ds,
        callbacks=[earlystopping_cb, mdlcheckpoint_cb],
    )

In [None]:
if history is not None:
    i = len(history.history["val_acc"])-1
    print("epoch", i, "\nacc", np.max(history.history["acc"][i]), "\nval_acc", history.history["val_acc"][i])

    fig, axs = plt.subplots(1, 2, figsize=(16, 6))

    axs[0].plot(history.history['acc'])
    axs[0].plot(history.history['val_acc'])
    axs[0].set_title('Model accuracy')
    axs[0].set_ylabel('Accuracy')
    axs[0].set_xlabel('Epoch')
    axs[0].legend(['training', 'validation'], loc='upper left')

    axs[1].plot(history.history['loss'])
    axs[1].plot(history.history['val_loss'])
    axs[1].set_title('Model loss')
    axs[1].set_ylabel('Loss')
    axs[1].set_xlabel('Epoch')
    axs[1].legend(['train', 'val'], loc='upper left')
    plt.show()

In [None]:
test_ds = paths_and_labels_to_dataset(train_audio_paths + valid_audio_paths, train_labels + valid_labels)
test_ds = test_ds.shuffle(buffer_size=BATCH_SIZE * 8, seed=SHUFFLE_SEED).batch(
    BATCH_SIZE
)

actual_class_names = [n for n in class_names if not n.endswith("_valid") and n not in to_skip]

confusion = np.zeros((len(actual_class_names),)*2, dtype=np.float32)
counter = 0
for audios, labels in test_ds.take(100):
    # Get the signal FFT
    ffts = audio_to_fft(audios)
    # Predict
    y_pred = model.predict(ffts)
    y_pred = np.argmax(y_pred, axis=-1)
    for idx, pred in enumerate(y_pred):
        counter += 1
        confusion[labels[idx], pred] += 1

print(counter)
diag_sum = np.sum(np.diag(confusion))
print(f"total={np.sum(confusion)}, on diagonal={diag_sum} totalAcc={diag_sum/np.sum(confusion)}")

for idx, label in enumerate(actual_class_names):
    confusion[idx, :] /= np.sum(confusion[idx, :])
    print(f"{idx} - {label}")

In [None]:
import seaborn as sn
import pandas as pd
import matplotlib.pyplot as plt
labels = [x.split("_")[0] for x in actual_class_names]
# labels = ["ribs", "kidney", "liver", "muscle", "skin"]
df_cm = pd.DataFrame(confusion, index=labels, columns=labels)
plt.figure(figsize=(12, 8))
plt.rcParams.update({'font.size': 16})
sn.heatmap(df_cm, annot=True, fmt=".3f", square=True, cbar=False, cmap="Blues", linewidths=3, vmin=0, vmax=1)
plt.xlabel("Predicted label", labelpad=16)
plt.ylabel("True label", labelpad=12)
plt.tick_params(axis='y', rotation=0)
plt.show()