In [None]:
from pathlib import Path
from typing import Any, Dict
import torch
import numpy as np
from IPython.display import display
from PIL import Image
from pytorch_lightning import Trainer
from torchvision.transforms import ToPILImage
import random
from functools import partial, update_wrapper
from types import MethodType
from typing import Any
import os
from torch import Tensor
import pandas
import seaborn as sns
from pandas import DataFrame

from torch.utils.data import DataLoader

from anomalib.config import get_configurable_parameters
from anomalib.data import get_datamodule
from anomalib.models import get_model
from anomalib.pre_processing.transforms import Denormalize
from anomalib.utils.callbacks import LoadModelCallback, get_callbacks



from matplotlib import pyplot as plt
import numpy as np
from pytorch_lightning import LightningModule, Trainer
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from torch.optim import Optimizer
from torch.optim.adam import Adam
from torch.utils.data import DataLoader

from anomalib.data import InferenceDataset
from anomalib.data.folder import Folder
from anomalib.models.fastflow.lightning_model import Fastflow
from anomalib.post_processing import superimpose_anomaly_map
from anomalib.pre_processing.transforms import Denormalize
from anomalib.utils.callbacks.visualizer import BaseVisualizerCallback
from anomalib.utils.callbacks import (
    LoadModelCallback,
    MetricsConfigurationCallback,
    MinMaxNormalizationCallback,
    )




In [None]:
MODEL = "fastflow"  
CONFIG_PATH = <PATH TO THE CONFIG FILE>

In [None]:
config = get_configurable_parameters(config_path=CONFIG_PATH)
IMAGE_SIZE = config["dataset"]["image_size"][0]


In [None]:
model = get_model(config)
callbacks = get_callbacks(config)

In [None]:
trainer = Trainer(**config.trainer, callbacks=callbacks);


In [None]:
# Helper function to denormalize the image for visualization

IMG_MEAN = [0.485, 0.456, 0.406]
IMG_STD = [0.229, 0.224, 0.225]
def denormalize2(tensor):
    meantens = Tensor(IMG_MEAN)
    stdtens = Tensor(IMG_STD)
    if tensor.dim() == 4:
        if tensor.size(0):
            tensor = tensor.squeeze(0)
  
    mean = IMG_MEAN
    std = IMG_STD
    newtensor = torch.empty(3,IMAGE_SIZE,IMAGE_SIZE)
    newtensor[0] = tensor[0] * std[0] + mean[0]
    newtensor[1] = tensor[1] * std[1] + mean[1]
    newtensor[2] = tensor[2] * std[2] + mean[2]
    
    
    array = (newtensor * 255).permute(1, 2, 0).cpu().numpy().astype(np.uint8)
    return array


In [None]:
# Fill in the path to the folder containing the images you want to infer on and the path to the checkpoint you want to use

NORMAL_IMAGES_PATH = <PATH TO NORMAL IMAGES>
ABNORMAL_IMAGES_PATH = <PATH TO ABNORMAL IMAGES>
CHECKPOINT_PATH = <PATH TO MODEL CHECKPOINT FILE>

# Pixel Threshold

The prediction function of trainer returns a dictionary with following keys:

- *image* contains a tensor of the image itself
- *image_path* contains the image's path
- *anomaly_maps* contains a tensor of the anomaly map of the image
- *pred_scores* contains a tensor of the anomaly score
- *pred_labels* contains a tensor of a bool meaning the predicted class
- *pred_mask* contains a tensor of booleans values. It is equal to the anomaly map where the pixels > 0.5 are set to true and the rest to false


In [None]:
# Predict all the images of NORMAL_IMAGES_PATH and ABNORMAL_IMAGES_PATH and store the results in a DataFrame.

df = DataFrame(columns=['prediction', 'target', 'image_path'])

for images_path in [NORMAL_IMAGES_PATH,ABNORMAL_IMAGES_PATH]:
    if images_path == ABNORMAL_IMAGES_PATH:
        print("starting to predict abnormal images")
    print(images_path)
    normal_abnormal_images_path = os.listdir(images_path)
    print(len(normal_abnormal_images_path))

    inference_dataloader = InferenceDataset(
        path=images_path,
        image_size=IMAGE_SIZE)
    inference_dataloader = DataLoader(dataset=inference_dataloader)

    predictions = trainer.predict(model=model,ckpt_path=CHECKPOINT_PATH, dataloaders=inference_dataloader)


    if images_path == NORMAL_IMAGES_PATH:
        target = "Normal"
    else:
        target = "Abnormal"
    
    for i in range(len(predictions)):
        df = df.append({'prediction':predictions[i]['pred_scores'].item(), 'target': target,'image_path': predictions[i]['image_path']}, ignore_index=True)


In [None]:
df

In [None]:
# Graphic to visualise the distribution of the predictions

sns.set_theme(style="whitegrid")
ax = sns.violinplot(y=df['prediction'], x=df['target'], inner="quartile", split=True)


In [None]:
NUMBER_OF_IMAGES_TO_SHOW = 5
r = list(range(len(df)))
random.shuffle(r)

for i in r[:NUMBER_OF_IMAGES_TO_SHOW]:
        PATHOFIMAGETOANALYSE = df["image_path"].tolist()[i][0]
        TARGET = df["target"].tolist()[i]
        PREDICTION_SCORE = df["prediction"].tolist()[i]


        fig = plt.figure(figsize=(16 ,4), linewidth=0)


        #############
        # FILE PATH #
        #############
        inference_dataset = InferenceDataset(
        path=PATHOFIMAGETOANALYSE,
        image_size=IMAGE_SIZE)
        inference_dataloader = DataLoader(dataset=inference_dataset);

        predictions = trainer.predict(model=model,ckpt_path=CHECKPOINT_PATH, dataloaders=inference_dataloader)[0];

        print(PATHOFIMAGETOANALYSE)
        print(TARGET)
        print(PREDICTION_SCORE)


        #########
        # IMAGE #
        #########
        image = predictions["image"][0]
        image = denormalize2(image)
        ax0 = fig.add_subplot(1, 4, 1)
        ax0.axis("off")
        ax0.imshow(image)
        ax0.set_title(f'\n\n {TARGET} Original Image')


        ###############
        # ANOMALY MAP #
        ###############
        anomaly_map = predictions["anomaly_maps"][0]
        anomaly_map = anomaly_map.cpu().numpy().squeeze()
        ax1 = fig.add_subplot(1, 4,2)
        ax1.axis('off')
        ax1.imshow(anomaly_map)
        ax1.set_title(f'\n\nAnomaly map with AS = {PREDICTION_SCORE * 100:.1f}%')


        ###########################
        # SUPERIMPOSE ANOMALY MAP #
        ###########################
        heat_map = superimpose_anomaly_map(anomaly_map=anomaly_map, image=image, normalize=True)
        ax2 = fig.add_subplot(1, 4, 3)
        ax2.axis('off')
        ax2.imshow(heat_map)
        ax2.set_title('\n\nHeat map')


        ###################
        # PREDICTION MASK #
        ###################
        pred_masks = predictions["pred_masks"][0].squeeze().cpu().numpy()
        ax3 = fig.add_subplot(1, 4, 4)
        ax3.axis('off')
        ax3.imshow(pred_masks)
        ax3.set_title('\n\nMask')

        plt.show()


# Area Threshold

The anomaly score is here defined as the number of pixels of the anomaly map higher than 0.5 divided by the image dimension.

In [None]:
dfarea = DataFrame(columns=['prediction', 'target', 'image_path'])
lstpred = []

for images_path in [NORMAL_IMAGES_PATH,ABNORMAL_IMAGES_PATH]:
    if images_path == ABNORMAL_IMAGES_PATH:
        print("starting to predict abnormal images")
    print(images_path)
    normal_abnormal_images_path = os.listdir(images_path)
    print(len(normal_abnormal_images_path))

    inference_dataloader = InferenceDataset(
        path=images_path,
        image_size=IMAGE_SIZE
    )
    inference_dataloader = DataLoader(dataset=inference_dataloader, num_workers=16)

    predictions = trainer.predict(model=model,ckpt_path=CHECKPOINT_PATH, dataloaders=inference_dataloader)

    lstpred = lstpred + predictions

    if images_path == NORMAL_IMAGES_PATH:
        target = "Normal"
    else:
        target = "Abnormal"
    
    for i in range(len(predictions)):
        dfarea = dfarea.append({'prediction':predictions[i]['pred_scores'].item(), 'target': target,'image_path': predictions[i]['image_path']}, ignore_index=True)



In [None]:
# Calculate the anomaly score as defined above and rewrite the prediction score in the dataframe.

anomaldf = dfarea.copy(deep=True)
pixthresh = 0.5 # Set pixel threshold value equal to 0.5 which is equal to the threshold used for the mask.

for s in range(len(lstpred)):
    anomal_number = 0
    anomaly_map = lstpred[s]["anomaly_maps"]
    anomaly_map = anomaly_map.cpu().numpy().squeeze()
    for i in range(IMAGE_SIZE):
        for j in range(IMAGE_SIZE):   
                if anomaly_map[i][j] > pixthresh:
                    anomal_number += 1
    anomal_score = anomal_number/(IMAGE_SIZE*IMAGE_SIZE)
    anomaldf.loc[anomaldf.index == s, 'prediction'] = anomal_score

In [None]:
anomaldf

In [None]:
# Graphic to visualise the distribution of the predictions

sns.set_theme(style="whitegrid")
ax = sns.violinplot(y=anomaldf['prediction'], x=anomaldf['target'], inner="quartile", split=True)
