In [56]:
#| default_exp inference.anomaly_score_organizer

# Anomaly Score Organizer

> Organize and save images based on their anomaly scores into customizable threshold folders

In [57]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [58]:
#| hide
from nbdev.showdoc import *

In [59]:
#| export
import os
import json
import shutil
import random
from pathlib import Path
from functools import lru_cache
from typing import Union, List, Dict, Any, Optional, Tuple
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image, ImageDraw, ImageFont
import matplotlib.pyplot as plt
import matplotlib.patches as patches

from fastcore.all import *
from fastcore.test import *

## Data Setup for Trial and Error

> Helper functions to set up training and testing data for experimentation


In [60]:
#| export
def setup_trial_data(
    data_root: Union[str, Path],  # Root directory containing data
    normal_dir: str = "good",  # Normal images subdirectory
    abnormal_dir: str = "bad",  # Abnormal images subdirectory
    train_split: float = 0.7,  # Training split ratio
    val_split: float = 0.15,  # Validation split ratio
    test_split: float = 0.15,  # Test split ratio
    output_dir: Optional[Union[str, Path]] = None,  # Output directory for data lists
    seed: int = 42  # Random seed for reproducibility
) -> Dict[str, Path]:  # Returns dict with paths to train/val/test image list files
    """
    Set up training, validation, and test data splits for trial and error.

    Creates text files with image paths for each split that can be used
    for training and inference.

    Example:
        data_paths = setup_trial_data(
            data_root="/path/to/data",
            normal_dir="good",
            abnormal_dir="bad",
            train_split=0.7,
            val_split=0.15,
            test_split=0.15
        )
        # Use data_paths['train_normal'] for training normal images
    """
    data_root = Path(data_root)
    if not data_root.exists():
        raise FileNotFoundError(f"Data root not found: {data_root}")

    normal_path = data_root / normal_dir
    abnormal_path = data_root / abnormal_dir

    if not normal_path.exists():
        raise FileNotFoundError(f"Normal directory not found: {normal_path}")
    if not abnormal_path.exists():
        raise FileNotFoundError(f"Abnormal directory not found: {abnormal_path}")

    # Validate splits
    if abs(train_split + val_split + test_split - 1.0) > 1e-6:
        raise ValueError(f"Splits must sum to 1.0, got {train_split + val_split + test_split}")

    # Set output directory
    if output_dir is None:
        output_dir = data_root / "data_splits"
    else:
        output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Get all images
    normal_images = get_images_(normal_path)
    abnormal_images = get_images_(abnormal_path)

    print(f"üìä Found {len(normal_images)} normal images")
    print(f"üìä Found {len(abnormal_images)} abnormal images")

    # Shuffle with seed for reproducibility
    import random
    random.seed(seed)
    random.shuffle(normal_images)
    random.shuffle(abnormal_images)

    # Split normal images
    n_train_norm = int(len(normal_images) * train_split)
    n_val_norm = int(len(normal_images) * val_split)

    train_normal = normal_images[:n_train_norm]
    val_normal = normal_images[n_train_norm:n_train_norm + n_val_norm]
    test_normal = normal_images[n_train_norm + n_val_norm:]

    # Split abnormal images
    n_train_abnorm = int(len(abnormal_images) * train_split)
    n_val_abnorm = int(len(abnormal_images) * val_split)

    train_abnormal = abnormal_images[:n_train_abnorm]
    val_abnormal = abnormal_images[n_train_abnorm:n_train_abnorm + n_val_abnorm]
    test_abnormal = abnormal_images[n_train_abnorm + n_val_abnorm:]

    # Save to files
    def save_image_list(images: List[Path], filepath: Path) -> None:
        """Save list of image paths to text file."""
        with open(filepath, 'w') as f:
            for img in images:
                f.write(f"{img}\n")

    result_paths = {}

    # Training data
    train_normal_file = output_dir / "train_normal.txt"
    train_abnormal_file = output_dir / "train_abnormal.txt"
    save_image_list(train_normal, train_normal_file)
    save_image_list(train_abnormal, train_abnormal_file)
    result_paths['train_normal'] = train_normal_file
    result_paths['train_abnormal'] = train_abnormal_file

    # Validation data
    val_normal_file = output_dir / "val_normal.txt"
    val_abnormal_file = output_dir / "val_abnormal.txt"
    save_image_list(val_normal, val_normal_file)
    save_image_list(val_abnormal, val_abnormal_file)
    result_paths['val_normal'] = val_normal_file
    result_paths['val_abnormal'] = val_abnormal_file

    # Test data
    test_normal_file = output_dir / "test_normal.txt"
    test_abnormal_file = output_dir / "test_abnormal.txt"
    save_image_list(test_normal, test_normal_file)
    save_image_list(test_abnormal, test_abnormal_file)
    result_paths['test_normal'] = test_normal_file
    result_paths['test_abnormal'] = test_abnormal_file

    # Combined test file (for inference)
    test_all_file = output_dir / "test_all.txt"
    save_image_list(test_normal + test_abnormal, test_all_file)
    result_paths['test_all'] = test_all_file

    # Print summary
    print(f"\n‚úÖ Data splits created in {output_dir}")
    print(f"   Train: {len(train_normal)} normal, {len(train_abnormal)} abnormal")
    print(f"   Val:   {len(val_normal)} normal, {len(val_abnormal)} abnormal")
    print(f"   Test:  {len(test_normal)} normal, {len(test_abnormal)} abnormal")

    # Save metadata
    metadata = {
        'data_root': str(data_root),
        'normal_dir': normal_dir,
        'abnormal_dir': abnormal_dir,
        'splits': {
            'train': train_split,
            'val': val_split,
            'test': test_split
        },
        'counts': {
            'train_normal': len(train_normal),
            'train_abnormal': len(train_abnormal),
            'val_normal': len(val_normal),
            'val_abnormal': len(val_abnormal),
            'test_normal': len(test_normal),
            'test_abnormal': len(test_abnormal)
        },
        'seed': seed
    }

    metadata_file = output_dir / "data_split_metadata.json"
    with open(metadata_file, 'w') as f:
        json.dump(metadata, f, indent=2)

    result_paths['metadata'] = metadata_file

    return result_paths


In [61]:
#| export
def load_image_fast(
    image_path: Union[str, Path],  # Path to image
    cache: bool = True  # Whether to cache loaded images
) -> Image.Image:  # Returns PIL Image
    """
    Fast image loading with optional caching.

    Uses efficient PIL loading and optional LRU cache for repeated access.
    """
    image_path = Path(image_path)

    if cache:
        return _load_image_cached(str(image_path))
    else:
        return Image.open(image_path).convert('RGB')

@lru_cache(maxsize=1000)
def _load_image_cached(image_path_str: str) -> Image.Image:
    """Cached image loader (internal use)."""
    return Image.open(image_path_str).convert('RGB')


In [62]:
#| export
def save_image_with_metadata(
    image: Image.Image,  # PIL Image to save
    output_path: Union[str, Path],  # Output path
    metadata: Optional[Dict[str, Any]] = None,  # Optional metadata dict
    format: str = "JPEG",  # Image format
    quality: int = 95,  # JPEG quality (1-100)
    optimize: bool = True  # Whether to optimize image
) -> Path:  # Returns saved path
    """
    Save image with optional metadata for reproducibility.

    Saves image and optionally creates a JSON file with metadata
    in the same directory.

    Example:
        img = Image.open("test.jpg")
        save_image_with_metadata(
            img, "output/test.jpg",
            metadata={"anomaly_score": 0.75, "model": "padim"}
        )
        # Creates output/test.jpg and output/test_metadata.json
    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Save image
    if format == "JPEG":
        image.save(output_path, format=format, quality=quality, optimize=optimize)
    else:
        image.save(output_path, format=format, optimize=optimize)

    # Save metadata if provided
    if metadata is not None:
        metadata_path = output_path.with_suffix('.json').with_name(
            output_path.stem + '_metadata.json'
        )
        with open(metadata_path, 'w') as f:
            json.dump(metadata, f, indent=2)

    return output_path


In [63]:
#| export
def create_poster_from_folder(
    folder_path: Union[str, Path],  # Folder containing images
    image_index_df: pd.DataFrame,  # DataFrame with image indices
    output_path: Union[str, Path],  # Path to save poster
    images_per_poster: int = 20,  # Number of images per poster
    poster_index: int = 0,  # Index of this poster
    image_size: Tuple[int, int] = (224, 224),  # Size of each image
    grid_cols: int = 5,  # Number of columns
    annotate_with_index: bool = True,  # Whether to annotate with index
    font_size: int = 30,  # Font size for annotations
    title: Optional[str] = None  # Poster title
) -> Optional[Path]:  # Returns path to saved poster or None
    """
    Create a poster from images in a folder with optional index annotations.

    This function efficiently loads images, resizes them, and creates a grid poster.
    Uses fast image loading for better performance.

    Example:
        poster_path = create_poster_from_folder(
            folder_path="output/0.5",
            image_index_df=df,
            output_path="poster.png",
            images_per_poster=20,
            grid_cols=5
        )
    """
    folder_path = Path(folder_path)
    output_path = Path(output_path)

    if not folder_path.exists():
        print(f"‚ö†Ô∏è  Folder not found: {folder_path}")
        return None

    # Get all images in folder
    image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
    images = []
    for ext in image_extensions:
        images.extend(folder_path.glob(f"*{ext}"))
        images.extend(folder_path.glob(f"*{ext.upper()}"))

    images = sorted(set(images))

    if not images:
        print(f"‚ö†Ô∏è  No images found in {folder_path}")
        return None

    # Limit to images_per_poster
    images = images[:images_per_poster]

    # Calculate grid dimensions
    grid_rows = int(np.ceil(len(images) / grid_cols))

    # Create figure
    fig_width = grid_cols * (image_size[0] / 100)
    fig_height = grid_rows * (image_size[1] / 100) + 1  # Extra space for title

    fig, axes = plt.subplots(grid_rows, grid_cols, figsize=(fig_width, fig_height))

    # Handle single row/col case
    if grid_rows == 1 and grid_cols == 1:
        axes = [[axes]]
    elif grid_rows == 1:
        axes = [axes]
    elif grid_cols == 1:
        axes = [[ax] for ax in axes]

    # Set title
    if title:
        fig.suptitle(f"{title} - Poster {poster_index + 1}", fontsize=14, weight='bold')

    # Load and display images
    for idx, img_path in enumerate(images):
        row = idx // grid_cols
        col = idx % grid_cols
        ax = axes[row][col]

        try:
            # Fast image loading
            img = load_image_fast(img_path, cache=False)
            img = img.resize(image_size, Image.Resampling.LANCZOS)

            # Annotate with index if requested
            if annotate_with_index:
                # Find index in dataframe
                img_name = img_path.name
                df_match = image_index_df[image_index_df['image_name'] == img_name]
                if not df_match.empty:
                    img_index = df_match.iloc[0]['index']
                    img = annotate_image_with_index(img, img_index, font_size=font_size)

            ax.imshow(np.array(img))
            ax.set_title(img_path.stem[:20], fontsize=8)  # Truncate long names
            ax.axis('off')

        except Exception as e:
            ax.text(0.5, 0.5, f"Error\n{img_path.name}",
                   ha='center', va='center', transform=ax.transAxes,
                   fontsize=8, color='red')
            ax.axis('off')

    # Hide empty cells
    for idx in range(len(images), grid_rows * grid_cols):
        row = idx // grid_cols
        col = idx % grid_cols
        axes[row][col].axis('off')

    # Save poster
    output_path.parent.mkdir(parents=True, exist_ok=True)
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='white')
    plt.close()

    return output_path


In [64]:
#| export
# Import from existing modules
from be_vision_ad_tools.inference.prediction_system import (
    predict_image_list_from_file_enhanced,
    predict_image_list
)

from be_vision_ad_tools.inference.multinode_inference import (
    create_smart_batches,
    scan_folder_structure,
    create_batch_list_file
)

# Data

In [65]:
import os
DATA_ROOT = os.getenv('DATA_PATH')
good_im_path= Path(DATA_ROOT,'malacca','g_imgs')
bad_im_path= Path(DATA_ROOT,'malacca','b_imgs')
MODEL_PATH = Path(DATA_ROOT, 'malacca','model.pt')
print(MODEL_PATH)
print(MODEL_PATH.exists())
sm_img = Path(good_im_path).ls()[0]
print(sm_img)
OUTPUT_DIR = Path(DATA_ROOT,'malacca','output')
Path(OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
print(f'OUTPUT_DIR: {OUTPUT_DIR.exists()}')


/home/hasan/Schreibtisch/projects/data/malacca/model.pt
True
/home/hasan/Schreibtisch/projects/data/malacca/g_imgs/2462401115552714.png
OUTPUT_DIR: True


In [66]:
good_im_path.exists(),bad_im_path.exists()


(True, True)

## Core Functions

In [67]:
score_thrs = [0.5, 1]
sorted_score_thrs = sorted(score_thrs)
sorted_score_thrs

[0.5, 1]

In [68]:
anomaly_score = 0.788
fn_name = None
for score_thr in sorted_score_thrs:
	if anomaly_score <= score_thr:
		print(score_thr)
		print(str(score_thr))
fn_name = str(sorted_score_thrs[-1])
print(fn_name)

1
1
1


In [69]:
#| export
def normalize_score_thresholds(
    score_thresholds: Optional[List[float]]  # List of score thresholds or None
) -> List[float]:  # Returns sorted list of thresholds
    """
    Normalize and sort score thresholds.

    Returns default thresholds if None provided, otherwise returns sorted list.
    """
    if score_thresholds is None:
        return [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
    return sorted(score_thresholds)


In [70]:
normalize_score_thresholds(
	score_thresholds=[0.5,1]
)

[0.5, 1]

In [71]:
#| export
def determine_score_folder(
    anomaly_score: float,  # Anomaly score (0.0 to 1.0)
    score_thresholds: List[float]  # List of score thresholds (e.g., [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0])
) -> str:  # Returns the folder name based on the score
    """
    Determine which folder an image should go to based on its anomaly score.

    Example:
        score_thresholds = [0.5, 1.0]
        - score 0.3 -> folder "0.5"
        - score 0.7 -> folder "1.0"
    """
    sorted_thresholds = normalize_score_thresholds(score_thresholds)

    # Find the appropriate folder
    for threshold in sorted_thresholds:
        if anomaly_score <= threshold:
            return str(threshold)

    # If score exceeds all thresholds, use the last one
    return str(sorted_thresholds[-1])

In [72]:
score_thresholds = [0.5, 1]
determine_score_folder(anomaly_score, score_thresholds)

'1'

In [73]:
#| export
def get_image_parent_folder(
    image_path: Union[str, Path]  # Path to the image
) -> str:  # Returns parent folder name
    """
    Extract parent folder name from image path.

    Example: 'first/second/image.png' -> 'second'
    """
    return Path(image_path).parent.name


In [74]:
print(sm_img)
get_image_parent_folder(sm_img)

/home/hasan/Schreibtisch/projects/data/malacca/g_imgs/2462401115552714.png


'g_imgs'

In [75]:
#| export
def build_target_folder_path(
    output_dir: Path,  # Base output directory
    parent_folder: str,  # Parent folder name from image path
    folder_name: str  # Score-based folder name
) -> Path:  # Returns target folder path
    """
    Build target folder path from components.

    Creates: output_dir/parent_folder/folder_name
    """
    return Path(output_dir, parent_folder, folder_name)


In [76]:
parent_folder = get_image_parent_folder(sm_img)
folder_name = '0.5'

fn_path = build_target_folder_path(
	OUTPUT_DIR, parent_folder, folder_name)
print(f'fn_path: {fn_path}')

fn_path: /home/hasan/Schreibtisch/projects/data/malacca/output/g_imgs/0.5


In [77]:
#| export

def copy_or_move_file(
    source_path: Union[str, Path],  # Source file path
    dest_path: Union[str, Path],    # Destination file path
    copy_mode: bool = True,         # If True, copy; if False, move
    dry_run: bool = False           # If True, print action instead of executing
) -> None:
    """
    Copy or move a file from source to destination.
    Example:
        copy_or_move_file('a.png', 'b/c.png', copy_mode=True, dry_run=True)
        # Dry run: Would copy a.png to b/c.png
    """
    source_path = Path(source_path)
    dest_path = Path(dest_path)
    action = "copy" if copy_mode else "move"
    if dry_run:
        print(f"Dry run: Would {action} {source_path} to {dest_path}")
        return
    dest_path.parent.mkdir(parents=True, exist_ok=True)
    if copy_mode:
        shutil.copy2(source_path, dest_path)
    else:
        shutil.move(str(source_path), str(dest_path))


In [None]:
rs = predict_image(
	model_path=MODEL_PATH,
	image_path=sm_img,
	heatmap_style='side_by_side',
	save_heatmap=False,
	show_heatmap=False,
	output_dir=OUTPUT_DIR,
	compress=True,
	jpeg_quality=95,
	device='cpu'

)

2025-11-10 16:05:56,728 - be_vision_ad_tools.inference.prediction_system - INFO - Predicting with .pt model on 2462401115552714.png


2025-11-10 16:05:57,550 - be_vision_ad_tools.inference.prediction_system - INFO - Prediction: NORMAL (Score: 0.0000)


In [94]:
im_path = rs.get('image_path')
print(im_path)

/home/hasan/Schreibtisch/projects/data/malacca/g_imgs/2462401115552714.png


In [95]:
anomaly_score = rs.get('anomaly_score')
print(anomaly_score)

0.0


In [96]:
#| export
def validate_prediction_result(
    result: Dict[str, Any]  # Prediction result dictionary
) -> Tuple[Optional[str], Optional[float]]:  # Returns (image_path, anomaly_score) or (None, None) if invalid
    """
    Validate and extract image_path and anomaly_score from prediction result.

    Returns (image_path, anomaly_score) if valid, (None, None) if invalid.
    """
    image_path = result.get('image_path')
    anomaly_score = result.get('anomaly_score')

    if image_path is None or anomaly_score is None:
        return None, None
    return image_path, anomaly_score


In [99]:
im_p, a_s = validate_prediction_result(rs)
print(im_p)
print(a_s)



/home/hasan/Schreibtisch/projects/data/malacca/g_imgs/2462401115552714.png
0.0


In [100]:
get_image_parent_folder(im_p)

'g_imgs'

In [102]:
score_thresholds = [0.5, 1]
parent_folder = get_image_parent_folder(im_p)
print(parent_folder)
folder_name = determine_score_folder(a_s, score_thresholds)
print(folder_name)
target_folder = build_target_folder_path(
	OUTPUT_DIR, parent_folder, folder_name)
print(target_folder)


g_imgs
0.5
/home/hasan/Schreibtisch/projects/data/malacca/output/g_imgs/0.5


In [103]:
#| export
def save_image_by_score(
    image_path: Union[str, Path],  # Path to the source image
    anomaly_score: float,  # Anomaly score for the image
    output_dir: Path,  # Base output directory
    score_thresholds: List[float],  # List of score thresholds
    dry_run: bool = False,  # If True, do not move or copy files
    copy_mode: bool = True  # If True, copy files; if False, move files
) -> Path:  # Returns the destination path
    """
    Save (copy or move) an image to the appropriate score folder.

    Returns the destination path where the image was saved.
    """
    image_path = Path(image_path)

    if not image_path.exists():
        raise FileNotFoundError(f"Image not found: {image_path}")

    # Get parent folder name
    im_folder = get_image_parent_folder(image_path)

    # Determine target folder
    folder_name = determine_score_folder(anomaly_score, score_thresholds)
    target_folder = build_target_folder_path(output_dir, im_folder, folder_name)
    target_folder.mkdir(parents=True, exist_ok=True)

    # Create destination path
    dest_path = target_folder / image_path.name

    # Copy or move the file or dry run
    if dry_run:
        print(f"Dry run: Would move {image_path} to {dest_path}")
    else:
        copy_or_move_file(image_path, dest_path, copy_mode)

    return dest_path

In [104]:
dest = save_image_by_score(
	im_p,
	a_s,
	OUTPUT_DIR,
	score_thresholds,
	dry_run=True
)

Dry run: Would move /home/hasan/Schreibtisch/projects/data/malacca/g_imgs/2462401115552714.png to /home/hasan/Schreibtisch/projects/data/malacca/output/g_imgs/0.5/2462401115552714.png


In [105]:
score_thresholds

[0.5, 1]

In [106]:
f_map = {}
for i in score_thresholds:
	nm = str(i)
	f_p = Path(OUTPUT_DIR,nm)
	f_map[nm] = f_p
f_map

{'0.5': Path('/home/hasan/Schreibtisch/projects/data/malacca/output/0.5'),
 '1': Path('/home/hasan/Schreibtisch/projects/data/malacca/output/1')}

In [107]:
#| export
def create_score_folders(
    output_dir: Path,  # Base output directory
    score_thresholds: List[float],  # List of score thresholds
) -> Dict[str, Path]:  # Returns dict mapping threshold strings to folder paths
    """
    Create subdirectories for each score threshold.

    Returns a dictionary mapping threshold values to their folder paths.
    """
    output_dir = Path(output_dir)
    folder_map = {}

    for threshold in score_thresholds:
        folder_name = str(threshold)
        folder_path = Path(output_dir, folder_name)
        folder_path.mkdir(parents=True, exist_ok=True)
        folder_map[folder_name] = folder_path

    print(f"‚úÖ Created {len(folder_map)} score folders in {output_dir}")
    for threshold, path in sorted(folder_map.items()):
        print(f"   üìÅ {threshold}: {path}")

    return folder_map

In [None]:

#| export
def process_single_image_result(
    result: Dict[str, Any],  # Prediction result dictionary
    output_dir: Path,  # Base output directory
    score_thresholds: List[float],  # List of score thresholds
    copy_mode: bool  # Whether to copy or move
) -> Optional[Dict[str, Any]]:  # Returns dict with folder_name and dest_path, or None if failed
    """
    Process a single prediction result: save image and return metadata.

    Returns dict with 'folder_name' and 'dest_path', or None if processing failed.
    """
    image_path, anomaly_score = validate_prediction_result(result)

    if image_path is None or anomaly_score is None:
        print(f"‚ö†Ô∏è  Skipping result with missing data: {result}")
        return None

    try:
        dest_path = save_image_by_score(
            image_path=image_path,
            anomaly_score=anomaly_score,
            output_dir=output_dir,
            score_thresholds=score_thresholds,
            copy_mode=copy_mode
        )

        folder_name = determine_score_folder(anomaly_score, score_thresholds)
        return {
            'folder_name': folder_name,
            'dest_path': str(dest_path),
            'anomaly_score': float(anomaly_score)
        }
    except Exception as e:
        print(f"‚ùå Error processing {image_path}: {e}")
        return None

#| export
def initialize_folder_stats(
    score_thresholds: List[float]  # List of score thresholds
) -> Dict[str, Dict[str, Any]]:  # Returns initialized stats dictionary
    """
    Initialize folder statistics dictionary.

    Returns dict with structure: {folder_name: {'count': 0, 'images': [], 'scores': []}}
    """
    return {str(t): {'count': 0, 'images': [], 'scores': []} for t in score_thresholds}

#| export
def update_folder_stats(
    folder_stats: Dict[str, Dict[str, Any]],  # Folder statistics dictionary
    folder_name: str,  # Folder name
    dest_path: str,  # Destination path
    anomaly_score: float  # Anomaly score
) -> None:
    """
    Update folder statistics with a new image result.
    """
    folder_stats[folder_name]['count'] += 1
    folder_stats[folder_name]['images'].append(dest_path)
    folder_stats[folder_name]['scores'].append(anomaly_score)

#| export
def create_folder_metadata(
    folder_name: str,  # Folder name (threshold)
    stats: Dict[str, Any]  # Folder statistics
) -> Dict[str, Any]:  # Returns metadata dictionary
    """
    Create metadata dictionary for a folder.
    """
    scores = stats['scores']
    return {
        'threshold': folder_name,
        'count': stats['count'],
        'avg_score': float(np.mean(scores)) if scores else 0.0,
        'min_score': float(np.min(scores)) if scores else 0.0,
        'max_score': float(np.max(scores)) if scores else 0.0,
        'images': stats['images']
    }

#| export
def save_folder_metadata(
    metadata_path: Path,  # Path to save metadata JSON
    metadata: Dict[str, Any]  # Metadata dictionary
) -> None:
    """
    Save folder metadata to JSON file.
    """
    with open(metadata_path, 'w') as f:
        json.dump(metadata, f, indent=2)

#| export
def save_all_folder_metadata(
    folder_map: Dict[str, Path],  # Mapping of folder names to paths
    folder_stats: Dict[str, Dict[str, Any]]  # Folder statistics
) -> None:
    """
    Save metadata JSON files for all folders that have images.
    """
    print("\nüíæ Saving metadata...")
    for folder_name, stats in folder_stats.items():
        if stats['count'] > 0:
            metadata_path = folder_map[folder_name] / "metadata.json"
            metadata = create_folder_metadata(folder_name, stats)
            save_folder_metadata(metadata_path, metadata)

#| export
def print_organization_summary(
    score_thresholds: List[float],  # List of score thresholds
    folder_stats: Dict[str, Dict[str, Any]],  # Folder statistics
    failed_count: int  # Number of failed images
) -> None:
    """
    Print summary of image organization.
    """
    print("\nüìä ORGANIZATION SUMMARY")
    print("="*70)
    for threshold in sorted(score_thresholds):
        folder_name = str(threshold)
        count = folder_stats[folder_name]['count']
        if count > 0:
            avg_score = np.mean(folder_stats[folder_name]['scores'])
            print(f"üìÅ Folder '{folder_name}': {count} images (avg score: {avg_score:.4f})")

    if failed_count > 0:
        print(f"\n‚ö†Ô∏è  Failed: {failed_count} images")

    print("\n‚úÖ Organization complete!")

#| export
def build_organization_stats(
    output_dir: Path,  # Output directory
    score_thresholds: List[float],  # Score thresholds
    folder_stats: Dict[str, Dict[str, Any]],  # Folder statistics
    total_processed: int,  # Total processed images
    failed_count: int  # Failed images count
) -> Dict[str, Any]:  # Returns organization statistics dictionary
    """
    Build final organization statistics dictionary.
    """
    return {
        'output_dir': str(output_dir),
        'score_thresholds': score_thresholds,
        'folder_stats': {k: {'count': v['count'],
                             'avg_score': float(np.mean(v['scores'])) if v['scores'] else 0.0}
                        for k, v in folder_stats.items()},
        'total_processed': total_processed,
        'failed_count': failed_count
    }


In [32]:
#| export
def create_score_folders(
    output_dir: Path,  # Base output directory
    score_thresholds: List[float],  # List of score thresholds
) -> Dict[str, Path]:  # Returns dict mapping threshold strings to folder paths
    """
    Create subdirectories for each score threshold.

    Returns a dictionary mapping threshold values to their folder paths.
    """
    output_dir = Path(output_dir)
    folder_map = {}

    for threshold in score_thresholds:
        folder_name = str(threshold)
        folder_path = Path(output_dir, folder_name)
        folder_path.mkdir(parents=True, exist_ok=True)
        folder_map[folder_name] = folder_path

    print(f"‚úÖ Created {len(folder_map)} score folders in {output_dir}")
    for threshold, path in sorted(folder_map.items()):
        print(f"   üìÅ {threshold}: {path}")

    return folder_map

In [33]:
output_dir = Path('output_dir')
create_score_folders(output_dir, score_thresholds)

‚úÖ Created 2 score folders in output_dir
   üìÅ 0.5: output_dir/0.5
   üìÅ 1: output_dir/1


{'0.5': Path('output_dir/0.5'), '1': Path('output_dir/1')}

In [None]:
#| export
def organize_images_by_score(
    prediction_results: List[Dict[str, Any]],  # List of prediction results from predict_image_list
    output_dir: Union[str, Path],  # Base output directory
    score_thresholds: List[float] = [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],  # Score thresholds
    copy_mode: bool = True,  # If True, copy files; if False, move files
    save_metadata: bool = True  # If True, save metadata JSON for each folder
) -> Dict[str, Any]:  # Returns organization statistics
    """
    Organize images into folders based on their anomaly scores.

    Args:
        prediction_results: List of prediction results, each containing 'image_path' and 'anomaly_score'
        output_dir: Base directory where score folders will be created
        score_thresholds: List of threshold values (e.g., [0.5, 1.0] for simple two-folder setup)
        copy_mode: Whether to copy (True) or move (False) images
        save_metadata: Whether to save JSON metadata for each folder

    Returns:
        Dictionary with organization statistics
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    print("\nüóÇÔ∏è  ORGANIZING IMAGES BY ANOMALY SCORE")
    print("="*70)
    print(f"üìÇ Output directory: {output_dir}")
    print(f"üìä Score thresholds: {score_thresholds}")
    print(f"üìã Total images: {len(prediction_results)}")
    print(f"üîÑ Mode: {'COPY' if copy_mode else 'MOVE'}")

    # Create score folders
    folder_map = create_score_folders(output_dir, score_thresholds)

    # Initialize statistics
    folder_stats = initialize_folder_stats(score_thresholds)
    failed_count = 0

    print("\nüì¶ Processing images...")

    # Process each image
    for result in tqdm(prediction_results, desc="Organizing images"):
        processed = process_single_image_result(
            result, output_dir, score_thresholds, copy_mode
        )

        if processed is None:
            failed_count += 1
            continue

        # Update statistics
        update_folder_stats(
            folder_stats,
            processed['folder_name'],
            processed['dest_path'],
            processed['anomaly_score']
        )

    # Save metadata if requested
    if save_metadata:
        save_all_folder_metadata(folder_map, folder_stats)

    # Print summary
    print_organization_summary(score_thresholds, folder_stats, failed_count)

    # Build and return statistics
    return build_organization_stats(
        output_dir, score_thresholds, folder_stats,
        len(prediction_results) - failed_count, failed_count
    )

In [None]:
#| export
def create_posters_for_score_folders(
    output_dir: Union[str, Path],  # Base output directory with score folders
    image_index_df: pd.DataFrame,  # Dataframe with image indices
    score_thresholds: List[float],  # List of score thresholds
    images_per_poster: int = 20,  # Number of images per poster
    image_size: Tuple[int, int] = (224, 224),  # Size of each image in the poster
    grid_cols: int = 5,  # Number of columns in the grid
    annotate_with_index: bool = True,  # Whether to add index numbers
    font_size: int = 30  # Font size for index numbers
) -> Dict[str, List[Path]]:  # Returns dict mapping folder names to poster paths
    """
    Create posters for all score folders.

    This function processes each score folder and creates one or more posters
    depending on the number of images in each folder.

    Args:
        output_dir: Base directory containing score folders
        image_index_df: DataFrame with image indices
        score_thresholds: List of threshold values
        images_per_poster: How many images per poster
        image_size: Size of each image in the poster
        grid_cols: Number of columns in the grid
        annotate_with_index: Whether to annotate images with indices
        font_size: Font size for annotations

    Returns:
        Dictionary mapping folder names to list of poster paths
    """
    output_dir = Path(output_dir)
    poster_paths = {}

    print("\nüñºÔ∏è  CREATING POSTERS FOR SCORE FOLDERS")
    print("="*70)

    for threshold in score_thresholds:
        folder_name = str(threshold)
        folder_path = output_dir / folder_name

        if not folder_path.exists():
            print(f"‚ö†Ô∏è  Folder {folder_name} does not exist, skipping...")
            continue

        # Get all images in folder (excluding metadata.json)
        image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
        images_in_folder = []
        for ext in image_extensions:
            images_in_folder.extend(folder_path.glob(f"*{ext}"))
            images_in_folder.extend(folder_path.glob(f"*{ext.upper()}"))

        images_in_folder = sorted(set(images_in_folder))

        if not images_in_folder:
            print(f"üìÅ Folder '{folder_name}': No images found")
            continue

        print(f"\nüìÅ Processing folder '{folder_name}': {len(images_in_folder)} images")

        # Calculate number of posters needed
        num_posters = int(np.ceil(len(images_in_folder) / images_per_poster))
        folder_poster_paths = []

        # Create posters
        for poster_idx in range(num_posters):
            start_idx = poster_idx * images_per_poster
            end_idx = min((poster_idx + 1) * images_per_poster, len(images_in_folder))

            # Create a temporary folder for this subset
            poster_output_path = folder_path / f"poster_{poster_idx + 1:03d}.png"

            # Get subset of images for this poster
            poster_images = images_in_folder[start_idx:end_idx]

            # Create temporary folder with subset
            temp_folder = folder_path / f"_temp_poster_{poster_idx}"
            temp_folder.mkdir(exist_ok=True)

            # Copy images to temp folder
            for img in poster_images:
                shutil.copy2(img, temp_folder / img.name)

            # Create poster
            try:
                poster_path = create_poster_from_folder(
                    folder_path=temp_folder,
                    image_index_df=image_index_df,
                    output_path=poster_output_path,
                    images_per_poster=images_per_poster,
                    poster_index=poster_idx,
                    image_size=image_size,
                    grid_cols=grid_cols,
                    annotate_with_index=annotate_with_index,
                    font_size=font_size,
                    title=f"Score Folder {folder_name}"
                )

                if poster_path:
                    folder_poster_paths.append(poster_path)
                    print(f"  ‚úÖ Created poster {poster_idx + 1}/{num_posters}: {len(poster_images)} images")

            finally:
                # Clean up temp folder
                shutil.rmtree(temp_folder, ignore_errors=True)

        if folder_poster_paths:
            poster_paths[folder_name] = folder_poster_paths
            print(f"üìä Folder '{folder_name}': Created {len(folder_poster_paths)} poster(s)")

    print("\n‚úÖ Poster creation complete!")
    print(f"   Total folders processed: {len(poster_paths)}")
    print(f"   Total posters created: {sum(len(p) for p in poster_paths.values())}")

    return poster_paths

In [None]:
#| export
def predict_and_organize_by_score(
    model_path: Union[str, Path],  # Path to the trained model
    image_list_file: Union[str, Path],  # Text file with image paths (one per line)
    output_dir: Union[str, Path],  # Base output directory for organized images
    score_thresholds: List[float] = [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],  # Score thresholds
    batch_id: Optional[str] = None,  # Optional batch identifier
    copy_mode: bool = True,  # If True, copy files; if False, move files
    save_metadata: bool = True,  # If True, save metadata JSON for each folder
    create_posters: bool = True,  # If True, create posters for each score folder
    images_per_poster: int = 20,  # Number of images per poster
    image_size: Tuple[int, int] = (224, 224),  # Size of each image in the poster
    grid_cols: int = 5,  # Number of columns in poster grid
    annotate_with_index: bool = True,  # Whether to add index numbers to images in posters
    font_size: int = 30,  # Font size for index annotations
    device: str = "auto",  # Device for inference ("auto", "cpu", "cuda")
    **kwargs  # Additional arguments passed to prediction function
) -> Dict[str, Any]:  # Returns combined prediction and organization results
    """
    Complete workflow: Predict anomaly scores, organize images, and create indexed posters.

    This is the main function that combines:
    1. Image index dataframe creation
    2. Smart batch creation
    3. Prediction using predict_image_list_from_file_enhanced
    4. Image organization based on anomaly scores
    5. Poster creation with index annotations (optional)

    Args:
        model_path: Path to the trained anomaly detection model
        image_list_file: Text file containing paths to images (one per line)
        output_dir: Directory where score-based folders will be created
        score_thresholds: List of threshold values (customize to your needs)
            Examples:
            - [0.5, 1.0] for simple two-folder setup
            - [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] for fine-grained organization
        batch_id: Optional identifier for this batch
        copy_mode: Whether to copy (True) or move (False) images
        save_metadata: Whether to save JSON metadata for each folder
        create_posters: Whether to create image posters for each score folder
        images_per_poster: Number of images to include in each poster
        image_size: Size to resize each image to in posters
        grid_cols: Number of columns in the poster grid
        annotate_with_index: Whether to annotate images with their dataframe index
        font_size: Font size for index annotations
        device: Device to use for inference
        **kwargs: Additional arguments (save_heatmap, heatmap_style, etc.)

    Returns:
        Dictionary containing:
        - image_index_df: DataFrame with image indices
        - prediction_results: Full prediction results
        - organization_stats: Statistics about image organization
        - poster_paths: Paths to created posters (if create_posters=True)
    """
    print("\nüöÄ PREDICT AND ORGANIZE BY ANOMALY SCORE WITH INDEXED POSTERS")
    print("="*70)

    # Step 0: Create image index dataframe
    print("\nüìä Step 0: Creating image index dataframe...")
    image_index_df = create_image_index_dataframe(image_list_file)

    # Save the dataframe
    output_dir_path = Path(output_dir)
    output_dir_path.mkdir(parents=True, exist_ok=True)
    df_path = output_dir_path / "image_index.csv"
    image_index_df.to_csv(df_path, index=False)
    print(f"üíæ Saved image index dataframe to {df_path}")

    # Step 1: Run predictions
    print("\nüìä Step 1: Running predictions...")
    prediction_output = predict_image_list_from_file_enhanced(
        model_path=model_path,
        image_list_file=image_list_file,
        batch_id=batch_id,
        output_dir=output_dir,
        device=device,
        save_results=True,
        **kwargs
    )

    # Extract results
    prediction_results = prediction_output.get('results', [])

    if not prediction_results:
        print("‚ö†Ô∏è  No prediction results to organize!")
        return {
            'image_index_df': image_index_df,
            'prediction_results': prediction_output,
            'organization_stats': None,
            'poster_paths': None
        }

    print(f"‚úÖ Predictions complete: {len(prediction_results)} images processed")

    # Step 2: Organize images by score
    print("\nüìÅ Step 2: Organizing images by score...")
    organization_stats = organize_images_by_score(
        prediction_results=prediction_results,
        output_dir=output_dir,
        score_thresholds=score_thresholds,
        copy_mode=copy_mode,
        save_metadata=save_metadata
    )

    # Step 3: Create posters (optional)
    poster_paths = None
    if create_posters:
        print("\nüñºÔ∏è  Step 3: Creating indexed posters...")
        poster_paths = create_posters_for_score_folders(
            output_dir=output_dir,
            image_index_df=image_index_df,
            score_thresholds=score_thresholds,
            images_per_poster=images_per_poster,
            image_size=image_size,
            grid_cols=grid_cols,
            annotate_with_index=annotate_with_index,
            font_size=font_size
        )

    print("\nüéâ WORKFLOW COMPLETE!")
    print("="*70)
    print(f"üìä Image index dataframe: {df_path}")
    print(f"üìÅ Organized images: {output_dir}")
    if poster_paths:
        total_posters = sum(len(p) for p in poster_paths.values())
        print(f"üñºÔ∏è  Created {total_posters} poster(s)")

    return {
        'image_index_df': image_index_df,
        'image_index_df_path': str(df_path),
        'prediction_results': prediction_output,
        'organization_stats': organization_stats,
        'poster_paths': poster_paths
    }

In [None]:
#| export
def annotate_image_with_index(
    image: Union[Image.Image, np.ndarray],  # PIL Image or numpy array
    index: int,  # Index number to display
    font_size: int = 40,  # Font size for the index number
    position: str = "top_left",  # Position: "top_left", "top_right", "bottom_left", "bottom_right"
    text_color: Tuple[int, int, int] = (255, 255, 0),  # RGB color for text (yellow)
    bg_color: Tuple[int, int, int, int] = (0, 0, 0, 180)  # RGBA color for background (semi-transparent black)
) -> Image.Image:  # Returns annotated PIL Image
    """
    Add an index number to an image.

    Args:
        image: Input image (PIL Image or numpy array)
        index: Index number to display
        font_size: Size of the font
        position: Where to place the index number
        text_color: RGB tuple for text color
        bg_color: RGBA tuple for background color (includes alpha for transparency)

    Returns:
        PIL Image with index number annotated
    """
    # Convert to PIL Image if numpy array
    if isinstance(image, np.ndarray):
        image = Image.fromarray(image)

    # Make a copy to avoid modifying original
    img_copy = image.copy().convert("RGBA")

    # Create a transparent overlay
    overlay = Image.new('RGBA', img_copy.size, (255, 255, 255, 0))
    draw = ImageDraw.Draw(overlay)

    # Try to use a nice font, fall back to default if not available
    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
    except:
        try:
            font = ImageFont.truetype("/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf", font_size)
        except:
            font = ImageFont.load_default()

    # Prepare text
    text = f"#{index}"

    # Get text bounding box
    bbox = draw.textbbox((0, 0), text, font=font)
    text_width = bbox[2] - bbox[0]
    text_height = bbox[3] - bbox[1]

    # Add padding
    padding = 10
    box_width = text_width + 2 * padding
    box_height = text_height + 2 * padding

    # Calculate position
    img_width, img_height = img_copy.size

    if position == "top_left":
        x, y = padding, padding
    elif position == "top_right":
        x, y = img_width - box_width - padding, padding
    elif position == "bottom_left":
        x, y = padding, img_height - box_height - padding
    elif position == "bottom_right":
        x, y = img_width - box_width - padding, img_height - box_height - padding
    else:
        x, y = padding, padding  # default to top_left

    # Draw semi-transparent background rectangle
    draw.rectangle(
        [x, y, x + box_width, y + box_height],
        fill=bg_color
    )

    # Draw text
    draw.text(
        (x + padding, y + padding),
        text,
        font=font,
        fill=text_color
    )

    # Composite the overlay onto the image
    result = Image.alpha_composite(img_copy, overlay)

    # Convert back to RGB
    return result.convert("RGB")

In [None]:
#| export
def create_image_index_dataframe(
    image_list: Union[List[Union[str, Path]], str, Path]  # List of images or path to text file
) -> pd.DataFrame:  # Returns dataframe with index and image paths
    """
    Create a dataframe with index numbers for all images.

    This dataframe is used to track and reference images by index number
    when creating posters.

    Args:
        image_list: Either a list of image paths or a path to text file containing image paths

    Returns:
        DataFrame with columns: ['index', 'image_path', 'image_name']
    """
    # Handle input - could be list or file path
    if isinstance(image_list, (str, Path)):
        # Read from file
        image_list_path = Path(image_list)
        if image_list_path.exists() and image_list_path.is_file():
            images = []
            with open(image_list_path, 'r') as f:
                for line in f:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        images.append(line)
        else:
            raise FileNotFoundError(f"Image list file not found: {image_list}")
    else:
        images = [str(img) for img in image_list]

    # Create dataframe
    df = pd.DataFrame({
        'index': range(len(images)),
        'image_path': images,
        'image_name': [Path(img).name for img in images]
    })

    print(f"üìä Created image index dataframe with {len(df)} images")

    return df

## Image Indexing and Poster Creation

## High-Level Workflow Function

In [None]:
#| export
def predict_and_organize_by_score(
    model_path: Union[str, Path],  # Path to the trained model
    image_list_file: Union[str, Path],  # Text file with image paths (one per line)
    output_dir: Union[str, Path],  # Base output directory for organized images
    score_thresholds: List[float] = [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],  # Score thresholds
    batch_id: Optional[str] = None,  # Optional batch identifier
    copy_mode: bool = True,  # If True, copy files; if False, move files
    save_metadata: bool = True,  # If True, save metadata JSON for each folder
    device: str = "auto",  # Device for inference ("auto", "cpu", "cuda")
    **kwargs  # Additional arguments passed to prediction function
) -> Dict[str, Any]:  # Returns combined prediction and organization results
    """
    Complete workflow: Predict anomaly scores and organize images into score-based folders.

    This is the main function that combines:
    1. Smart batch creation
    2. Prediction using predict_image_list_from_file_enhanced
    3. Image organization based on anomaly scores

    Args:
        model_path: Path to the trained anomaly detection model
        image_list_file: Text file containing paths to images (one per line)
        output_dir: Directory where score-based folders will be created
        score_thresholds: List of threshold values (customize to your needs)
            Examples:
            - [0.5, 1.0] for simple two-folder setup
            - [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] for fine-grained organization
        batch_id: Optional identifier for this batch
        copy_mode: Whether to copy (True) or move (False) images
        save_metadata: Whether to save JSON metadata for each folder
        device: Device to use for inference
        **kwargs: Additional arguments (save_heatmap, heatmap_style, etc.)

    Returns:
        Dictionary containing:
        - prediction_results: Full prediction results
        - organization_stats: Statistics about image organization
    """
    print("\nüöÄ PREDICT AND ORGANIZE BY ANOMALY SCORE")
    print("="*70)

    # Step 1: Run predictions
    print("\nüìä Step 1: Running predictions...")
    prediction_output = predict_image_list_from_file_enhanced(
        model_path=model_path,
        image_list_file=image_list_file,
        batch_id=batch_id,
        output_dir=output_dir,
        device=device,
        save_results=True,
        **kwargs
    )

    # Extract results
    prediction_results = prediction_output.get('results', [])

    if not prediction_results:
        print("‚ö†Ô∏è  No prediction results to organize!")
        return {
            'prediction_results': prediction_output,
            'organization_stats': None
        }

    print(f"‚úÖ Predictions complete: {len(prediction_results)} images processed")

    # Step 2: Organize images by score
    print("\nüìÅ Step 2: Organizing images by score...")
    organization_stats = organize_images_by_score(
        prediction_results=prediction_results,
        output_dir=output_dir,
        score_thresholds=score_thresholds,
        copy_mode=copy_mode,
        save_metadata=save_metadata
    )

    print("\nüéâ WORKFLOW COMPLETE!")
    print("="*70)

    return {
        'prediction_results': prediction_output,
        'organization_stats': organization_stats
    }

## Example Usage

```python
# Example 1: Simple two-folder organization (low vs high anomaly)
results = predict_and_organize_by_score(
    model_path="path/to/model.ckpt",
    image_list_file="path/to/images.txt",
    output_dir="organized_output",
    score_thresholds=[0.5, 1.0],  # Two folders: 0.5 (normal) and 1.0 (anomaly)
    copy_mode=True
)

# Example 2: Fine-grained organization with 8 score folders
results = predict_and_organize_by_score(
    model_path="path/to/model.ckpt",
    image_list_file="path/to/images.txt",
    output_dir="organized_output",
    score_thresholds=[0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0],
    copy_mode=True,
    save_heatmap=True,
    heatmap_style="side_by_side"
)

# Example 3: Custom thresholds
results = predict_and_organize_by_score(
    model_path="path/to/model.ckpt",
    image_list_file="path/to/images.txt",
    output_dir="organized_output",
    score_thresholds=[0.25, 0.5, 0.75, 1.0],  # Four folders
    copy_mode=False  # Move files instead of copying
)
```

## Tests

In [None]:
#| hide
# Test determine_score_folder
test_eq(determine_score_folder(0.3, [0.5, 1.0]), "0.5")
test_eq(determine_score_folder(0.7, [0.5, 1.0]), "1.0")
test_eq(determine_score_folder(0.45, [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]), "0.5")
test_eq(determine_score_folder(0.85, [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]), "0.9")
print("‚úÖ All tests passed!")

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()