# AnomaVision Pipeline
Complete workflow: Train → Export → Detect → Evaluate

In [None]:
import os
import time
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch.utils.data import DataLoader
from easydict import EasyDict as edict

import anomavision
from anomavision.config import load_config
from anomavision.general import Profiler, determine_device, increment_path
from anomavision.inference.model.wrapper import ModelWrapper
from anomavision.inference.modelType import ModelType
from anomavision.utils import (
    adaptive_gaussian_blur,
    get_logger,
    setup_logging,
    save_args_to_yaml
)

%matplotlib inline

## 1. Training

In [None]:
# Load configuration
config = edict(load_config("config.yml"))

# Setup logging
setup_logging(enabled=True, log_level=config.log_level, log_to_file=True)
logger = get_logger("anomavision.train")

In [None]:
# Prepare dataset path
root = os.path.join(
    os.path.realpath(config.dataset_path),
    config.class_name,
    "train",
    "good"
)

# Create dataset
ds = anomavision.AnodetDataset(
    root,
    resize=config.resize,
    crop_size=config.crop_size,
    normalize=config.normalize,
    mean=config.norm_mean,
    std=config.norm_std,
)

dl = DataLoader(ds, batch_size=config.batch_size, shuffle=False)
logger.info(f"Dataset: {len(ds)} images | batch_size={config.batch_size}")

In [None]:
# Setup device and model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Device: {device.type}")

padim = anomavision.Padim(
    backbone=config.backbone,
    device=device,
    layer_indices=config.layer_indices,
    feat_dim=config.feat_dim,
)

In [None]:
# Train model
t0 = time.perf_counter()
padim.fit(dl)
logger.info(f"Training completed in {time.perf_counter() - t0:.2f}s")

In [None]:
# Save model
run_dir = increment_path(
    Path(config.model_data_path) / config.run_name,
    exist_ok=True,
    mkdir=True
)

model_path = run_dir / config.output_model
torch.save(padim, str(model_path))

# Save compact statistics
stats_path = model_path.with_suffix(".pth")
padim.save_statistics(str(stats_path), half=True)

# Save config
save_args_to_yaml(config, str(run_dir / "config.yml"))

logger.info(f"Model saved: {model_path}")
logger.info(f"Statistics saved: {stats_path}")

## 2. Export

In [None]:
from export import ModelExporter

# Reload config from saved directory
config = edict(load_config(str(run_dir / "config.yml")))
logger = get_logger("anomavision.export")

# Setup paths
model_path = run_dir / config.output_model
model_stem = Path(config.output_model).stem

# Input shape
h, w = config.crop_size if config.crop_size else config.resize
input_shape = (1, 3, h, w)

# Calibration directory
calib_dir = os.path.join(
    config.dataset_path,
    config.class_name,
    "train",
    "good"
)

logger.info(f"Exporting model: {model_path}")
logger.info(f"Input shape: {input_shape}")

In [None]:
# Initialize exporter
exporter = ModelExporter(model_path, run_dir, logger, device=config.device)

In [None]:
# Export to ONNX
onnx_path = exporter.export_onnx(
    input_shape=input_shape,
    output_name=f"{model_stem}.onnx",
    opset_version=config.opset,
    dynamic_batch=config.dynamic_batch,
    quantize_dynamic_flag=False,
    quantize_static_flag=False,
)
logger.info(f"ONNX exported: {onnx_path}")

In [None]:
# Export to TorchScript
ts_path = exporter.export_torchscript(
    input_shape=input_shape,
    output_name=f"{model_stem}.torchscript",
    optimize=config.optimize,
)
logger.info(f"TorchScript exported: {ts_path}")

In [None]:
# Export to OpenVINO
ov_path = exporter.export_openvino(
    input_shape=input_shape,
    output_name=f"{model_stem}_openvino",
    fp16=not config.fp32,
    dynamic_batch=config.dynamic_batch,
)
logger.info(f"OpenVINO exported: {ov_path}")

## 3. Detection

In [None]:
# Reload config
config = edict(load_config(str(run_dir / "config.yml")))
logger = get_logger("anomavision.detect")

# Setup paths
DATASET_PATH = os.path.realpath(config.img_path)
MODEL_DATA_PATH = os.path.realpath(str(run_dir))
device_str = determine_device(config.device)

logger.info(f"Device: {device_str}")
logger.info(f"Dataset: {DATASET_PATH}")

In [None]:
# Load model (using ONNX export)
model_file = f"{model_stem}.onnx"
model_path_detect = os.path.join(MODEL_DATA_PATH, model_file)
model = ModelWrapper(model_path_detect, device_str)
model_type = ModelType.from_extension(model_path_detect)

logger.info(f"Model loaded: {model_type.value.upper()}")

In [None]:
# Create dataset
test_dataset = anomavision.AnodetDataset(
    DATASET_PATH,
    resize=config.resize,
    crop_size=config.crop_size,
    normalize=config.normalize,
    mean=config.norm_mean,
    std=config.norm_std,
)

test_dataloader = DataLoader(
    test_dataset,
    batch_size=config.batch_size,
    num_workers=config.num_workers,
    pin_memory=config.pin_memory,
)

logger.info(f"Dataset: {len(test_dataset)} images")

In [None]:
# Setup output directory
if config.save_visualizations:
    RESULTS_PATH = increment_path(
        Path(config.viz_output_dir) / model_type.value.upper() / config.run_name,
        exist_ok=config.overwrite,
        mkdir=True,
    )
    logger.info(f"Results path: {RESULTS_PATH}")

In [None]:
try:
    first = next(iter(test_dataloader))  # (batch, images, _, _)
    first_batch = first[0]
    if device_str == "cuda":
        first_batch = first_batch.half()

    first_batch = first_batch.to(device_str)

    model.warmup(batch=first_batch, runs=2)
    logger.info(
        "AnomaVision warm-up done with first batch %s.", tuple(first_batch.shape)
    )
except StopIteration:
    logger.warning("Dataset empty; skipping warm-up.")
except Exception as e:
    logger.warning(f"Warm-up skipped due to error: {e}")

In [None]:
# Parse visualization color
viz_color = tuple(map(int, config.viz_color.split(",")))

# Initialize profiler
inference_profiler = Profiler()

# Run inference (show first batch only)
for batch_idx, (batch, images, _, _) in enumerate(test_dataloader):
    if device_str == "cuda":
        batch = batch.half()
    batch = batch.to(device_str)

    # Inference
    with inference_profiler:
        image_scores, score_maps = model.predict(batch)

    # Postprocessing
    score_maps = adaptive_gaussian_blur(score_maps, kernel_size=33, sigma=4)
    score_map_classifications = anomavision.classification(score_maps, config.thresh)
    image_classifications = anomavision.classification(image_scores, config.thresh)

    # Visualization (only first batch)
    if config.enable_visualization and batch_idx == 0:
        test_images = np.array(images)

        boundary_images = anomavision.visualization.framed_boundary_images(
            test_images,
            score_map_classifications.numpy() if hasattr(score_map_classifications, 'numpy') else score_map_classifications,
            image_classifications.numpy() if hasattr(image_classifications, 'numpy') else image_classifications,
            padding=config.viz_padding,
        )

        heatmap_images = anomavision.visualization.heatmap_images(
            test_images,
            score_maps if isinstance(score_maps, np.ndarray) else score_maps.numpy(),
            alpha=config.viz_alpha,
        )

        highlighted_images = anomavision.visualization.highlighted_images(
            [images[i] for i in range(len(images))],
            score_map_classifications.numpy() if hasattr(score_map_classifications, 'numpy') else score_map_classifications,
            color=viz_color,
        )

        # Display first image
        fig, axs = plt.subplots(1, 4, figsize=(16, 8))
        fig.suptitle("AnomaVision Detection Results", fontsize=14)

        axs[0].imshow(images[0])
        axs[0].set_title("Original")
        axs[0].axis("off")

        axs[1].imshow(boundary_images[0])
        axs[1].set_title("Boundary")
        axs[1].axis("off")

        axs[2].imshow(heatmap_images[0])
        axs[2].set_title("Heatmap")
        axs[2].axis("off")

        axs[3].imshow(highlighted_images[0])
        axs[3].set_title("Highlighted")
        axs[3].axis("off")

        if config.save_visualizations:
            plt.savefig(RESULTS_PATH / f"batch_{batch_idx}.png", dpi=100, bbox_inches="tight")

        plt.tight_layout()
        plt.show()
        plt.close()

# Close model
model.close()

# Performance summary
inference_fps = inference_profiler.get_fps(len(test_dataset))
logger.info(f"Inference FPS: {inference_fps:.2f}")
logger.info(f"Total time: {inference_profiler.accumulated_time:.4f}s")

## 4. Evaluation

In [None]:
# Reload config
config = edict(load_config(str(run_dir / "config.yml")))
logger = get_logger("anomavision.eval")

# Setup paths
DATASET_PATH = os.path.realpath(config.dataset_path)
MODEL_DATA_PATH = os.path.realpath(str(run_dir))
device_str = determine_device(config.device)

logger.info(f"Device: {device_str}")
logger.info(f"Dataset: {DATASET_PATH}")
logger.info(f"Class: {config.class_name}")

In [None]:
# Load model
model_path_eval = os.path.join(MODEL_DATA_PATH, model_file)
model = ModelWrapper(model_path_eval, device_str)
model_type = ModelType.from_extension(model_path_eval)

logger.info(f"Model loaded: {model_type.value.upper()}")

In [None]:
# Create test dataset
test_dataset = anomavision.MVTecDataset(
    DATASET_PATH,
    config.class_name,
    is_train=False,
    resize=config.resize,
    crop_size=config.crop_size,
    normalize=config.normalize,
    mean=config.norm_mean,
    std=config.norm_std,
)

test_dataloader = DataLoader(
    test_dataset,
    batch_size=config.batch_size,
    num_workers=config.num_workers,
    pin_memory=config.pin_memory and device_str == "cuda",
)

logger.info(f"Test dataset: {len(test_dataset)} images")

In [None]:
# Run evaluation
all_images = []
all_image_classifications_target = []
all_masks_target = []
all_image_scores = []
all_score_maps = []

evaluation_profiler = Profiler()

for batch_idx, (batch, images, image_targets, mask_targets) in enumerate(test_dataloader):
    batch = batch.to(device_str)

    with evaluation_profiler:
        image_scores, score_maps = model.predict(batch)

    all_images.extend(images)
    all_image_classifications_target.extend(
        image_targets.numpy() if hasattr(image_targets, "numpy") else image_targets
    )
    all_masks_target.extend(
        mask_targets.numpy() if hasattr(mask_targets, "numpy") else mask_targets
    )

    if isinstance(image_scores, np.ndarray):
        all_image_scores.extend(image_scores.tolist())
        all_score_maps.extend(score_maps)
    else:
        all_image_scores.extend(
            image_scores.cpu().numpy().tolist() if hasattr(image_scores, "cpu") else image_scores.tolist()
        )
        all_score_maps.extend(
            score_maps.cpu().numpy() if hasattr(score_maps, "cpu") else score_maps
        )

model.close()

# Convert to arrays
all_images = np.array(all_images)
all_image_classifications_target = np.array(all_image_classifications_target)
all_masks_target = np.squeeze(np.array(all_masks_target), axis=1)
all_image_scores = np.array(all_image_scores)
all_score_maps = np.array(all_score_maps)

# Apply gaussian blur
all_score_maps = adaptive_gaussian_blur(all_score_maps, kernel_size=33, sigma=4)

logger.info("Evaluation completed")

In [None]:
# Visualize results
anomavision.visualize_eval_data(
    all_image_classifications_target,
    all_masks_target.astype(np.uint8).flatten(),
    all_image_scores,
    all_score_maps.flatten(),
)

plt.tight_layout()
plt.show()

In [None]:
# Performance metrics
evaluation_fps = evaluation_profiler.get_fps(len(test_dataset))
avg_time = evaluation_profiler.get_avg_time_ms(len(test_dataloader))

print("=" * 60)
print("EVALUATION PERFORMANCE SUMMARY")
print("=" * 60)
print(f"Evaluation FPS: {evaluation_fps:.2f} images/sec")
print(f"Average time: {avg_time:.2f} ms/batch")
print(f"Total time: {evaluation_profiler.accumulated_time:.4f}s")
print("=" * 60)