<a href="https://colab.research.google.com/github/arminak6/Control-Quality/blob/Dev/Code/anomalib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install anomalib



In [2]:
!pip install lightning
!pip install kornia
!pip install FrEIA
!pip install python-dotenv
!pip install open-clip-torch
!pip install openvino
!pip install openvino-dev
!pip install onnx



In [18]:
from anomalib.data import Folder
from anomalib.models import EfficientAd
from anomalib.engine import Engine
from lightning.pytorch.callbacks import ModelCheckpoint, EarlyStopping
from lightning.pytorch.loggers import CSVLogger
import pandas as pd
import matplotlib.pyplot as plt
from anomalib.models import Patchcore
from anomalib.models import Padim



# Dataset Configuration
datamodule = Folder(
    name="custom_dataset",
    root="/content/drive/MyDrive/praeciso/anomalib_dataset",
    normal_dir="train/good",
    abnormal_dir="test/anomalous",
    extensions=[".bmp"],
    task="classification"
)

# Set batch sizes
datamodule.train_batch_size = 1
datamodule.eval_batch_size = 1

# Initialize Model and Callbacks
# model = EfficientAd()
# model = Patchcore()
model = Padim()



# Define Model Checkpoint Callback
checkpoint_callback = ModelCheckpoint(
    monitor="image_AUROC",  # Save the model with the highest AUROC
    mode="max",
    save_top_k=1
)

# Define Early Stopping Callback
early_stopping_callback = EarlyStopping(
    monitor="image_AUROC",  # Monitor AUROC instead of val_loss
    patience=15,
    mode="max",
    verbose=True
)

# Define CSV Logger for Logging Metrics
csv_logger = CSVLogger("logs", name="EfficientAd_training")

# Initialize Engine with Callbacks and Logger
engine = Engine(
    max_epochs=15,
    accelerator="gpu",
    devices=1,
    callbacks=[checkpoint_callback, early_stopping_callback],
    # callbacks=[checkpoint_callback],
    logger=csv_logger
)

# Train the Model
engine.fit(datamodule=datamodule, model=model)


model.safetensors:   0%|          | 0.00/46.8M [00:00<?, ?B/s]

INFO: GPU available: True (cuda), used: True
INFO:lightning.pytorch.utilities.rank_zero:GPU available: True (cuda), used: True
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: `Trainer(val_check_interval=1.0)` was configured so validation will run at the end of the training epoch..
INFO:lightning.pytorch.utilities.rank_zero:`Trainer(val_check_interval=1.0)` was configured so validation will run at the end of the training epoch..
INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
/usr/local/lib/python3.10/dist-packages/lightning/pytorch/core/optimizer.py:183: `LightningModule.configure_optimizers` returned `None`, this fit will run with no optimizer
INFO: 
  | Name                  | Type               

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO: Metric image_AUROC improved. New best score: 0.857
INFO:lightning.pytorch.callbacks.early_stopping:Metric image_AUROC improved. New best score: 0.857
INFO: `Trainer.fit` stopped: `max_epochs=1` reached.
INFO:lightning.pytorch.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=1` reached.


In [5]:
from anomalib.models import *
print(dir())


['AiVad', 'CSVLogger', 'Cfa', 'Cflow', 'Csflow', 'Dfkde', 'Dfm', 'Draem', 'Dsr', 'EarlyStopping', 'EfficientAd', 'Engine', 'Fastflow', 'Folder', 'Fre', 'Ganomaly', 'In', 'ModelCheckpoint', 'Out', 'Padim', 'Patchcore', 'ReverseDistillation', 'Rkde', 'Stfpm', 'Uflow', 'VlmAd', 'WinClip', '_', '__', '___', '__builtin__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', '_dh', '_exit_code', '_i', '_i1', '_i2', '_i3', '_i4', '_i5', '_ih', '_ii', '_iii', '_oh', 'exit', 'get_ipython', 'pd', 'plt', 'quit']


In [19]:
# Evaluate the best checkpoint after training
test_results = engine.test(
    model=model,
    datamodule=datamodule,
    ckpt_path=engine.trainer.checkpoint_callback.best_model_path
)

print("Test Results:", test_results)

  checkpoint = torch.load(ckpt_path, map_location=model.device)
INFO: Restoring states from the checkpoint path at /content/results/Padim/custom_dataset/v0/weights/lightning/model.ckpt
INFO:lightning.pytorch.utilities.rank_zero:Restoring states from the checkpoint path at /content/results/Padim/custom_dataset/v0/weights/lightning/model.ckpt
INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO: Loaded model weights from the checkpoint at /content/results/Padim/custom_dataset/v0/weights/lightning/model.ckpt
INFO:lightning.pytorch.utilities.rank_zero:Loaded model weights from the checkpoint at /content/results/Padim/custom_dataset/v0/weights/lightning/model.ckpt


Testing: |          | 0/? [00:00<?, ?it/s]

Test Results: [{'image_AUROC': 0.96875, 'image_F1Score': 0.9166666865348816}]


In [20]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# Directory to save plots
output_dir = "/content/drive/MyDrive/praeciso/show2"
os.makedirs(output_dir, exist_ok=True)

# Check logger directory and log file
log_file = os.path.join(csv_logger.log_dir, "metrics.csv")
if not os.path.exists(log_file):
    raise FileNotFoundError(f"Log file not found: {log_file}")

# Load training logs
logs = pd.read_csv(log_file)

# Check available metrics
print("Available metrics in logs:")
print(logs.columns)

# Plot AUROC if available
if "image_AUROC" in logs:
    plt.figure(figsize=(10, 5))
    plt.plot(logs["epoch"], logs["image_AUROC"], label="Image AUROC", marker="o")
    plt.xlabel("Epoch")
    plt.ylabel("AUROC")
    plt.title("Image AUROC During Training")
    plt.legend()
    plt.grid()
    plt.savefig(os.path.join(output_dir, "image_auroc.png"))  # Save plot
    plt.close()  # Close the figure

# Plot F1 Score if available
if "image_F1Score" in logs:
    plt.figure(figsize=(10, 5))
    plt.plot(logs["epoch"], logs["image_F1Score"], label="Image F1 Score", marker="o")
    plt.xlabel("Epoch")
    plt.ylabel("F1 Score")
    plt.title("Image F1 Score During Training")
    plt.legend()
    plt.grid()
    plt.savefig(os.path.join(output_dir, "image_f1_score.png"))  # Save plot
    plt.close()  # Close the figure

# Plot train_loss or val_loss if available
if "train_loss" in logs or "val_loss" in logs:
    plt.figure(figsize=(10, 5))
    if "train_loss" in logs:
        plt.plot(logs["epoch"], logs["train_loss"], label="Training Loss", marker="o")
    if "val_loss" in logs:
        plt.plot(logs["epoch"], logs["val_loss"], label="Validation Loss", marker="o")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title("Training and Validation Loss")
    plt.legend()
    plt.grid()
    plt.savefig(os.path.join(output_dir, "loss_curve.png"))  # Save plot
    plt.close()  # Close the figure

print(f"Plots saved successfully in {output_dir}")


Available metrics in logs:
Index(['epoch', 'image_AUROC', 'image_F1Score', 'step'], dtype='object')
Plots saved successfully in /content/drive/MyDrive/praeciso/show2


In [21]:

import os
import matplotlib.pyplot as plt

# Directory to save plots
output_dir = "/content/drive/MyDrive/praeciso/show2"
os.makedirs(output_dir, exist_ok=True)

# Check available metrics and handle missing data
if "epoch" in logs.columns:
    # Filter rows with non-NaN values for AUROC
    if "image_AUROC" in logs:
        valid_logs_auroc = logs[~logs["image_AUROC"].isna()]
        if not valid_logs_auroc.empty:
            plt.figure(figsize=(10, 5))
            plt.plot(valid_logs_auroc["epoch"], valid_logs_auroc["image_AUROC"], label="Image AUROC", marker="o")
            plt.xlabel("Epoch")
            plt.ylabel("AUROC")
            plt.title("Image AUROC During Training")
            plt.legend()
            plt.grid()
            plt.savefig(os.path.join(output_dir, "image_auroc_fixed.png"))
            plt.close()
        else:
            print("No valid data for Image AUROC found in logs.")

    # Filter rows with non-NaN values for F1 Score
    if "image_F1Score" in logs:
        valid_logs_f1 = logs[~logs["image_F1Score"].isna()]
        if not valid_logs_f1.empty:
            plt.figure(figsize=(10, 5))
            plt.plot(valid_logs_f1["epoch"], valid_logs_f1["image_F1Score"], label="Image F1 Score", marker="o")
            plt.xlabel("Epoch")
            plt.ylabel("F1 Score")
            plt.title("Image F1 Score During Training")
            plt.legend()
            plt.grid()
            plt.savefig(os.path.join(output_dir, "image_f1_score_fixed.png"))
            plt.close()
        else:
            print("No valid data for Image F1 Score found in logs.")

    # Plot training loss if available
    if "train_loss_step" in logs:
        valid_logs_loss = logs[~logs["train_loss_step"].isna()]
        if not valid_logs_loss.empty:
            plt.figure(figsize=(10, 5))
            plt.plot(valid_logs_loss["epoch"], valid_logs_loss["train_loss_step"], label="Training Loss", marker="o")
            plt.xlabel("Epoch")
            plt.ylabel("Loss")
            plt.title("Training Loss During Training")
            plt.legend()
            plt.grid()
            plt.savefig(os.path.join(output_dir, "train_loss_fixed.png"))
            plt.close()
        else:
            print("No valid data for Training Loss found in logs.")
else:
    print("`epoch` column not found in logs.")

print(f"Plots saved successfully in {output_dir}")

Plots saved successfully in /content/drive/MyDrive/praeciso/show2


In [22]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import random

# Directory to save outputs
output_dir = "combined_visualizations"
os.makedirs(output_dir, exist_ok=True)

# Set threshold for segmentation
threshold_value = 0.5  # Adjust this based on your data

# Set the number of images to process
num_samples = 10

# Randomly select images from defected dataset
test_loader = datamodule.test_dataloader()
defected_images = []
defected_predictions = []

for batch, prediction in zip(test_loader, engine.trainer.predict(model=model, dataloaders=test_loader)):
    labels = batch["label"].detach().cpu().numpy()  # Get labels (0: normal, 1: anomalous)
    for i in range(len(labels)):
        if labels[i] == 1:  # Anomalous images only
            defected_images.append(batch["image"][i])
            defected_predictions.append(prediction["anomaly_maps"][i])

# Randomly select 10 images if available
random_indices = random.sample(range(len(defected_images)), min(num_samples, len(defected_images)))
selected_images = [defected_images[i] for i in random_indices]
selected_predictions = [defected_predictions[i] for i in random_indices]

# Process each selected image
for idx, (image, anomaly_map) in enumerate(zip(selected_images, selected_predictions)):
    unique_id = f"{idx+1:03d}"  # Unique number for each image

    # Normalize anomaly map
    anomaly_map = anomaly_map.squeeze().detach().cpu().numpy()
    anomaly_map = (anomaly_map - anomaly_map.min()) / (anomaly_map.max() - anomaly_map.min())

    # Convert image for visualization
    image_np = image.permute(1, 2, 0).cpu().numpy()
    image_np = (image_np * 255).astype(np.uint8)

    # Create heatmap
    anomaly_map_resized = cv2.resize(anomaly_map, (image_np.shape[1], image_np.shape[0]))
    heatmap = cv2.applyColorMap((anomaly_map_resized * 255).astype(np.uint8), cv2.COLORMAP_JET)
    heatmap_overlay = cv2.addWeighted(image_np, 0.7, heatmap, 0.3, 0)

    # Create segmentation mask
    binary_mask = (anomaly_map > threshold_value).astype(np.uint8)
    binary_mask_resized = cv2.resize(binary_mask, (image_np.shape[1], image_np.shape[0]), interpolation=cv2.INTER_NEAREST)
    segmentation_overlay = cv2.addWeighted(image_np, 0.7, cv2.applyColorMap(binary_mask_resized * 255, cv2.COLORMAP_JET), 0.3, 0)

    # Apply colormap to anomaly map before saving
    anomaly_map_colored = cv2.applyColorMap((anomaly_map_resized * 255).astype(np.uint8), cv2.COLORMAP_JET)

    # Save all outputs
    cv2.imwrite(os.path.join(output_dir, f"image_{unique_id}.png"), cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR))
    cv2.imwrite(os.path.join(output_dir, f"anomaly_map_{unique_id}.png"), anomaly_map_colored)  # Save as colored map
    cv2.imwrite(os.path.join(output_dir, f"heatmap_{unique_id}.png"), heatmap_overlay)
    cv2.imwrite(os.path.join(output_dir, f"segmentation_{unique_id}.png"), segmentation_overlay)


    # Optional: Display outputs
    plt.figure(figsize=(20, 5))

    plt.subplot(1, 4, 1)
    plt.title("Original Image")
    plt.imshow(image_np)
    plt.axis("off")

    plt.subplot(1, 4, 2)
    plt.title("Anomaly Map")
    plt.imshow(anomaly_map, cmap="jet")
    plt.axis("off")

    plt.subplot(1, 4, 3)
    plt.title("Heatmap Overlay")
    plt.imshow(heatmap_overlay)
    plt.axis("off")

    plt.subplot(1, 4, 4)
    plt.title("Segmentation Map")
    plt.imshow(segmentation_overlay)
    plt.axis("off")

    plt.show()

INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Predicting: |          | 0/? [00:00<?, ?it/s]

In [23]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import random

# Directory to save outputs
parent_dir = "/content/drive/MyDrive/praeciso/show2"
output_dir = os.path.join(parent_dir, "combined_visualizations")
os.makedirs(output_dir, exist_ok=True)

# Set threshold for segmentation
threshold_value = 0.5  # Adjust this based on your data

# Set the number of images to process
num_samples = 10

# Randomly select images from defected dataset
test_loader = datamodule.test_dataloader()
defected_images = []
defected_predictions = []

for batch, prediction in zip(test_loader, engine.trainer.predict(model=model, dataloaders=test_loader)):
    labels = batch["label"].detach().cpu().numpy()  # Get labels (0: normal, 1: anomalous)
    for i in range(len(labels)):
        if labels[i] == 1:  # Anomalous images only
            defected_images.append(batch["image"][i])
            defected_predictions.append(prediction["anomaly_maps"][i])

# Randomly select images
random_indices = random.sample(range(len(defected_images)), min(num_samples, len(defected_images)))
selected_images = [defected_images[i] for i in random_indices]
selected_predictions = [defected_predictions[i] for i in random_indices]

# Process each selected image
for idx, (image, anomaly_map) in enumerate(zip(selected_images, selected_predictions)):
    unique_id = f"{idx+1:03d}"  # Unique identifier for each image

    # Normalize anomaly map
    anomaly_map = anomaly_map.squeeze().detach().cpu().numpy()
    anomaly_map = (anomaly_map - anomaly_map.min()) / (anomaly_map.max() - anomaly_map.min())

    # Convert image for visualization
    image_np = image.permute(1, 2, 0).cpu().numpy()
    image_np = (image_np * 255).astype(np.uint8)

    # Create heatmap
    anomaly_map_resized = cv2.resize(anomaly_map, (image_np.shape[1], image_np.shape[0]))
    heatmap = cv2.applyColorMap((anomaly_map_resized * 255).astype(np.uint8), cv2.COLORMAP_JET)
    heatmap_overlay = cv2.addWeighted(image_np, 0.7, heatmap, 0.3, 0)

    # Create segmentation mask
    binary_mask = (anomaly_map > threshold_value).astype(np.uint8)
    binary_mask_resized = cv2.resize(binary_mask, (image_np.shape[1], image_np.shape[0]), interpolation=cv2.INTER_NEAREST)
    segmentation_overlay = cv2.addWeighted(image_np, 0.7, cv2.applyColorMap(binary_mask_resized * 255, cv2.COLORMAP_JET), 0.3, 0)

    # Combine all visuals into one image
    fig, axes = plt.subplots(1, 4, figsize=(20, 5))
    axes[0].imshow(image_np)
    axes[0].set_title("Original Image")
    axes[0].axis("off")

    axes[1].imshow(anomaly_map, cmap="jet")
    axes[1].set_title("Anomaly Map")
    axes[1].axis("off")

    axes[2].imshow(heatmap_overlay)
    axes[2].set_title("Heatmap Overlay")
    axes[2].axis("off")

    axes[3].imshow(segmentation_overlay)
    axes[3].set_title("Segmentation Map")
    axes[3].axis("off")

    # Save combined image
    combined_image_path = os.path.join(output_dir, f"combined_visualization_{unique_id}.png")
    plt.savefig(combined_image_path)
    plt.close(fig)

print(f"Combined visualizations saved in {output_dir}")


INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Predicting: |          | 0/? [00:00<?, ?it/s]

  self.figure, self.axis = plt.subplots(1, num_cols, figsize=figure_size)
  fig, axes = plt.subplots(1, 4, figsize=(20, 5))


Combined visualizations saved in /content/drive/MyDrive/praeciso/show2/combined_visualizations
