In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr
from skimage import exposure
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
import math

In [None]:
# CONSTANT VARIABLES

ROOT_FOLDER = # INCLUDE PATH TO ROOT FOLDER
DATASET_FOLDER = # INCLUDE NAME OF DATASET ROOT FOLDER
DEFECT_TYPE_FOLDER = # INCLUDE NAME OF DEFECT TYPE FOLDER
IMG_SIZE = (256, 256) # not too big or might run out of memory
COLOR_IMAGES = False
INPUT_SHAPE = (256, 256, 3 if COLOR_IMAGES else 1)
SEG_OUTPUT_SIZE = (256, 256) # same as input shape for this network

LATENT_DIM = 100 # z

In [None]:
# IMAGE TRANSFORMATIONS

# Crop the center of an image
def crop_center_opencv(image, crop_width, crop_height):
    height, width = image.shape[:2]
    start_x = width // 2 - (crop_width // 2)
    start_y = height // 2 - (crop_height // 2)
    cropped_image = image[start_y:start_y + crop_height, start_x:start_x + crop_width]
    return cropped_image

# Load reference image for contrast
def load_ref_img_contrast(img_size=IMG_SIZE):
    for filename in os.listdir(os.path.join(ROOT_FOLDER, DATASET_FOLDER, DEFECT_TYPE_FOLDER, "test_normal")):
        img_path = os.path.join(os.path.join(ROOT_FOLDER, DATASET_FOLDER, DEFECT_TYPE_FOLDER, "test_normal"), filename)
        img = cv2.imread(img_path, cv2.COLOR_BGR2RGB if COLOR_IMAGES else cv2.IMREAD_GRAYSCALE)  # Convert to RGB/grey scale
        img = cv2.resize(img, img_size)  # Resize to desired dimensions
        img = img.astype(np.float32) / 255.0  # Normalize to [0, 1]
        return img
        

# Load images from a folder
def load_images_from_folder(folder, img_size=IMG_SIZE):
    images = []
    filenames = []
    for filename in os.listdir(folder):
        img_path = os.path.join(folder, filename)

        img = cv2.imread(img_path, cv2.COLOR_BGR2RGB if COLOR_IMAGES else cv2.IMREAD_GRAYSCALE)  # Convert to RGB/grey scale
        img = cv2.resize(img, img_size)  # Resize to desired dimensions
        img = img.astype(np.float32) / 255.0  # Normalize to [0, 1]  
             
        img = exposure.equalize_adapthist(img, clip_limit=0.01)
        #img = exposure.equalize_hist(img)
 
        img = img if COLOR_IMAGES else np.expand_dims(img, axis=-1)  # Add channel dimension since it's grey scale

        images.append(img)
        filenames.append(filename)
        
    return np.array(images), filenames

# Load ground truth masks from a folder
def load_ground_truth_masks(folder, filenames, img_size=SEG_OUTPUT_SIZE): # masks are downsampled in the segmentation network so they need to be the same to compare
    masks = []
    for filename in filenames:
        mask_path = os.path.join(folder, filename)
        if os.path.exists(mask_path):

            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)  # Convert to grayscale
            mask = cv2.resize(mask, img_size)  # Resize to match test images
            mask = mask.astype(np.float32) / 255.0  # Normalize to [0, 1]
            mask = (mask > 0.5).astype(np.float32) # Binarize mask
            mask = np.expand_dims(mask, axis=-1)  # Add channel dimension since it's grey scale
            
        else:
            # If no ground truth mask exists, assume it's a good image (all zeros)
            mask = np.zeros((img_size[0], img_size[1], 1))
        masks.append(mask)

    return np.array(masks)

In [None]:
# RANDOM SEEDS

# Keep model seed fixed
tf.random.set_seed(42)
np.random.seed(42)

# Change data split seed instead
# to evaluate how the different data split affects performance
# random_state=None in scikit-learn split

In [None]:
# LOAD DATA

# Paths to the folders
abnormal_dir = os.path.join(ROOT_FOLDER, DATASET_FOLDER, DEFECT_TYPE_FOLDER, "test_abnormal") # Defective images
mask_dir = os.path.join(ROOT_FOLDER, DATASET_FOLDER, DEFECT_TYPE_FOLDER, "ground_truth") # Ground truth masks
normal_dir = os.path.join(ROOT_FOLDER, DATASET_FOLDER, DEFECT_TYPE_FOLDER, "test_normal") # Normal (good) images
train_dir = os.path.join(ROOT_FOLDER, DATASET_FOLDER, DEFECT_TYPE_FOLDER, "train") # Train (good) images

# Load training images
train_images, _ = load_images_from_folder(train_dir)
print(f"# train_images: {len(train_images)}")
# Load defective test images and their ground truth masks
defective_images, defective_filenames = load_images_from_folder(abnormal_dir)
ground_truth_masks = load_ground_truth_masks(mask_dir, defective_filenames)
print(f"# ground_truth_masks: {len(ground_truth_masks)}")
# Load normal test images
normal_images, _ = load_images_from_folder(normal_dir)
# Combine defective and normal images for testing
test_images = np.concatenate([defective_images, normal_images], axis=0)
print(f"# test_images: {len(test_images)}")
test_labels = np.concatenate([np.ones(len(defective_images)), np.zeros(len(normal_images))], axis=0)
print(f"# test_labels: {len(test_labels)}")

In [None]:
# SHOW RANDOM DATA SAMPLES

# Randomly select 2 defective and 2 normal samples
num_samples = 2
defective_indices = np.random.choice(len(defective_images), num_samples, replace=False)
normal_indices = np.random.choice(len(normal_images), num_samples, replace=False)

# Visualize random samples
plt.figure(figsize=(8, 4))

# Display 2 random defective images and their ground truth masks
for i, idx in enumerate(defective_indices):
    plt.subplot(2, 4, i + 1)
    plt.imshow(defective_images[idx].squeeze(), cmap='gray')
    plt.title(f"Defective Image {idx}")
    plt.axis('off')

    plt.subplot(2, 4, i + 5)
    plt.imshow(ground_truth_masks[idx].squeeze(), cmap='gray')
    plt.title(f"Ground Truth Mask {idx}")
    plt.axis('off')

# Display 2 random normal images
for i, idx in enumerate(normal_indices):
    plt.subplot(2, 4, i + 3)
    plt.imshow(normal_images[idx].squeeze(), cmap='gray')
    plt.title(f"Normal Image {idx}")
    plt.axis('off')

    plt.subplot(2, 4, i + 7)
    plt.imshow(np.zeros((128, 128)), cmap='gray')  # No mask for normal images
    plt.title("No Mask (Normal)")
    plt.axis('off')

plt.tight_layout()
plt.show()

In [None]:
# SPLIT DATA INTO TRAINING - VALIDATION

val_size = 0.2  # 20% of the data for testing
train_images, val_images, = train_test_split(
    train_images,
    test_size=val_size, 
    random_state=None # Random split
)

In [None]:
# ARCHITECTURE (ORIGINAL PAPER + RESIDUAL BLOCKS +  A BIT DEEPER + DROPOUT + SMALL BOTTLENECK)

def residual_block(x, filters, kernel=3):
    # If dimensions don't match, adjust the shortcut
    if x.shape[-1] != filters:
        x = layers.Conv2D(filters, (1, 1), padding="same")(x)
    
    # Residual path
    y = layers.Conv2D(filters, kernel, padding="same")(x)
    y = layers.LeakyReLU(alpha=0.2)(y)
    y = layers.Conv2D(filters, kernel, padding="same")(y)

    # Add shortcut
    y = layers.add([x, y])
    y = layers.LeakyReLU(alpha=0.2)(y)    
    
    return y

def build_autoencoder(input_shape=INPUT_SHAPE, latent_dim=100):
    # Encoder
    encoder_input = layers.Input(shape=input_shape, name="encoder_input")
    
    # Conv1: input_shape / 2
    x = layers.Conv2D(32, (4, 4), strides=2, padding="same")(encoder_input)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = residual_block(x, 32, 3)  # Add a residual block
    
    # Conv2: input_shape / 4
    x = layers.Conv2D(32, (4, 4), strides=2, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = residual_block(x, 32, 3)  # Add a residual block
    
    # Conv3: input_shape / 4
    x = layers.Conv2D(64, (3, 3), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    
    # Conv4: input_shape / 8
    x = layers.Conv2D(128, (4, 4), strides=2, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = residual_block(x, 128, 3)  # Add a residual block
    
    # Conv5: input_shape / 8
    x = layers.Conv2D(128, (3, 3), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    
    # Conv6: input_shape / 16
    x = layers.Conv2D(256, (4, 4), strides=2, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = residual_block(x, 256, 3)  # Add a residual block
    
    # Conv7: input_shape / 16
    x = layers.Conv2D(128, (3, 3), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    
    # Conv8: input_shape / 16
    x = layers.Conv2D(64, (3, 3), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)

    # Dropout
    x = layers.Dropout(0.3)(x)
    
    # Final bottleneck layer 16 x 16 -> 3 x 3
    encoder_output = layers.Conv2D(latent_dim, (12, 12), strides=2, padding="valid",  activation="linear")(x)
    
    encoder = models.Model(
        inputs=encoder_input, 
        outputs=encoder_output,
        name="encoder"
    )
    
    # Get encoder output shape for decoder input
    encoder_output_shape = encoder.output.shape[1:]
    
    # Decoder
    decoder_input = layers.Input(shape=encoder_output_shape, name="decoder_input")
    
    # Reverse of Conv8 3 x 3 -> 16 x 16
    x = layers.Conv2DTranspose(64, (12, 12), strides=2, padding="valid")(decoder_input)
    x = layers.LeakyReLU(alpha=0.2)(x)
    
    # Reverse of Conv7
    x = layers.Conv2DTranspose(128, (3, 3), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    
    # Reverse of Conv6    
    x = layers.Conv2DTranspose(256, (4, 4), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)    
    x = residual_block(x, 256, 3)  # Add a residual block
    
    # Reverse of Conv5
    x = layers.Conv2DTranspose(128, (3, 3), strides=2, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)    
    
    # Reverse of Conv4    
    x = layers.Conv2DTranspose(128, (4, 4), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)  
    x = residual_block(x, 128, 3)  # Add a residual block  
    
    # Reverse of Conv3
    x = layers.Conv2DTranspose(64, (3, 3), strides=2, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    
    # Reverse of Conv2    
    x = layers.Conv2DTranspose(32, (4, 4), strides=1, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)   
    x = residual_block(x, 32, 3)  # Add a residual block 
    
    # Reverse of Conv1    
    x = layers.Conv2DTranspose(32, (4, 4), strides=2, padding="same")(x)
    x = layers.LeakyReLU(alpha=0.2)(x)
    x = residual_block(x, 32, 3)  # Add a residual block
    
    # Final output layer
    decoder_output = layers.Conv2DTranspose(input_shape[-1], (4, 4), strides=2, padding="same", activation="sigmoid")(x)
    
    decoder = models.Model(
        inputs=decoder_input,
        outputs=decoder_output,
        name="decoder"
    )
    
    return encoder, decoder

In [None]:
# DCAE MODEL

class DCAE(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(DCAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder

    def call(self, inputs):
        latent = self.encoder(inputs)
        reconstructed = self.decoder(latent)
        return reconstructed

    def compile(self, optimizer, **kwargs):
        super().compile(**kwargs)
        self.optimizer = optimizer

    def train_step(self, data):
        # Unpack the data since for unsupervised learning there are no labels
        x = data if isinstance(data, tf.Tensor) else data[0]
        
        with tf.GradientTape() as tape:
            # Forward pass
            x_reconstructed = self(x, training=True)
            # Compute SSIM loss (1 - SSIM)
            ssim_loss = 1 - tf.reduce_mean(
                tf.image.ssim(x, x_reconstructed, max_val=1.0)
            )
            # Compute MSE for comparison
            mse_loss = tf.reduce_mean(tf.square(x - x_reconstructed))

            # Weight the losses (adjust these weights based on experiments)
            #combined_loss = 0.7 * ssim_loss + 0.3 * mse_loss
        
        # Compute gradients and update weights
        gradients = tape.gradient(ssim_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
        
        # Return metrics
        return {
            "loss": ssim_loss,  # SSIM-based loss
            "ssim": 1 - ssim_loss,  # SSIM metric (higher is better)
            "mse": mse_loss,  # MSE metric (lower is better)
        }
    
    def test_step(self, data):
        # Unpack the data since for unsupervised learning there are no labels
        x = data if isinstance(data, tf.Tensor) else data[0]
        
        # Compute reconstructions
        x_reconstructed = self(x, training=False)
        
        # Compute SSIM loss
        ssim_loss = 1 - tf.reduce_mean(
            tf.image.ssim(x, x_reconstructed, max_val=1.0)
        )
        
        # Compute MSE
        mse_loss = tf.reduce_mean(tf.square(x - x_reconstructed))
        
        # Return metrics (same as train_step to ensure consistency)
        return {
            "loss": ssim_loss,
            "ssim": 1 - ssim_loss,
            "mse": mse_loss,
        }

In [None]:
# COMPILE THE MODEL

# Create model instance
# Build autoencoder
encoder, decoder = build_autoencoder(input_shape=INPUT_SHAPE, latent_dim=LATENT_DIM)
model = DCAE(encoder, decoder)

# Compile model with Dice loss and coefficient metric
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001)  
)

# Print model summary
# Summary of the architecture
encoder.summary()
decoder.summary()

In [None]:
# TRAIN THE MODEL

# Data augmentation (optional)
datagen = ImageDataGenerator(
    #rotation_range=10,
    #width_shift_range=0.1,
    #height_shift_range=0.1,
    #shear_range=0.1,
    #zoom_range=0.2,
    vertical_flip=True,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Fit the data generator to your training data
train_generator = datagen.flow(
    train_images, train_images,  # For autoencoder, input = target
    batch_size=16,
    shuffle=True
)

# Early stopping callback
early_stopping_callback = keras.callbacks.EarlyStopping(
    monitor="val_ssim",  # Monitor SSIM on validation data
    patience=10,
    mode="max",         # Higher SSIM is better
    restore_best_weights=True,
    verbose=1,
)
# Learning rate reducer
lr_reducer = keras.callbacks.ReduceLROnPlateau(
    monitor="val_ssim",
    factor=0.5,
    patience=5,
    mode="max",
    min_lr=1e-6,
    verbose=1
)

# Train with validation data
history = model.fit(
    #train_generator,
    train_images,
    validation_data=(val_images, val_images),  # Validation needs both X and y
    epochs=150, 
    steps_per_epoch=math.ceil(len(train_images) / 16),  # Needed when using a generator
    callbacks=[
        early_stopping_callback,
        lr_reducer
        ]
    )

In [None]:
# ANALYZE TRAINING

# Extract metrics from history object
training_loss = history.history['loss']
training_ssim = history.history['ssim']
training_mse = history.history['mse']
val_loss = history.history['val_loss']
val_ssim = history.history['val_ssim']
val_mse = history.history['val_mse']

# Create figure with three subplots
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 3))

# Plot training & validation loss
ax1.plot(training_loss, label='loss', color='blue')
ax1.plot(val_loss, label='val_loss', color='green')
ax1.set_title('Loss')
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss')
ax1.legend()
ax1.grid(True, linestyle='--', alpha=0.7)

# Plot training & validation accuracy
ax2.plot(training_ssim, label='ssim', color='blue')
ax2.plot(val_ssim, label='val_ssim', color='green')
ax2.set_title('SSIM')
ax2.set_xlabel('Epochs')
ax2.set_ylabel('Accuracy')
ax2.legend()
ax2.grid(True, linestyle='--', alpha=0.7)

# Plot training & validation accuracy
ax3.plot(training_mse, label='mse', color='blue')
ax3.plot(val_mse, label='val_mse', color='green')
ax3.set_title('MSE')
ax3.set_xlabel('Epochs')
ax3.set_ylabel('Accuracy')
ax3.legend()
ax3.grid(True, linestyle='--', alpha=0.7)

# Adjust layout and display
plt.tight_layout()
plt.show()

In [None]:
# ANOMALY SCORE

def calculate_anomaly_score(image, dcae):
    # Generate the reconstructed image DCAE(x)
    image_batch = tf.expand_dims(image, axis=0)
    reconstructed_batch = dcae(image_batch)
    reconstructed_image = tf.squeeze(reconstructed_batch, axis=0)

    anomaly_score = tf.reduce_mean(tf.square(image - reconstructed_image)) # ME DA MEJORES NÚMEROS

    anomaly_map = tf.sqrt(tf.square(image - reconstructed_image))

    #anomaly_map = tf.abs(image - reconstructed_image) # For when anomaly maps are black
    
    # Optionally compute SSIM and PSNR with slightly better parameters
    ssim_score = ssim(
        image, 
        reconstructed_image.numpy(), 
        win_size=7,  # Increased from 3 for better structural assessment
        channel_axis=-1,
        data_range=1.0
    )
    psnr_score = psnr(image, reconstructed_image.numpy(), data_range=1.0)
    
    return reconstructed_image, anomaly_map, anomaly_score, ssim_score, psnr_score

In [None]:
# FIND BEST ANOMALY THRESHOLD (+ ACCURACY)

j = 0
step = 0.0001  
limit =  0.1

best_acc = 0
best_prec = 0
best_rec = 0
best_f1 = 0
best_fpr = 1
best_weighted_mean = 0
best_t = j

# Evaluate on test images
generated_images = []
anomaly_maps = []
anomaly_scores = []
ssim_scores = []
psnr_scores = []

for i, img in enumerate(test_images):
    generated_image, anomaly_map, anomaly_score, ssim_score, psnr_score = calculate_anomaly_score(img, model)
    generated_images.append(generated_image)
    anomaly_maps.append(anomaly_map)
    anomaly_scores.append(anomaly_score)
    ssim_scores.append(ssim_score)
    psnr_scores.append(psnr_score)

while j < limit: 

    # Compute AUC score
    auc_score = roc_auc_score(test_labels, anomaly_scores) # use raw decision scores for ROC AUC

    # Set a threshold for anomaly detection
    predictions = (np.array(anomaly_scores) > j).astype(int)

    # Confusion matrix
    cm_values = confusion_matrix(test_labels, predictions)
    tn, fp, fn, tp = confusion_matrix(test_labels, predictions).ravel()
    fpr = 0 if (fp + tn) == 0 else fp / (fp + tn)

    # Compute additional metrics
    accuracy = accuracy_score(test_labels, predictions)
    precision = precision_score(test_labels, predictions, zero_division=0)
    recall = recall_score(test_labels, predictions, zero_division=0)
    f1 = f1_score(test_labels, predictions, zero_division=0)

    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0  # Same as 1-FPR
    # Weighted harmonic mean - gives more weight to accuracy
    w_acc = 2
    w_spec = 1
    # Weighted arithmetic mean
    weighted_mean = (w_acc * accuracy + w_spec * specificity) / (w_acc + w_spec)

    #if weighted_mean > best_weighted_mean:    
    if accuracy > best_acc:      

        best_acc = accuracy 
        best_prec = precision 
        best_rec = recall 
        best_f1 = f1 
        best_fpr = fpr
        #best_weighted_mean = weighted_mean
        best_t = j

    j += step    

# Show evaluation metrics
"""print(f"Accuracy: {best_acc:.4f}")
print(f"Precision: {best_prec:.4f}")
print(f"Recall: {best_rec:.4f}")
print(f"F1 Score: {best_f1:.4f}")
print(f"FPR: {best_fpr:.4f}")
print(f"weighted_mean: {best_weighted_mean:.4f}")"""

print(f"\nt: {best_t:.4f}")


In [None]:
ANOMALY_THRESHOLD = 0.0144 # Adjust this value based on the data

In [None]:
# INFERENCE AND EVALUATION METRICS

# Evaluate on test images
generated_images = []
anomaly_maps = []
anomaly_scores = []
ssim_scores = []
psnr_scores = []

for i, img in enumerate(test_images):
    generated_image, anomaly_map, anomaly_score, ssim_score, psnr_score = calculate_anomaly_score(img, model)
    generated_images.append(generated_image)
    anomaly_maps.append(anomaly_map)
    anomaly_scores.append(anomaly_score)
    ssim_scores.append(ssim_score)
    psnr_scores.append(psnr_score)

# Compute AUC score
auc_score = roc_auc_score(test_labels, anomaly_scores) # use raw decision scores for ROC AUC

# Set a threshold for anomaly detection
predictions = (np.array(anomaly_scores) > ANOMALY_THRESHOLD).astype(int)

# Confusion matrix
cm_values = confusion_matrix(test_labels, predictions)
tn, fp, fn, tp = confusion_matrix(test_labels, predictions).ravel()
fpr = fp / (fp + tn)

# Compute additional metrics
accuracy = accuracy_score(test_labels, predictions)
precision = precision_score(test_labels, predictions)
recall = recall_score(test_labels, predictions)
f1 = f1_score(test_labels, predictions)

# Show evaluation metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"FPR: {fpr:.4f}")

print(f"AUROC: {auc_score:.4f}")
print(f"Mean SSIM: {np.mean(ssim_scores):.4f}")
print(f"Mean PSNR: {np.mean(psnr_scores):.4f}")

# Plot the confusion matrix
plt.figure(figsize=(4, 3))
sns.heatmap(cm_values, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Normal', 'Anomaly'],
            yticklabels=['Normal', 'Anomaly'])
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.title('Confusion Matrix')

In [None]:
# VISUALIZE RESULTS

# Get indices of normal and anomalous samples
normal_indices = np.where(test_labels == 0)[0]
anomalous_indices = np.where(test_labels == 1)[0]

# Randomly sample 5 normal and 5 anomalous examples
#np.random.seed(42)  # for reproducibility
random_normal_indices = np.random.choice(normal_indices, size=5, replace=False)
random_anomalous_indices = np.random.choice(anomalous_indices, size=5, replace=False)

# Combine them
random_indices = np.concatenate([random_normal_indices, random_anomalous_indices])

# Visualize random samples (5 normal and 5 anomalous)
plt.figure(figsize=(25, 12))
for i, idx in enumerate(random_indices):
    # First row: original image
    plt.subplot(4, 10, i + 1)
    plt.imshow(test_images[idx], cmap='gray')
    plt.title(
        f"Anomaly Score: {anomaly_scores[idx]:.4f}\n"
        f"Prediction: {'anomalous' if predictions[idx] == 1 else 'normal'}\n"
        f"Ground Truth: {'anomalous' if test_labels[idx] == 1 else 'normal'}"
    )
    plt.axis('off')

    # Second row: ground truth mask
    plt.subplot(4, 10, i + 11)
    if test_labels[idx] == 1:  # Defective image
        mask = ground_truth_masks[idx].squeeze()
        plt.imshow(mask, cmap='gray')
    else:  # Normal image
        plt.imshow(np.zeros((128, 128)), cmap='gray')
    plt.title("Ground truth mask")
    plt.axis('off')

    # Third row: generated image G(E(x))
    plt.subplot(4, 10, i + 21)
    plt.imshow(generated_images[idx], cmap='gray' if COLOR_IMAGES == False else None)
    plt.title("Generated image")
    plt.axis('off')

    # Fourth row: anomaly map
    plt.subplot(4, 10, i + 31)
    plt.imshow(anomaly_maps[idx], cmap='jet')
    plt.title("Anomaly map")
    plt.axis('off')

# Add overall titles for normal and anomalous sections
plt.figtext(0.25, 0.98, 'Random Normal Examples', fontsize=16, ha='center')
plt.figtext(0.75, 0.98, 'Random Anomalous Examples', fontsize=16, ha='center')

plt.tight_layout(rect=[0, 0, 1, 0.96])  # Adjust layout to make room for the overall titles
plt.show()