# Main CNN model for bat call classification

In [2]:
# Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from typing import Callable
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, MaxPool2D, Dropout
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import load_model
from tensorflow_addons.metrics import F1Score
import math
import pickle
import cv2
import time
from sklearn.model_selection import train_test_split
from itertools import product


2023-12-18 14:29:23.348551: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-12-18 14:29:23.705908: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-12-18 14:29:23.705945: I tensorflow/compiler/xla/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2023-12-18 14:29:24.814716: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-

In [None]:
# class to track execution time of certain code events
class track_time:
    def __init__(self):
        self.events = []
        self.add('Start')
    def add(self, name: str) -> None:
        if name == "total":
            raise RuntimeError("Cant use the name 'total'.")
        self.events.append([name,time.time()])
    def get_time(self): # calculate time between events and total
        self.timed_events = {}
        for (n, event) in enumerate(self.events):
            elapsed_time = 0
            if n+1 == len(self.events):
                # last element
                elapsed_time = time.time() - event[1]
            else:
                elapsed_time = self.events[n+1][1] - event[1]
            self.timed_events[event[0]] = elapsed_time
        self.timed_events['total'] = time.time() - self.events[0][1]
        return self.timed_events
    def __str__(self):
        output = ""
        if not hasattr(self,'timed_events'):
            self.get_time()
        output += ("  Event tracked  |  Duration  \n")
        output += ("==============================\n")
        for name,duration in self.timed_events.items():
            output += (" "+name+"\t\t\t| "+str(round(duration,3))+"\n")
        return output

In [None]:
# timer
timer = track_time()
timer.add("Read in data")
# load image data s and reshape 
data = pd.read_pickle('images_df_numerical.pkl')
# convert to numpy array
X, y = data['data'], data['Species']
classes = X.unique()
image_size = X[0].size
samples = X.size
image_shape = (216,334,3) # height, width , channel
# reshape every row to the image, swap rgbs and scale to 0-1
X = [
    cv2.cvtColor(row.reshape(image_shape), cv2.COLOR_BGR2RGB).astype('float32')/255. 
    for row in X]
y = [row.astype('int32') for row in y]

In [None]:
timer.add("Split Train/Test")
# Cross Valiadation, wenn wir ein 
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1) 
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=1)

X_train = np.array(X_train)
y_train = np.array(y_train)

X_val = np.array(X_val)
y_val = np.array(y_val)

X_test = np.array(X_test)
y_test = np.array(y_test)

In [None]:
# hyperparameter
number_of_classes = classes.size()
pooling_size = (2, 2)
early_stopping = EarlyStopping(monitor='val_accuracy', patience=20, min_delta=0.001, start_from_epoch=15, restore_best_weights=True)
padding = "same"
epochs = 200
dropout_rate = 1 - 0.8 # ggf anpassen, wenn overfittet

def create_model(conv_kernel_sizes: list, conv_filter_nums: list, number_of_neurons: list, optimizer="adam", activation_function="relu") -> tf.model:
    f1 = F1Score(num_classes=number_of_classes, average="micro")

    model=Sequential()

    model.add(Conv2D(conv_filter_nums[0], conv_kernel_sizes[0],activation=activation_function,input_shape=image_shape,padding=padding))
    model.add(MaxPool2D(pooling_size))
    model.add(Dropout(dropout_rate))

    model.add(Conv2D(conv_filter_nums[1],conv_kernel_sizes[1],activation=activation_function, padding=padding))
    model.add(MaxPool2D(pooling_size))
    model.add(Dropout(dropout_rate))

    # Classficiation
    model.add(Flatten())
    model.add(Dense(number_of_neurons[0], activation=activation_function))
    model.add(Dropout(dropout_rate))

    model.add(Dense(number_of_neurons[1], activation=activation_function))
    model.add(Dropout(dropout_rate))

    model.add(Dense(number_of_neurons[2], activation=activation_function))
    model.add(Dropout(dropout_rate))

    # Output-Layer
    model.add(Dense(number_of_classes, activation="softmax"))
    model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy", f1])

    return model

In [None]:


batch_sizes = [8, 16, 32, 64, 128]
learning_rates = [0.0001, 0.001, 0.001]
conv_kernel_sizes = [(7,7), (3, 3)] # schauen, ob ggf. wir mehr layer benutzen
conv_filter_nums = [32, 64]
number_of_neurons = [256, 128, 64]
histories_with_params = list()

for batch_size, learning_rate in product(batch_sizes, learning_rates):
    model = create_model(conv_kernel_sizes, conv_filter_nums)

    history = model.fit(
        X_train,
        y_train,
        epochs=epochs,
        batch_size=batch_size,
        workers=8, # workers are number of cores
        callbacks=[early_stopping],
        validation_data=(X_val, y_val),
        verbose=1)
    
    parameters = {"bs": batch_size, "lr": learning_rate}

    history_with_param = {"history": history, "parameters": parameters}
    
    histories_with_params.append(history_with_param)

print(f"Epochs: {len(history.history['accuracy'])}")
print(f"Test Score: {round(model.evaluate(X_test, y_test)[1], 2)}%")

In [None]:
number_of_epochs = len(history.history["accuracy"])

for history_with_param in histories_with_params:
    plt.plot(history_with_param["history"].history["accuracy"], label="train_data accuracy")
    plt.plot(history_with_param["history"].history["val_accuracy"], label="val_data accuracy")
    plt.scatter(number_of_epochs, model.evaluate(X_test, y_test)[1], label="test_data accuracy", marker="x", c="g")
    plt.title(f"bs: {history_with_param["params"]["bs"]} lr: {history_with_param["params"]["lr"]}, Test Score: {round(model.evaluate(X_test, y_test)[1], 2)}%")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend(loc="upper left")
    plt.savefig(f"./testing/{history_with_param["params"]["bs"]}_{history_with_param["params"]["lr"]}.png",dpi=600)