# Image Preprocessing evaluation

The goal of this experiment is to evaluate different methods of pre-processing the image before detection.

Methods to be tested:

- Nothing/Base;
- black-white;
- 8-bit color;
- clusterisation into 256 colors;
- clusterisation into 16 colors;
- clusterisation into 8 colors.

The main metrics we are concerned about are mAP@50, mAP@50-95 and FPS.


In [None]:
from ultralytics import YOLO
import cv2
import numpy as np
import time
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans

import glob
import os
import tempfile
import shutil
import yaml

In [None]:
MODEL_PLATE = YOLO("./yolo_plate.pt")
VAL_DIR = "./Dataset V2/val/"

In [None]:
# Define preprocessing functions
def preprocess_base(img):
    """No preprocessing, return original image"""
    return img.copy()


def preprocess_black_white(img):
    """Convert to black and white"""
    return cv2.cvtColor(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), cv2.COLOR_GRAY2BGR)


def preprocess_8bit_color(img):
    """Convert to 8-bit color (3 bits for R, 3 bits for G, 2 bits for B)"""
    img_8bit = img.copy()
    # Reduce color depth
    img_8bit[:, :, 0] = (img_8bit[:, :, 0] >> 5) << 5  # Blue channel: 3 bits
    img_8bit[:, :, 1] = (img_8bit[:, :, 1] >> 6) << 6  # Green channel: 2 bits
    img_8bit[:, :, 2] = (img_8bit[:, :, 2] >> 5) << 5  # Red channel: 3 bits
    return img_8bit


def preprocess_cluster(img, n_clusters):
    """Color clustering using K-means"""
    # Reshape the image to be a list of pixels
    h, w, c = img.shape
    reshaped = img.reshape((h * w, c))

    # Apply k-means clustering
    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    labels = kmeans.fit_predict(reshaped)

    # Replace each pixel with its centroid
    clustered_img = kmeans.cluster_centers_[labels].reshape((h, w, c))
    return clustered_img.astype(np.uint8)


def preprocess_cluster_16(img):
    """Cluster colors to 16 colors"""
    return preprocess_cluster(img, 16)


def preprocess_cluster_8(img):
    """Cluster colors to 8 colors"""
    return preprocess_cluster(img, 8)


def preprocess_cluster_4(img):
    """Cluster colors to 4 colors"""
    return preprocess_cluster(img, 4)

In [None]:
def visualize_preprocessing(original_img, processed_img, method_name):
    """Visualize original and processed images side by side."""
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.imshow(cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB))
    plt.title("Original")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(cv2.cvtColor(processed_img, cv2.COLOR_BGR2RGB))
    plt.title(f"Processed ({method_name})")
    plt.axis("off")
    plt.tight_layout()
    plt.show()


def measure_performance(preprocess_fn, test_images, model, num_runs=3):
    """Measure preprocessing and inference times."""
    processing_times = []
    inference_times = []

    for img_path in test_images:
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Unable to read image {img_path}. Skipping.")
            continue

        # Measure preprocessing time
        start_time = time.time()
        processed_img = preprocess_fn(img)
        preprocessing_time = time.time() - start_time

        # Measure inference time (average of num_runs)
        total_inference_time = 0
        for _ in range(num_runs):
            start_time = time.time()
            result = model(processed_img)
            total_inference_time += time.time() - start_time
        avg_inference_time = total_inference_time / num_runs

        processing_times.append(preprocessing_time)
        inference_times.append(avg_inference_time)

    # Calculate speed metrics
    avg_preprocessing_time = (
        sum(processing_times) / len(processing_times) if processing_times else 0
    )
    avg_inference_time = (
        sum(inference_times) / len(inference_times) if inference_times else 0
    )
    fps = (
        1.0 / (avg_preprocessing_time + avg_inference_time)
        if (avg_preprocessing_time + avg_inference_time) > 0
        else 0
    )

    return avg_preprocessing_time, avg_inference_time, fps


def create_validation_dataset(preprocess_fn, val_dir, temp_dir):
    """Create a temporary validation dataset with preprocessed images."""
    # Copy all label files
    for label_file in glob.glob(os.path.join(val_dir, "*.txt")):
        if os.path.basename(label_file) == "classes.txt":
            continue  # Skip classes.txt if present
        shutil.copy(label_file, temp_dir)

    # Preprocess and save images
    for img_file in glob.glob(os.path.join(val_dir, "*.png")):
        img = cv2.imread(img_file)
        if img is not None:
            processed_img = preprocess_fn(img)
            cv2.imwrite(
                os.path.join(temp_dir, os.path.basename(img_file)), processed_img
            )


def create_dataset_config(val_dir, temp_dir):
    """Create a YAML configuration file for the dataset."""
    yaml_path = os.path.join(temp_dir, "dataset.yaml")

    # Get list of class names if available
    class_names = []
    classes_file = os.path.join(val_dir, "classes.txt")
    if os.path.exists(classes_file):
        with open(classes_file, "r") as f:
            class_names = [line.strip() for line in f.readlines()]
    else:
        # Default to generic class name
        class_names = ["object"]

    # Create YAML content
    dataset_config = {
        "path": temp_dir,  # Dataset root directory
        "train": "",  # No train images in validation
        "val": temp_dir,  # Validation images
        "test": "",  # No test images
        "names": {i: name for i, name in enumerate(class_names)},
    }

    # Write YAML file
    with open(yaml_path, "w") as f:
        yaml.dump(dataset_config, f)

    return yaml_path


def evaluate_preprocessing(preprocess_fn, name, val_dir, model, num_runs=3):
    """
    Evaluates preprocessing method on validation images with labels.

    Args:
        preprocess_fn: preprocessing function to apply
        name: name of the preprocessing method
        val_dir: validation directory containing images and labels
        model: YOLO model to use for detection
        num_runs: number of runs for FPS calculation

    Returns:
        dict: metrics including mAP and FPS
    """
    # First calculate speed metrics (FPS) on individual images
    test_images = glob.glob(os.path.join(val_dir, "*.png"))[
        :10
    ]  # Limit to 10 images for speed test

    # Example image visualization
    if test_images:
        img_path = test_images[0]
        img = cv2.imread(img_path)
        if img is not None:
            processed_img = preprocess_fn(img)
            visualize_preprocessing(img, processed_img, name)

    # Measure processing and inference times
    avg_preprocessing_time, avg_inference_time, fps = measure_performance(
        preprocess_fn, test_images, model, num_runs
    )

    # Run model validation on the entire validation set with preprocessing
    with tempfile.TemporaryDirectory() as temp_dir:
        # Create preprocessed validation dataset
        create_validation_dataset(preprocess_fn, val_dir, temp_dir)

        # Create dataset configuration
        yaml_path = create_dataset_config(val_dir, temp_dir)

        # Run validation on the preprocessed dataset
        results = model.val(data=yaml_path, project="preprocessing_eval", name=name)

        # Extract metrics
        metrics = {}
        if hasattr(results, "results_dict"):
            metrics = results.results_dict

    return {
        "name": name,
        "mAP50": metrics.get("metrics/mAP50(B)", 0),
        "mAP50-95": metrics.get("metrics/mAP50-95(B)", 0),
        "preprocessing_time": avg_preprocessing_time,
        "inference_time": avg_inference_time,
        "fps": fps,
    }

In [None]:
# Create a list to store evaluation results
eval_results = []

# Evaluate each preprocessing method
preprocessing_methods = [
    (preprocess_base, "Base (No preprocessing)"),
    (preprocess_black_white, "Black-White"),
    (preprocess_8bit_color, "8-bit Color"),
    # (preprocess_cluster_16, "Color Clustering (16 colors)"),
    (preprocess_cluster_8, "Color Clustering (8 colors)"),
    (preprocess_cluster_4, "Color Clustering (4 colors)"),
]

for preprocess_fn, name in preprocessing_methods:
    print(f"Evaluating {name}...")
    result = evaluate_preprocessing(
        preprocess_fn, name, VAL_DIR, MODEL_PLATE, num_runs=5
    )
    eval_results.append(result)
    print(
        f"Completed {name} evaluation with results: mAP50={result['mAP50']:.4f}, FPS={result['fps']:.2f}"
    )

In [None]:
# Visualize results
def plot_metrics(eval_results):
    """Plot comparison of preprocessing methods"""
    # Create a figure with multiple subplots
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Extract data for plotting
    names = [r["name"] for r in eval_results]
    map50 = [r["mAP50"] for r in eval_results]
    map50_95 = [r["mAP50-95"] for r in eval_results]
    fps = [r["fps"] for r in eval_results]
    preproc_time = [r["preprocessing_time"] for r in eval_results]

    # Plot mAP50
    axes[0, 0].bar(names, map50, color="skyblue")
    axes[0, 0].set_title("mAP50")
    axes[0, 0].set_ylim(0, 1.0)
    plt.setp(axes[0, 0].get_xticklabels(), rotation=45, ha="right")

    # Plot mAP50-95
    axes[0, 1].bar(names, map50_95, color="lightgreen")
    axes[0, 1].set_title("mAP50-95")
    axes[0, 1].set_ylim(0, 1.0)
    plt.setp(axes[0, 1].get_xticklabels(), rotation=45, ha="right")

    # Plot FPS
    axes[1, 0].bar(names, fps, color="salmon")
    axes[1, 0].set_title("Frames Per Second (FPS)")
    plt.setp(axes[1, 0].get_xticklabels(), rotation=45, ha="right")

    # Plot preprocessing time
    axes[1, 1].bar(names, preproc_time, color="purple")
    axes[1, 1].set_title("Preprocessing Time (seconds)")
    plt.setp(axes[1, 1].get_xticklabels(), rotation=45, ha="right")

    plt.tight_layout()
    plt.show()

    # Create a summary table
    plt.figure(figsize=(12, 6))
    cell_text = []
    for r in eval_results:
        cell_text.append(
            [
                r["name"],
                f"{r['mAP50']:.4f}",
                f"{r['mAP50-95']:.4f}",
                f"{r['fps']:.2f}",
                f"{r['preprocessing_time'] * 1000:.2f}",
            ]
        )

    table = plt.table(
        cellText=cell_text,
        colLabels=["Method", "mAP50", "mAP50-95", "FPS", "Preproc Time (ms)"],
        loc="center",
    )
    table.auto_set_font_size(False)
    table.set_fontsize(12)
    table.scale(2, 2)
    plt.axis("off")
    plt.title("Preprocessing Methods Comparison")
    plt.show()


# Plot the results
plot_metrics(eval_results)

In [None]:
# Analyze results to recommend preprocessing methods
def recommend_methods(eval_results):
    """Recommend preprocessing methods based on different priorities"""
    # Sort by different metrics
    by_map50 = sorted(eval_results, key=lambda x: x["mAP50"], reverse=True)
    by_map50_95 = sorted(eval_results, key=lambda x: x["mAP50-95"], reverse=True)
    by_fps = sorted(eval_results, key=lambda x: x["fps"], reverse=True)

    # Get best balance (normalize and sum metrics)
    for r in eval_results:
        max_map50 = max(x["mAP50"] for x in eval_results)
        max_map50_95 = max(x["mAP50-95"] for x in eval_results)
        max_fps = max(x["fps"] for x in eval_results)

        r["balanced_score"] = (
            0.4 * r["mAP50"] / max_map50
            + 0.4 * r["mAP50-95"] / max_map50_95
            + 0.2 * r["fps"] / max_fps
        )

    by_balanced = sorted(eval_results, key=lambda x: x["balanced_score"], reverse=True)

    print("Recommended preprocessing methods:")
    print(f"Best accuracy (mAP50): {by_map50[0]['name']}")
    print(f"Best accuracy (mAP50-95): {by_map50_95[0]['name']}")
    print(f"Best speed (FPS): {by_fps[0]['name']}")
    print(f"Best balance of accuracy and speed: {by_balanced[0]['name']}")


recommend_methods(eval_results)