# Calculate Anomaly Maps & Performance Metrics

Author(s): Peer Schütt

In [None]:
import sys, os, glob

parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(parent_dir)
import json
import numpy as np
import torch
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from tqdm import tqdm


from autoencoder import Autoencoder
from MultispectralImageDataset import MultispectralImageDataset
from utils import save_img_comparisons, loss_func
import cv2 as cv
from utils import find_image, load_all_img_paths, labelme_plot_with_custom_cmap, set_global_random_seed
import matplotlib.pyplot as plt

from pathlib import Path

from labelme import utils as labelme_utils

device = "cpu"

## Model selection and image prediction

### Select the model

In [None]:
# pretrained_model_name = "2025-04-18_16-14-34_musero_autoencoder_NIR" # NIR 1024
# pretrained_model_name = "2025-04-19_12-45-55_musero_autoencoder_NIR" # NIR 2048
pretrained_model_name = "2025-04-18_09-48-55_musero_autoencoder_VIS" # VIS 1024
# pretrained_model_name = "2025-04-24_13-32-54_musero_autoencoder_VIS" # VIS 2048

img_type = "VIS" if "VIS" in pretrained_model_name else "NIR"

### Get all image names for which we have labels

In [None]:
labelled_files_folder_path = "../sample_images/"
all_labelled_file_names = []
for filename in glob.glob(f"{labelled_files_folder_path}*.json"):
    if (img_type in filename) and ("2025" in filename):
        all_labelled_file_names.append(os.path.basename(filename).replace(".json", ""))


### Load the model

In [None]:
# Parent folder of all the data folders
data_folder = "../sample_images/"

pretrained_model_folder = "../trained_models/"
pretrained_model_full_path = pretrained_model_folder + pretrained_model_name

whole_dict = None
with open(pretrained_model_full_path+".json") as f:
    whole_dict = json.load(f)
config = whole_dict["config"]
config["batch_size"] = int(np.min([config["batch_size"], len(all_labelled_file_names)]))
dense_layer_dim = config["dense_layer_dim"]

set_global_random_seed(config)

transform = transforms.Compose([
        transforms.CenterCrop([config['img_size_x'], config['img_size_y']])
    ])
model = Autoencoder(**config).to(device)

model.load_state_dict(torch.load(pretrained_model_full_path+".pth", weights_only=True))
model.eval()

dataset_params = {
    'transform': transform,
    'img_type': config["img_type"],
    'channels_to_use': config['channels_to_use'],
    'device': device,
    'overfit': False,
    'augment_data': False
}   

# Create data loaders with common parameters
dataloader_params = {
    'batch_size': config['batch_size'],
    'num_workers': 4,
    'pin_memory': True
}

### Load all original .npy images for each labelled image we have 

In [None]:
test_img_paths = [find_image(testing_img_name) for testing_img_name in all_labelled_file_names]
 
test_dataset = MultispectralImageDataset(test_img_paths, **dataset_params)
test_data_loader = DataLoader(test_dataset, shuffle=False, **dataloader_params)

In [None]:
img_save_path = "../images_paper/"

### Predict all images

In [None]:
with torch.no_grad():
    for batch_idx, (corrected_images, raw_images, img_full_path) in enumerate(tqdm(test_data_loader)):
        raw_images = raw_images.to(device)
        batch_size = raw_images.shape[0]
        
        # Forward pass
        outputs = model(raw_images)

        loss_image = loss_func(raw_images, outputs, per_img_and_pixel=True).cpu().detach().numpy()

        # save_img_comparisons(
        #     raw_images, outputs, img_full_path, [0,1,2,3,4,5,6,7], prefix=f"{img_save_path}channel_comparison_{testing_img_name}.jpg", dpi = 100
        # )

## Postprocessing

### Load thresholds

In [None]:
thresholds = np.load(f"{pretrained_model_full_path}/thresholds_val_95.npy", allow_pickle=True)
thresholds_reshaped = thresholds[:, np.newaxis, np.newaxis]

### What component analysis do you want to do?

In [None]:
### for component analysis ###
# specific channels
use_RGB_only = False # default False
use_single_channel = False  # default False
single_channel = 1

# votings
use_intersection = False # default False
use_majority = False # default False
use_union = True # default True
assert (int(use_intersection) + int(use_majority) + int(use_union)) == 1, "You can only use one voting approach!"

# denoising
use_denoising = True # default True

save_images_for_paper = False

In [None]:
if use_RGB_only is True:
    print("Performing ablation study: ", "Only using RGB channels")
    
    if img_type != "VIS":
        print("A VIS model is required!")
        exit(-1)

### Perform postprocessing

In [None]:
# threshold the values
# loss_image_squeeze = loss_image
thresholded_loss_image = loss_image.copy()
thresholded_loss_image[loss_image<thresholds_reshaped] = 0.
thresholded_loss_image[loss_image>=thresholds_reshaped] = 1.

if use_union is True:
    vote_threshold = 1
elif use_majority is True:
    if use_RGB_only is True:
        vote_threshold = 2
    elif use_single_channel is True:
        vote_threshold = 1
    else:
        vote_threshold = 4
elif use_intersection is True:
    if use_RGB_only is True:
        vote_threshold = 3
    elif use_single_channel is True:
        vote_threshold = 1
    else:
        vote_threshold = 8

# perform voting
if use_RGB_only:
    channel_sum = np.sum(thresholded_loss_image[:,[0,2,6]], axis=1)
elif use_single_channel:
    channel_sum = thresholded_loss_image[:,single_channel].copy()
else:
    channel_sum = np.sum(thresholded_loss_image[:,0:8], axis=1)

threshold_vote = (channel_sum >= vote_threshold).astype(np.uint8)

postprocessing_result = threshold_vote.copy()

if use_denoising is True:
    # perform denoising
    
    ## MORPHOLOGICAL
    # https://docs.opencv.org/4.x/d9/d61/tutorial_py_morphological_ops.html
    open_kernel = np.ones((2,2),np.uint8)
    # opening_vote = cv.morphologyEx(threshold_vote, cv.MORPH_OPEN, open_kernel)
    opening_vote = np.stack([cv.morphologyEx(img, cv.MORPH_OPEN, open_kernel) 
                            for img in threshold_vote])

    # close_kernel = np.ones((3,3),np.uint8)
    # # closing_vote = cv.morphologyEx(opening_vote, cv.MORPH_CLOSE, close_kernel, iterations=2)
    # closing_vote = np.stack([cv.morphologyEx(img, cv.MORPH_CLOSE, close_kernel, iterations=1) 
    #                         for img in opening_vote])
    
    # postprocessing_result = closing_vote.copy()
    # postprocessing_result = opening_vote.copy()
    
    ## CV MEDIAN BLUR
    median = np.stack([cv.medianBlur(img, 5)
                            for img in opening_vote])  # 5x5 kernel
    # median_tresh = np.stack([cv.medianBlur(img, 5)
    #                         for img in threshold_vote])  # 5x5 kernel
    postprocessing_result = median.copy()   

In [None]:
if save_images_for_paper is True:
    channel_id_for_paper = 1
    batch_id_to_save = 0
    testing_img_name = Path(test_img_paths[batch_id_to_save]).stem

    # input image
    input_image = np.load(test_img_paths[batch_id_to_save])
    plt.axis('off')
    plt.imshow(input_image, cmap="viridis")
    plt.savefig(f'{img_save_path}input_{testing_img_name}.jpg', dpi = 100, bbox_inches="tight", pad_inches = 0)
    plt.close()

    # reconstructed image channel_id_for_paper
    out_pan_image = outputs[batch_id_to_save,channel_id_for_paper,:,:].cpu().detach().numpy()
    plt.axis('off')
    plt.imshow(out_pan_image, cmap="viridis")
    plt.savefig(f'{img_save_path}recon_c{channel_id_for_paper}_{testing_img_name}.jpg', bbox_inches='tight', dpi = 100, pad_inches = 0)
    plt.close()

    # loss image channel_id_for_paper
    plt.axis('off')
    plt.imshow(loss_image[batch_id_to_save,channel_id_for_paper,:,:], cmap="viridis_r")
    plt.savefig(f'{img_save_path}loss_c{channel_id_for_paper}_{testing_img_name}.jpg', bbox_inches='tight', dpi = 100, pad_inches = 0)
    plt.close()

    # thresholded image channel_id_for_paper
    plt.axis('off')
    plt.imshow(thresholded_loss_image[batch_id_to_save, channel_id_for_paper,:,:], cmap="viridis_r")
    plt.savefig(f'{img_save_path}threshold_c{channel_id_for_paper}_{testing_img_name}.jpg', bbox_inches='tight', dpi = 100, pad_inches = 0)
    plt.close()

    # voting image
    plt.axis('off')
    plt.imshow(threshold_vote[batch_id_to_save,:,:], cmap="viridis_r")
    plt.savefig(f'{img_save_path}vote{vote_threshold}_{testing_img_name}.jpg', bbox_inches='tight', dpi = 100, pad_inches = 0)
    plt.close()
    
    # denoised image
    plt.axis('off')
    plt.imshow(postprocessing_result[batch_id_to_save], cmap="viridis_r")
    plt.savefig(f'{img_save_path}morph_{testing_img_name}.jpg', bbox_inches='tight', dpi = 100, pad_inches = 0)
    plt.close()

In [None]:
plt.axis("off")
plt.imshow(postprocessing_result[0], cmap="viridis_r")
plt.show()

### Load labelled images

In [None]:
evaluate_keys = ['ErdeXDuenger_1p1_100', 'ErdeXDuenger_1p1_50', 'ErdeXDuenger_1p4_100', 'ErdeXDuenger_1p4_50', 'Erde_100', 'Erde_50', 'Sand_20', 'Sand_50', 'Waschpulver_25', 'Waschpulver_50', "Ethanol", 'Spectralon_x']
label_name_to_value = dict(zip(evaluate_keys, np.arange(2, len(evaluate_keys)+2)))

value_to_label_name = {v: k for k, v in label_name_to_value.items()}

# keys_present = list(set(evaluate_keys) & set(label_name_to_value.keys()))
# labels_to_evaluate = [label_name_to_value[key] for key in keys_present]

batched_label_mask = np.zeros(postprocessing_result.shape, dtype=np.int8)
batched_label_mask_road = np.zeros(postprocessing_result.shape, dtype=np.int8)

for idx, filename in enumerate(all_labelled_file_names):
# for idx, filename in enumerate([testing_img_name]):
    json_file = labelled_files_folder_path + filename + ".json"

    data = json.load(open(json_file))
    input_img = labelme_utils.img_b64_to_arr(data.get("imageData"))

    label_mask = np.zeros(input_img.shape, dtype=np.int8)
    label_mask_road = np.zeros(input_img.shape, dtype=np.int8)

    for i, shape in enumerate(data['shapes']):
        if shape["label"] == "road":
            points = shape['points']
            points = np.array(points, dtype=np.int16)
            shape_mask = labelme_utils.shape_to_mask(
                input_img.shape, points, shape_type=shape['shape_type']
            )
            label_mask[shape_mask] = 1
            label_mask_road[shape_mask] = 1
        else:
            continue

    # Fill in the labeled regions
    for i, shape in enumerate(data['shapes']):
        
        label = shape["label"]    
        points = shape['points']
        # Convert to numpy array
        points = np.array(points, dtype=np.int32)
        
        # Create binary mask for this shape (1 for shape, 0 elsewhere)
        shape_mask = labelme_utils.shape_to_mask(
            input_img.shape, points, shape_type=shape['shape_type']
        )
        mask_value = 0
        try: 
            mask_value = label_name_to_value[label]
        except:
            mask_value = 0
            if label != "road":
                print("No entry found for ", label) 
            continue
        
        # Set class index (leave 0 and 1 for background and road)
        label_mask[shape_mask] = mask_value   
    
    label_mask_road = torch.tensor(label_mask_road)
    label_mask_road = transform(label_mask_road)
    label_mask_road = label_mask_road.numpy()
    
    label_mask = torch.tensor(label_mask)
    label_mask = transform(label_mask)
    label_mask = label_mask.numpy()
    
    batched_label_mask[idx] = label_mask
    batched_label_mask_road[idx] = label_mask_road


# how many pixels are on the road?
# y_pred = postprocessing_result
road_mask = batched_label_mask_road & postprocessing_result
b = np.logical_and(batched_label_mask_road, postprocessing_result)
    
final_prediction = road_mask.copy()

In [None]:
# road mask
plt.imshow(batched_label_mask_road[0,:,:])
plt.show()

In [None]:
# postprocessing result before road mask is applied
plt.imshow(postprocessing_result[0,:,:])
plt.show()

In [None]:
# postprocessing result after road mask is applied
plt.imshow(road_mask[0])
plt.show()

In [None]:
if save_images_for_paper is True:
    plt.axis('off')
    plt.imshow(road_mask[batch_id_to_save], cmap="viridis_r")
    plt.savefig(f'{img_save_path}final_AoI_{testing_img_name}.jpg', bbox_inches='tight', dpi = 100, pad_inches = 0)
    plt.close()

## Calculate performance measures

### Define which substances we want to test

In [None]:
# only report these labels for the paper
# the spectralon isn't a substance we want to test
value_to_paper_label_name = {
2: 'SoFe 1p1 100',
3: 'SoFe 1p1 50',
4: 'SoFe 1p4 100',
5: 'SoFe 1p4 50',
6: 'So 100',
7: 'So 50',
8: 'Sa 20',
9: 'Sa 50',
10: 'WP 25',
11: 'WP 50',
12: 'Ethanol',
13: 'Spectralon'
}

### Performance across images over all classes

We can use these to report all measures (Precision, Recall, F1, IoU)

In [None]:
batched_TP_across_all_classes = np.zeros((len(all_labelled_file_names)))
batched_FP_across_all_classes = np.zeros((len(all_labelled_file_names)))
batched_FN_across_all_classes = np.zeros((len(all_labelled_file_names)))
batched_TN_across_all_classes = np.zeros((len(all_labelled_file_names)))

for batch_idx, label_mask in enumerate(batched_label_mask): 
    
    y_pred = final_prediction[batch_idx]
    
    gt_mask_all_classes = np.zeros_like(y_pred)
    gt_mask_all_classes[np.isin(label_mask, list(value_to_paper_label_name.keys()))] = 1
    
    TP_across_all_classes = np.sum((y_pred == 1) & gt_mask_all_classes)
    FP_across_all_classes = np.sum((y_pred == 1) & ~gt_mask_all_classes)
    FN_across_all_classes = np.sum((y_pred == 0) & gt_mask_all_classes)
    TN_across_all_classes = np.sum((y_pred == 0) & ~gt_mask_all_classes)
    
    batched_TP_across_all_classes[batch_idx] = TP_across_all_classes
    batched_FP_across_all_classes[batch_idx] = FP_across_all_classes
    batched_FN_across_all_classes[batch_idx] = FN_across_all_classes
    batched_TN_across_all_classes[batch_idx] = TN_across_all_classes

TP_sum_all_images_across_all_classes = np.sum(batched_TP_across_all_classes)
FP_sum_all_images_across_all_classes = np.sum(batched_FP_across_all_classes)
FN_sum_all_images_across_all_classes = np.sum(batched_FN_across_all_classes)
TN_sum_all_images_across_all_classes = np.sum(batched_TN_across_all_classes)

precision_all_images_across_all_classes = TP_sum_all_images_across_all_classes / (TP_sum_all_images_across_all_classes + FP_sum_all_images_across_all_classes) if (TP_sum_all_images_across_all_classes + FP_sum_all_images_across_all_classes) > 0 else 0
recall_all_images_across_all_classes = TP_sum_all_images_across_all_classes / (TP_sum_all_images_across_all_classes + FN_sum_all_images_across_all_classes) if (TP_sum_all_images_across_all_classes + FN_sum_all_images_across_all_classes) > 0 else 0
F1_all_images_across_all_classes = 2 * precision_all_images_across_all_classes * recall_all_images_across_all_classes / (precision_all_images_across_all_classes + recall_all_images_across_all_classes)
F2_all_images_across_all_classes = 5 * precision_all_images_across_all_classes * recall_all_images_across_all_classes / (4*precision_all_images_across_all_classes + recall_all_images_across_all_classes)
iou_all_images_across_all_classes = TP_sum_all_images_across_all_classes / (TP_sum_all_images_across_all_classes + FP_sum_all_images_across_all_classes + FN_sum_all_images_across_all_classes) if (TP_sum_all_images_across_all_classes + FP_sum_all_images_across_all_classes + FN_sum_all_images_across_all_classes) > 0 else 0

### LATEX: Precision / Recall / F1 / IoU overall

In [None]:
column_names = " & Precision & Recall & F1 & F2 & IoU"
entries = f"{img_type} {dense_layer_dim} & {precision_all_images_across_all_classes:.3f} & {recall_all_images_across_all_classes:.3f} & {F1_all_images_across_all_classes:.3f} & {F2_all_images_across_all_classes:.3f} & {iou_all_images_across_all_classes:.3f}"

print(f"RESULTS for {pretrained_model_name}")
print(column_names +  "\n" + entries)

### Performance per image and class

In [None]:

batched_results = []

batched_TP = np.zeros((len(all_labelled_file_names), max(label_name_to_value.values())+1))
batched_FP = np.zeros((len(all_labelled_file_names), max(label_name_to_value.values())+1))
batched_FN = np.zeros((len(all_labelled_file_names), max(label_name_to_value.values())+1))
batched_TN = np.zeros((len(all_labelled_file_names), max(label_name_to_value.values())+1))

for batch_idx, label_mask in enumerate(batched_label_mask):
    
    y_true = label_mask
    y_pred = final_prediction[batch_idx]
    
    # Assuming:
    # y_true: ground truth with multiple classes (2-13)
    # y_pred: binary prediction (0/1)
    results = {}
    for class_id in label_name_to_value.values():
        # Create binary mask for this ground truth class
        gt_mask = (y_true == class_id)
        
        # Calculate metrics for this class
        TP = np.sum((y_pred == 1) & gt_mask)
        FP = np.sum((y_pred == 1) & ~gt_mask)
        FN = np.sum((y_pred == 0) & gt_mask)
        TN = np.sum((y_pred == 0) & ~gt_mask)
                
        # Calculate metrics
        precision = TP / (TP + FP) if (TP + FP) > 0 else 0
        recall = TP / (TP + FN) if (TP + FN) > 0 else 0
        iou = TP / (TP + FP + FN) if (TP + FP + FN) > 0 else 0
        
        # Store results
        results[class_id] = {
            'true_positives': TP,
            'false_positives': FP,
            'false_negatives': FN,
            'precision': precision,
            'recall': recall,
            'iou': iou
        }    
        
        batched_TP[batch_idx, class_id] = TP
        batched_FP[batch_idx, class_id] = FP
        batched_FN[batch_idx, class_id] = FN
        batched_TN[batch_idx, class_id] = TN
        
    # # Print results
    # for class_id, metrics in results.items():
    #     print(f"Class {value_to_label_name[class_id]}:")
    #     print(f"  TP: {metrics['true_positives']}, FN: {metrics['false_negatives']}")
    #     print(f"  FP: {metrics['false_positives']}")
    #     # print(f"  Precision: {metrics['precision']:.4f}")
    #     print(f"  Recall: {metrics['recall']:.4f}")
    #     # print(f"  IoU: {metrics['iou']:.4f}")


### Performance across images over specific class

Only take Recall from these values!

In [None]:
results_over_all_images = {}
results_over_all_images_specific_classes = {}
TP_sum_all_images, FP_sum_all_images, FN_sum_all_images, TN_sum_all_images = 0,0,0,0

for class_id in label_name_to_value.values():
    TP_sum = np.sum(batched_TP[:,class_id])
    FP_sum = np.sum(batched_FP[:,class_id])
    FN_sum = np.sum(batched_FN[:,class_id])
    TN_sum = np.sum(batched_TN[:,class_id])
    
    TP_sum_all_images += TP_sum
    FP_sum_all_images += FP_sum
    FN_sum_all_images += FN_sum
    TN_sum_all_images += TN_sum
    
    # Calculate metrics
    precision_all = TP_sum / (TP_sum + FP_sum) if (TP_sum + FP_sum) > 0 else 0
    recall_all = TP_sum / (TP_sum + FN_sum) if (TP_sum + FN_sum) > 0 else 0
    iou_all = TP_sum / (TP_sum + FP_sum + FN_sum) if (TP_sum + FP_sum + FN_sum) > 0 else 0
    
    results_over_all_images[class_id] = {
        'true_positives': TP_sum,
        'false_positives': FP_sum,
        'false_negatives': FN_sum,
        "true_negatives": TN_sum,
        'precision': precision_all,
        'recall': recall_all,
        'iou': iou_all
    }

# Calculate metrics
precision_all_images = TP_sum_all_images / (TP_sum_all_images + FP_sum_all_images) if (TP_sum_all_images + FP_sum_all_images) > 0 else 0
recall_all_images = TP_sum_all_images / (TP_sum_all_images + FN_sum_all_images) if (TP_sum_all_images + FN_sum_all_images) > 0 else 0
F1_all_images = 2 * precision_all_images * recall_all_images / (precision_all_images + recall_all_images)
iou_all_images = TP_sum_all_images / (TP_sum_all_images + FP_sum_all_images + FN_sum_all_images) if (TP_sum_all_images + FP_sum_all_images + FN_sum_all_images) > 0 else 0
    
results_over_all_images_specific_classes = {
    'true_positives': TP_sum_all_images,
    'false_positives': FP_sum_all_images,
    'false_negatives': FN_sum_all_images,
    "true_negatives": TN_sum_all_images,
    'precision': precision_all_images,
    'recall': recall_all_images,
    'iou': iou_all_images
}

### LATEX: Print resulting table for paper

In [None]:
print(f"RESULTS for {pretrained_model_name}")

import pandas as pd
# Extract only the recall values for each class
recall_values = {f"{img_type} {dense_layer_dim}": {f"{value_to_paper_label_name[class_id]}": results_over_all_images[class_id]['recall'] for class_id in value_to_paper_label_name.keys()}}

# recall_values_all = {"Overall": results_over_all_images_specific_classes['recall']}

# # merge both dicts with "Overall" as first entry
# recall_values[f"{img_type} {dense_layer_dim}"] = {**recall_values_all, **recall_values[f"{img_type} {dense_layer_dim}"]}

# Create the DataFrame with classes as columns
df = pd.DataFrame(recall_values).T
# Format to 3 decimal places
df = df.round(3)
# Convert to LaTeX table
# latex_table = df.to_latex(index=True, index_names=False)
# print(latex_table)
# For a more professional table with booktabs style
latex_table_booktabs = df.to_latex(
    index=True,
    index_names=False,
    caption="Recall Values by Class",
    label="tab:recall_values",
    position="htbp",
    column_format="l" + "c" * len(df.columns), 
    float_format="{:0.3f}".format
)
print(latex_table_booktabs)