# 04. Convolutional Autoencoder (Unsupervised)

## Introduction
This notebook implements a Convolutional Autoencoder (CAE) for unsupervised anomaly detection.
The model is trained ONLY on normal images to learn to reconstruct them.
Anomalies are detected by high reconstruction error.

## Setup

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models

tf.random.set_seed(42)
np.random.seed(42)

## 1. Data Loading
For the Autoencoder, we train ONLY on the 'train/good' folder.

In [None]:

def load_data(processed_dir, target_category=None, img_size=(256, 256), batch_size=32):
    """
    Loads ALL data from processed_dir (augmented).
    Train: 80% of 'good' samples.
    Val: 10% of 'good' samples (optional, but good for monitoring).
    Test: 10% of 'good' samples + ALL 'anomaly' samples.
    """
    logger.info(f"Loading augmented data for category: {target_category}...")
    
    if not os.path.exists(processed_dir):
        raise ValueError(f"Processed directory {processed_dir} not found! Please run augmentation first.")
        
    # 1. Load All Augmented Data
    all_files = []
    logger.info(f"Scanning processed data in {processed_dir}...")
    search_path = os.path.join(processed_dir, "*", "*.png")
    
    for filepath in glob.glob(search_path):
        folder_name = os.path.basename(os.path.dirname(filepath))
        if target_category and not folder_name.startswith(target_category):
            continue
        
        # Determine if good or anomaly
        # folder_name ends with '_good' for normal
        is_good = folder_name.endswith('_good')
        
        all_files.append({
            'filepath': filepath,
            'label_str': folder_name,
            'is_good': is_good
        })
            
    df = pd.DataFrame(all_files)
    if df.empty:
        raise ValueError("No data found in processed directory!")
        
    # 2. Split Data
    good_df = df[df['is_good'] == True]
    anomaly_df = df[df['is_good'] == False]
    
    if good_df.empty:
        raise ValueError("No normal (good) data found!")
        
    # Split 'good' into 80/10/10
    # Train (80%)
    train_good, temp_good = train_test_split(good_df, train_size=0.8, random_state=42)
    
    # Val (10%) and Test (10%)
    val_good, test_good = train_test_split(temp_good, test_size=0.5, random_state=42)
    
    # Test set includes test_good and ALL anomalies
    test_df = pd.concat([test_good, anomaly_df], ignore_index=True)
    
    # Create labels for Test: 0 for good, 1 for anomaly
    test_df['label'] = test_df['is_good'].apply(lambda x: 0 if x else 1)
    
    logger.info(f"Train (Normal): {len(train_good)}")
    logger.info(f"Test (Normal+Anomaly): {len(test_df)} (Normal: {len(test_good)}, Anomaly: {len(anomaly_df)})")
    
    # 3. Create Datasets
    def process_path(filepath):
        img = tf.io.read_file(filepath)
        img = tf.io.decode_image(img, channels=3, expand_animations=False)
        img = tf.image.resize(img, img_size)
        img = tf.cast(img, tf.float32) / 255.0
        return img

    def process_path_label(filepath, label):
        img = process_path(filepath)
        return img, label

    # Train DS: (x, x) for Autoencoder
    train_ds = tf.data.Dataset.from_tensor_slices(train_good['filepath'].values)
    train_ds = train_ds.map(process_path, num_parallel_calls=tf.data.AUTOTUNE)
    train_ds = train_ds.shuffle(1000).map(lambda x: (x, x))
    train_ds = train_ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    
    # Test DS: (x, label) for evaluation
    test_ds = tf.data.Dataset.from_tensor_slices((test_df['filepath'].values, test_df['label'].values))
    test_ds = test_ds.map(process_path_label, num_parallel_calls=tf.data.AUTOTUNE)
    test_ds = test_ds.batch(1) # Batch 1 for individual prediction
    
    return train_ds, test_ds

# --- Configuration ---
IMG_SIZE = (256, 256)
BATCH_SIZE = 16
PROCESSED_DIR = "../data/processed/augmented"
TARGET_CATEGORY = 'bottle' # Train only on this category

# --- Execution ---
train_ds, test_ds = load_data(PROCESSED_DIR, TARGET_CATEGORY, IMG_SIZE, BATCH_SIZE)
print("Data loaded successfully.")


## 2. Model Architecture
Encoder-Decoder architecture.

In [None]:
def create_autoencoder(input_shape):
    # Encoder
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same', strides=2)(inputs)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same', strides=2)(x)
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same', strides=2)(x)
    
    # Latent space
    shape_before_flattening = tf.keras.backend.int_shape(x)[1:]
    x = layers.Flatten()(x)
    latent = layers.Dense(128, name='latent_vector')(x)
    
    # Decoder
    x = layers.Dense(np.prod(shape_before_flattening))(latent)
    x = layers.Reshape(shape_before_flattening)(x)
    
    x = layers.Conv2DTranspose(128, (3, 3), activation='relu', padding='same', strides=2)(x)
    x = layers.Conv2DTranspose(64, (3, 3), activation='relu', padding='same', strides=2)(x)
    x = layers.Conv2DTranspose(32, (3, 3), activation='relu', padding='same', strides=2)(x)
    
    outputs = layers.Conv2DTranspose(3, (3, 3), activation='sigmoid', padding='same')(x)
    
    model = models.Model(inputs, outputs, name='autoencoder')
    return model

autoencoder = create_autoencoder(IMG_SIZE + (3,))
autoencoder.summary()

## 3. Training
Loss function is Mean Squared Error (MSE) between input and output.

In [None]:
autoencoder.compile(optimizer='adam', loss='mse')

history = autoencoder.fit(
    train_ds,
    epochs=20,
    # In unsupervised setting, we often use a split of train set as validation,
    # or just monitor loss.
)

# Create Test Dataset (with labels for evaluation)
def process_path_label(filepath, label):
    img = tf.io.read_file(filepath)
    img = tf.io.decode_image(img, channels=3, expand_animations=False)
    img = tf.image.resize(img, IMG_SIZE)
    img = tf.cast(img, tf.float32) / 255.0
    return img, label

def create_test_dataset(dataframe):
    filepaths = dataframe['filepath'].values
    labels = dataframe['label'].values
    ds = tf.data.Dataset.from_tensor_slices((filepaths, labels))
    ds = ds.map(process_path_label, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(1) # Batch size 1 for individual prediction
    return ds

test_ds = create_test_dataset(test_df)

def predict_anomaly(model, dataset, threshold=None):
    reconstruction_errors = []
    labels = []
    
    for image, label in dataset:
        reconstructed = model.predict(image, verbose=0)
        loss = np.mean(np.abs(image - reconstructed))
        reconstruction_errors.append(loss)
        labels.append(label.numpy()[0])
        
    return np.array(reconstruction_errors), np.array(labels)

print("Predicting anomalies on test set...")
errors, labels = predict_anomaly(autoencoder, test_ds)

# Determine threshold (e.g., 90th percentile of errors)
threshold = np.percentile(errors, 90)
print(f"Threshold: {threshold}")

# Visualize
plt.figure(figsize=(10, 5))
# Identify normal label indices
# We need to know which integer labels correspond to 'good'
# class_names list has the strings.
normal_indices = [i for i, name in enumerate(class_names) if name.endswith('_good')]

# Create mask for normal and anomaly
is_normal = np.isin(labels, normal_indices)

plt.hist(errors[is_normal], bins=20, alpha=0.5, label='Normal')
plt.hist(errors[~is_normal], bins=20, alpha=0.5, label='Anomaly')
plt.axvline(threshold, color='r', linestyle='--', label='Threshold')
plt.legend()
plt.title("Reconstruction Error Distribution")
plt.show()

In [None]:
# Load Test Data
test_ds = tf.keras.utils.image_dataset_from_directory(
    TEST_DIR,
    label_mode='int',
    image_size=IMG_SIZE,
    batch_size=1,
    shuffle=False
)

def predict_anomaly(model, dataset, threshold=None):
    reconstruction_errors = []
    labels = []
    
    for image, label in dataset:
        image = preprocess(image)
        reconstructed = model.predict(image, verbose=0)
        loss = np.mean(np.abs(image - reconstructed))
        reconstruction_errors.append(loss)
        labels.append(label.numpy()[0])
        
    return np.array(reconstruction_errors), np.array(labels)

errors, labels = predict_anomaly(autoencoder, test_ds)

# Determine threshold (e.g., 95th percentile of errors)
# Ideally this is done on a validation set of normal images
threshold = np.percentile(errors, 90)
print(f"Threshold: {threshold}")

# Visualize
plt.figure(figsize=(10, 5))
plt.hist(errors[labels==0], bins=20, alpha=0.5, label='Normal')
plt.hist(errors[labels!=0], bins=20, alpha=0.5, label='Anomaly')
plt.axvline(threshold, color='r', linestyle='--', label='Threshold')
plt.legend()
plt.title("Reconstruction Error Distribution")
plt.show()

In [None]:
# Advanced Evaluation for Autoencoder
import numpy as np

print("Evaluating Autoencoder...")

# 1. Calculate Reconstruction Error (MSE) for all test images
reconstructions = autoencoder.predict(test_ds)
mse_scores = []
y_true_labels = []

# We need to iterate through test_ds to get original images and labels
# Note: test_ds yields (images, labels)
idx = 0
for images, labels in test_ds:
    batch_recon = reconstructions[idx : idx + len(images)]
    batch_mse = np.mean(np.square(images - batch_recon), axis=(1, 2, 3))
    mse_scores.extend(batch_mse)
    y_true_labels.extend(labels.numpy())
    idx += len(images)

mse_scores = np.array(mse_scores)
y_true_labels = np.array(y_true_labels)

# 2. Calculate AUC-ROC using MSE as anomaly score
# Note: Higher MSE = Anomaly (1), Lower MSE = Normal (0)
# Ensure labels are 0 (Normal) and 1 (Anomaly)
auc = calculate_auc(y_true_labels, mse_scores)
print(f"AUC-ROC: {auc:.4f}")

# 3. Calculate F1-Score (requires thresholding)
# Simple strategy: use mean + 2*std of normal samples from validation set as threshold
# For now, we'll just pick a threshold that maximizes F1 on test set for demonstration
best_f1 = 0
best_thresh = 0
thresholds = np.linspace(mse_scores.min(), mse_scores.max(), 100)

for thresh in thresholds:
    y_pred_bin = (mse_scores > thresh).astype(int)
    f1 = calculate_f1(y_true_labels, y_pred_bin)
    if f1 > best_f1:
        best_f1 = f1
        best_thresh = thresh

print(f"Best F1-Score: {best_f1:.4f} (at threshold {best_thresh:.4f})")


## 5. Export Model
We save the trained model for later use.

In [None]:
# Create models directory if it doesn't exist
models_dir = "../models"
os.makedirs(models_dir, exist_ok=True)

# Save the model
model_name = "autoencoder"
category_name = TARGET_CATEGORY if 'TARGET_CATEGORY' in locals() else 'all_categories'
save_path = os.path.join(models_dir, f"{model_name}_{category_name}.keras")
model.save(save_path)
print(f"Model saved to {save_path}")