In [4]:
""" Improving Anomaly Detection in Audio by Learning How it Flows in Time - Yishai Shor """

#########################################
########## Import Libraries #############
#########################################

import numpy as np

# Packages for loading and visualizing audio files
import librosa
import librosa.display
import os

# Packages for deep learning processing
import tensorflow as tf
from keras.models import Sequential, Model
from keras.layers import Dense, Flatten, Conv3D, MaxPooling3D, Dropout, BatchNormalization, Reshape, Input, InputLayer, GlobalAveragePooling3D
from tensorflow.keras.losses import MeanAbsoluteError


#########################################
########## Defining functions ###########
#########################################
def load_audio(kind):
    audio_paths = []
    if kind == 'train':
        audio_paths.append("/content/google_colab/train1")
    elif kind == 'test':
        audio_paths.extend(("/content/google_colab/test_normal1", "/content/google_colab/test_anomaly1"))

    # Convert the audio waveform to spectrogram
    Xdb = []
    for audio_path in audio_paths:
        audio_clips = os.listdir(audio_path)
        for audio_clip in audio_clips:
            x, sr = librosa.load(os.path.join(audio_path, audio_clip), sr=41000)
            X = librosa.stft(x)
            Xdb.append(librosa.amplitude_to_db(abs(X)))
    return Xdb


def create_regular_x(single_x):
    groups = len(single_x) // 41  # 25 groups
    X_temp = [single_x[groups*i:groups*(i+1)] for i in range(groups)]
    X_run_1 = [X_temp[i-2:i+3] for i in range(2, groups-2)]
    return X_run_1


def create_twice_fast_x(single_x):
    groups = len(single_x) // 41  # 25 groups
    X_temp = [single_x[groups*i:groups*(i+1)] for i in range(groups)]
    X_run_2 = [[X_temp[i-4], X_temp[i-1], X_temp[i], X_temp[i+1], X_temp[i+4]] for i in range(4, groups-4)]
    return X_run_2


def create_oposite_order_x(single_x):
    groups = len(single_x) // 41  # 25 groups
    X_temp = [single_x[groups*i:groups*(i+1)] for i in range(groups)]
    X_run_3 = [[X_temp[i+2],X_temp[i+1],X_temp[i],X_temp[i-1],X_temp[i-2]] for i in range(2, groups-2)]
    return X_run_3


def create_without_i(single_x):
    groups = len(single_x) // 41  # 25 groups
    X_temp = [single_x[groups*i:groups*(i+1)] for i in range(groups)]
    X_run_1 = [[X_temp[i-2], X_temp[i-1], X_temp[i+1], X_temp[i+2]] for i in range(2, groups-2)]
    Y_run_1 = [X_temp[i] for i in range(2, groups-2)]
    return X_run_1, Y_run_1

def create_labels(length, label_kind):
    return length*[label_kind]


def create_data_for_run(model_num, kind):
    X_train = load_audio(kind)
    X_train_regular = [create_regular_x(x) for x in X_train]
    X_train_twice_fast = [create_twice_fast_x(x) for x in X_train]
    X_train_opposite_order  = [create_oposite_order_x(x) for x in X_train]
    X_train_without_i, Y_train_without_i = [create_without_i(x) for x in X_train]

    if model_num != 3 and kind == 'train':
        if model_num == 1:
            x_validate = X_train_opposite_order
        elif model_num == 2:
            x_validate = X_train_twice_fast

        # create labels
        x_train = [item for sublist in X_train_regular for item in sublist]
        y_train = create_labels(len(x_train), 1)
        x_train += [item for sublist in x_validate for item in sublist]
        y_train += create_labels(len(x_train)-len(y_train), 0)

    elif model_num == 3 and kind == 'train':
        x_train = [item for sublist in X_train_without_i for item in sublist]
        y_train = [item for sublist in Y_train_without_i for item in sublist]

    elif kind == 'test' and model_num != 3:
        x_train = [item for sublist in X_train_regular for item in sublist]
        count_regular = len(x_train) // 2
        y_train = create_labels(count_regular, 1) + create_labels(count_regular, 0)

    elif kind == 'test' and model_num == 3:
        x_train = [item for sublist in X_train_without_i for item in sublist]
        y_train = [item for sublist in Y_train_without_i for item in sublist]

    x_train = np.asarray(x_train).astype('float32')
    y_train = np.asarray(y_train).astype('float32')

    del X_train_regular, X_train_twice_fast, X_train_opposite_order, X_train, Y_train_without_i, X_train_without_i

    return x_train, y_train


def anomaly_decision(prediction,threshold):
    model_prediction = []
    for i in range(0, len(prediction)-20, 21):
        temp = sorted(prediction[i:i+21])
        if sum(temp[:3]) < threshold:
            model_prediction.append(0)
        else:
            model_prediction.append(1)
    return model_prediction


def anomaly_decision_model_3(mae_prediction, limit=2):
    mean_mae_train = np.mean(mae_prediction)
    sd_mae_train = np.std(mae_prediction)
    decision = np.where(np.abs(mae_prediction - mean_mae_train) / sd_mae_train < limit, 1, 0)
    return decision.tolist()


def test_observation_min_value(new_list):
    model_prediction = [round(min(new_list[i:i+21])) for i in range(0, len(new_list)-20, 21)]
    return model_prediction


def accuracy(list1, list2):
    return 1 - sum(abs(x - y) for x, y in zip(list1, list2)) / len(list1)

def compute_mae(list1, list2):
    mae = tf.keras.losses.MeanAbsoluteError()
    model_mae = []
    list1_ok = []
    list2_ok = []

    # Creating lists of subsequences
    for temp_old_list in [list1, list2]:
        for i in range(0, len(temp_old_list) - 20, 21):
            temp = temp_old_list[i:i + 21]
            if temp_old_list is list1:
                list1_ok.append(temp)
            else:
                list2_ok.append(temp)

    # Compute MAE for each sequence
    for x, y in zip(list1_ok, list2_ok):
        model_mae.append(mae(x, y).numpy())

    return model_mae

def apply_model():
    # Create the model
    model = Sequential()
    model.add(Conv3D(32, kernel_size=(3, 3, 3), activation='relu', kernel_initializer='he_uniform'))
    model.add(MaxPooling3D(pool_size=(1,2,2)))
    model.add(BatchNormalization(center=True, scale=True))
    model.add(Dropout(0.5))
    model.add(Conv3D(64, kernel_size=(2,2,2), activation='relu', kernel_initializer='he_uniform'))
    model.add(MaxPooling3D(pool_size=(1,2,2)))
    model.add(BatchNormalization(center=True, scale=True))
    model.add(Dropout(0.5))
    model.add(Conv3D(128, kernel_size=(2,2,2), activation='relu', kernel_initializer='he_uniform'))
    model.add(MaxPooling3D(pool_size=(1, 2, 2)))
    model.add(BatchNormalization(center=True, scale=True))
    model.add(Dropout(0.5))
    model.add(Flatten())
    model.add(Dense(36, activation='relu', kernel_initializer='he_uniform'))
    model.add(Dense(24, activation='relu', kernel_initializer='he_uniform'))
    model.add(Dense(1, activation='sigmoid'))
    # Compile the model
    model.compile(loss='binary_crossentropy',
                  optimizer=tensorflow.keras.optimizers.Adam(learning_rate=0.0001),
                  metrics=['accuracy'])
    return model

def build_autoencoder(audio_file_shape, code_size):
    # The encoder
    encoder = Sequential()
    encoder.add(Conv3D(32, kernel_size=(1, 3, 3), activation='relu', kernel_initializer='he_uniform', input_shape=audio_file_shape))
    encoder.add(Dropout(0.5))
    encoder.add(Conv3D(32, kernel_size=(1, 2, 2), activation='relu', kernel_initializer='he_uniform'))
    encoder.add(Dropout(0.5))
    encoder.add(GlobalAveragePooling3D())
    encoder.add(Dense(code_size))

    # The decoder
    decoder = Sequential()
    decoder.add(Dense(audio_file_shape[1] * audio_file_shape[2] * audio_file_shape[3], input_shape=(code_size,)))
    decoder.add(Reshape((audio_file_shape[1], audio_file_shape[2], audio_file_shape[3])))
    decoder.add(Conv3DTranspose(32, kernel_size=(1, 2, 2), activation='relu', kernel_initializer='he_uniform'))
    decoder.add(Dropout(0.5))
    decoder.add(Conv3DTranspose(32, kernel_size=(1, 3, 3), activation='relu', kernel_initializer='he_uniform'))
    decoder.add(Dropout(0.5))
    decoder.add(Conv3DTranspose(1, kernel_size=(1, 3, 3), activation='relu', kernel_initializer='he_uniform'))

    return encoder, decoder

#### Evaluating Models ####

########################
####### Model 1 ########
########################
# Fit data to model 1
x_train, y_train = create_data_for_run(1,'train')
model = apply_model()
model.build(input_shape=(None,5,25,431,1))
model.summary()
history_1 = model.fit(x_train, y_train, batch_size=60, epochs=90, verbose=1, validation_split=0.1)
del x_train, y_train

# Resalts
x_test, y_test = create_data_for_run(1, 'test')
predictions_1_model = model.predict(x_test)
predictions_1_model_cons = anomaly_decision(predictions_1_model, 0.1)
y_test = test_observation_min_value(y_test)

########################
####### Model 2 ########
########################

# Fit data to model 2
x_train, y_train = create_data_for_run(2,'train')
model = apply_model()
model.build(input_shape=(None,5,25,431,1))
model.summary()
history_2 = model.fit(x_train, y_train, batch_size=60, epochs=90, verbose=1, validation_split=0.1)
del x_train, y_train

# cheking resalts
x_test, y_test = create_data_for_run(2, 'test')
predictions_2_model = model.predict(x_test)
predictions_2_model_decision = anomaly_decision(predictions_2_model, 0.1)
y_test = test_observation_min_value(y_test)

########################
####### Model 3 ########
########################
# Create train data
x_train, y_train = create_data_for_run(3, 'train')  # x_train and y_train are created using the 'create_data_for_run' function
audio_file_shape = (4, 25, 801)  # Shape of the audio file

# Build Autoencoder
encoder, decoder = build_autoencoder(audio_file_shape, 1000)  # Encoder and decoder models are built
inp = Input((4, 25, 801, 1))  # Define input shape
code = encoder(inp)  # Encode the input
reconstruction = decoder(code)  # Reconstruct the input

autoencoder = Model(inp, reconstruction)  # Create the autoencoder model
autoencoder.compile(optimizer='adamax', loss='mae')  # Compile the model with optimizer and loss function

history_3 = autoencoder.fit(x=x_train, y=y_train, epochs=20, validation_split=0.1)  # Train the autoencoder
MAE_model = history_3.history['loss']  # Get the loss values from training history
MAE = MAE_model[-1]  # Get the last MAE value
del x_train, y_train  # Delete variables to save memory

# Testing
x_test, y_test = create_data_for_run(3, 'test')  # Create test data using the 'create_data_for_run' function
predictions_3_method = autoencoder.predict(x_test)  # Predict using the autoencoder model

# Compute the MAE of each picture
predictions_3_method = np.reshape(predictions_3_method, [8400, 25, 801])  # Reshape predictions to match the target shape
predictions_3_method.tolist()  # Convert predictions to a Python list
y_test.tolist()  # Convert y_test to a Python list
mae_test = compute_mae(predictions_3_method, y_test)  # Compute the MAE between predictions and y_test
