In [None]:
import os
os.environ["TF_GPU_THREAD_MODE"] = "gpu_private"
os.environ["TF_AUTOTUNE_THRESHOLD"] = "3"
os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"
os.environ["TF_ENABLE_XLA"] = "true"
os.environ['TF_USE_CUDNN_BATCHNORM_SPATIAL_PERSISTENT'] = '1'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

import pandas as pd
import io
import matplotlib.pyplot as plt
import librosa
import random
import itertools

# import librosa.display
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import normalize
from tensorflow.keras.regularizers import l2
import warnings

warnings.filterwarnings("ignore")
import numpy as np
import pickle
import joblib
from sklearn.model_selection import train_test_split
from tensorflow.keras import models, layers
import tensorflow.keras as keras
import tensorflow as tf
from datetime import datetime

import asyncio
import concurrent.futures
import functools

# settings
physical_devices = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(physical_devices[0], True)
tf.config.set_logical_device_configuration(physical_devices[0],[tf.config.LogicalDeviceConfiguration(memory_limit=3500)])




<h1>Audio ML with multiple features</h1>

<h2>Loading the Data</h2>

In [None]:
basePath = "/home/birdo/MachineLearning/Data/"

In [None]:
df = pd.read_csv(basePath + "bird_sounds.csv")

In [None]:
df

In [None]:
df.species_id.unique()

## Train/val/test split

In [None]:
allImPath = basePath + "Images-512/all/"
allLabelPath = basePath + "Labels/all/"
valid_files = []
labels = []
for img_file in os.listdir(allImPath):
    if os.path.isfile(allImPath + img_file) and os.path.isfile(allLabelPath + img_file):
        if os.path.isfile(allLabelPath + img_file):
            valid_files.append(img_file)
            f = open(allLabelPath + img_file, "r")
            label = f.read()
            f.close()
            labels.append(label)
        else:
            print("No label could be found of: " + img_file)
    else:
        continue 

X_train, X_test, y_train, y_test = train_test_split(
    valid_files, labels, test_size=0.2, random_state=42, stratify=labels
)

X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.25, random_state=42, stratify=y_train
)  # 0.25 x 0.8 = 0.2
print(len(y_train), len(y_test), len(y_val))

## Prepare data generators

In [None]:
NUM_DCT_OUTPUTS = 32
MFCC_SIZE = 623
CHANNELS = 1

In [None]:
class Sound_Generator(keras.utils.Sequence):
    def __init__(self, image_filenames, labels, batch_size, directory):
        self.image_filenames = image_filenames
        self.labels = labels
        self.batch_size = batch_size
        self.imBasePath = basePath + str("Images-512/") + directory
        self.labelBasePath = basePath + str("Labels/") + directory
        self.executor = concurrent.futures.ThreadPoolExecutor(8)
        random.seed(42)

    def __get_all_labels__(self):
        y = np.asarray(self.labels, dtype=np.float32)
        return y

    def __on_epoch_end(self):
        c = list(zip(self.image_filenames, self.labels))
        random.shuffle(c)
        self.image_filenames, self.labels = zip(*c)

    def __len__(self):
        return (np.ceil(len(self.image_filenames) / float(self.batch_size))).astype(
            np.int
        )

    def __getitem__(self, idx):
        batch_x = self.image_filenames[
            idx * self.batch_size : (idx + 1) * self.batch_size
        ]
        batch_y = self.labels[idx * self.batch_size : (idx + 1) * self.batch_size]

        X = self.get_images_data(batch_x, self.imBasePath)

        X = np.asarray(X, dtype=np.float32)
        y0 = np.asarray(batch_y, dtype=np.float32)

        return X, y0

    def get_image_data(self,file_path):
        _data = np.fromfile(file_path, dtype=np.float64)
        return _data.reshape((MFCC_SIZE, NUM_DCT_OUTPUTS, CHANNELS))

    def get_images_data(self,_batch_x, _imBasePath):
        tasks = [
            self.executor.submit(self.get_image_data,_imBasePath + _image_f)
            for _image_f in _batch_x
        ]
        return [task.result() for task in tasks]

## Load an image

In [None]:
X_test_test = X_test[15000]
print(X_test_test)
image = np.fromfile((allImPath + X_test_test), dtype=np.float64).reshape(
    (MFCC_SIZE,NUM_DCT_OUTPUTS, CHANNELS)
)
print(image.shape)
plt.figure(figsize=(8, 8))
plt.imshow(image)

In [None]:
batch_size = 512

train_batch_generator = Sound_Generator(X_train, y_train, batch_size, "all/")
val_batch_generator = Sound_Generator(X_val, y_val, batch_size, "all/")
test_batch_generator = Sound_Generator(X_test, y_test, batch_size, "all/")
mini_train_batch_generator = Sound_Generator(
    X_train[:400], y_train[:400], batch_size, "all/"
)
mini_test_batch_generator = Sound_Generator(
    X_val[:100], y_val[:100], batch_size, "all/"
)

## Build the model

In [None]:
input_shape = (MFCC_SIZE, NUM_DCT_OUTPUTS, CHANNELS)
n_classes = df.species_id.unique().shape[0]
CNNmodel = models.Sequential()

CNNmodel.add(layers.Conv2D(16, kernel_size=(3,3),input_shape=input_shape, activation='tanh', padding='same', name='conv2d_tanh', kernel_regularizer=l2(0.0001)))
CNNmodel.add(layers.MaxPooling2D(pool_size=(2,2), name='maxpool2d_1'))
CNNmodel.add(layers.Dropout(rate=0.1, name='dropout_1'))

CNNmodel.add(layers.Conv2D(32, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_1',kernel_regularizer=l2(0.0001)))
CNNmodel.add(layers.MaxPooling2D(pool_size=(2,2), name='maxpool2d_2'))
CNNmodel.add(layers.Dropout(rate=0.1, name='dropout_2'))

CNNmodel.add(layers.Conv2D(32, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_2',kernel_regularizer=l2(0.0001)))
CNNmodel.add(layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_3'))
CNNmodel.add(layers.Dropout(rate=0.1, name='dropout_3'))

CNNmodel.add(layers.Conv2D(64, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_3',kernel_regularizer=l2(0.0001)))
CNNmodel.add(layers.MaxPooling2D(pool_size=(2,2), padding='same', name='max_pool_2d_4'))
CNNmodel.add(layers.Dropout(rate=0.1, name='dropout_4'))

CNNmodel.add(layers.Conv2D(64, kernel_size=(3,3), activation='relu', padding='same', name='conv2d_relu_4', kernel_regularizer=l2(0.0001)))
CNNmodel.add(layers.Dropout(rate=0.1, name='dropout_5'))
CNNmodel.add(layers.Flatten(name='flatten'))

CNNmodel.add(layers.Dense(64, activation='relu', activity_regularizer=l2(0.0001),kernel_regularizer=l2(0.0001), name='dense'))
CNNmodel.add(layers.Dropout(rate=0.1, name='dropout_6'))
CNNmodel.add(layers.Dense(32, activation="relu", name='dense_2'))
CNNmodel.add(layers.Dropout(rate=0.1, name='dropout_7'))
CNNmodel.add(layers.Dense(n_classes, activation='softmax', name='softmax'))
CNNmodel.summary()

In [None]:
load_model = True
optimizer = tf.keras.optimizers.Adamax(learning_rate=0.001)
if load_model:
    CNNmodel = tf.keras.models.load_model(basePath + str("/Models/MFCCOnly"))
    # set learning rate
    CNNmodel.optimizer.learning_rate = 0.0001
else:
    CNNmodel.compile(
        optimizer=optimizer,
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
        metrics=["accuracy"],
        jit_compile=True,
    )

## Define tensorflow callback

In [None]:
log_dir = basePath + "logs/MFCCOnly/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1,profile_batch='600, 620')
file_writer_cm = tf.summary.create_file_writer(log_dir + "/cm")

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=basePath + str("/Models/MFCCOnly"),
    save_weights_only=False,
    monitor="val_accuracy",
    save_best_only=True,
)

### Define Confusion Matrix callback

In [None]:
def plot_confusion_matrix(cm, class_names):
    """
    Returns a matplotlib figure containing the plotted confusion matrix.

    Args:
       cm (array, shape = [n, n]): a confusion matrix of integer classes
       class_names (array, shape = [n]): String names of the integer classes
    """

    figure = plt.figure(figsize=(10, 10))

    # Normalize the confusion matrix.
    cm = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], decimals=2)

    plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
    plt.title("Confusion matrix")
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)

    

    # Use white text if squares are dark; otherwise black.
    threshold = cm.max() / 2.0

    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        color = "white" if cm[i, j] > threshold else "black"
        plt.text(j, i, cm[i, j], horizontalalignment="center", color=color)

    plt.tight_layout()
    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    return figure

In [None]:
def plot_to_image(figure):
    """
    Converts the matplotlib plot specified by 'figure' to a PNG image and
    returns it. The supplied figure is closed and inaccessible after this call.
    """

    buf = io.BytesIO()

    # Use plt.savefig to save the plot to a PNG in memory.
    plt.savefig(buf, format="png")

    # Closing the figure prevents it from being displayed directly inside
    # the notebook.
    plt.close(figure)
    buf.seek(0)

    # Use tf.image.decode_png to convert the PNG buffer
    # to a TF image. Make sure you use 4 channels.
    image = tf.image.decode_png(buf.getvalue(), channels=4)

    # Use tf.expand_dims to add the batch dimension
    image = tf.expand_dims(image, 0)

    return image

In [None]:
def log_confusion_matrix(epoch, logs):
    batch_generator = val_batch_generator

    figure = generate_cm(batch_generator)
    cm_image = plot_to_image(figure)

    # Log the confusion matrix as an image summary.
    with file_writer_cm.as_default():
        tf.summary.image("Confusion Matrix", cm_image, step=epoch)

In [None]:
def generate_cm(generator):
    y_pred_raw = CNNmodel.predict(generator)

    
    y_pred = np.argmax(y_pred_raw, axis=1)
    y_true = generator.__get_all_labels__()
    cm = confusion_matrix(y_true, y_pred)
    figure = plot_confusion_matrix(cm, class_names=["Wilde Eend", "Tjiftjaf", "Koolmees", "Houtduif", "Huismus", "Merel", "Winterkoning", "Fitis", "Vink", "Spreeuw", "Geen Vogel"])
    return figure

In [None]:
cm_callback = keras.callbacks.LambdaCallback(on_epoch_end=log_confusion_matrix)

## Calculate class weights

In [None]:
weights = []
for specie in df.species_id.unique():
    amount = len(df[df["species_id"] == specie])
    weight = (1 / amount) * (len(df) / len(df.species_id.unique()))
    weights.append(weight)
zip_weights = zip(range(0, len(df.species_id.unique())), weights)
class_weights = {}
for (
    i,
    w,
) in zip_weights:
    class_weights[i] = w
print(class_weights)

## Train the model

In [None]:
train_gen = train_batch_generator
val_gen = val_batch_generator

CNNmodel.fit(
    train_gen,
    epochs=256,
    verbose=1,
    validation_data=val_gen,
    callbacks=[tensorboard_callback, model_checkpoint_callback, cm_callback],
    class_weight=class_weights,
)

## Test the model

In [None]:
loss, accuracy = CNNmodel.evaluate(test_batch_generator, verbose=1)
print("Test: accuracy = %f  ;  loss = %f" % (accuracy, loss))

conf_matrix = generate_cm(test_batch_generator)
plt.show()

In [None]:
# Convert the model.


converter = tf.lite.TFLiteConverter.from_keras_model(CNNmodel)
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]
converter.inference_input_type = tf.float32
converter.inference_output_type = tf.float32
converter.optimizations = {tf.lite.Optimize.DEFAULT}
# set too support only float32 input and output
converter.target_spec.supported_types = [tf.float32]



tflite_model = converter.convert()
# print size of model
print("Size of model: ", len(tflite_model) / 1024, " kb")

with open(basePath+'model.tflite', 'wb') as f:
  f.write(tflite_model)