In [1]:
# Import necessary libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import requests
import torch
from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc, precision_recall_curve
from skimage.metrics import structural_similarity
import tensorflow as tf
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Dense, Flatten, Reshape, BatchNormalization, Dropout, LeakyReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import time
import pickle
import sys

In [2]:
# Ensure TensorFlow is using GPU if available
physical_devices = tf.config.list_physical_devices('GPU')
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    print(f"Using GPU: {physical_devices[0]}")
else:
    print("No GPU found, using CPU instead")

Using GPU: PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


In [3]:
# Set random seed for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
torch.manual_seed(42)

# Define paths and parameters
IMAGE_SIZE = 64  # Resize images to this size
BATCH_SIZE = 32
EPOCHS = 100
LATENT_DIM = 256  # Increased size of the latent space for better feature capture
LEARNING_RATE = 0.001
PATIENCE = 15     # Increased patience for early stopping

# Create directories for storing data and models
for directory in ['data', 'models', 'results', 'logs']:
    os.makedirs(directory, exist_ok=True)

In [4]:
# Function to download a file with progress bar
def download_file(url, filename):
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))
    block_size = 1024

    with open(filename, 'wb') as file, tqdm(
            desc=filename,
            total=total_size,
            unit='B',
            unit_scale=True,
            unit_divisor=1024,
        ) as bar:
        for data in response.iter_content(block_size):
            bar.update(len(data))
            file.write(data)

    if total_size != 0 and bar.n != total_size:
        print(f"WARNING: Downloaded {bar.n} bytes, expected {total_size} bytes")

    return filename

# Download and prepare the dataset
def download_and_prepare_dataset():
    print("Downloading the German Traffic Sign Recognition Benchmark (GTSRB) dataset...")

    # Check if dataset already exists to avoid redownloading
    if os.path.exists('data/GTSRB') and len(os.listdir('data/GTSRB')) > 0:
        print("Dataset already exists, skipping download.")
        return

    # Use a more reliable direct download link
    try:
        url = 'https://sid.erda.dk/public/archives/daaeac0d7ce1152aea9b61d9f1e19370/GTSRB-Training_fixed.zip'
        output = 'data/GTSRB-Training_fixed.zip'

        # Download the dataset
        download_file(url, output)

        # Extract the dataset
        print("Extracting dataset...")
        import zipfile
        with zipfile.ZipFile(output, 'r') as zip_ref:
            zip_ref.extractall('data/')

        print("Dataset downloaded and extracted successfully.")

    except Exception as e:
        print(f"Error downloading dataset: {e}")
        print("Attempting alternative download method...")

        # Alternative method using direct download
        try:
            # Create data directory if it doesn't exist
            os.makedirs('data/GTSRB', exist_ok=True)

            # Use the torchvision dataset if available
            from torchvision.datasets import GTSRB
            train_dataset = GTSRB(root='data', split='train', download=True)
            print("Dataset downloaded successfully using torchvision.")

        except Exception as inner_e:
            print(f"Error with alternative download: {inner_e}")
            print("Please download the GTSRB dataset manually and place it in the 'data/GTSRB' directory.")
            sys.exit(1)

In [5]:
# Function to check if CUDA is available for PyTorch
def check_cuda():
    if torch.cuda.is_available():
        print(f"PyTorch CUDA is available. Using device: {torch.cuda.get_device_name(0)}")
        return True
    else:
        print("PyTorch CUDA is not available. Using CPU.")
        return False

In [6]:
# Function to load and preprocess the GTSRB dataset
def load_gtsrb_dataset():
    print("Loading and preprocessing the dataset...")
    data_dir = 'data/GTSRB/Final_Training/Images/'

    # Check if preprocessed data already exists
    cache_file = 'data/preprocessed_gtsrb.pkl'
    if os.path.exists(cache_file):
        print(f"Loading preprocessed data from {cache_file}")
        with open(cache_file, 'rb') as f:
            data = pickle.load(f)
        return data['X'], data['y']

    # Check if the data directory exists
    if not os.path.exists(data_dir):
        alternatives = [
            'data/GTSRB-Training_fixed/GTSRB/Final_Training/Images/',
            'data/GTSRB/Training/'
        ]

        for alt_dir in alternatives:
            if os.path.exists(alt_dir):
                data_dir = alt_dir
                print(f"Using alternative data directory: {data_dir}")
                break
        else:
            raise FileNotFoundError(f"Could not find GTSRB dataset directory. Please check the dataset installation.")

    images = []
    labels = []

    # Data augmentation for training
    datagen = ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.1,
        zoom_range=0.1,
        horizontal_flip=False,  # Don't flip traffic signs horizontally
        brightness_range=[0.8, 1.2],
        fill_mode='nearest'
    )

    # Loop through each class directory
    class_dirs = [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))]

    for class_dir in tqdm(class_dirs, desc="Loading classes"):
        class_path = os.path.join(data_dir, class_dir)

        if os.path.isdir(class_path):
            # Load all images from this class
            img_paths = [os.path.join(class_path, f) for f in os.listdir(class_path)
                        if f.lower().endswith(('.png', '.jpg', '.jpeg', '.ppm'))]

            for img_path in tqdm(img_paths, desc=f"Class {class_dir}", leave=False):
                try:
                    img = cv2.imread(img_path)
                    if img is None:
                        continue

                    # Convert BGR to RGB
                    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

                    # Resize image
                    img = cv2.resize(img, (IMAGE_SIZE, IMAGE_SIZE))

                    # Normalize to [0,1]
                    img_array = img.astype(np.float32) / 255.0

                    # Check if image is RGB
                    if len(img_array.shape) < 3 or img_array.shape[2] != 3:
                        continue

                    images.append(img_array)
                    labels.append(int(class_dir))

                except Exception as e:
                    print(f"Error processing {img_path}: {e}")

    # Convert to numpy arrays
    X = np.array(images)
    y = np.array(labels)

    # Save preprocessed data
    with open(cache_file, 'wb') as f:
        pickle.dump({'X': X, 'y': y}, f)

    print(f"Dataset loaded: {X.shape[0]} images with shape {X.shape[1:]}.")
    print(f"Number of classes: {len(np.unique(y))}")
    return X, y

In [7]:
# Build an improved Autoencoder model with residual connections
def build_autoencoder():
    # Encoder
    input_img = Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))

    # Encoder (with BatchNorm and LeakyReLU for better training)
    x = Conv2D(32, (3, 3), strides=1, padding='same')(input_img)
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = Conv2D(64, (3, 3), strides=2, padding='same')(x)  # Downsample
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = Conv2D(64, (3, 3), strides=1, padding='same')(x)  # Same dimension
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = Conv2D(128, (3, 3), strides=2, padding='same')(x)  # Downsample
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = Conv2D(128, (3, 3), strides=1, padding='same')(x)  # Same dimension
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = Conv2D(256, (3, 3), strides=2, padding='same')(x)  # Downsample
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    # Bottleneck
    x = Flatten()(x)
    x = Dense(LATENT_DIM)(x)
    encoded = LeakyReLU(0.2)(x)

    # Decoder
    x = Dense(8*8*128)(encoded)
    x = LeakyReLU(0.2)(x)
    x = Reshape((8, 8, 128))(x)

    x = Conv2D(128, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = UpSampling2D((2, 2))(x)  # 16x16
    x = Conv2D(64, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = UpSampling2D((2, 2))(x)  # 32x32
    x = Conv2D(32, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = LeakyReLU(0.2)(x)

    x = UpSampling2D((2, 2))(x)  # 64x64
    decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    # Create autoencoder model
    autoencoder = Model(input_img, decoded)

    # Use MSE loss for reconstruction
    autoencoder.compile(
        optimizer=Adam(learning_rate=LEARNING_RATE),
        loss='mse',
        metrics=['mae']  # Track mean absolute error
    )

    # Also create an encoder model for feature extraction
    encoder = Model(input_img, encoded)

    return autoencoder, encoder

In [8]:
# Train the autoencoder using "normal" data with data augmentation
def train_autoencoder(X_train_normal, X_val_normal):
    print("Building and training the autoencoder...")
    autoencoder, encoder = build_autoencoder()

    # Print model summary
    autoencoder.summary()

    # Data augmentation for training
    datagen = ImageDataGenerator(
        rotation_range=10,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        brightness_range=[0.85, 1.15],
        fill_mode='nearest'
    )

    # Prepare the data generator
    train_generator = datagen.flow(
        X_train_normal, X_train_normal,
        batch_size=BATCH_SIZE,
        shuffle=True
    )

    # Define callbacks
    callbacks = [
        ModelCheckpoint(
            filepath='models/autoencoder_best.h5',
            monitor='val_loss',
            save_best_only=True,
            verbose=1
        ),
        EarlyStopping(
            monitor='val_loss',
            patience=PATIENCE,
            restore_best_weights=True,
            verbose=1
        ),
        ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=7,
            min_lr=1e-6,
            verbose=1
        ),
        TensorBoard(
            log_dir='logs/autoencoder',
            histogram_freq=1,
            write_graph=True
        )
    ]

    # Train the model
    start_time = time.time()

    # Train with data augmentation
    history = autoencoder.fit(
        train_generator,
        steps_per_epoch=len(X_train_normal) // BATCH_SIZE,
        epochs=EPOCHS,
        validation_data=(X_val_normal, X_val_normal),
        callbacks=callbacks
    )

    training_time = time.time() - start_time
    print(f"Training completed in {training_time/60:.2f} minutes.")

    # Save the final model
    autoencoder.save('models/autoencoder_final.h5')
    encoder.save('models/encoder_final.h5')

    # Plot training history
    plt.figure(figsize=(12, 5))

    # Plot training & validation loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Plot training & validation MAE
    plt.subplot(1, 2, 2)
    plt.plot(history.history['mae'], label='Training MAE')
    plt.plot(history.history['val_mae'], label='Validation MAE')
    plt.title('Model MAE')
    plt.xlabel('Epoch')
    plt.ylabel('MAE')
    plt.legend()

    plt.tight_layout()
    plt.savefig('results/training_history.png')
    plt.close()

    return autoencoder, encoder, history

In [9]:
# Find the optimal threshold for anomaly detection with advanced visualization
def find_optimal_threshold(autoencoder, X_val_normal, X_val_anomaly):
    print("Finding optimal threshold for anomaly detection...")

    # Reconstruct validation data
    X_val_normal_pred = autoencoder.predict(X_val_normal, batch_size=BATCH_SIZE, verbose=1)
    X_val_anomaly_pred = autoencoder.predict(X_val_anomaly, batch_size=BATCH_SIZE, verbose=1)

    # Calculate MSE for each image
    mse_normal = np.mean(np.square(X_val_normal - X_val_normal_pred), axis=(1, 2, 3))
    mse_anomaly = np.mean(np.square(X_val_anomaly - X_val_anomaly_pred), axis=(1, 2, 3))

    # Create ground truth labels for ROC curve (0 for normal, 1 for anomaly)
    y_true = np.concatenate([np.zeros(len(mse_normal)), np.ones(len(mse_anomaly))])
    scores = np.concatenate([mse_normal, mse_anomaly])

    # Calculate ROC curve and AUC
    fpr, tpr, thresholds = roc_curve(y_true, scores)
    roc_auc = auc(fpr, tpr)

    # Calculate precision-recall curve
    precision, recall, pr_thresholds = precision_recall_curve(y_true, scores)
    pr_auc = auc(recall, precision)

    # Find the optimal threshold based on Youden's J statistic (maximizing TPR - FPR)
    optimal_idx = np.argmax(tpr - fpr)
    youden_threshold = thresholds[optimal_idx]

    # Also calculate the 95th percentile threshold
    percentile_threshold = np.percentile(mse_normal, 95)

    # Compare thresholds and choose the better one, Evaluate both and pick the one with better F1 score
    y_pred_youden = scores >= youden_threshold
    y_pred_percentile = scores >= percentile_threshold

    from sklearn.metrics import f1_score
    f1_youden = f1_score(y_true, y_pred_youden)
    f1_percentile = f1_score(y_true, y_pred_percentile)

    if f1_youden >= f1_percentile:
        optimal_threshold = youden_threshold
        threshold_method = "Youden's J statistic"
        f1_score_val = f1_youden
    else:
        optimal_threshold = percentile_threshold
        threshold_method = "95th percentile"
        f1_score_val = f1_percentile

    # Plot distributions
    plt.figure(figsize=(15, 10))

    # Plot 1: Distribution of reconstruction errors
    plt.subplot(2, 2, 1)
    plt.hist(mse_normal, bins=50, alpha=0.5, label='Normal', density=True)
    plt.hist(mse_anomaly, bins=50, alpha=0.5, label='Anomaly', density=True)
    plt.axvline(x=optimal_threshold, color='red', linestyle='--',
                label=f'Optimal Threshold: {optimal_threshold:.4f}')
    plt.title('Distribution of Reconstruction Errors')
    plt.xlabel('Mean Squared Error')
    plt.ylabel('Density')
    plt.legend()

    # Plot 2: ROC curve
    plt.subplot(2, 2, 2)
    plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.4f})')
    plt.plot([0, 1], [0, 1], 'k--', label='Random')
    plt.scatter(fpr[optimal_idx], tpr[optimal_idx], marker='o', color='red',
                label=f'Optimal point (TPR: {tpr[optimal_idx]:.2f}, FPR: {fpr[optimal_idx]:.2f})')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")

    # Plot 3: Precision-Recall curve
    plt.subplot(2, 2, 3)
    plt.plot(recall, precision, label=f'PR curve (AUC = {pr_auc:.4f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.legend(loc="lower left")

    # Plot 4: Decision boundary visualization
    plt.subplot(2, 2, 4)
    plt.scatter(range(len(scores)), np.sort(scores), alpha=0.5, s=5)
    plt.axhline(y=optimal_threshold, color='red', linestyle='--',
                label=f'Optimal Threshold: {optimal_threshold:.4f}')
    plt.title('Sorted Anomaly Scores with Decision Boundary')
    plt.xlabel('Sample Index (sorted)')
    plt.ylabel('Anomaly Score (MSE)')
    plt.legend()

    plt.tight_layout()
    plt.savefig('results/threshold_analysis.png')
    plt.close()

    print(f"Optimal threshold determined ({threshold_method}): {optimal_threshold:.6f}")
    print(f"F1 Score at optimal threshold: {f1_score_val:.4f}")
    print(f"ROC AUC: {roc_auc:.4f}")
    print(f"Precision-Recall AUC: {pr_auc:.4f}")

    # Save the threshold for later use
    threshold_info = {
        'optimal_threshold': float(optimal_threshold),
        'method': threshold_method,
        'f1_score': float(f1_score_val),
        'roc_auc': float(roc_auc),
        'pr_auc': float(pr_auc)
    }

    with open('models/threshold_info.pkl', 'wb') as f:
        pickle.dump(threshold_info, f)

    return optimal_threshold

In [10]:
def detect_anomalies(autoencoder, X_test, threshold):
    print("Detecting anomalies...")

    # Reconstruct test data
    X_test_pred = autoencoder.predict(X_test, batch_size=BATCH_SIZE, verbose=1)

    # Calculate MSE for each image
    mse = np.mean(np.square(X_test - X_test_pred), axis=(1, 2, 3))

    # Try to calculate SSIM if available
    try:
        # Import from skimage instead of using cv2.compareSSIM
        from skimage.metrics import structural_similarity as ssim

        ssim_scores = []
        for i in range(len(X_test)):
            ssim_value = ssim(
                X_test[i],
                X_test_pred[i],
                multichannel=True,
                channel_axis=2
            )
            ssim_scores.append(1 - ssim_value)  # Convert to dissimilarity

        ssim_scores = np.array(ssim_scores)

        # Normalize the scores
        mse_norm = (mse - np.min(mse)) / (np.max(mse) - np.min(mse) + 1e-10)
        ssim_norm = (ssim_scores - np.min(ssim_scores)) / (np.max(ssim_scores) - np.min(ssim_scores) + 1e-10)

        # Combine the scores (weighted average)
        anomaly_scores = 0.7 * mse_norm + 0.3 * ssim_norm

        # Re-normalize to match the threshold scale
        min_mse, max_mse = np.min(mse), np.max(mse)
        anomaly_scores = min_mse + anomaly_scores * (max_mse - min_mse)

    except Exception as e:
        print(f"Warning: Could not calculate SSIM: {e}")
        print("Using MSE only for anomaly detection")
        # Fallback to using just MSE
        anomaly_scores = mse

    # Determine if it's an anomaly based on threshold
    anomaly_pred = anomaly_scores > threshold

    return anomaly_pred, anomaly_scores, X_test_pred

In [11]:
# Function to visualize results
def visualize_results(X_test, X_test_pred, anomaly_pred, anomaly_scores, threshold, y_test=None, n_samples=10):
    # Create figure for visualization
    plt.figure(figsize=(20, 4 * n_samples))


    if y_test is not None:
        # Get indices for different prediction categories
        true_pos_idx = np.where((anomaly_pred == 1) & (y_test == 1))[0]
        false_pos_idx = np.where((anomaly_pred == 1) & (y_test == 0))[0]
        true_neg_idx = np.where((anomaly_pred == 0) & (y_test == 0))[0]
        false_neg_idx = np.where((anomaly_pred == 0) & (y_test == 1))[0]

        # Select samples from each category
        categories = {
            'True Positive (Anomaly)': true_pos_idx,
            'False Positive (Normal)': false_pos_idx,
            'True Negative (Normal)': true_neg_idx,
            'False Negative (Anomaly)': false_neg_idx
        }

        # Get balanced samples from each category
        indices = []
        for cat, idx_array in categories.items():
            if len(idx_array) > 0:
                # Take up to n_samples/4 from each category
                cat_samples = np.random.choice(idx_array, min(int(n_samples/4), len(idx_array)), replace=False)
                indices.extend(cat_samples)

        if len(indices) < n_samples:
            remaining = n_samples - len(indices)
            all_idx = np.arange(len(X_test))
            mask = np.ones(len(all_idx), dtype=bool)
            mask[indices] = False
            remaining_idx = all_idx[mask]
            additional = np.random.choice(remaining_idx, min(remaining, len(remaining_idx)), replace=False)
            indices.extend(additional)

        # Limit to n_samples
        indices = indices[:n_samples]
    else:

        indices = np.random.choice(range(len(X_test)), n_samples, replace=False)

    # Plot each sample
    for i, idx in enumerate(indices):
        # Original image
        plt.subplot(n_samples, 3, i*3 + 1)
        plt.imshow(X_test[idx])
        title = f"Original (Score: {anomaly_scores[idx]:.4f})"
        if y_test is not None:
            true_label = "Anomaly" if y_test[idx] == 1 else "Normal"
            title += f"\nTrue: {true_label}"
        plt.title(title)
        plt.axis('off')

        # Reconstructed image
        plt.subplot(n_samples, 3, i*3 + 2)
        plt.imshow(X_test_pred[idx])
        plt.title(f"Reconstructed")
        plt.axis('off')

        # Difference (heatmap)
        plt.subplot(n_samples, 3, i*3 + 3)
        diff = np.abs(X_test[idx] - X_test_pred[idx])
        plt.imshow(diff, cmap='hot')
        status = "Anomaly" if anomaly_pred[idx] else "Normal"
        plt.title(f"Difference ({status})")
        plt.axis('off')

    plt.tight_layout()
    plt.savefig('results/anomaly_detection_samples.png', dpi=200)
    plt.close()

    # Also create a confusion matrix visualization if we have labels
    if y_test is not None:
        cm = confusion_matrix(y_test, anomaly_pred)
        plt.figure(figsize=(8, 6))
        plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
        plt.title('Confusion Matrix')
        plt.colorbar()
        tick_marks = np.arange(2)
        plt.xticks(tick_marks, ['Normal', 'Anomaly'])
        plt.yticks(tick_marks, ['Normal', 'Anomaly'])

        # Add text annotations
        thresh = cm.max() / 2.
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                plt.text(j, i, format(cm[i, j], 'd'),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black")

        plt.ylabel('True label')
        plt.xlabel('Predicted label')
        plt.tight_layout()
        plt.savefig('results/confusion_matrix.png')
        plt.close()

In [12]:
# Function to install YOLOv8 if not present
def setup_yolo():
    print("Setting up YOLOv8 for traffic sign detection...")

    try:
        # Import to check if already installed
        import ultralytics
        print(f"Ultralytics (YOLOv8) already installed. Version: {ultralytics.__version__}")
    except ImportError:
        print("Installing Ultralytics (YOLOv8)...")
        import os
        os.system('pip install ultralytics')

    # Create models directory if it doesn't exist
    import os
    os.makedirs('models', exist_ok=True)

    # Download YOLOv8 weights if not present
    yolo_model_path = 'models/yolov8s.pt'
    if not os.path.exists(yolo_model_path):
        print("Downloading YOLOv8 weights...")
        from ultralytics import YOLO
        # download the model to the default location
        YOLO('yolov8s.pt')
        # Move it to our models directory if necessary
        import shutil
        if os.path.exists('yolov8s.pt') and not os.path.exists(yolo_model_path):
            shutil.move('yolov8s.pt', yolo_model_path)

    print("YOLOv8 setup complete.")

In [13]:
# Function to perform detection using YOLOv8 with our anomaly detection
def detect_and_analyze_image(image_path, autoencoder, threshold):
    # First make sure YOLO is set up
    setup_yolo()

    # Import necessary libraries
    from ultralytics import YOLO
    import numpy as np
    import cv2

    try:
        # Check if image path exists
        if not os.path.exists(image_path):
            raise FileNotFoundError(f"Image not found: {image_path}")

        # Load image
        img = cv2.imread(image_path)
        if img is None:
            raise ValueError(f"Could not read image: {image_path}")

        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        # Clone original image for visualization
        original_img = img_rgb.copy()

        # Load YOLO model
        yolo_model_path = 'models/yolov8s.pt'
        yolo_model = YOLO(yolo_model_path)

        # Run detection
        results = yolo_model(img_rgb, conf=0.25, iou=0.45)

        # Process results
        traffic_signs = []

        # Filter for traffic signs, traffic lights, and other relevant classes
        relevant_classes = ['traffic sign', 'traffic light', 'stop sign', 'signal']

        # Process each detection
        for result in results:
            boxes = result.boxes
            for box in boxes:
                # Get class name
                class_id = int(box.cls.item())
                class_name = result.names[class_id]

                # Check if the class is relevant or if confidence is high enough
                is_relevant = any(cls in class_name.lower() for cls in relevant_classes)
                confidence = float(box.conf.item())
                high_confidence = confidence > 0.6

                if is_relevant or high_confidence:
                    # Extract bounding box
                    x1, y1, x2, y2 = box.xyxy.squeeze().tolist()
                    x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)

                    # Make sure box has valid dimensions
                    if x2 <= x1 or y2 <= y1 or x2 - x1 < 10 or y2 - y1 < 10:
                        continue

                    # Crop the traffic sign with padding
                    padding = int(min((x2-x1), (y2-y1)) * 0.1)  # 10% padding
                    x1_pad = max(0, x1 - padding)
                    y1_pad = max(0, y1 - padding)
                    x2_pad = min(img_rgb.shape[1], x2 + padding)
                    y2_pad = min(img_rgb.shape[0], y2 + padding)

                    sign_img = img_rgb[y1_pad:y2_pad, x1_pad:x2_pad]

                    if sign_img.size == 0:  # Skip if empty image
                        continue

                    # Resize to our model's input size
                    sign_img_resized = cv2.resize(sign_img, (IMAGE_SIZE, IMAGE_SIZE))
                    sign_img_normalized = sign_img_resized.astype(np.float32) / 255.0

                    # Check for anomaly using our autoencoder
                    sign_img_expanded = np.expand_dims(sign_img_normalized, axis=0)
                    sign_img_pred = autoencoder.predict(sign_img_expanded, verbose=0)[0]

                    # Calculate both MSE and SSIM for more robust detection
                    mse = np.mean(np.square(sign_img_normalized - sign_img_pred))

                    # Calculate SSIM (Structural Similarity Index)
                    try:
                        from skimage.metrics import structural_similarity as ssim
                        ssim_value = ssim(
                            sign_img_normalized,
                            sign_img_pred,
                            multichannel=True,
                            channel_axis=2
                        )
                        ssim_score = 1 - ssim_value  # Convert to dissimilarity
                    except:
                        # Fall back to MSE if SSIM calculation fails
                        ssim_score = 0

                    # Combined anomaly score (weighted average)
                    anomaly_score = 0.7 * mse + 0.3 * ssim_score

                    # Generate difference map for visualization
                    diff_map = np.abs(sign_img_normalized - sign_img_pred)
                    diff_map_colored = cv2.applyColorMap((diff_map * 255).astype(np.uint8).mean(axis=2).astype(np.uint8), cv2.COLORMAP_JET)
                    diff_map_colored = cv2.cvtColor(diff_map_colored, cv2.COLOR_BGR2RGB)

                    # Determine if it's an anomaly
                    is_anomaly = anomaly_score > threshold

                    # Save relevant info
                    traffic_signs.append({
                        'bbox': (x1_pad, y1_pad, x2_pad, y2_pad),
                        'original_bbox': (x1, y1, x2, y2),
                        'class': class_name,
                        'confidence': confidence,
                        'is_anomaly': is_anomaly,
                        'anomaly_score': anomaly_score,
                        'mse': mse,
                        'ssim_score': ssim_score,
                        'original_img': sign_img_normalized,
                        'reconstructed_img': sign_img_pred,
                        'diff_map': diff_map_colored
                    })

        return original_img, traffic_signs

    except Exception as e:
        print(f"Error in detect_and_analyze_image: {e}")
        import traceback
        traceback.print_exc()
        return None, []

In [14]:
# Function to visualize detection and anomaly results with improved graphics
def visualize_detection_results(img, traffic_signs, output_path='results/detection_results.png'):
    if len(traffic_signs) == 0:
        print("No traffic signs detected.")
        plt.figure(figsize=(10, 8))
        plt.imshow(img)
        plt.title('No Traffic Signs Detected')
        plt.axis('off')
        plt.savefig(output_path, dpi=200)
        plt.close()
        return

    # Create a figure with appropriate size
    n_signs = min(5, len(traffic_signs))
    fig = plt.figure(figsize=(15, 5 + 3 * n_signs))

    # Main image with bounding boxes - larger subplot
    ax_main = plt.subplot2grid((n_signs + 2, 3), (0, 0), colspan=3, rowspan=2)
    ax_main.imshow(img)
    ax_main.set_title('Traffic Sign Detection', fontsize=16)

    # Draw bounding boxes with different colors for normal vs anomaly
    for sign in traffic_signs:
        x1, y1, x2, y2 = sign['bbox']

        # Color based on anomaly status (red for anomaly, green for normal)
        color = 'red' if sign['is_anomaly'] else 'green'

        # Create rectangle patch
        from matplotlib.patches import Rectangle
        rect = Rectangle((x1, y1), x2-x1, y2-y1,
                         linewidth=2, edgecolor=color, facecolor='none', alpha=0.8)
        ax_main.add_patch(rect)

        # Add text with class and score
        text = f"{sign['class']}: {sign['confidence']:.2f}\nAnomaly: {sign['is_anomaly']}"

        # Add text with background
        from matplotlib.patches import Rectangle
        text_bg = Rectangle((x1, y1-30), len(text)*5, 25, facecolor='black', alpha=0.5)
        ax_main.add_patch(text_bg)

        ax_main.text(x1, y1-10, text, color='white', fontsize=8,
                    bbox=dict(facecolor=color, alpha=0.5, boxstyle='round'))

    ax_main.axis('off')

    # Create header row for sign details
    col_headers = ['Original Sign', 'Reconstruction', 'Difference Map']
    for i, header in enumerate(col_headers):
        plt.figtext(0.2 + i*0.25, 0.75, header, ha='center', fontsize=14, fontweight='bold')

    # Show details for each sign - in a grid below
    for i in range(min(n_signs, len(traffic_signs))):
        sign = traffic_signs[i]

        # Original sign
        ax1 = plt.subplot2grid((n_signs + 2, 3), (i + 2, 0))
        ax1.imshow(sign['original_img'])
        label = f"{sign['class']} ({sign['confidence']:.2f})"
        ax1.set_title(label, fontsize=10)
        ax1.axis('off')

        # Reconstructed sign
        ax2 = plt.subplot2grid((n_signs + 2, 3), (i + 2, 1))
        ax2.imshow(sign['reconstructed_img'])
        recon_title = f"MSE: {sign['mse']:.4f}"
        ax2.set_title(recon_title, fontsize=10)
        ax2.axis('off')

        # Difference map
        ax3 = plt.subplot2grid((n_signs + 2, 3), (i + 2, 2))
        ax3.imshow(sign['diff_map'])
        anomaly_status = "ANOMALY" if sign['is_anomaly'] else "Normal"
        diff_title = f"{anomaly_status} (score: {sign['anomaly_score']:.4f})"
        ax3.set_title(diff_title, fontsize=10,
                     color='red' if sign['is_anomaly'] else 'green')
        ax3.axis('off')

    plt.tight_layout()
    plt.savefig(output_path, dpi=200, bbox_inches='tight')
    plt.close()

    # Also create a version with just the image and bounding boxes for quick reference
    plt.figure(figsize=(10, 8))
    plt.imshow(img)
    plt.title('Traffic Sign Detection Summary', fontsize=14)

    # Draw bounding boxes
    for sign in traffic_signs:
        x1, y1, x2, y2 = sign['bbox']
        color = 'red' if sign['is_anomaly'] else 'green'
        plt.gca().add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1,
                            fill=False, edgecolor=color, linewidth=3))

        text = f"{sign['class']}"
        if sign['is_anomaly']:
            text += " (ANOMALY)"

        plt.text(x1, y1-10, text, color='white', fontsize=10,
                bbox=dict(facecolor=color, alpha=0.7, boxstyle='round'))

    plt.axis('off')
    summary_path = output_path.replace('.png', '_summary.png')
    plt.savefig(summary_path, dpi=200, bbox_inches='tight')
    plt.close()

In [15]:
# Create a simple dashboard for results
def create_dashboard(results_dir='results', output_path='results/dashboard.html'):
    # Get all image files in results directory
    image_files = [f for f in os.listdir(results_dir) if f.endswith('.png')]

    # Create HTML content
    html_content = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>Traffic Sign Anomaly Detection Dashboard</title>
        <style>
            body {
                font-family: Arial, sans-serif;
                margin: 0;
                padding: 20px;
                background-color: #f5f5f5;
            }
            h1 {
                color: #333;
                text-align: center;
            }
            .dashboard {
                display: flex;
                flex-direction: column;
                gap: 20px;
                max-width: 1200px;
                margin: 0 auto;
            }
            .section {
                background-color: white;
                border-radius: 10px;
                box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
                padding: 20px;
            }
            .section-title {
                margin-top: 0;
                border-bottom: 1px solid #eee;
                padding-bottom: 10px;
                color: #444;
            }
            .image-container {
                text-align: center;
                margin: 15px 0;
            }
            img {
                max-width: 100%;
                height: auto;
                border-radius: 5px;
            }
            .description {
                color: #666;
                margin-bottom: 15px;
            }
        </style>
    </head>
    <body>
        <h1>Traffic Sign Anomaly Detection Dashboard</h1>
        <div class="dashboard">
    """

    # Add Training Section
    html_content += """
            <div class="section">
                <h2 class="section-title">Model Training</h2>
                <div class="description">
                    Training performance metrics and learning curves for the autoencoder model.
                </div>
    """

    if 'training_history.png' in image_files:
        html_content += """
                <div class="image-container">
                    <img src="training_history.png" alt="Training History">
                </div>
        """

    html_content += """
            </div>
    """

    # Add Threshold Analysis Section
    html_content += """
            <div class="section">
                <h2 class="section-title">Threshold Analysis</h2>
                <div class="description">
                    Analysis of anomaly detection thresholds showing error distributions and ROC curves.
                </div>
    """

    if 'threshold_analysis.png' in image_files:
        html_content += """
                <div class="image-container">
                    <img src="threshold_analysis.png" alt="Threshold Analysis">
                </div>
        """

    html_content += """
            </div>
    """

    # Add Detection Results Section
    html_content += """
            <div class="section">
                <h2 class="section-title">Detection Results</h2>
                <div class="description">
                    Results of traffic sign detection and anomaly classification.
                </div>
    """

    # Add detection-related images
    detection_images = [f for f in image_files if 'detection' in f]
    for image_file in detection_images:
        html_content += f"""
                <div class="image-container">
                    <img src="{image_file}" alt="{image_file.replace('.png', '')}">
                </div>
        """

    html_content += """
            </div>
    """

    # Add Anomaly Samples Section
    html_content += """
            <div class="section">
                <h2 class="section-title">Anomaly Samples</h2>
                <div class="description">
                    Examples of detected normal and anomalous traffic signs.
                </div>
    """

    if 'anomaly_detection_samples.png' in image_files:
        html_content += """
                <div class="image-container">
                    <img src="anomaly_detection_samples.png" alt="Anomaly Detection Samples">
                </div>
        """

    if 'confusion_matrix.png' in image_files:
        html_content += """
                <div class="image-container">
                    <img src="confusion_matrix.png" alt="Confusion Matrix">
                </div>
        """

    html_content += """
            </div>
        </div>
    </body>
    </html>
    """

    # Write the HTML file
    with open(output_path, 'w') as f:
        f.write(html_content)

    print(f"Dashboard created at {output_path}")

In [16]:
# Run automated tests to verify functionality
def run_tests():
    print("Running system tests...")

    test_results = {
        "data_loading": False,
        "model_building": False,
        "cuda_available": torch.cuda.is_available(),
        "yolov8_setup": False
    }

    # Test data loading
    try:
        # Create a small test dataset
        os.makedirs('data/test', exist_ok=True)
        test_img = np.ones((IMAGE_SIZE, IMAGE_SIZE, 3)) * 255
        test_img = test_img.astype(np.uint8)
        cv2.imwrite('data/test/test_image.jpg', test_img)

        # Try loading it
        test_img_loaded = cv2.imread('data/test/test_image.jpg')
        if test_img_loaded is not None:
            test_results["data_loading"] = True
    except Exception as e:
        print(f"Data loading test failed: {e}")

    # Test model building
    try:
        # Build a tiny model for testing
        input_img = Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3))
        x = Conv2D(4, (3, 3), padding='same')(input_img)
        x = MaxPooling2D((2, 2))(x)
        x = Flatten()(x)
        encoded = Dense(8)(x)
        x = Dense(8*8*4)(encoded)
        x = Reshape((8, 8, 4))(x)
        x = Conv2D(4, (3, 3), padding='same')(x)
        x = UpSampling2D((2, 2))(x)
        decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

        test_model = Model(input_img, decoded)
        test_model.compile(optimizer='adam', loss='mse')

        # Try a forward pass
        dummy_input = np.random.rand(1, IMAGE_SIZE, IMAGE_SIZE, 3)
        out = test_model.predict(dummy_input)

        if out.shape == (1, IMAGE_SIZE, IMAGE_SIZE, 3):
            test_results["model_building"] = True
    except Exception as e:
        print(f"Model building test failed: {e}")

    # Test YOLOv8 setup
    try:
        # Check if ultralytics is installed
        try:
            import ultralytics
            test_results["yolov8_setup"] = True
        except ImportError:
            # Check if pip is available for installation
            import subprocess
            subprocess.check_call(['pip', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            test_results["yolov8_setup"] = "Ready to install"
    except Exception as e:
        print(f"YOLOv8 setup test failed: {e}")

    # Print test summary
    print("\nTest Results Summary:")
    for test, result in test_results.items():
        status = "✅ PASSED" if result else "❌ FAILED"
        if isinstance(result, str):
            status = f"⚠️ {result}"
        print(f"{test}: {status}")

    all_passed = all(result for result in test_results.values() if not isinstance(result, str))
    if all_passed:
        print("\n✅ All tests passed! System is ready to run.")
    else:
        print("\n⚠️ Some tests failed. Please check the issues before running the full system.")

    return test_results

In [17]:
def main():
    print("=" * 80)
    print("Traffic Sign Anomaly Detection System (YOLOv8 Version)")
    print("=" * 80)

    # Run tests first
    test_results = run_tests()

    try:
        # Download and prepare the dataset
        download_and_prepare_dataset()

        # Load the dataset
        X, y = load_gtsrb_dataset()

        # For anomaly detection, allow selecting a specific class as "normal"
        # Let's use class 1 (speed limit 30) as our normal class by default
        normal_class = 1
        normal_classes = np.unique(y)

        print(f"Available classes: {normal_classes}")
        print(f"Using class {normal_class} as normal (non-anomalous) class")

        # Split the data with proper stratification
        X_normal = X[y == normal_class]
        X_anomaly = X[y != normal_class]
        y_anomaly = y[y != normal_class]  # Keep track of anomaly classes for stratification

        print(f"Normal samples: {len(X_normal)}")
        print(f"Anomaly samples: {len(X_anomaly)}")

        # Split the normal data into train, validation, and test
        X_train_normal, X_temp_normal = train_test_split(X_normal, test_size=0.3, random_state=42)
        X_val_normal, X_test_normal = train_test_split(X_temp_normal, test_size=0.5, random_state=42)

        # Split the anomaly data into validation and test, stratified by class
        X_val_anomaly, X_test_anomaly, y_val_anomaly, y_test_anomaly = train_test_split(
            X_anomaly, y_anomaly, test_size=0.5, random_state=42, stratify=y_anomaly
        )

        # Initialize autoencoder variable
        autoencoder = None
        model_path = 'models/autoencoder_final.h5'

        # Ensure models directory exists
        os.makedirs('models', exist_ok=True)

        # Attempt to load existing model with comprehensive error handling
        try:
            if os.path.exists(model_path):
                print(f"Attempting to load model from {model_path}")
                try:
                    # First, try loading full model
                    autoencoder = load_model(model_path)
                    print("Successfully loaded full model!")
                except Exception as full_model_error:
                    print(f"Full model load failed: {full_model_error}")
                    print("Attempting to rebuild model and load weights...")

                    # Rebuild model architecture
                    temp_autoencoder, temp_encoder = build_autoencoder()

                    try:
                        # Try loading just the weights
                        temp_autoencoder.load_weights(model_path)
                        autoencoder = temp_autoencoder
                        print("Successfully loaded model weights!")
                    except Exception as weights_error:
                        print(f"Weight loading failed: {weights_error}")
                        autoencoder = None
        except Exception as e:
            print(f"Unexpected error in model loading: {e}")
            autoencoder = None

        # If no existing model was successfully loaded, train a new one
        if autoencoder is None:
            print("Training a new autoencoder model...")
            autoencoder, encoder, history = train_autoencoder(X_train_normal, X_val_normal)

            # Save the newly trained model
            autoencoder.save(model_path)
            print(f"New model saved to {model_path}")

        # Check if we have a saved threshold
        threshold_path = 'models/threshold_info.pkl'
        if os.path.exists(threshold_path):
            print(f"Loading threshold information from {threshold_path}")
            with open(threshold_path, 'rb') as f:
                threshold_info = pickle.load(f)
            optimal_threshold = threshold_info['optimal_threshold']
            print(f"Using saved threshold: {optimal_threshold:.6f} (method: {threshold_info['method']})")
        else:
            # Find optimal threshold
            optimal_threshold = find_optimal_threshold(autoencoder, X_val_normal, X_val_anomaly)

        # Create test set with mixed normal and anomaly samples
        n_test = min(len(X_test_normal), len(X_test_anomaly))
        X_test = np.concatenate([X_test_normal[:n_test], X_test_anomaly[:n_test]])
        y_test = np.array([0] * n_test + [1] * n_test)  # 0 for normal, 1 for anomaly

        # Detect anomalies
        anomaly_pred, anomaly_scores, X_test_pred = detect_anomalies(autoencoder, X_test, optimal_threshold)

        # Evaluate performance
        print("Anomaly Detection Performance:")
        print(classification_report(y_test, anomaly_pred))
        print("Confusion Matrix:")
        print(confusion_matrix(y_test, anomaly_pred))

        # Visualize results
        visualize_results(X_test, X_test_pred, anomaly_pred, anomaly_scores, optimal_threshold, y_test)

        # Setup YOLOv8 for traffic sign detection
        setup_yolo()

        # Process a test image if available
        test_image_paths = [
            'sample_traffic_image.jpg',
            'data/sample_traffic.jpg',
            'data/traffic_signs_test.jpg'
        ]

        test_image_found = False
        for image_path in test_image_paths:
            if os.path.exists(image_path):
                print(f"Processing test image: {image_path}")
                img, traffic_signs = detect_and_analyze_image(image_path, autoencoder, optimal_threshold)

                if img is not None:
                    visualize_detection_results(img, traffic_signs)
                    test_image_found = True
                    break

        if not test_image_found:
            print("No test images found. Please provide a sample image to test the full pipeline.")
            print("You can place an image in one of these locations:", test_image_paths)

        # Create dashboard
        create_dashboard()

        print("\n" + "=" * 80)
        print("Traffic Sign Anomaly Detection System (YOLOv8) completed successfully!")
        print("=" * 80)
        print("You can now:")
        print("1. View the results in the 'results' directory")
        print("2. Open the dashboard at 'results/dashboard.html'")
        print("3. Process your own images using the detect_and_analyze_image() function")

    except Exception as e:
        print(f"Error in main execution: {e}")
        import traceback
        traceback.print_exc()

In [18]:
if __name__ == "__main__":
    main()

Traffic Sign Anomaly Detection System (YOLOv8 Version)
Running system tests...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step

Test Results Summary:
data_loading: ✅ PASSED
model_building: ❌ FAILED
cuda_available: ✅ PASSED
yolov8_setup: ⚠️ Ready to install

⚠️ Some tests failed. Please check the issues before running the full system.
Downloading the German Traffic Sign Recognition Benchmark (GTSRB) dataset...


data/GTSRB-Training_fixed.zip:   0%|          | 0.00/179M [00:00<?, ?B/s]

Extracting dataset...
Dataset downloaded and extracted successfully.
Loading and preprocessing the dataset...
Using alternative data directory: data/GTSRB/Training/


Loading classes:   0%|          | 0/43 [00:00<?, ?it/s]

Class 00013:   0%|          | 0/1440 [00:00<?, ?it/s]

Class 00002:   0%|          | 0/1500 [00:00<?, ?it/s]

Class 00041:   0%|          | 0/180 [00:00<?, ?it/s]

Class 00028:   0%|          | 0/360 [00:00<?, ?it/s]

Class 00008:   0%|          | 0/960 [00:00<?, ?it/s]

Class 00032:   0%|          | 0/180 [00:00<?, ?it/s]

Class 00024:   0%|          | 0/180 [00:00<?, ?it/s]

Class 00016:   0%|          | 0/300 [00:00<?, ?it/s]

Class 00018:   0%|          | 0/810 [00:00<?, ?it/s]

Class 00021:   0%|          | 0/240 [00:00<?, ?it/s]

Class 00027:   0%|          | 0/180 [00:00<?, ?it/s]

Class 00031:   0%|          | 0/540 [00:00<?, ?it/s]

Class 00022:   0%|          | 0/270 [00:00<?, ?it/s]

Class 00035:   0%|          | 0/810 [00:00<?, ?it/s]

Class 00037:   0%|          | 0/150 [00:00<?, ?it/s]

Class 00036:   0%|          | 0/270 [00:00<?, ?it/s]

Class 00007:   0%|          | 0/960 [00:00<?, ?it/s]

Class 00009:   0%|          | 0/990 [00:00<?, ?it/s]

Class 00006:   0%|          | 0/300 [00:00<?, ?it/s]

Class 00020:   0%|          | 0/240 [00:00<?, ?it/s]

Class 00017:   0%|          | 0/750 [00:00<?, ?it/s]

Class 00004:   0%|          | 0/1320 [00:00<?, ?it/s]

Class 00033:   0%|          | 0/480 [00:00<?, ?it/s]

Class 00030:   0%|          | 0/300 [00:00<?, ?it/s]

Class 00025:   0%|          | 0/1020 [00:00<?, ?it/s]

Class 00019:   0%|          | 0/150 [00:00<?, ?it/s]

Class 00010:   0%|          | 0/1350 [00:00<?, ?it/s]

Class 00042:   0%|          | 0/180 [00:00<?, ?it/s]

Class 00026:   0%|          | 0/420 [00:00<?, ?it/s]

Class 00029:   0%|          | 0/180 [00:00<?, ?it/s]

Class 00012:   0%|          | 0/1410 [00:00<?, ?it/s]

Class 00011:   0%|          | 0/900 [00:00<?, ?it/s]

Class 00034:   0%|          | 0/300 [00:00<?, ?it/s]

Class 00005:   0%|          | 0/1260 [00:00<?, ?it/s]

Class 00001:   0%|          | 0/1500 [00:00<?, ?it/s]

Class 00039:   0%|          | 0/210 [00:00<?, ?it/s]

Class 00000:   0%|          | 0/150 [00:00<?, ?it/s]

Class 00015:   0%|          | 0/420 [00:00<?, ?it/s]

Class 00014:   0%|          | 0/540 [00:00<?, ?it/s]

Class 00023:   0%|          | 0/360 [00:00<?, ?it/s]

Class 00038:   0%|          | 0/1380 [00:00<?, ?it/s]

Class 00040:   0%|          | 0/240 [00:00<?, ?it/s]

Class 00003:   0%|          | 0/960 [00:00<?, ?it/s]

Dataset loaded: 26640 images with shape (64, 64, 3).
Number of classes: 43
Available classes: [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42]
Using class 1 as normal (non-anomalous) class
Normal samples: 1500
Anomaly samples: 25140
Training a new autoencoder model...
Building and training the autoencoder...


  self._warn_if_super_not_called()


Epoch 1/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 273ms/step - loss: 0.0772 - mae: 0.2255
Epoch 1: val_loss improved from inf to 0.17191, saving model to models/autoencoder_best.h5




[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 438ms/step - loss: 0.0770 - mae: 0.2251 - val_loss: 0.1719 - val_mae: 0.3608 - learning_rate: 0.0010
Epoch 2/100
[1m 1/32[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m1s[0m 32ms/step - loss: 0.0842 - mae: 0.2372
Epoch 2: val_loss improved from 0.17191 to 0.14561, saving model to models/autoencoder_best.h5




[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 75ms/step - loss: 0.0842 - mae: 0.2372 - val_loss: 0.1456 - val_mae: 0.3287 - learning_rate: 0.0010
Epoch 3/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 58ms/step - loss: 0.0592 - mae: 0.1937
Epoch 3: val_loss improved from 0.14561 to 0.07484, saving model to models/autoencoder_best.h5




[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 135ms/step - loss: 0.0592 - mae: 0.1938 - val_loss: 0.0748 - val_mae: 0.2357 - learning_rate: 0.0010
Epoch 4/100
[1m 1/32[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 32ms/step - loss: 0.0564 - mae: 0.1912
Epoch 4: val_loss did not improve from 0.07484
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 61ms/step - loss: 0.0564 - mae: 0.1912 - val_loss: 0.0837 - val_mae: 0.2493 - learning_rate: 0.0010
Epoch 5/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 58ms/step - loss: 0.0618 - mae: 0.2010
Epoch 5: val_loss did not improve from 0.07484
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 122ms/step - loss: 0.0618 - mae: 0.2010 - val_loss: 0.0756 - val_mae: 0.2391 - learning_rate: 0.0010
Epoch 6/100
[1m 1/32[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 31ms/step - loss: 0.0461 - mae: 0.1759
Epoch 6: val_lo



[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 72ms/step - loss: 0.0461 - mae: 0.1759 - val_loss: 0.0717 - val_mae: 0.2332 - learning_rate: 0.0010
Epoch 7/100
[1m31/32[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 64ms/step - loss: 0.0564 - mae: 0.1890
Epoch 7: val_loss improved from 0.07169 to 0.04742, saving model to models/autoencoder_best.h5




[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 139ms/step - loss: 0.0566 - mae: 0.1892 - val_loss: 0.0474 - val_mae: 0.1878 - learning_rate: 0.0010
Epoch 8/100
[1m 1/32[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 31ms/step - loss: 0.0542 - mae: 0.1834
Epoch 8: val_loss did not improve from 0.04742
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 62ms/step - loss: 0.0542 - mae: 0.1834 - val_loss: 0.0485 - val_mae: 0.1903 - learning_rate: 0.0010
Epoch 9/100
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 58ms/step - loss: 0.0584 - mae: 0.1944
Epoch 9: val_loss did not improve from 0.04742
[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 122ms/step - loss: 0.0584 - mae: 0.1943 - val_loss: 0.0490 - val_mae: 0.1802 - learning_rate: 0.0010
Epoch 10/100
[1m 1/32[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 31ms/step - loss: 0.0590 - mae: 0.1841
Epoch 10: val_l



Training completed in 2.11 minutes.




New model saved to models/autoencoder_final.h5
Finding optimal threshold for anomaly detection...
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 235ms/step
[1m393/393[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 8ms/step
Optimal threshold determined (Youden's J statistic): 0.050768
F1 Score at optimal threshold: 0.8182
ROC AUC: 0.7194
Precision-Recall AUC: 0.9926
Detecting anomalies...
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 88ms/step
Using MSE only for anomaly detection
Anomaly Detection Performance:
              precision    recall  f1-score   support

           0       0.72      0.67      0.69       225
           1       0.69      0.73      0.71       225

    accuracy                           0.70       450
   macro avg       0.70      0.70      0.70       450
weighted avg       0.70      0.70      0.70       450

Confusion Matrix:
[[151  74]
 [ 60 165]]
Setting up YOLOv8 for traffic sign detection...
Installing Ultralytics (YOLO

100%|██████████| 21.5M/21.5M [00:00<00:00, 211MB/s]


YOLOv8 setup complete.
No test images found. Please provide a sample image to test the full pipeline.
You can place an image in one of these locations: ['sample_traffic_image.jpg', 'data/sample_traffic.jpg', 'data/traffic_signs_test.jpg']
Dashboard created at results/dashboard.html

Traffic Sign Anomaly Detection System (YOLOv8) completed successfully!
You can now:
1. View the results in the 'results' directory
2. Open the dashboard at 'results/dashboard.html'
3. Process your own images using the detect_and_analyze_image() function
