In [None]:
# Installation
# -------------------
#### Uncomment and run this cell if you need to install the requirements ####
"""
!pip install anomalib[full]==1.2.0  # Anomalib version 1.2.0 was used for this benchmark

# Fix possible version problems
!pip install matplotlib==3.6.0
!pip uninstall -y ollama  # Fix anomalib.models import problem ("cannot import name '_encode_image' from 'ollama._client'")
"""

In [None]:
# Anomalib Benchmark Notebook
# ===========================
# This notebook allows you to benchmark different anomaly detection models on various datasets with customizable settings.


#### Configuration Section (Modify these settings) ####
# ------------------------------------------------------------
### Choose your dataset:
# - "mvtec" (MVTec AD dataset)
# - "rubber_mats" (Dataspree Rubber Mats dataset)
# - "mvtec_multiclass" (Multi-class MVTec AD dataset)
DATASET = "mvtec"

### For MVTec AD, choose category:
# bottle/cable/capsule/carpet/grid/hazelnut/leather/metal_nut/pill/screw/tile/toothbrush/transistor/wood/zipper
CATEGORY = "leather"  # Only used for MVTec dataset

### Image size (width, height)
# All Models were tested on: 256x256, 320x320, 448x448. For PatchCore-1% also 360x360, 512x512 were tested
IMAGE_SIZE = (256, 256)

### Choose model:
# - "efficientad_s" (EfficientAD-S)
# - "efficientad_m" (EfficientAD-M)
# - "fastflow" (FastFlow)
# - "patchcore_default" (PatchCore-10%)
# - "patchcore_1percent" (PatchCore-1%)
MODEL = "patchcore_default"

# Number of epochs (for FastFlow) or steps (for EfficientAD)
MAX_EPOCHS = 100  # used for FastFlow
MAX_STEPS = 70000 # used for EfficientAD

# Batch sizes
TRAIN_BATCH_SIZE = 8  # EfficientAD=1 / FastFlow=32 / Patchcore=8
EVAL_BATCH_SIZE = 8   # EfficientAD=32 / FastFlow=32 / Patchcore=8

### Setup Dataset paths ###
# The Benchmark was done on MVTec AD, on Rubber Mats, and on MVTec AD in a multi-class setting
# If left empty, the datasets will be downloaded to default locations
# If you have the datasets already, specify the paths here

## MVTec AD dataset ##
MVTEC_ROOT = "/content/drive/MyDrive/MVTec"  # Will download automatically if empty

"""
## Rubber Mats dataset ##
# Needs to be downloaded manually: You can contact Data Spree GmbH for access to the Rubber Mats dataset! https://www.data-spree.com/de/kontakt
#RUBBER_MATS_ROOT = "./dataspree-rubber-mats-dataset-S/datasets/dataspree"

## Multi-class MVTec AD ##
# Needs to be downloaded and set up manually for now. 
# Therefore, you can put together the 5 structures and 10 objects of MVTec AD into Train and Test Folders. Also, Please configure the Anomalib Folder according to your setup.
MVTEC_MULTICLASS_ROOT = "./MVTec-generalized/all" # Needs to be downloaded and set up manually for now. 

# configure to match your folder structure
TRAIN_DIR = "train"
TEST_DIR = "test/defect"
NORMAL_TEST_DIR = "test/good"

"""

# Optional: Comet ML configuration
USE_COMET = False  # Set to True to enable Comet ML logging
COMET_API_KEY = "YOUR_API_KEY"  # Replace with your API key
PROJECT_NAME = "ad-benchmark"

# Optional imports for Comet ML
if USE_COMET:
    import comet_ml
    comet_ml.init(api_key=COMET_API_KEY)

# ------------------------------------------------------------


# Main imports
import os
import torch
import numpy as np
from pathlib import Path
import matplotlib
from matplotlib import pyplot as plt
import time
from tqdm import tqdm
import random
from pytorch_lightning import seed_everything

import anomalib
from anomalib.data import MVTec, Folder
from anomalib.models import Patchcore, EfficientAd, Fastflow
from anomalib.engine import Engine
from anomalib import TaskType
from anomalib.data.utils import ValSplitMode, TestSplitMode
from anomalib.metrics import AUROC, AUPR, F1Score
from lightning.pytorch.callbacks import ModelCheckpoint


# Check environment
print("\n=== Environment Information ===")
print("Anomalib version:", anomalib.__version__)
print("Torch version:", torch.__version__)
print("Matplotlib version:", matplotlib.__version__)
print("CUDA version:", torch.version.cuda if hasattr(torch.version, 'cuda') else "Not available")
print("GPU availability:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Number of GPU devices:", torch.cuda.device_count())
    print("Name of current GPU:", torch.cuda.get_device_name(0))
    print(f"CUDA memory allocated: {torch.cuda.memory_allocated()}")
    print(f"CUDA memory reserved: {torch.cuda.memory_reserved()}")
print("Matplotlib version:", matplotlib.__version__)

# Set random seed
seed_everything(42)
print("\n=== Random Seeds ===")
print(f"PyTorch seed: {torch.initial_seed()}")
print(f"NumPy seed: {np.random.get_state()[1][0]}")
print(f"Random seed: {random.getstate()[1][0]}")

""" 
Can be uncommented and configured when access to rubber mats dataset is given.
# Import Rubber Mats dataset class if needed
if DATASET == "rubber_mats":
    print("\nImporting Rubber Mats dataset class...")
    rubber_mats_path = os.path.dirname(RUBBER_MATS_ROOT) if RUBBER_MATS_ROOT else "./dataspree-rubber-mats-dataset-S"
    sys.path.append(rubber_mats_path)
    try:
        from dataspree_anomalib_data import DataspreeDataModule
    except ImportError:
        print("ERROR: Could not import DataspreeDataModule. Please make sure the Rubber Mats dataset is downloaded.")
        print("The Dataspree Rubber Mats dataset must be downloaded manually from the official source.")
        raise
"""

# Setup Dataset
# ------------
print(f"\n=== Setting up {DATASET} dataset ===")

task = TaskType.CLASSIFICATION
seed = 42

# Load the appropriate dataset
if DATASET == "mvtec":
    print(f"Loading MVTec AD dataset, category: {CATEGORY}")

    # If path is not provided, use default download location
    if not MVTEC_ROOT:
        MVTEC_ROOT = "./datasets/MVTec"
        print(f"No path provided for MVTec dataset. Will download to {MVTEC_ROOT} if needed.")

    # MVTec will be automatically downloaded by anomalib if not found at the specified path
    datamodule = MVTec(
        root=MVTEC_ROOT,
        category=CATEGORY,
        image_size=IMAGE_SIZE,
        train_batch_size=TRAIN_BATCH_SIZE,
        eval_batch_size=EVAL_BATCH_SIZE,
        num_workers=2,
        task=task,
        seed=seed,
    )

    # ensure that dataset is there. If not it is automatically downloaded here.
    datamodule.prepare_data() # you can uncomment this line if download is not needed


"""
Uncomment if acces to Rubber Mats dataset is given.
elif DATASET == "rubber_mats":
    print("Loading Rubber Mats dataset")
    if not RUBBER_MATS_ROOT:
        print("ERROR: Path for Rubber Mats dataset is required")
        print("The Dataspree Rubber Mats dataset must be downloaded manually from the official source.")
        raise ValueError("RUBBER_MATS_ROOT path must be specified")

    datamodule = DataspreeDataModule(
        ds_dataset_id=561,
        root=RUBBER_MATS_ROOT,
        task=task,
        train_batch_size=TRAIN_BATCH_SIZE,
        eval_batch_size=EVAL_BATCH_SIZE,
        image_size=IMAGE_SIZE,
        num_workers=2,
        seed=seed,
    )
"""


elif DATASET == "mvtec_multiclass":
    print("Loading Multi-class MVTec AD dataset")
    if not MVTEC_MULTICLASS_ROOT:
        print("ERROR: Path for MVTec Multiclass dataset is required")
        print("The MVTec Multiclass dataset must be downloaded manually.")
        raise ValueError("MVTEC_MULTICLASS_ROOT path must be specified")

    datamodule = Folder(
        name="MVTec-generalized",
        root=MVTEC_MULTICLASS_ROOT,
        normal_dir=TRAIN_DIR,
        abnormal_dir=TEST_DIR,
        normal_test_dir=NORMAL_TEST_DIR,
        task=task,
        train_batch_size=TRAIN_BATCH_SIZE,
        eval_batch_size=EVAL_BATCH_SIZE,
        num_workers=2,
        image_size=IMAGE_SIZE,
        seed=seed,
        val_split_mode=ValSplitMode.SAME_AS_TEST,
        normal_split_ratio=None,
    )

else:
    raise ValueError(f"Unknown dataset: {DATASET}")


# Setup the datamodule
try:
    datamodule.setup()
    print("\n=== Dataset Information ===")
    print(f"Total training images: {len(datamodule.train_dataloader().dataset)}")
    print(f"Total validation images: {len(datamodule.val_dataloader().dataset)}")
    print(f"Total test images: {len(datamodule.test_dataloader().dataset)}")
except Exception as e:
    print(f"Error setting up dataset: {e}")
    if DATASET == "mvtec":
        print("\nTip: MVTec dataset will be automatically downloaded if not found.")
        print("If you're having connection issues, you can manually download the dataset from:")
        print("https://www.mvtec.com/company/research/datasets/mvtec-ad")
        print("and place it in the specified MVTEC_ROOT directory.")
    raise


# Setup Model
# ----------
print(f"\n=== Setting up {MODEL} model ===")

# Initialize the selected model
if MODEL == "efficientad_s":
    model = EfficientAd()
elif MODEL == "efficientad_m":
    from anomalib.models.image.efficient_ad.torch_model import EfficientAdModelSize
    model = EfficientAd(model_size=EfficientAdModelSize.M)
elif MODEL == "fastflow":
    model = Fastflow(backbone="wide_resnet50_2")
elif MODEL == "patchcore_default":
    model = Patchcore()
elif MODEL == "patchcore_1percent":
    model = Patchcore(
        backbone="wide_resnet101_2",
        layers=("layer2", "layer3"),
        pre_trained=True,
        coreset_sampling_ratio=0.01,
        num_neighbors=9,
    )
else:
    raise ValueError(f"Unknown model: {MODEL}")


# Setup Logger (optional)
# ---------------------
if USE_COMET:
    from anomalib.loggers import AnomalibCometLogger

    # Generate experiment name based on settings
    if DATASET == "mvtec":
        experiment_name = f"{MODEL}_{CATEGORY}_{IMAGE_SIZE[0]}"
    else:
        experiment_name = f"{MODEL}_{DATASET}_{IMAGE_SIZE[0]}"

    comet_logger = AnomalibCometLogger(
        project_name=PROJECT_NAME,
        experiment_name=experiment_name
    )
    logger = comet_logger
else:
    logger = None


# Setup Callbacks
# --------------
# Create results directory
results_dir = "./results"
os.makedirs(results_dir, exist_ok=True)

if MODEL in ["efficientad_s", "efficientad_m", "fastflow"]:
    # These models benefit from callbacks
    save_dir = f"{results_dir}/{MODEL}"
    if DATASET == "mvtec":
        save_dir += f"/MVTec/{CATEGORY}"
    else:
        save_dir += f"/{DATASET}"

    os.makedirs(save_dir, exist_ok=True)

    checkpoint_callback = ModelCheckpoint(
        monitor="image_AUROC",
        mode="max",
        dirpath=save_dir,
        save_top_k=1,
        save_last=True,
    )
    callbacks = [checkpoint_callback]
else:
    # PatchCore doesn't need callbacks, and needs only 1 Epoch
    callbacks = None
    checkpoint_callback = None


# Setup Engine
# -----------
print("\n=== Setting up training engine ===")

# Configure engine based on model
engine_kwargs = {
    "task": task,
    "image_metrics": {
        "Precision": {
            "class_path": "torchmetrics.Precision",
            "init_args": {"task": "binary"},
        },
        "Recall": {
            "class_path": "torchmetrics.Recall",
            "init_args": {"task": "binary"},
        },
        "AUROC": {
            "class_path": "torchmetrics.AUROC",
            "init_args": {"task": "binary"},
        },
        "F1Score": {
            "class_path": "torchmetrics.F1Score",
            "init_args": {"task": "binary"},
        },
        "AUPR": {
            "class_path": "torchmetrics.AveragePrecision",
            "init_args": {"task": "binary"},
        },
    },
    "accelerator": "auto",
    "devices": 1,
    "logger": logger,
}


if callbacks:
    engine_kwargs["callbacks"] = callbacks

if MODEL == "fastflow":
    engine_kwargs["max_epochs"] = MAX_EPOCHS
    #engine_kwargs["log_every_n_steps"] = 2

if MODEL in ["efficientad_s", "efficientad_m"]:
    engine_kwargs["max_steps"] = MAX_STEPS

engine = Engine(**engine_kwargs)


# Train Model
# ----------
print("\n=== Training model ===")
try:
    engine.fit(model=model, datamodule=datamodule)

    # Print checkpoint information if applicable
    if MODEL in ["efficientad_s", "efficientad_m", "fastflow"] and checkpoint_callback:
        print(f"\nBEST model saved: {checkpoint_callback.best_model_path}")
        print(f"Last model saved: {checkpoint_callback.last_model_path}")
except Exception as e:
    print(f"Error during training: {e}")
    raise


# Test Model
# ---------
print("\n=== Testing model ===")
test_kwargs = {
    "model": model,
    "datamodule": datamodule,
}

if MODEL in ["efficientad_s", "efficientad_m", "fastflow"] and checkpoint_callback and checkpoint_callback.best_model_path:
    test_kwargs["ckpt_path"] = checkpoint_callback.best_model_path

try:
    test_results = engine.test(**test_kwargs)
    print("\nTest Results:")
    for metric_name, value in test_results[0].items():
        if isinstance(value, (int, float)):
            print(f"{metric_name}: {value:.4f}")
except Exception as e:
    print(f"Error during testing: {e}")
    raise


# Measure Model Efficiency
# ----------------------
if torch.cuda.is_available():
    print("\n=== Measuring model efficiency ===")

    # FPS and Latency measurement
    def measure_fps_latency(model, input_size=(1, 3, 256, 256), num_runs=100, exclude_first=10):
        model.eval().cuda()
        x = torch.rand(*input_size).cuda()
        time_list = []

        for _ in tqdm(range(num_runs), desc="Measuring FPS and Latency"):
            torch.cuda.synchronize()
            start_time = time.time()
            with torch.no_grad():
                model(x)
            torch.cuda.synchronize()
            time_list.append(time.time() - start_time)

        time_list = time_list[exclude_first:]
        if not time_list:
            raise ValueError(f"`exclude_first` ({exclude_first}) is too high for `num_runs` ({num_runs}).")

        latency_ms = (sum(time_list) / len(time_list)) * 1000
        fps = 1 / (sum(time_list) / len(time_list))

        return fps, latency_ms

    # Throughput measurement
    def measure_throughput(model, input_size=(16, 3, 256, 256), num_runs=100, batch_size=16, exclude_first=10):
        model.eval().cuda()
        x = torch.rand(*input_size).cuda()
        throughput_list = []

        for _ in tqdm(range(num_runs), desc="Measuring Throughput"):
            torch.cuda.synchronize()
            start_time = time.time()
            with torch.no_grad():
                model(x)
            torch.cuda.synchronize()
            throughput_list.append(time.time() - start_time)

        throughput_list = throughput_list[exclude_first:]
        throughput = (batch_size * len(throughput_list)) / sum(throughput_list)

        return throughput

    # Measure performance metrics (reduced number of runs demo. Use num_runs=1000 for proper results.)
    try:
        input_size = (1, 3, IMAGE_SIZE[0], IMAGE_SIZE[1])
        fps, latency_ms = measure_fps_latency(model, input_size=input_size, num_runs=100)
        fps = round(fps, 2)
        latency_ms = round(latency_ms, 2)

        input_size_throughput = (16, 3, IMAGE_SIZE[0], IMAGE_SIZE[1])
        throughput = measure_throughput(model, input_size=input_size_throughput, num_runs=100)
        throughput = round(throughput, 2)

        # Print results
        print("\n=== Performance Metrics ===")
        print(f"FPS: {fps}")
        print(f"Latency: {latency_ms} ms")
        print(f"Throughput: {throughput} images/second")

        # Log metrics to Comet if enabled
        if USE_COMET:
            comet_logger.experiment.log_metric(name="FPS", value=fps)
            comet_logger.experiment.log_metric(name="Latency(ms)", value=latency_ms)
            comet_logger.experiment.log_metric(name="Throughput(img/s)", value=throughput)

            # End experiment logging
            comet_logger.experiment.end()
    except Exception as e:
        print(f"Error measuring performance: {e}")
else:
    print("\nSkipping performance measurements as CUDA is not available")


print("\n=== Benchmark completed successfully ===")