In [None]:
import numpy as np
import keras
import tensorflow as tf
import torch
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
from enum import Enum, auto

class ModelType(Enum):
    TORCH = auto()
    TFLITE = auto()

class InputType(Enum):
    MEL_SPEC = auto()
    TIME_SERIES = auto()

class ModelMeta:
    def __init__(self, name, path, model_type, input_type,quantized=False):
        self.name = name
        self.path = path
        self.type = model_type
        self.input_type = input_type
        self.quantized = quantized

#1 - Target, 0 - Non-target
def calculate_binary_classification_performance(predicted_labels, true_labels):
    y_true, y_pred = true_labels, predicted_labels
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

    accuracy = accuracy_score(y_true, y_pred)

    f1 = f1_score(y_true, y_pred)

    recall = tp / (tp + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    f2 = (1 + 2**2) * (precision * recall) / (2**2 * precision + recall)

    return {
        "TP": tp,
        "TN": tn,
        "FP": fp,
        "FN": fn,
        "Accuracy": accuracy,
        "Recall": recall,
        "Precision": precision,
        "TPR": recall,
        "FPR": fp / (fp + tn),
        "F1": f1,
        "F2": f2
    }

# data -> model -> softmax
# return softmax preds val idx 1 - Target, 0 - Non-target
#        true_labels
def inference_torch_model(model, data_loader, quantized=False):
    model.eval()
    model.to(DEVICE)
    prediction_softmax = []
    true_labels = []
    with torch.no_grad():
        for inputs, targets in data_loader:
            inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
            outputs = model(inputs)
            preds = torch.softmax(outputs, dim=1).cpu().numpy()
            prediction_softmax.append(preds)
            true_labels.append(targets.cpu().numpy())


    prediction_softmax = np.concatenate(prediction_softmax)
    true_labels = np.concatenate(true_labels)
    prediction_softmax[:, [0, 1]] = prediction_softmax[:, [1, 0]]
    true_labels = 1 - true_labels
    return prediction_softmax, true_labels

def evaluate_torch_model(model, data_loader, quantized=False, threshold_vals=None):
    prediction_score, true_labels = inference_torch_model(model, data_loader, quantized)
    metrics_on_t = []
    for t in threshold_vals:
        preds = prediction_score[:, 1]
        preds = preds > t
        metrics = calculate_binary_classification_performance(preds, true_labels)
        metrics['t'] = t
        metrics_on_t.append(metrics)
    return metrics_on_t


In [None]:
metas = [
    ModelMeta("StreamingCNNTiny", 
              "./AudioClassifierCNNNoReluTiny_16K_3s_raw_signal/model.51.0.9442029595375061.pth",
              ModelType.TORCH, InputType.TIME_SERIES
             ),
    ModelMeta("StreamingTransformer", "./model_transformer.pth", ModelType.TORCH, InputType.TIME_SERIES),
    
    ModelMeta("SqueezNet-Time-Series", 
              "./tf_implementation/time_series_models_to_test_with_RIOT_ML/squeezenet/squeezenet30%_time_series_16kHz_full_int_q.tflite", 
              ModelType.TFLITE, InputType.TIME_SERIES, quantized=True
             ),
    
    ModelMeta("CNN-Mel-Spec", "./tf_implementation/spectrogram_models_to_test_with_RIOT_ML/cnn/cnn_mel_spec_16kHz_full_int_q.tflite", 
              ModelType.TFLITE, InputType.MEL_SPEC, quantized=True
             ),
    
    ModelMeta("SqueezNet-Mel-Spec", 
              "./tf_implementation/spectrogram_models_to_test_with_RIOT_ML/squeezenet/squeezenet_spec_16kHz_full_int_q.tflite", 
              ModelType.TFLITE, InputType.MEL_SPEC, quantized=True),
]

In [None]:
#pytorch init of data and model class
import librosa
def resample_audio(y, orig_sr, target_sr):
    y = librosa.resample(y, orig_sr=orig_sr, target_sr=target_sr, res_type='zero_order_hold')
    return y

from CNNModels import AudioClassifierCNNNoRelu, AudioClassifierCNNNoReluSmall, AudioClassifierCNNNoReluTiny, AudioClassifierCNNNoReluPico
from TransformerModel import RawAudioTransformerModel

from torch.utils.data import DataLoader
from AudioDataset import AudioDataset, RawAudioDataset

train_target_folder = "../BirdSound/roman_data/cut-data/training/target/"
train_non_target_folder = "../BirdSound/roman_data/cut-data/training/non_target/"

validation_target_folder = "../BirdSound/roman_data/cut-data/validation/target/"
validation_non_target_folder = "../BirdSound/roman_data/cut-data/validation/non_target/"

test_target_folder = "../BirdSound/roman_data/cut-data/testing/target/"
test_non_target_folder = "../BirdSound/roman_data/cut-data/testing/non_target/"

SAMPLE_RATE = 48000
RESAMPLE_RATE = 16000
FIXED_LEN_IN_SEC = 3
FIXED_LEN_WAVE = RESAMPLE_RATE * FIXED_LEN_IN_SEC
NUM_WORKERS = 10
BATCH_SIZE = 256
RESAMPLE_TRANSFORM = lambda x : torch.from_numpy(resample_audio(x.numpy(), SAMPLE_RATE, RESAMPLE_RATE))


train_dataset = RawAudioDataset(train_target_folder, train_non_target_folder,fixed_length_wave= FIXED_LEN_WAVE, transform=RESAMPLE_TRANSFORM)
train_dataloader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle = True, num_workers=NUM_WORKERS)

validation_dataset = RawAudioDataset(validation_target_folder, validation_non_target_folder,fixed_length_wave = FIXED_LEN_WAVE, transform=RESAMPLE_TRANSFORM)
validation_dataloader = DataLoader(validation_dataset, batch_size = BATCH_SIZE,  num_workers = NUM_WORKERS, shuffle = True)

test_dataset = RawAudioDataset(test_target_folder, test_non_target_folder,fixed_length_wave = FIXED_LEN_WAVE, transform=RESAMPLE_TRANSFORM)
test_dataloader = DataLoader(validation_dataset, batch_size = BATCH_SIZE,  num_workers = NUM_WORKERS, shuffle = False)

In [None]:
import tensorflow as tf
import os
import numpy as np
import tensorflow_io as tfio

from tf_implementation.helper_functions import (
    create_spectrogram_features,
    lite_model_from_file_predicts_dataset,
)

# Take all audio from testing dataset and create spectrograms from them
# We will use spectrograms for models testing
directory = '../BirdSound/roman_data/cut-data/testing/'

#create mel-spec

# x_data = []
# y_data = []
# for root, dirs, files in os.walk(directory):
#     for file in files:
#         full_file_name = os.path.join(root, file)
#         if "non_target" in str(full_file_name):
#             class_encoded = 0
#         elif "target" in str(full_file_name):
#             class_encoded = 1

#         audio, sr = tf.audio.decode_wav(tf.io.read_file(full_file_name))
#         audio = tf.squeeze(audio, axis=-1)
#         resampled_audio = tfio.audio.resample(audio, rate_in=SAMPLE_RATE, rate_out=RESAMPLE_RATE)
#         # Prepare log mel spectrogram from audio
#         spectrogram_feature = create_spectrogram_features(resampled_audio, desired_length=FIXED_LEN_WAVE, sample_rate = RESAMPLE_RATE)
#         x_data.append(spectrogram_feature)
#         y_data.append(class_encoded)

# # input data should be in numpy array, not in list
# x_data_mel = np.array(x_data)
# y_data_labels = np.array(y_data)

# np.save("x_data_mel_test_tf.npy", x_data_mel)
# np.save("y_data_labels_test_tf.npy", y_data_labels)


#create raw audio
# x_data = []
# y_data = []
# desired_length_of_audio = FIXED_LEN_WAVE
# for root, dirs, files in os.walk(directory):
#     for file in files:
#         full_file_name = os.path.join(root, file)

#         if "non_target" in str(full_file_name):
#             class_encoded = 0
#         elif "target" in str(full_file_name):
#             class_encoded = 1

#         audio, sr = tf.audio.decode_wav(tf.io.read_file(full_file_name))
#         audio = tf.squeeze(audio, axis=-1)
#         resampled_audio = tfio.audio.resample(audio, rate_in=SAMPLE_RATE, rate_out=RESAMPLE_RATE)
#         audio_length = tf.shape(resampled_audio)[0]
#         if audio_length < desired_length_of_audio:
#             resampled_audio = tf.pad(resampled_audio, [[0, desired_length_of_audio - audio_length]], mode='CONSTANT')
#         else:
#             resampled_audio = resampled_audio[:desired_length_of_audio]
#         resampled_audio = tf.expand_dims(resampled_audio, axis=-1).numpy()

#         x_data.append(resampled_audio)
#         y_data.append(class_encoded)

# # input data should be in numpy array, not in list
# x_data_raw = np.array(x_data)
# y_data_raw = np.array(y_data)

In [None]:
x_data_mel_train = np.load("x_data_mel_train_tf.npy")
x_data_ts_train = np.load("x_data_ts_train_tf.npy")
y_data_labels_train = np.load("y_data_labels_train_tf.npy")
x_data_mel_test = np.load("x_data_mel_test_tf.npy")
x_data_ts_test = np.load("x_data_ts_test_tf.npy")
y_data_labels_test = np.load("y_data_labels_test_tf.npy")

def run_full_int_q_tflite_model(tflite_file, test_image_indices, x_data):
    # Initialize the interpreter
    interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
    interpreter.allocate_tensors()

    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]

    predictions = np.zeros((len(test_image_indices),), dtype=float)
    for i, test_image_index in enumerate(test_image_indices):
        test_data_point = x_data[test_image_index]

        # Check if the input type is quantized, then rescale input data to uint8
        if input_details['dtype'] == np.uint8:
            input_scale, input_zero_point = input_details["quantization"]
            test_data_point = test_data_point / input_scale + input_zero_point

        test_data_point = np.expand_dims(test_data_point, axis=0).astype(input_details["dtype"])
        interpreter.set_tensor(input_details["index"], test_data_point)
        interpreter.invoke()
        output = interpreter.get_tensor(output_details["index"])[0]

        predictions[i] = tf.nn.softmax(tf.cast(output, tf.float32))[1]

    return predictions

def full_int_model_predict(tflite_file, x_data):
    test_image_indices = range(len(x_data))
    predictions = run_full_int_q_tflite_model(tflite_file, test_image_indices, x_data)
    return predictions

# data -> model -> softmax
# return softmax validx 1 - Target, 0 - Non-target
def inference_tflite_model(model_path, data, quantized=False):
    if quantized:
        prediction_score = full_int_model_predict(model_path, data)
    return prediction_score


def evaluate_tflite_model(model_path, x_data, y_data, quantized=False, threshold_vals=None):
    prediction_score, true_labels = inference_tflite_model(model_path, x_data, quantized), y_data
    metrics_on_t = []
    for t in threshold_vals:
        preds = prediction_score
        preds = preds >= t
        metrics = calculate_binary_classification_performance(preds, true_labels)
        metrics['t'] = t
        metrics_on_t.append(metrics)
    return metrics_on_t

In [None]:
#Metrics on training dataset


t = [x * 0.01 for x in range(0, 101)]

metrics_diff_models = []

for meta in metas:
    if meta.type == ModelType.TORCH:
        if meta.name == "StreamingCNNTiny":
            model = AudioClassifierCNNNoReluTiny()
        if meta.name == "StreamingTransformer":
            model = RawAudioTransformerModel(num_classes=2, n_embd=16, n_head=1, block_size=16, hidden_size=32, n_layers=1)
        model.load_state_dict(torch.load(meta.path))
        metrics = evaluate_torch_model(model, train_dataloader, False, t)

    elif meta.type == ModelType.TFLITE:
        if meta.input_type == InputType.MEL_SPEC:
            metrics = evaluate_tflite_model(meta.path, x_data_mel_train, y_data_labels_train, meta.quantized, t)
        elif meta.input_type == InputType.TIME_SERIES:
            metrics = evaluate_tflite_model(meta.path, x_data_ts_train, y_data_labels_train, meta.quantized, t)
    metrics_diff_models.append({'name': meta.name, 'path':  meta.path, 'metrics': metrics})
import pickle
with open('metrics_diff_models_train.pkl', 'wb') as f:
    pickle.dump(metrics_diff_models, f)

In [None]:
#Metrics on test dataset


t = [x * 0.01 for x in range(0, 101)]

metrics_diff_models = []

for meta in metas:
    if meta.type == ModelType.TORCH:
        if meta.name == "StreamingCNNTiny":
            model = AudioClassifierCNNNoReluTiny()
        if meta.name == "StreamingTransformer":
            model = RawAudioTransformerModel(num_classes=2, n_embd=16, n_head=1, block_size=16, hidden_size=32, n_layers=1)
        model.load_state_dict(torch.load(meta.path))
        metrics = evaluate_torch_model(model, test_dataloader, False, t)

    elif meta.type == ModelType.TFLITE:
        if meta.input_type == InputType.MEL_SPEC:
            metrics = evaluate_tflite_model(meta.path, x_data_mel_test, y_data_labels_test, meta.quantized, t)
        elif meta.input_type == InputType.TIME_SERIES:
            metrics = evaluate_tflite_model(meta.path, x_data_ts_test, y_data_labels_test, meta.quantized, t)
    metrics_diff_models.append({'name': meta.name, 'path':  meta.path, 'metrics': metrics})

# np.save("metrics_diff_models_test.npy", metrics_diff_models)
import pickle
with open('metrics_diff_models_test.pkl', 'wb') as f:
    pickle.dump(metrics_diff_models, f)

In [None]:
audio_transformer = RawAudioTransformerModel(num_classes=2, n_embd=16, n_head=1, block_size=16, hidden_size=32, n_layers=1)

In [None]:
from torchinfo import summary
summary(audio_transformer, input_size=(1, 1, 48000))