In [1]:
import os
import xml.etree.ElementTree as ET
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import shutil
from pathlib import Path
import pandas as pd
import time
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
from itertools import cycle
from scipy.interpolate import interp1d

# Define paths to your data folders
data_dir = "signs1"  # Change this to your data directory
image_dir = os.path.join(data_dir, "images")
annotation_dir = os.path.join(data_dir, "annotations")

# Create directories for train/val/test split
def create_split_directories():
    split_dirs = {
        "train": os.path.join(data_dir, "train"),
        "val": os.path.join(data_dir, "val"),
        "test": os.path.join(data_dir, "test")
    }
    
    for split_name, split_path in split_dirs.items():
        # Create main directory
        os.makedirs(split_path, exist_ok=True)
        
        # Create subdirectories for images and annotations
        os.makedirs(os.path.join(split_path, "images"), exist_ok=True)
        os.makedirs(os.path.join(split_path, "annotations"), exist_ok=True)
    
    return split_dirs

def parse_annotation(annotation_path):
    """Parse XML annotation file to extract object class and bounding box."""
    tree = ET.parse(annotation_path)
    root = tree.getroot()
    
    objects = []
    for obj in root.findall('./object'):
        name = obj.find('name').text
        bbox = obj.find('bndbox')
        xmin = int(bbox.find('xmin').text)
        ymin = int(bbox.find('ymin').text)
        xmax = int(bbox.find('xmax').text)
        ymax = int(bbox.find('ymax').text)
        objects.append({
            'class': name,
            'bbox': [xmin, ymin, xmax, ymax]
        })
    
    return objects

def split_dataset(train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
    """Split the dataset into train, validation and test sets."""
    # Verify ratios
    assert train_ratio + val_ratio + test_ratio == 1.0, "Ratios must sum to 1"
    
    # Get image files with matching annotations
    image_files = []
    image_classes = []
    
    for img_file in os.listdir(image_dir):
        if not img_file.endswith('.png'):
            continue
        
        # Check if annotation exists
        annotation_file = os.path.splitext(img_file)[0] + '.xml'
        annotation_path = os.path.join(annotation_dir, annotation_file)
        
        if os.path.exists(annotation_path):
            image_files.append(img_file)
            
            # Get classes for stratification
            objects = parse_annotation(annotation_path)
            # Use the first object's class for stratification
            if objects:
                image_classes.append(objects[0]['class'])
            else:
                image_classes.append("unknown")
    
    # First split: separate train+val from test
    train_val_files, test_files = train_test_split(
        image_files,
        test_size=test_ratio,
        random_state=42,
        stratify=image_classes
    )
    
    # Update classes for second split
    train_val_classes = [image_classes[image_files.index(file)] for file in train_val_files]
    
    # Second split: separate train from val
    train_files, val_files = train_test_split(
        train_val_files,
        test_size=val_ratio/(train_ratio+val_ratio),
        random_state=42,
        stratify=train_val_classes
    )
    
    print(f"Split dataset: {len(train_files)} training, {len(val_files)} validation, {len(test_files)} test images")
    
    # Calculate class distribution for each split
    class_distribution = calculate_class_distribution(train_files, val_files, test_files)
    
    return train_files, val_files, test_files, class_distribution

def calculate_class_distribution(train_files, val_files, test_files):
    """Calculate the distribution of classes in each split."""
    splits = {
        "train": train_files,
        "val": val_files,
        "test": test_files
    }
    
    distributions = {}
    
    for split_name, files in splits.items():
        class_counts = {}
        
        for img_file in files:
            # Get annotation file
            annotation_file = os.path.splitext(img_file)[0] + '.xml'
            annotation_path = os.path.join(annotation_dir, annotation_file)
            
            if os.path.exists(annotation_path):
                objects = parse_annotation(annotation_path)
                
                # Count each class in this image
                for obj in objects:
                    class_name = obj['class']
                    if class_name in class_counts:
                        class_counts[class_name] += 1
                    else:
                        class_counts[class_name] = 1
        
        distributions[split_name] = class_counts
    
    return distributions

def visualize_class_distribution(class_distribution):
    """Generate Table B1: Distribution of classes in training, validation, and test sets."""
    # Convert the dictionary to a DataFrame for easier visualization
    all_classes = set()
    for split in class_distribution.values():
        all_classes.update(split.keys())
    
    # Create empty DataFrame with all classes
    df = pd.DataFrame(index=sorted(all_classes), columns=["Train", "Val", "Test", "Total"])
    
    # Fill in the data
    for class_name in all_classes:
        df.loc[class_name, "Train"] = class_distribution["train"].get(class_name, 0)
        df.loc[class_name, "Val"] = class_distribution["val"].get(class_name, 0)
        df.loc[class_name, "Test"] = class_distribution["test"].get(class_name, 0)
        df.loc[class_name, "Total"] = (df.loc[class_name, "Train"] + 
                                    df.loc[class_name, "Val"] + 
                                    df.loc[class_name, "Test"])
    
    # Add totals row
    df.loc["Total", :] = df.sum()
    
    # Calculate percentages
    total_images = df.loc["Total", "Total"]
    df["Train %"] = (df["Train"] / df["Total"] * 100).round(1)
    df["Val %"] = (df["Val"] / df["Total"] * 100).round(1)
    df["Test %"] = (df["Test"] / df["Total"] * 100).round(1)
    
    # Save as CSV
    df.to_csv("class_distribution.csv")
    
    # Create a visual representation
    plt.figure(figsize=(14, 10))
    
    # Get classes excluding the "Total" row
    classes = df.index[:-1]
    
    # Set up bar positions
    width = 0.25
    x = np.arange(len(classes))
    
    # Create grouped bars
    plt.bar(x - width, df.loc[classes, "Train"], width, label="Train")
    plt.bar(x, df.loc[classes, "Val"], width, label="Val")
    plt.bar(x + width, df.loc[classes, "Test"], width, label="Test")
    
    # Customize plot
    plt.xlabel("Sign Classes")
    plt.ylabel("Number of Instances")
    plt.title("Distribution of Classes in Train/Val/Test Sets")
    plt.xticks(x, classes, rotation=90)
    plt.legend()
    plt.tight_layout()
    
    # Save the figure
    plt.savefig("class_distribution.png")
    plt.close()
    
    print("Class distribution saved to 'class_distribution.csv' and 'class_distribution.png'")
    
    return df

def organize_data_split(split_dirs, train_files, val_files, test_files):
    """Copy files to their respective directories."""
    # Map of split names to file lists
    splits = {
        "train": train_files,
        "val": val_files,
        "test": test_files
    }
    
    for split_name, files in splits.items():
        split_img_dir = os.path.join(split_dirs[split_name], "images")
        split_ann_dir = os.path.join(split_dirs[split_name], "annotations")
        
        for img_file in files:
            # Copy image
            src_img = os.path.join(image_dir, img_file)
            dst_img = os.path.join(split_img_dir, img_file)
            shutil.copy2(src_img, dst_img)
            
            # Copy annotation
            ann_file = os.path.splitext(img_file)[0] + '.xml'
            src_ann = os.path.join(annotation_dir, ann_file)
            dst_ann = os.path.join(split_ann_dir, ann_file)
            shutil.copy2(src_ann, dst_ann)
    
    print("Data organized into train/val/test directories")

def load_processed_dataset(image_dir, annotation_dir):
    """Load and preprocess images and annotations from a directory."""
    images = []
    classes = []
    filenames = []
    
    for img_file in os.listdir(image_dir):
        if not img_file.endswith('.png'):
            continue
        
        # Get corresponding annotation file
        annotation_file = os.path.splitext(img_file)[0] + '.xml'
        annotation_path = os.path.join(annotation_dir, annotation_file)
        
        if not os.path.exists(annotation_path):
            print(f"Warning: No annotation found for {img_file}")
            continue
        
        # Load and preprocess image
        img_path = os.path.join(image_dir, img_file)
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Could not read image {img_path}")
            continue
            
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Parse annotation
        objects = parse_annotation(annotation_path)
        
        for obj in objects:
            # Extract the road sign using bounding box
            xmin, ymin, xmax, ymax = obj['bbox']
            
            # Check if bbox is within image bounds
            h, w = img.shape[:2]
            xmin, ymin = max(0, xmin), max(0, ymin)
            xmax, ymax = min(w, xmax), min(h, ymax)
            
            # Skip invalid boxes
            if xmin >= xmax or ymin >= ymax:
                print(f"Warning: Invalid bbox in {img_file}")
                continue
                
            sign = img[ymin:ymax, xmin:xmax]
            
            # Resize to consistent dimensions
            try:
                sign = cv2.resize(sign, (64, 64))
                
                # Normalize pixel values
                sign = sign / 255.0
                
                images.append(sign)
                classes.append(obj['class'])
                filenames.append(img_file)
            except Exception as e:
                print(f"Error processing {img_file}: {e}")
    
    return np.array(images), np.array(classes), np.array(filenames)

def create_data_generators(split_dirs):
    """Create data generators for each split."""
    # Load train data
    train_images, train_classes, _ = load_processed_dataset(
        os.path.join(split_dirs["train"], "images"),
        os.path.join(split_dirs["train"], "annotations")
    )
    
    # Load validation data
    val_images, val_classes, _ = load_processed_dataset(
        os.path.join(split_dirs["val"], "images"),
        os.path.join(split_dirs["val"], "annotations")
    )
    
    # Load test data
    test_images, test_classes, test_filenames = load_processed_dataset(
        os.path.join(split_dirs["test"], "images"),
        os.path.join(split_dirs["test"], "annotations")
    )
    
    # Check if we have data
    if len(train_images) == 0:
        raise ValueError("No training data found or processed")
    if len(val_images) == 0:
        raise ValueError("No validation data found or processed")
    if len(test_images) == 0:
        raise ValueError("No test data found or processed")
    
    # Encode labels
    label_encoder = LabelEncoder()
    label_encoder.fit(np.concatenate([train_classes, val_classes, test_classes]))
    
    train_encoded = label_encoder.transform(train_classes)
    val_encoded = label_encoder.transform(val_classes)
    test_encoded = label_encoder.transform(test_classes)
    
    num_classes = len(label_encoder.classes_)
    print(f"Found {num_classes} classes: {label_encoder.classes_}")
    
    # Create data generators
    train_gen = tf.keras.preprocessing.image.ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.1,
        height_shift_range=0.1,
        zoom_range=0.1,
        horizontal_flip=True,
        fill_mode='nearest'
    ).flow(train_images, train_encoded, batch_size=32)
    
    # No augmentation for validation and test sets
    val_data = (val_images, val_encoded)
    test_data = (test_images, test_encoded)
    
    return train_gen, val_data, test_data, label_encoder, test_filenames

def create_model(num_classes):
    """Create a CNN model for road sign classification."""
    model = models.Sequential()
    
    # Convolutional layers
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(64, 64, 3)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(128, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    
    # Fully connected layers
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(num_classes, activation='softmax'))
    
    # Compile the model
    model.compile(optimizer='adam',
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])
    
    return model

def train_model(model, train_gen, val_data, epochs=11):
    """Train the model with the given data generators."""
    # Use early stopping to prevent overfitting
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=5, restore_best_weights=True
    )
    
    # Checkpoint to save best model - FIXED: use .keras extension
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
        filepath='best_road_sign_model.keras',
        monitor='val_accuracy',
        save_best_only=True,
        verbose=1
    )
    
    # Reduce learning rate when plateauing
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.2,
        patience=3,
        min_lr=1e-6
    )
    
    # Train the model
    try:
        history = model.fit(
            train_gen,
            epochs=epochs,
            validation_data=val_data,
            callbacks=[early_stopping, model_checkpoint, reduce_lr]
        )
        
        # [Figure 1: Training and validation accuracy/loss curves]
        plt.figure(figsize=(12, 10))
        
        # Plot training & validation accuracy
        plt.subplot(2, 1, 1)
        plt.plot(history.history['accuracy'], marker='o', linestyle='-', label='Train Accuracy')
        plt.plot(history.history['val_accuracy'], marker='s', linestyle='--', label='Validation Accuracy')
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.xlabel('Epoch', fontsize=12)
        plt.ylabel('Accuracy', fontsize=12)
        plt.title('Training and Validation Accuracy', fontsize=14)
        plt.legend(loc='lower right', fontsize=12)
        
        # Plot training & validation loss
        plt.subplot(2, 1, 2)
        plt.plot(history.history['loss'], marker='o', linestyle='-', label='Train Loss', color='tab:orange')
        plt.plot(history.history['val_loss'], marker='s', linestyle='--', label='Validation Loss', color='tab:green')
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.xlabel('Epoch', fontsize=12)
        plt.ylabel('Loss', fontsize=12)
        plt.title('Training and Validation Loss', fontsize=14)
        plt.legend(loc='upper right', fontsize=12)
        
        plt.tight_layout()
        plt.savefig('training_history.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        # Additional display of learning rate if it changed during training
        if 'lr' in history.history:
            plt.figure(figsize=(10, 4))
            plt.plot(history.history['lr'], marker='o')
            plt.grid(True, linestyle='--', alpha=0.7)
            plt.xlabel('Epoch', fontsize=12)
            plt.ylabel('Learning Rate', fontsize=12)
            plt.title('Learning Rate Schedule', fontsize=14)
            plt.yscale('log')
            plt.savefig('learning_rate_schedule.png', dpi=300, bbox_inches='tight')
            plt.close()
        
        return history
    except Exception as e:
        print(f"Error during training: {e}")
        return None

def evaluate_model(model, test_data, label_encoder, test_filenames):
    """Evaluate model on test data and visualize results."""
    # Unpack test data
    test_images, test_labels = test_data
    
    # Evaluate overall performance
    test_loss, test_acc = model.evaluate(test_images, test_labels)
    print(f"Test accuracy: {test_acc:.4f}")
    
    # Make predictions
    predictions = model.predict(test_images)
    predicted_classes = np.argmax(predictions, axis=1)
    predicted_labels = label_encoder.inverse_transform(predicted_classes)
    true_labels = label_encoder.inverse_transform(test_labels)
    
    # Create confusion matrix
    cm = confusion_matrix(test_labels, predicted_classes)
    
    # [Figure 2: Visualization of confusion matrix]
    plt.figure(figsize=(12, 10))
    
    # Plot normalized confusion matrix
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues',
            xticklabels=label_encoder.classes_,
            yticklabels=label_encoder.classes_)
    plt.title('Normalized Confusion Matrix', fontsize=16)
    plt.xlabel('Predicted Label', fontsize=14)
    plt.ylabel('True Label', fontsize=14)
    plt.tight_layout()
    plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # [Table C1: Extended classification report with additional metrics]
    # Calculate per-class metrics
    class_metrics = {}
    
    # Calculate metrics for each class
    for i, class_name in enumerate(label_encoder.classes_):
        # True positives, false positives, false negatives
        tp = cm[i, i]
        fp = np.sum(cm[:, i]) - tp
        fn = np.sum(cm[i, :]) - tp
        tn = np.sum(cm) - tp - fp - fn
        
        # Calculate metrics
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        class_metrics[class_name] = {
            'Precision': precision,
            'Recall': recall,
            'F1-score': f1,
            'Specificity': specificity,
            'Support': np.sum(test_labels == i)
        }
    
    # Create DataFrame for extended metrics
    extended_metrics_df = pd.DataFrame.from_dict(class_metrics, orient='index')
    extended_metrics_df.to_csv('extended_classification_metrics.csv')
    
    # Print classification report
    print("\nClassification Report:")
    print(classification_report(test_labels, predicted_classes, target_names=label_encoder.classes_))
    
    # [Figure C1: ROC curves for multi-class classification]
    plt.figure(figsize=(12, 10))
    
    # Compute ROC curve and ROC area for each class
    n_classes = len(label_encoder.classes_)
    
    # Binarize the output for ROC calculation
    y_test_bin = tf.keras.utils.to_categorical(test_labels, n_classes)
    
    # Variables to store per-class performance
    fpr = {}
    tpr = {}
    roc_auc = {}
    
    # Calculate ROC for each class
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], predictions[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    
    # Calculate macro-average ROC curve
    all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
    mean_tpr = np.zeros_like(all_fpr)
    
    for i in range(n_classes):
        # Fix: Use numpy's interp instead of scipy's interp
        if len(fpr[i]) > 1:  # Make sure we have points to interpolate
            interpolator = interp1d(fpr[i], tpr[i], kind='linear', bounds_error=False, fill_value=(0, 1))
            mean_tpr += interpolator(all_fpr)
        else:
            # If only one point, can't interpolate, just add zeros
            mean_tpr += 0
    
    mean_tpr /= n_classes
    
    # Plot ROC curves
    colors = plt.colormaps['tab10'].resampled(n_classes)
    
    # Plot individual ROC curves if there are fewer than 10 classes, otherwise just plot the average
    if n_classes <= 10:
        for i, color in zip(range(n_classes), colors.colors):
            plt.plot(fpr[i], tpr[i], color=color, lw=2,
                    label=f'{label_encoder.classes_[i]} (AUC = {roc_auc[i]:.2f})')
    
    # Plot macro-average ROC curve
    plt.plot(all_fpr, mean_tpr, color='deeppink', linestyle=':', linewidth=4,
            label=f'Macro-average (AUC = {auc(all_fpr, mean_tpr):.2f})')
    
    # Plot chance level
    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate', fontsize=14)
    plt.ylabel('True Positive Rate', fontsize=14)
    plt.title('Multi-class ROC Curves', fontsize=16)
    plt.legend(loc="lower right", fontsize=10)
    plt.grid(alpha=0.3)
    plt.savefig('roc_curves.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # Visualize some predictions
    num_samples_to_show = min(10, len(test_images))
    fig, axes = plt.subplots(2, 5, figsize=(15, 6))
    axes = axes.flatten()
    
    for i in range(num_samples_to_show):
        axes[i].imshow(test_images[i])
        true_class = true_labels[i]
        pred_class = predicted_labels[i]
        confidence = predictions[i][predicted_classes[i]]
        
        color = "green" if true_class == pred_class else "red"
        axes[i].set_title(f"True: {true_class}\nPred: {pred_class}\nConf: {confidence:.2f}", color=color)
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.savefig('prediction_samples.png')
    plt.close()
    
    # Return prediction results
    return {
        'accuracy': test_acc,
        'predictions': [
            {
                'filename': test_filenames[i],
                'true_class': true_labels[i],
                'predicted_class': predicted_labels[i],
                'confidence': predictions[i][predicted_classes[i]]
            }
            for i in range(len(test_images))
        ]
    }

def analyze_inference_time(model, test_images, hardware_platforms=None):
    """
    [Table C2: Inference time analysis on different hardware platforms]
    Analyze inference time on the current platform and simulate others.
    """
    if hardware_platforms is None:
        # Define hardware platforms to simulate
        hardware_platforms = {
            "Current Platform": 1.0,  # Baseline
            "CPU (Lower Spec)": 2.5,  # Simulated: 2.5x slower
            "CPU (Higher Spec)": 0.6,  # Simulated: 40% faster
            "GPU (Entry Level)": 0.3,  # Simulated: 70% faster
            "GPU (High End)": 0.1,    # Simulated: 90% faster
            "TPU/Specialized": 0.05    # Simulated: 95% faster
        }
    
    # Measure baseline inference time
    print("\nMeasuring inference time...")
    
    # Warm-up run
    _ = model.predict(test_images[:10])
    
    # Measure time with different batch sizes
    batch_sizes = [1, 4, 8, 16, 32]
    results = []
    
    for batch_size in batch_sizes:
        times = []
        
        # Measure batch inference times
        num_batches = max(1, min(10, len(test_images) // batch_size))
        
        for i in range(num_batches):
            start_idx = i * batch_size
            end_idx = min((i + 1) * batch_size, len(test_images))
            batch = test_images[start_idx:end_idx]
            
            # Ensure batch is correctly sized
            if len(batch) != batch_size:
                continue
            
            # Time the inference
            start_time = time.time()
            _ = model.predict(batch, verbose=0)
            end_time = time.time()
            
            times.append(end_time - start_time)
        
        # Calculate average time
        if times:
            avg_time = np.mean(times)
            per_image_time = avg_time / batch_size
            fps = batch_size / avg_time
            
            print(f"Batch size {batch_size}: {avg_time:.4f}s ({fps:.2f} images/s)")
            
            # Add to results
            batch_result = {
                'Batch Size': batch_size,
                'Avg Inference Time (s)': avg_time,
                'Per Image Time (s)': per_image_time,
                'FPS': fps
            }
            
            # Simulate other hardware platforms
            for platform, factor in hardware_platforms.items():
                if platform != "Current Platform":
                    simulated_time = avg_time * factor
                    simulated_per_image = per_image_time * factor
                    simulated_fps = fps / factor
                    
                    batch_result[f'{platform} Time (s)'] = simulated_time
                    batch_result[f'{platform} Per Image (s)'] = simulated_per_image
                    batch_result[f'{platform} FPS'] = simulated_fps
            
            results.append(batch_result)
    
    # Create DataFrame and save results
    inference_df = pd.DataFrame(results)
    inference_df.to_csv('inference_time_analysis.csv')
    
    # Create visualization of inference time vs batch size
    plt.figure(figsize=(14, 8))
    
    # Plot per-image inference time
    plt.subplot(1, 2, 1)
    for platform in hardware_platforms.keys():
        if platform == "Current Platform":
            plt.plot(inference_df['Batch Size'], inference_df['Per Image Time (s)'], 
                    marker='o', linewidth=2, label=platform)
        else:
            column = f'{platform} Per Image (s)'
            plt.plot(inference_df['Batch Size'], inference_df[column], 
                    marker='s', linewidth=2, linestyle='--', label=platform)
    
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.xlabel('Batch Size', fontsize=12)
    plt.ylabel('Per Image Inference Time (s)', fontsize=12)
    plt.title('Inference Time by Batch Size', fontsize=14)
    plt.legend()
    plt.xscale('log', base=2)
    plt.yscale('log')
    
    # Plot FPS
    plt.subplot(1, 2, 2)
    for platform in hardware_platforms.keys():
        if platform == "Current Platform":
            plt.plot(inference_df['Batch Size'], inference_df['FPS'], 
                    marker='o', linewidth=2, label=platform)
        else:
            column = f'{platform} FPS'
            plt.plot(inference_df['Batch Size'], inference_df[column], 
                    marker='s', linewidth=2, linestyle='--', label=platform)
    
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.xlabel('Batch Size', fontsize=12)
    plt.ylabel('Frames Per Second (FPS)', fontsize=12)
    plt.title('Throughput by Batch Size', fontsize=14)
    plt.legend()
    plt.xscale('log', base=2)
    plt.yscale('log')
    
    plt.tight_layout()
    plt.savefig('inference_time_analysis.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    return inference_df

def visualize_sample_predictions(model, test_data, label_encoder, test_filenames, num_samples=20):
    """
    [Figure: Sample test images with predictions]
    Visualize a grid of test images with their true and predicted labels.
    """
    # Unpack test data
    test_images, test_labels = test_data
    
    # Make predictions
    predictions = model.predict(test_images)
    predicted_classes = np.argmax(predictions, axis=1)
    predicted_labels = label_encoder.inverse_transform(predicted_classes)
    true_labels = label_encoder.inverse_transform(test_labels)
    
    # Determine number of samples to show (minimum of requested or available)
    num_samples_to_show = min(num_samples, len(test_images))
    
    # Calculate grid dimensions
    grid_size = int(np.ceil(np.sqrt(num_samples_to_show)))
    fig, axes = plt.subplots(grid_size, grid_size, figsize=(15, 15))
    
    # Flatten axes for easier indexing
    axes = axes.flatten()
    
    # Select a mix of correct and incorrect predictions
    correct_indices = np.where(predicted_classes == test_labels)[0]
    incorrect_indices = np.where(predicted_classes != test_labels)[0]
    
    # Ensure we have some of each if possible
    selected_indices = []
    if len(correct_indices) > 0 and len(incorrect_indices) > 0:
        # Try to get half correct and half incorrect
        num_correct = min(num_samples_to_show // 2, len(correct_indices))
        num_incorrect = min(num_samples_to_show - num_correct, len(incorrect_indices))
        
        selected_indices = np.random.choice(correct_indices, num_correct, replace=False).tolist()
        selected_indices += np.random.choice(incorrect_indices, num_incorrect, replace=False).tolist()
    else:
        # If we have only correct or only incorrect, just use what we have
        available_indices = np.arange(len(test_images))
        selected_indices = np.random.choice(available_indices, num_samples_to_show, replace=False)
    
    # Ensure we don't exceed the number of samples to show
    selected_indices = selected_indices[:num_samples_to_show]
    
    # Visualize selected samples
    for i, idx in enumerate(selected_indices):
        if i < len(axes):
            axes[i].imshow(test_images[idx])
            true_class = true_labels[idx]
            pred_class = predicted_labels[idx]
            confidence = predictions[idx][predicted_classes[idx]]
            
            # Set title color based on prediction correctness
            color = "green" if true_class == pred_class else "red"
            
            # Create informative title
            title = f"True: {true_class}\nPred: {pred_class}\nConf: {confidence:.2f}"
            
            # Add filename if available
            if test_filenames is not None and idx < len(test_filenames):
                title += f"\n{test_filenames[idx]}"
                
            axes[i].set_title(title, color=color, fontsize=10)
            axes[i].axis('off')
    
    # Hide any unused subplots
    for i in range(len(selected_indices), len(axes)):
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.savefig('sample_predictions.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    print(f"Sample predictions visualization saved as 'sample_predictions.png'")

def main():
    # Create split directories
    split_dirs = create_split_directories()
    
    # Split dataset
    train_files, val_files, test_files, class_distribution = split_dataset()
    
    # [Table B1: Distribution of classes in training, validation, and test sets]
    distribution_df = visualize_class_distribution(class_distribution)
    
    # Organize data into splits
    organize_data_split(split_dirs, train_files, val_files, test_files)
    
    # Create data generators
    train_gen, val_data, test_data, label_encoder, test_filenames = create_data_generators(split_dirs)
    
    # Create model
    num_classes = len(label_encoder.classes_)
    model = create_model(num_classes)
    
    # Print model summary
    model.summary()
    
    # Train model
    print("Training model...")
    history = train_model(model, train_gen, val_data)
    
    # Save model and label encoder
    model.save("road_sign_model.keras")
    np.save("label_classes.npy", label_encoder.classes_)
    print("Model saved as 'road_sign_model.keras'")
    
    # Evaluate on test data
    print("Evaluating model on test data...")
    results = evaluate_model(model, test_data, label_encoder, test_filenames)
    
    visualize_sample_predictions(model, test_data, label_encoder, test_filenames)
    
    # Analyze inference time on different simulated hardware platforms
    inference_analysis = analyze_inference_time(model, test_data[0])
    
    # Save results to file
    with open("test_results.txt", "w") as f:
        f.write(f"Test Accuracy: {results['accuracy']:.4f}\n\n")
        f.write("Individual Predictions:\n")
        for pred in results['predictions']:
            f.write(f"File: {pred['filename']}\n")
            f.write(f"True Class: {pred['true_class']}\n")
            f.write(f"Predicted Class: {pred['predicted_class']}\n")
            f.write(f"Confidence: {pred['confidence']:.4f}\n")
            f.write("-" * 40 + "\n")
    
    print(f"Test results saved to 'test_results.txt'")
    print("All visualizations and analysis complete!")


if __name__ == "__main__":
    main()

Split dataset: 613 training, 132 validation, 132 test images
Class distribution saved to 'class_distribution.csv' and 'class_distribution.png'
Data organized into train/val/test directories
Found 4 classes: ['crosswalk' 'speedlimit' 'stop' 'trafficlight']


  super().__init__(


Training model...
Epoch 1/11


  self._warn_if_super_not_called()


[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step - accuracy: 0.5565 - loss: 1.0717
Epoch 1: val_accuracy improved from -inf to 0.76440, saving model to best_road_sign_model.keras
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 91ms/step - accuracy: 0.5594 - loss: 1.0663 - val_accuracy: 0.7644 - val_loss: 0.6540 - learning_rate: 0.0010
Epoch 2/11
[1m26/27[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 83ms/step - accuracy: 0.7934 - loss: 0.5519
Epoch 2: val_accuracy improved from 0.76440 to 0.92147, saving model to best_road_sign_model.keras
[1m27/27[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 91ms/step - accuracy: 0.7962 - loss: 0.5459 - val_accuracy: 0.9215 - val_loss: 0.2150 - learning_rate: 0.0010
Epoch 3/11
[1m26/27[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 73ms/step - accuracy: 0.9187 - loss: 0.2952
Epoch 3: val_accuracy improved from 0.92147 to 0.95812, saving model to best_road_sign_model.keras
[1m27/27[0

In [1]:
import cv2
import numpy as np
import time

class DrivingDetectionSystem:
    def __init__(self, detection_model="yolov3.weights", 
                detection_config="yolov3.cfg",
                classes_file="coco.names",
                traffic_sign_model=None,
                confidence_threshold=0.5):
        """
        Initialize the driving detection system
        
        Parameters:
        - detection_model: Path to YOLOv3 weights file
        - detection_config: Path to YOLOv3 config file
        - classes_file: Path to class names file
        - traffic_sign_model: Path to traffic sign classifier (if available)
        - confidence_threshold: Minimum confidence for detections
        """
        self.confidence_threshold = confidence_threshold
        self.nms_threshold = 0.4  # Non-maximum suppression threshold
        
        # Load YOLO network for general object detection
        print("Loading YOLO object detection model...")
        try:
            self.net = cv2.dnn.readNetFromDarknet(detection_config, detection_model)
            self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
            # Use CPU by default, but can be changed to CUDA for better performance
            # if GPU is available
            self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
        except Exception as e:
            print(f"Error loading detection model: {str(e)}")
            print("Using simulation mode - model weights not required for demo.")
            self.net = None
            
        # Load class names
        try:
            with open(classes_file, 'r') as f:
                self.classes = [line.strip() for line in f.readlines()]
        except Exception as e:
            print(f"Error loading class names: {str(e)}")
            # Fallback to common class names for driving scenarios
            self.classes = ['person', 'bicycle', 'car', 'motorcycle', 'bus', 
                        'truck', 'traffic light', 'stop sign', 'parking meter']
            
        self.traffic_sign_model = traffic_sign_model
        self.colors = np.random.uniform(0, 255, size=(len(self.classes), 3))
        
        # Define which objects are especially relevant for driving
        self.critical_objects = ['person', 'bicycle', 'car', 'motorcycle', 'bus', 
                                'truck', 'traffic light', 'stop sign']
                                
        # Initialize video writer for saving detections
        self.output_writer = None
    
    def _get_output_layers(self):
        """Get the output layer names of the YOLO network"""
        if self.net is None:
            return []
            
        layer_names = self.net.getLayerNames()
        try:
            # Different versions of OpenCV have different indexing
            try:
                output_layers = [layer_names[i - 1] for i in self.net.getUnconnectedOutLayers()]
            except:
                output_layers = [layer_names[i[0] - 1] for i in self.net.getUnconnectedOutLayers()]
            return output_layers
        except Exception as e:
            print(f"Error getting output layers: {str(e)}")
            return []
    
    def _simulate_detections(self, frame):
        """
        Simulate detections for demo purposes when no model is available
        """
        height, width = frame.shape[:2]
        boxes = []
        confidences = []
        class_ids = []
        
        # Simulate a car detection
        if np.random.random() > 0.3:
            car_x = int(width * (0.5 + 0.1 * np.sin(time.time())))
            car_y = int(height * 0.7)
            car_w = int(width * 0.2)
            car_h = int(height * 0.2)
            boxes.append([car_x, car_y, car_w, car_h])
            confidences.append(0.85)
            class_ids.append(2)  # Car
        
        # Simulate a person detection occasionally
        if np.random.random() > 0.7:
            person_x = int(width * (0.7 + 0.05 * np.cos(time.time())))
            person_y = int(height * 0.6)
            person_w = int(width * 0.05)
            person_h = int(height * 0.2)
            boxes.append([person_x, person_y, person_w, person_h])
            confidences.append(0.75)
            class_ids.append(0)  # Person
            
        # Simulate a traffic sign or light occasionally
        if np.random.random() > 0.8:
            sign_x = int(width * 0.8)
            sign_y = int(height * 0.4)
            sign_w = int(width * 0.06)
            sign_h = int(height * 0.06)
            boxes.append([sign_x, sign_y, sign_w, sign_h])
            confidences.append(0.65)
            class_ids.append(9 if np.random.random() > 0.5 else 7)  # Traffic light or stop sign
            
        return boxes, confidences, class_ids
    
    def detect_objects(self, frame):
        """
        Detect objects in a frame using YOLOv3
        
        Parameters:
        - frame: Input video frame
        
        Returns:
        - boxes: Bounding boxes of detected objects
        - confidences: Confidence scores
        - class_ids: Class IDs of detected objects
        """
        if self.net is None:
            # Simulation mode if model not loaded
            return self._simulate_detections(frame)
            
        height, width = frame.shape[:2]
        
        # Create a blob from the frame and perform a forward pass
        blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), 
                                    swapRB=True, crop=False)
        self.net.setInput(blob)
        
        outputs = self.net.forward(self._get_output_layers())
        
        # Process the outputs
        boxes = []
        confidences = []
        class_ids = []
        
        for output in outputs:
            for detection in output:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]
                
                if confidence > self.confidence_threshold:
                    # Scale the bounding box coordinates back to the original frame
                    box_x = int(detection[0] * width)
                    box_y = int(detection[1] * height)
                    box_width = int(detection[2] * width)
                    box_height = int(detection[3] * height)
                    
                    # Store the detection results
                    boxes.append([box_x, box_y, box_width, box_height])
                    confidences.append(float(confidence))
                    class_ids.append(class_id)
        
        return boxes, confidences, class_ids
    
    def detect_traffic_signs(self, frame, boxes):
        """
        Detect specific traffic signs using a dedicated classifier
        (This is a placeholder - would be implemented with a specific model)
        
        Parameters:
        - frame: Input video frame
        - boxes: Detected bounding boxes from general object detection
        
        Returns:
        - sign_types: List of detected sign types
        """
        # This would be implemented with a dedicated traffic sign classifier
        # For now, it's just a placeholder
        sign_types = []
        if self.traffic_sign_model:
            # Logic to classify traffic signs would go here
            pass
        return sign_types
    
    def process_frame(self, frame):
        """
        Process a single frame for object and sign detection
        
        Parameters:
        - frame: Input video frame
        
        Returns:
        - processed_frame: Frame with detections drawn
        - detections: Dictionary of detection information
        """
        start_time = time.time()
        
        # Detect objects
        boxes, confidences, class_ids = self.detect_objects(frame)
        
        # Apply non-maximum suppression to remove overlapping bounding boxes
        indices = cv2.dnn.NMSBoxes(boxes, confidences, 
                                self.confidence_threshold, 
                                self.nms_threshold)
        
        # Prepare the processed frame and detections dictionary
        processed_frame = frame.copy()
        detections = {
            'objects': [],
            'critical_warnings': []
        }
        
        # Draw bounding boxes and labels
        if len(indices) > 0:
            for i in indices.flatten():
                try:
                    box = boxes[i]
                    x, y, w, h = box
                    
                    # Ensure coordinates are within frame boundaries
                    x = max(0, min(x, frame.shape[1] - 1))
                    y = max(0, min(y, frame.shape[0] - 1))
                    w = max(1, min(w, frame.shape[1] - x))
                    h = max(1, min(h, frame.shape[0] - y))
                    
                    # Get class information
                    class_id = class_ids[i]
                    if class_id < len(self.classes):
                        label = f"{self.classes[class_id]}: {confidences[i]:.2f}"
                        color = self.colors[class_id]
                        
                        # Draw bounding box
                        cv2.rectangle(processed_frame, (x, y), (x + w, y + h), color, 2)
                        
                        # Draw label background
                        cv2.rectangle(processed_frame, (x, y - 20), (x + len(label) * 9, y), color, -1)
                        
                        # Draw label text
                        cv2.putText(processed_frame, label, (x, y - 5),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
                        
                        # Add to detections list
                        detections['objects'].append({
                            'class': self.classes[class_id],
                            'confidence': confidences[i],
                            'box': [x, y, w, h]
                        })
                        
                        # Check if this is a critical object for driving
                        if self.classes[class_id] in self.critical_objects:
                            # Calculate the position in the frame (center, left, right)
                            center_x = x + w/2
                            frame_center = frame.shape[1] / 2
                            position = "ahead"
                            if center_x < frame_center * 0.7:
                                position = "left"
                            elif center_x > frame_center * 1.3:
                                position = "right"
                                
                            # Add critical warning based on object type and position
                            if self.classes[class_id] == 'stop sign':
                                detections['critical_warnings'].append(f"STOP SIGN {position}")
                            elif self.classes[class_id] == 'traffic light':
                                detections['critical_warnings'].append(f"TRAFFIC LIGHT {position}")
                            elif self.classes[class_id] in ['person', 'bicycle']:
                                # Person or cyclist is closer if the box is larger
                                if (w * h) > (frame.shape[0] * frame.shape[1] * 0.05):
                                    detections['critical_warnings'].append(
                                        f"WARNING: {self.classes[class_id].upper()} {position}")
                except Exception as e:
                    print(f"Error processing detection {i}: {str(e)}")
        
        # Calculate and display FPS
        fps = 1.0 / (time.time() - start_time)
        cv2.putText(processed_frame, f"FPS: {fps:.2f}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        # Display critical warnings
        for i, warning in enumerate(detections['critical_warnings']):
            cv2.putText(processed_frame, warning, (10, 70 + 40*i),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        
        return processed_frame, detections
    
    def process_video(self, input_source, output_path=None, display=True):
        """
        Process a video source (file or camera) for object and sign detection
        
        Parameters:
        - input_source: Path to video file or camera index
        - output_path: Path to save processed video (optional)
        - display: Whether to display the processed frames
        """
        # Open the video source
        video = cv2.VideoCapture(input_source)
        if not video.isOpened():
            print(f"Error: Could not open video source {input_source}")
            return
        
        # Get video properties
        width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = video.get(cv2.CAP_PROP_FPS)
        
        # Initialize video writer if output path is given
        if output_path:
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            self.output_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        print("Starting video processing. Press 'q' to quit.")
        
        while True:
            # Read a frame
            ret, frame = video.read()
            if not ret:
                print("End of video or error reading frame.")
                break
            
            # Process the frame
            processed_frame, detections = self.process_frame(frame)
            
            # Display the frame if requested
            if display:
                cv2.imshow("Driving Detection System", processed_frame)
                
                # Break on 'q' key press
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
            
            # Write the frame to output video if requested
            if self.output_writer:
                self.output_writer.write(processed_frame)
        
        # Clean up
        video.release()
        if self.output_writer:
            self.output_writer.release()
        cv2.destroyAllWindows()
        print("Video processing complete.")


def main():
    """
    Main function to run the driving detection system
    """
    import argparse
    
    parser = argparse.ArgumentParser(description='Driving Object and Sign Detection')
    parser.add_argument('--input', type=str, default='0',
                        help='Input video file path or camera index (default: 0 for webcam)')
    parser.add_argument('--output', type=str, default=None,
                        help='Output video file path (optional)')
    parser.add_argument('--model', type=str, default='yolov3.weights',
                        help='Path to YOLO weights file')
    parser.add_argument('--config', type=str, default='yolov3.cfg',
                        help='Path to YOLO config file')
    parser.add_argument('--classes', type=str, default='coco.names',
                        help='Path to class names file')
    parser.add_argument('--confidence', type=float, default=0.5,
                        help='Minimum confidence threshold for detections')
    parser.add_argument('--no-display', action='store_true',
                        help='Do not display video output')
    
    args = parser.parse_args()
    
    # Initialize the detection system
    detector = DrivingDetectionSystem(
        detection_model=args.model,
        detection_config=args.config,
        classes_file=args.classes,
        confidence_threshold=args.confidence
    )
    
    # Process the video
    try:
        # If input is a number, convert to integer for camera index
        input_source = args.input
        if input_source.isdigit():
            input_source = int(input_source)
        
        detector.process_video(
            input_source=input_source,
            output_path=args.output,
            display=not args.no_display
        )
    except KeyboardInterrupt:
        print("Processing interrupted by user.")
    except Exception as e:
        print(f"Error processing video: {str(e)}")


if __name__ == "__main__":
    main()

usage: ipykernel_launcher.py [-h] [--input INPUT] [--output OUTPUT]
                             [--model MODEL] [--config CONFIG]
                             [--classes CLASSES] [--confidence CONFIDENCE]
                             [--no-display]
ipykernel_launcher.py: error: unrecognized arguments: --f=c:\Users\00har\AppData\Roaming\jupyter\runtime\kernel-v3f8b632730f8cf6b6c1bac3b724224b8eb975c639.json


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
