# Validate the trained model and choose a threshold

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import json
import os
import random
import sys

import tensorflow as tf
from keras.models import load_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.neighbors import KernelDensity

current_dir = os.getcwd()
utils_dir = os.path.join(current_dir, 'source')
sys.path.append(utils_dir)
from utils import calculate_percentile, density, read_data_from_file


## Define parameters and the generators

In [None]:
# Specify path to result model and plots
results_path = "/u/pa/nb/tourei/scratch/caserm/spectrum_analysis/background_noise/results/"

# Size of the input images
size = 128

# Define generators for training, validation and also anomaly data.
batch_size = 64
datagen = ImageDataGenerator(rescale=1./255)

# path to training PSD plots (seen data)
train_path = '/u/pa/nb/tourei/scratch/caserm/spectrum_analysis/background_noise/plots/train/'
num_train_data = 768
train_generator = datagen.flow_from_directory(
    train_path,
    target_size=(size, size),
    batch_size=batch_size,
    class_mode='input'
    )

# path to testing PSD plots (unseen data)
test_path = '/u/pa/nb/tourei/scratch/caserm/spectrum_analysis/background_noise/plots/test/'
num_test_data = 192
validation_generator = datagen.flow_from_directory(
    test_path,
    target_size=(size, size),
    batch_size=batch_size,
    class_mode='input'
    )

# path to known anomalies or events
events_path = '/u/pa/nb/tourei/scratch/caserm/spectrum_analysis/seismic_events/plots/obvious_seismic_events/'
anomaly_generator = datagen.flow_from_directory(
    events_path,
    target_size=(size, size),
    batch_size=batch_size,
    class_mode='input'
    )


In [None]:
# Load load the saved trained model and its history
model_path = os.path.join(results_path, "model_1_128")
loaded_model = tf.keras.models.load_model(model_path)

with open(os.path.join(results_path, 'history_1_128.json'), 'r') as json_file:
    history_dict = json.load(json_file)


## Sanity check anomolous and normal data

In [None]:
# Get all batches generated by the datagen and pick a batch for prediction
data_batch = []  
batch_num = 0
while batch_num <= validation_generator.batch_index: # Gets each generated batch of size batch_size
    data = next(validation_generator)
    data_batch.append(data[0])
    batch_num =+ 1

predicted = loaded_model.predict(data_batch[0]) # Predict on the first batch of images


In [None]:
# Sanity check normal data and view an image and corresponding reconstruction
image_number = random.randint(0, predicted.shape[0])
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(data_batch[0][image_number])
plt.title("original data")
plt.subplot(122)
plt.imshow(predicted[image_number])
plt.title("predicted data")
plt.show()

In [None]:
# Examine the reconstruction error between our validation data (normal images)
# and the anomaly images
validation_error = loaded_model.evaluate(validation_generator)

# Evaluate the model using the anomaly data generator
anomaly_error = loaded_model.evaluate(anomaly_generator)

print("Reconstruction error for the normal data is: ", validation_error)
print("Reconstruction error for the anomaly data is: ", anomaly_error)


In [None]:
# Get all batches generated by the datagen and pick a batch for prediction
data_batch = []  
img_num = 0
while img_num <= anomaly_generator.batch_index: # Gets each generated batch of size batch_size
    data = next(anomaly_generator)
    data_batch.append(data[0])
    img_num = img_num + 1

predicted = loaded_model.predict(data_batch[0]) # Predict on the first batch of images


In [None]:
# Sanity check anomalous data: plot an image and corresponding reconstructions
image_number = random.randint(0, predicted.shape[0])
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(data_batch[0][image_number])
plt.title("original data")
plt.subplot(122)
plt.imshow(predicted[image_number])
plt.title("predicted data")
plt.show()

In [None]:
# Extract (or build) the encoder network, with trained weights.
#This is used to get the compressed output (latent space) of the input image. 
#The compressed output is then used to calculate the KDE

encoder_model = Sequential()
# Add the convolutional layer without weights
encoder_model.add(Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(size, size, 3)))
# Set the weights from the corresponding layer of the loaded model
encoder_model.layers[-1].set_weights(loaded_model.layers[0].get_weights())
encoder_model.add(MaxPooling2D((2, 2), padding='same'))
encoder_model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
encoder_model.layers[-1].set_weights(loaded_model.layers[2].get_weights())
encoder_model.add(MaxPooling2D((2, 2), padding='same'))
encoder_model.add(Conv2D(16, (3, 3), activation='relu', padding='same'))
encoder_model.layers[-1].set_weights(loaded_model.layers[4].get_weights())

encoder_model.add(MaxPooling2D((2, 2), padding='same'))
encoder_model.summary()


## Calculate kernel density estimation (KDE) 

In [None]:
# Get encoded output of input images = Latent space
encoded_images = encoder_model.predict(train_generator)

# Flatten the encoder output because KDE from sklearn takes 1D vectors as input
encoder_output_shape = encoder_model.output_shape # Here, we have 16x16x16
out_vector_shape = encoder_output_shape[1] * encoder_output_shape[2] * encoder_output_shape[3]

encoded_images_vector = [np.reshape(img, (out_vector_shape)) for img in encoded_images]

# Fit KDE to the image latent data
kde = KernelDensity(kernel='gaussian', bandwidth=0.2).fit(encoded_images_vector)

# Get average and std dev. of density and recon. error for normal and anomaly images. 
# Need to generate a batch of images for each. 
train_batch = next(train_generator)[0]
anomaly_batch = next(anomaly_generator)[0]

normal_values_128 = density(encoder_model, train_batch, kde)
anomolous_values_128 = density(encoder_model, anomaly_batch, kde)

np.savetxt('normal_values_128.txt', normal_values_128, delimiter=',')
np.savetxt('anomolous_values_128.txt', anomolous_values_128, delimiter=',')


In [None]:
all_train_batches = []
batch_num = 0
# Loop through the train_generator to collect all batches
while batch_num <= train_generator.batch_index: # Gets each generated batch of size batch_size
    print("batch_num = ", batch_num)
    data = next(train_generator)
    all_train_batches.append(data[0])
    batch_num =+ 1 

In [None]:
all_anomaly_batches = []
batch_num = 0
# Loop through the anomaly_generator to collect all batches
while batch_num <= anomaly_generator.batch_num: # Gets each generated batch of size batch_size
    print("batch_num = ", batch_num)
    data = next(anomaly_generator)
    all_anomaly_batches.append(data[0])
    batch_num =+ 1 

In [None]:
# Process all_train_batches and all_anomaly_batches as needed
normal_values = density(encoder_model, all_train_batches, kde)
anomolous_values = density(encoder_model, all_anomaly_batches, kde)

## Plot the histogram and choose a thereshold

In [None]:
# Plotting the histogram for density score
plt.hist(normal_values, bins=2, alpha=0.5, label='normal_values')
plt.hist(anomolous_values, bins=10, alpha=0.5, label='anomolous_values')

# Adding labels and legend
plt.xlabel('Density score')
plt.ylabel('Frequency')
plt.legend(loc='upper right')

plt.show()

In [None]:
# Choose a thereshold based on desired percentile
file_path = 'anomaly_values_128.txt' 
percentile = 95
data = read_data_from_file(file_path)
percentile_value = calculate_percentile(data, percentile)

print(f"The {percentile}th percentile is: {percentile_value}")
