<a href="https://colab.research.google.com/github/Bepo2002/Humbug-4YP/blob/main/Test_bench_V4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Test Bench V4

### Test bench V3 with genus classification

## Setup env

In [None]:
%pip install visualkeras
%pip install pycodestyle pycodestyle_magic
%pip install flake8
%pip install -q -U tensorboard-plugin-profile
%load_ext pycodestyle_magic
%pip install scikit-plot



In [None]:
import pandas as pd
import zipfile
import matplotlib.pyplot as plt
from scipy.io import wavfile
import numpy as np
import tensorflow as tf
from IPython.display import Audio
import matplotlib.colors as mclr
import librosa
import visualkeras
from sklearn.model_selection import train_test_split
from keras.callbacks import EarlyStopping
from pathlib import Path
from time import strftime
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import scikitplot as skplt
from sklearn.metrics import RocCurveDisplay
import csv
from collections import Counter

## Functions

In [None]:
# Finds specific sound in zip and extracts it
def extract_sound(filename, silent=False):
    zip_files = ['./drive/MyDrive/4YP/train.zip',
                 './drive/MyDrive/4YP/dev.zip']
    folders = ['train/', 'dev/a/', 'dev/b/']
    dest_dir = './drive/MyDrive/4YP/Data'
    filename_to_extract = str(filename) + '.wav'

    for zip_path in zip_files:
        for folder in folders:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                filename_to_extract = folder + filename_to_extract
                if filename_to_extract in zip_ref.namelist():
                    zip_ref.extract(filename_to_extract, dest_dir)
                    if not silent:
                        print(f"{filename_to_extract} has been extracted",
                              f"to {dest_dir}")
                    final = './drive/MyDrive/4YP/Data/' + filename_to_extract
                    return wavfile.read(final)
    print(f"{filename_to_extract} not found in any of the provided ZIP files.")


# Read in .csv
def csv_read():
    metadata = pd.read_csv('./drive/MyDrive/4YP/Data/humbugdb_zenodo_edited.csv')
    metadata = metadata.set_index('id')
    return metadata


def gaussian(x, mu, sigma):
    return np.exp(-((x - mu)**2) / (2 * sigma**2))


# Normalise audio
def normalize_audio(audio, target_dBFS=0):
    # Calculate the current dBFS of the audio
    rms = np.sqrt(np.mean(np.square(audio)))
    current_dBFS = 20 * np.log10(rms)

    # Calculate the required gain to achieve the target dBFS
    gain = target_dBFS - current_dBFS

    # Apply gain to the audio data
    normalized_audio = audio * (10 ** (gain / 20))

    return normalized_audio


def progress_bar(progress, total):
    percent = 100 * (progress / float(total))
    bar = '#' * int(percent) + '-' * (100 - int(percent))
    print("[%s] %d%%" % (bar, percent))

In [None]:
# Full function for generating noise + mosquito
def long_sound_gen(mos_num, SNR=1, silent=False):
    metadata = csv_read()

    # Choose a long noise sound
    subset = metadata[(metadata['sound_type'] == 'background') &
                      (metadata['length'] > 10) &
                      (metadata['sample_rate'] == 44100) &
                      (metadata['clean'] == "Y")]

    noise_id = subset.sample(random_state=np.random.randint(0,100)).index.values[0]
    print("noise_id: ", noise_id)

    # Extract the sound wave and find samplerate
    n_samplerate, noise = extract_sound(noise_id, silent)

    # Normalise
    noise = normalize_audio(noise/100)

    # Choose a shorter mosquito sound
    subset = metadata[(metadata['sound_type'] == 'mosquito') &
                      (metadata['length'] < 10) &
                      (metadata['length'] > 1) &
                      (metadata['sample_rate'] == 44100)]

    chosen = subset.sample(random_state=np.random.randint(0,100))
    mos_id = chosen.index.values[0]
    audio_spec = chosen.species.values[0]
    print("mos_id: ", mos_id)
    print("Species: ", audio_spec)

    # Import the sound wave and find samplerate
    samplerate, data = extract_sound(mos_id, silent)

    # Normalise
    data = normalize_audio(data/100)

    # Add sound to random time in noise
    # Initilise vectors to store new audio and whether each sample
    # is from mosquito or noise
    new_audio = np.zeros(noise.size)
    sound_cat = np.zeros(noise.size)
    old_audio = np.zeros(noise.size)

    for i in range(0, mos_num):

        # Choose a 'time' to put the middle of mosquito sound wave
        mosquito_time = np.random.randint(data.size//2,
                                          high=noise.size-data.size//2,
                                          dtype=int)
        if not silent:
            print("Adding mosquito noise at ",
                  str(round(mosquito_time/samplerate, 3)), "s")

        # Put wave into long array
        new_audio[mosquito_time - data.size//2:(mosquito_time - data.size//2)+data.size] = data
        sound_cat[mosquito_time - data.size//2:mosquito_time + data.size//2] = 1

        # Generate gaussian in same place as mos audio
        x = np.linspace(0, noise.size, noise.size, dtype=int)
        y = gaussian(x, mosquito_time, data.size/6)

        # Multiply to apply gaussian to audio
        old_audio = y*new_audio + old_audio

    # Add faded mosquito sound to background noise
    full_audio = noise/SNR + old_audio

    return full_audio, samplerate, sound_cat, audio_spec

In [None]:
# Convert numbers to 0 or 1 depending on threshold
def binarize(input_array, thresh=0):
    output_array = np.zeros(len(input_array))
    for i in range(len(input_array)):
        if input_array[i] > thresh:
            output_array[i] = 1
        else:
            output_array[i] = 0

    return output_array

In [None]:
# Chops the audio clip into sections and creates a vector of clips
def chop_chop(audio, samplerate, seconds_per_clip=1):
    # Choose bin size (default 1 second clip)
    bin_size = int(samplerate*seconds_per_clip)

    # Reshape Data
    rows = int(audio.size/bin_size)
    audio = audio[:rows*bin_size]
    audio = np.reshape(audio, (rows, bin_size))
    audio = np.concatenate((audio, np.zeros((rows, 1))), axis=1)
    return audio

In [None]:
def display_images(images, y_true, y_pred, num_images=10):
    plt.figure(figsize=(10, 10))
    plt.suptitle("Mislabelled images:")
    for i in range(num_images):
        plt.subplot(5, 5, i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(images[i], cmap=plt.cm.binary)
        plt.xlabel(f"True: {y_true[i]}, Pred: {y_pred[i]}")
    plt.show()

## More helper functions

In [None]:
def generate_training_data(mosquitoness=1, SNR=2, chop_size=3,
                           silent=False):
    # Create the long mixed audio (mosquito + noise)
    full_audio, samplerate, sound_cat, audio_spec = long_sound_gen(mosquitoness,
                                                       SNR, silent)

    # Chop audio into X second clips
    chopped_audio = chop_chop(full_audio, samplerate, chop_size)

    # Create an average 'mosquitoness' of each clip
    chopped_cat = chop_chop(sound_cat, samplerate, chop_size)
    av_chopped_cat = np.mean(chopped_cat, axis=1)

    # Generate spectrogram for each clip
    mel_spect = librosa.feature.melspectrogram(y=chopped_audio[0],
                                               sr=samplerate, n_fft=1024,
                                               hop_length=256)
    spectro = np.zeros((len(chopped_audio), 128, mel_spect.shape[1]))
    for i in range(0, len(chopped_audio)):
        mel_spect = librosa.feature.melspectrogram(y=chopped_audio[i],
                                                   sr=samplerate, n_fft=1024,
                                                   hop_length=256)
        mel_spect = librosa.power_to_db(mel_spect, ref=np.max)
        mel_spect = (mel_spect + 80) / 80
        spectro[i] = mel_spect

    # Returns a number of seconds in audio x n_mfcc matrix and an array
    # of sound category (how much mosquito) in each X second chop
    return spectro, av_chopped_cat, audio_spec

In [None]:
def multiple_training_data(amount_of_audio, mosquitoness=2, SNR=2,
                           chop_size=3, silent=False):
    X, Y, audio_spec = generate_training_data(mosquitoness, SNR, chop_size,
                                  silent)
    audio_specs = np.tile(audio_spec, len(Y))
    for i in range(0, amount_of_audio-1):
        specto, av_chopped_cat, audio_spec = generate_training_data(mosquitoness,
                                                        SNR,
                                                        chop_size, silent)
        X = np.concatenate((X, specto))
        Y = np.concatenate((Y, av_chopped_cat))
        audio_specs = np.append(audio_specs, np.tile(audio_spec, len(av_chopped_cat)))
        progress_bar(i + 1, amount_of_audio)
    return X, Y, audio_specs

In [None]:
def get_run_logdir(root_logdir="./drive/MyDrive/my_logs", name="run_%Y_%m_%d_%H_%M_%S"):
    return Path(root_logdir) / strftime(name)

## Test Bench Function

In [None]:
def test_bench(test_model, sound_count, mosquitoness, SNR, chop_size, thresh,
               learning_rate, patient, name, kern=1):

    tf.keras.utils.set_random_seed(42)

    # Calculate the number of time bins to be generated per image
    time_size = int(1+((44100*chop_size)/256))
    print("Time size: ", time_size)


    # Create the keras model
    tf.keras.backend.clear_session()
    if test_model == 1:
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(filters=16, kernel_size=3, activation="relu",
                                   input_shape=(128, time_size, 1)),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Conv2D(filters=32, kernel_size=3, activation="relu"),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Conv2D(filters=64, kernel_size=3, activation="relu"),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(units=128, activation="relu"),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(units=1)
        ])
    else:
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(filters=16, kernel_size=(128,kern),
                                   activation="relu",
                                   input_shape=(128, time_size, 1)),
            tf.keras.layers.MaxPooling2D(pool_size=(1, 2)),
            tf.keras.layers.Dropout(0.5),

            tf.keras.layers.Conv2D(filters=32, kernel_size=(1,3),
                                   activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(1, 2)),
            tf.keras.layers.Dropout(0.5),

            tf.keras.layers.Conv2D(filters=64, kernel_size=(1,3),
                                   activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(1, 2)),
            tf.keras.layers.Dropout(0.5),

            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(units=20, activation="relu"),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(units=1)
        ])

    #test_model_2.summary()
    #visualkeras.layered_view(test_model_2, legend=True)


    # Create train and dev/val set
    print("----GENERATING DATA FOR TEST----")
    X, Y, audio_specs = multiple_training_data(sound_count, mosquitoness, SNR,
                                  chop_size, silent=True)

    print("")
    print("----BEFORE BINARISATION----")
    print("Mosquito Clips: ", np.count_nonzero(Y))
    print("Background Clips: ", len(Y)-np.count_nonzero(Y))
    print("Total: ", len(Y))

    Y_bin = binarize(Y, thresh)
    print("----AFTER BINARISATION----")
    print("Mosquito Clips: ", np.count_nonzero(Y_bin))
    print("Background Clips: ", len(Y_bin)-np.count_nonzero(Y_bin))
    print("Total: ", len(Y_bin), '\n')
    X_train, X_dev, y_train, y_dev, specs_train, specs_dev = train_test_split(X, Y_bin, audio_specs, test_size=0.2,
                                                      random_state=42)
    X_train = X_train.reshape(len(X_train), 128, -1, 1)
    X_dev = X_dev.reshape(len(X_dev), 128, -1, 1)

    print("Training set size: ", X_train.shape[0])
    print("Validation set size: ", X_dev.shape[0])
    print("")
    print("Input Shape: ", X_train.shape)
    print("----TRAINING----")

    # Train model

    model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                  optimizer=tf.keras.optimizers.Adam(learning_rate),
                  metrics=["accuracy"])

    early_stopping = EarlyStopping(monitor="val_loss",
                                   patience=patient,
                                   verbose=1,
                                   restore_best_weights=True)
    run_logdir = get_run_logdir(name=name)  # e.g., my_logs/run_2022_08_01_17_25_59
    tensorboard_cb = tf.keras.callbacks.TensorBoard(run_logdir,
                                                    profile_batch=(100, 200))
    history = model.fit(X_train, y_train, epochs=100,
                        validation_data=(X_dev, y_dev),
                        callbacks=[tensorboard_cb])

    # Evalute model performance

    y_pred = model.predict(X_dev)

    # Plotting of ROC curve
    y_prob = 1 / (1 + np.exp(-y_pred))
    y_prob = y_prob.flatten()
    y_pred = y_prob.round().astype(int)
    RocCurveDisplay.from_predictions(y_dev, y_prob)
    plt.show()

    # Species performance

    correct = y_pred == y_dev

    # Count the frequency of each class in both arrays
    counts1 = Counter(specs_dev[correct])
    counts2 = Counter(specs_dev)

    print(counts1)
    print(counts2)

    # Get a combined set of keys from both counters to include all possible classes
    all_classes = list(set(counts1.keys()) | set(counts2.keys()))

    # Prepare frequencies, setting to 0 for classes that don't appear in one of the arrays
    frequencies1 = [counts1[cls] for cls in all_classes]
    frequencies2 = [counts2[cls] for cls in all_classes]

    # Set the width of the bars
    barWidth = 0.35

    # Set position of bar on X axis
    r1 = (np.arange(len(all_classes)))*2
    r2 = [x + barWidth for x in r1]

    # Make the plot
    plt.figure(figsize=(12, 8))
    plt.bar(r1, frequencies1, color='blue', width=barWidth, edgecolor='grey', label='Correct identification')
    plt.bar(r2, frequencies2, color='red', width=barWidth, edgecolor='grey', label='Occurance in validation set')

    # Add xticks on the middle of the group bars
    plt.xlabel('Species', fontweight='bold')
    plt.xticks([(r + barWidth/2)*2 for r in range(len(all_classes))], all_classes)
    plt.ylabel('Frequency')
    plt.title('Accuracy of mosquito detection by species')

    # Specify the CSV file name
    csv_file = 'species_correct.csv'

    # Open the CSV file for writing
    with open(csv_file, 'w', newline='') as file:
        writer = csv.writer(file)

        # Write the header
        writer.writerow(['Species', 'Correct'])

        # Write the counts
        for item, count in counts1.items():
            writer.writerow([item, count])

    # Specify the CSV file name
    csv_file = 'species_total.csv'

    # Open the CSV file for writing
    with open(csv_file, 'w', newline='') as file:
        writer = csv.writer(file)

        # Write the header
        writer.writerow(['Species', 'Total'])

        # Write the counts
        for item, count in counts2.items():
            writer.writerow([item, count])

    # Create legend & Show graphic
    plt.legend()
    plt.show()

    correctly_classified_images = X_dev[correct]
    incorrectly_classified_images = X_dev[~correct]
    print("")
    print("----RESULTS----")
    print("Correct: ", len(correctly_classified_images))
    print("Incorrect: ", len(incorrectly_classified_images))

    true_labels = y_dev[~correct].astype(int)
    predicted_labels = y_pred[~correct]

    # Display first 10 incorrectly classified images
    display_images(incorrectly_classified_images, true_labels, predicted_labels,
                   num_images=min(25, len(incorrectly_classified_images)))

    confusion = confusion_matrix(y_dev, y_pred)
    true_pos = confusion[1][1]
    true_neg = confusion[0][0]
    false_pos = confusion[0][1]
    false_neg = confusion[1][0]

    print("True positive: " + str(true_pos))
    print("True negative: " + str(true_neg))
    print("False positive: " + str(false_pos))
    print("False negative: " + str(false_neg))

    sensitivity = (true_pos/(true_pos+false_neg)).round(3)
    specificity = (true_neg/(true_neg+false_pos)).round(3)

    print("Sensitivity: " + str(sensitivity))
    print("Specificity: " + str(specificity))
    print("Score: " + str((sensitivity+specificity).round(3)))
    disp = ConfusionMatrixDisplay(confusion,
                                  display_labels=["Background", "Mosquito"])
    disp.plot()

    return model, X_train, X_dev, y_train, y_dev, specs_train, specs_dev, y_pred

## Working Code

### Test Bench

In [None]:
# Run the test bench
comp_model, X_train, X_dev, y_train, y_dev, specs_train, specs_dev, y_pred = test_bench(test_model=1,
                                                                                   sound_count=500,
                                                                                   mosquitoness=1,
                                                                                   SNR=1,
                                                                                   chop_size=3,
                                                                                   thresh=0.1,
                                                                                   learning_rate=0.0005,
                                                                                   patient=10,
                                                                                   name="gen1")

### Mosquito classifier (Binary)

In [None]:
# Generate training set from only positive anopheles and culex

y_train_n = []
genera_train = []
index = []

# Only include if 1 and "an" or "culex"
for i in range(len(y_train)):
    genus = specs_train[i].partition(' ')[0]
    cat = y_train[i]
    if cat:
        if genus == "an":
            index.append(True)
            y_train_n.append(0)
            genera_train.append("anopheles")
        elif genus == "culex":
            index.append(True)
            y_train_n.append(1)
            genera_train.append("culex")
        else:
            index.append(False)
    else:
        index.append(False)

X_train_n = X_train[index]


# Only include if predicted 1 and "an" or "culex"
y_dev_n = []
genera_dev = []
index = []

for i in range(len(y_dev)):
    genus = specs_dev[i].partition(' ')[0]
    cat = y_pred[i]
    if cat:
        if genus == "an":
            index.append(True)
            y_dev_n.append(0)
            genera_dev.append("anopheles")
        elif genus == "culex":
            index.append(True)
            y_dev_n.append(1)
            genera_dev.append("culex")
        else:
            index.append(False)
    else:
        index.append(False)

X_dev_n = X_dev[index]
y_train_n = np.array(y_train_n)
y_dev_n = np.array(y_dev_n)
model = tf.keras.models.clone_model(comp_model)

print("----Training----")
print("Culex Clips: ", np.count_nonzero(y_train_n))
print("Anopheles Clips: ", len(y_train_n)-np.count_nonzero(y_train_n))
print("Total: ", len(y_train_n), '\n')

print("----Dev----")
print("Culex Clips: ", np.count_nonzero(y_dev_n))
print("Anopheles Clips: ", len(y_dev_n)-np.count_nonzero(y_dev_n))
print("Total: ", len(y_dev_n), '\n')

In [None]:
# Train model
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.Adam(learning_rate=0.0005),
                metrics=["accuracy"])

early_stopping = EarlyStopping(monitor="val_loss",
                                patience=10,
                                verbose=1,
                                restore_best_weights=True)
run_logdir = get_run_logdir(name="find_the_species")  # e.g., my_logs/run_2022_08_01_17_25_59
tensorboard_cb = tf.keras.callbacks.TensorBoard(run_logdir,
                                                profile_batch=(100, 200))
history = model.fit(X_train_n, y_train_n, epochs=100,
                    validation_data=(X_dev_n, y_dev_n),
                    callbacks=[tensorboard_cb])

In [None]:
# Evalute model performance

y_pred_n = model.predict(X_dev_n)

# Plotting of ROC curve
y_prob_n = 1 / (1 + np.exp(-y_pred_n))
y_prob_n = y_prob_n.flatten()
y_pred_n = y_prob_n.round().astype(int)
RocCurveDisplay.from_predictions(y_dev_n, y_prob_n)
plt.show()

correct = y_pred_n == y_dev_n

correctly_classified_images = X_dev_n[correct]
incorrectly_classified_images = X_dev_n[~correct]
print("")
print("----RESULTS----")
print("Correct: ", len(correctly_classified_images))
print("Incorrect: ", len(incorrectly_classified_images))

true_labels = y_dev_n[~correct].astype(int)
predicted_labels = y_pred_n[~correct]

# Display first 10 incorrectly classified images
display_images(incorrectly_classified_images, true_labels, predicted_labels,
                num_images=min(25, len(incorrectly_classified_images)))

confusion = confusion_matrix(y_dev_n, y_pred_n)
true_pos = confusion[1][1]
true_neg = confusion[0][0]
false_pos = confusion[0][1]
false_neg = confusion[1][0]

print("True positive: " + str(true_pos))
print("True negative: " + str(true_neg))
print("False positive: " + str(false_pos))
print("False negative: " + str(false_neg))

sensitivity = (true_pos/(true_pos+false_neg)).round(3)
specificity = (true_neg/(true_neg+false_pos)).round(3)

print("Sensitivity: " + str(sensitivity))
print("Specificity: " + str(specificity))
print("Score: " + str((sensitivity+specificity).round(3)))
disp = ConfusionMatrixDisplay(confusion,
                                display_labels=["Anopheles", "Culex"])
disp.plot()

### Analyse results

In [None]:
%load_ext tensorboard
%tensorboard --logdir=./drive/MyDrive/my_logs