In [2]:
# https://towardsdatascience.com/simplifing-image-outlier-detection-with-alibi-detect-6aea686bf7ba

import os
import glob
import numpy as np
import tensorflow as tf
import keras_tuner as kt
from tensorflow.keras.layers import Conv2D, Conv2DTranspose, Dense, Flatten, Reshape, Input
from tensorflow.keras.models import Sequential
from alibi_detect.od import OutlierAE
from sklearn.metrics import accuracy_score
from PIL import Image
from sklearn.model_selection import train_test_split
import gc
import logging

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# Prepare image data 
def img_to_np(path, resize=True, extract_labels=False):
    img_array = []
    labels = []
    fpaths = glob.glob(path, recursive=True)
    for fname in fpaths:
        if extract_labels:
            if '_bad' in os.path.basename(fname):
                labels.append(1)  # 1 for outlier
            else:
                labels.append(0)  # 0 for non-outlier
        img = Image.open(fname).convert("L")  # Grayscale
        if resize:
            img = img.resize((64, 64))
        img_array.append(np.asarray(img))
    images = np.array(img_array)
    if extract_labels:
        return images, np.array(labels)
    return images

# Paths to data
path_train = r'C:\Users\Ossi\Desktop\ImageML\elpv-dataset\train_without_bad_images\**\*'
path_test = r'C:\Users\Ossi\Desktop\ImageML\elpv-dataset\test_images\**\*'

# Load and preprocess data
train = img_to_np(path_train)
test, test_labels = img_to_np(path_test, extract_labels=True)
train = train.astype('float32') / 255.0
test = test.astype('float32') / 255.0

# Reshape to include the channel dimension
train = np.expand_dims(train, axis=-1)
test = np.expand_dims(test, axis=-1)

# Split the test data into two parts: one for validation (cross-validation during optimization) and one for final evaluation
val_data, final_test_data, val_labels, final_test_labels = train_test_split(test, test_labels, test_size=0.5, random_state=42)

In [4]:
# Function to convert a dataset to a numpy array
def dataset_to_numpy(dataset):
    return np.concatenate([x for x in dataset], axis=0)

# Function to build and train the model
def build_and_train_model(encoding_dim, dense_dims, learning_rate, threshold, train_data, val_data, val_labels):
    # Define the encoder
    encoder_net = Sequential([
        Input(shape=(64, 64, 1)),  # Input shape for grayscale images
        Conv2D(64, 4, strides=2, padding='same', activation='relu'),
        Conv2D(128, 4, strides=2, padding='same', activation='relu'),
        Conv2D(512, 4, strides=2, padding='same', activation='relu'),
        Flatten(),
        Dense(encoding_dim)
    ])

    # Define the decoder
    decoder_net = Sequential([
        Input(shape=(encoding_dim,)),
        Dense(np.prod(dense_dims)),
        Reshape(target_shape=dense_dims),
        Conv2DTranspose(128, 4, strides=2, padding='same', activation='relu'),
        Conv2DTranspose(64, 4, strides=2, padding='same', activation='relu'),
        Conv2DTranspose(1, 4, strides=2, padding='same', activation='sigmoid')
    ])

    # Create the OutlierAE model
    od = OutlierAE(
        threshold=threshold,
        encoder_net=encoder_net,
        decoder_net=decoder_net
    )
    
    adam = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    # Train the model
    od.fit(
        train_data,
        epochs=10,
        verbose=True,
        optimizer=adam
    )

    # Evaluate on the validation set (cross-validation set)
    val_ds = tf.data.Dataset.from_tensor_slices(val_data).batch(32)
    val_np = dataset_to_numpy(val_ds)  # Convert validation dataset to numpy
    preds = od.predict(val_np, outlier_type='instance', return_instance_score=True)
    predicted_labels = preds['data']['is_outlier']

    # Calculate accuracy on the validation set
    val_accuracy = accuracy_score(val_labels, predicted_labels)
    return od, val_accuracy

In [5]:
# Hyperparameters to search over
# size of the latent space. it determines how much information is compressed in the latent representation
# low value -> more comression, loosing details | high value -> could lead to overfitting
encoding_dims = [512, 768, 1024]
# defines the shape of the dense layer in the decoder part
# high values ->  better capture details | low values -> more aggressive compression
dense_dims = [(8, 8, 64), (8, 8, 128), (8, 8, 256)]
# controls the step size during the optimization process
# high value -> faster convergence but risk overshooting | low value-> more stable but slow training
learning_rates = [1e-3, 1e-4, 1e-5]
# cutoff value used to decide whether a data point is considered an outlier
# low value -> false positives 
thresholds = [0.001, 0.01, 0.05]

# Custom hyperparameter tuning loop
best_val_accuracy = 0
best_params = None
best_model = None

# Set up logging
logging.basicConfig(filename='model_training.log', level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

for encoding_dim in encoding_dims:
    for dense_dim in dense_dims:
        # Clear session and force garbage collection
        tf.keras.backend.clear_session()
        gc.collect()
        for learning_rate in learning_rates:
            for threshold in thresholds:
                
                logger.info(f"Testing with encoding_dim={encoding_dim}, dense_dim={dense_dim}, learning_rate={learning_rate}, threshold={threshold}")
                model, val_accuracy = build_and_train_model(encoding_dim, dense_dim, learning_rate, threshold, train, val_data, val_labels)
                logger.info(f"Validation accuracy: {val_accuracy:.2f}%")

                # Check if the current model is the best
                if val_accuracy > best_val_accuracy:
                    best_val_accuracy = val_accuracy
                    best_params = (encoding_dim, dense_dim, learning_rate, threshold)
                    best_model = model
                

# Use the best model to predict on the final test set
final_test_ds = tf.data.Dataset.from_tensor_slices(final_test_data).batch(32)
final_test_np = dataset_to_numpy(final_test_ds)  # Convert final test dataset to numpy
final_preds = best_model.predict(final_test_np, outlier_type='instance', return_instance_score=True)
final_predicted_labels = final_preds['data']['is_outlier']

# Calculate accuracy on the final test set
final_accuracy = accuracy_score(final_test_labels, final_predicted_labels)
logger.info(f'Best Validation Accuracy: {best_val_accuracy * 100:.2f}%')
logger.info(f'Best Hyperparameters: encoding_dim={best_params[0]}, dense_dim={best_params[1]}, learning_rate={best_params[2]}, threshold={best_params[3]}')
logger.info(f'Final Test Accuracy: {final_accuracy * 100:.2f}%')

KeyboardInterrupt: 