In [1]:
import os
import pathlib
import site

In [2]:
import logging
import warnings

from anomalib.utils.loggers import configure_logger, get_experiment_logger


logger = logging.getLogger("anomalib")
configure_logger(level="ERROR") # "<DEBUG, INFO, WARNING, ERROR>"

To use wandb logger install it using `pip install wandb`


In [3]:
import cv2
import numpy as np

In [4]:
from pytorch_lightning import Trainer, seed_everything

In [5]:
from anomalib.config import get_configurable_parameters
from anomalib.data import get_datamodule
from anomalib.data.utils import TestSplitMode
from anomalib.models import get_model
from anomalib.utils.callbacks import LoadModelCallback, get_callbacks

In [6]:
"""
Monkey Patch
 * FeatureExtractor
"""
from MyFeatureExtractor import FeatureExtractor
import anomalib.models.patchcore.torch_model as ptm
ptm.FeatureExtractor = FeatureExtractor

In [7]:
"""
Monkey Patch
 * Visualizer
"""
from MyVisualizer import Visualizer
import anomalib.utils.callbacks.visualizer.visualizer_base as vb
vb.Visualizer = Visualizer

## Config

In [8]:
package_path = site.getsitepackages()[0]
for p in site.getsitepackages():
    if "site-package" in p:
        package_path = p
        break

package_path

'/work/myenv/lib/python3.9/site-packages'

In [9]:
"""
SET MODEL NAME
"""
model_name = "patchcore"
config_path = os.path.join(package_path, f"anomalib/models/{model_name}/config.yaml")
config = get_configurable_parameters(model_name=model_name, config_path=config_path)

  warn(
  warn(


In [10]:
"""
DATASET SETTING 
"""
config.dataset.name = "custom"
config.dataset.path = "./datasets/Custom/"
config.dataset.category = "my_bottle"

config.dataset.train_batch_size = 4
config.dataset.eval_batch_size = 4

dict(config.dataset)

{'name': 'custom',
 'format': 'mvtec',
 'path': './datasets/Custom/',
 'task': 'segmentation',
 'category': 'my_bottle',
 'train_batch_size': 4,
 'eval_batch_size': 4,
 'num_workers': 8,
 'image_size': [256, 256],
 'center_crop': [224, 224],
 'normalization': 'imagenet',
 'transform_config': {'train': None, 'eval': None},
 'test_split_mode': 'from_dir',
 'test_split_ratio': 0.2,
 'val_split_mode': 'same_as_test',
 'val_split_ratio': 0.5,
 'tiling': {'apply': False, 'tile_size': None, 'stride': None, 'remove_border_count': 0, 'use_random_tiling': False, 'random_tile_count': 16}}

In [11]:
dict(config.logging)

{'logger': [], 'log_graph': False}

In [12]:
config.metrics.pixel = []

dict(config.metrics)

{'image': ['F1Score', 'AUROC'],
 'pixel': [],
 'threshold': {'method': 'adaptive', 'manual_image': None, 'manual_pixel': None}}

In [13]:
"""
MODEL SETTING
"""
config.model.backbone = "resnet18"
config.model.layers = ['layer2.-1', 'layer3.-1']
config.model.coreset_sampling_ratio = 0.1
config.model.num_neighbors = 9

# config.model.backbone = "wide_resnet50_2"
# config.model.layers = ['layer2.-1', 'layer3.-1']
# config.model.coreset_sampling_ratio = 0.1
# config.model.num_neighbors = 9

# config.model.backbone = "tf_efficientnet_b7_ns"
# config.model.layers = ['blocks.3.-1', 'blocks.4.-1']
# config.model.coreset_sampling_ratio = 0.1
# config.model.num_neighbors = 9

dict(config.model)

{'name': 'patchcore',
 'backbone': 'resnet18',
 'pre_trained': True,
 'layers': ['layer2.-1', 'layer3.-1'],
 'coreset_sampling_ratio': 0.1,
 'num_neighbors': 9,
 'normalization_method': 'min_max',
 'input_size': [224, 224]}

In [14]:
dict(config.optimization)

{'export_mode': None}

In [16]:
config.project.path = f"./results/{config.dataset.category}_{model_name}_{config.model.backbone}"
dict(config.project)

{'seed': 0,
 'path': './results/my_bottle_patchcore_resnet18',
 'unique_dir': False}

In [17]:
config.trainer.default_root_dir = f"./results/{config.dataset.category}_{model_name}_{config.model.backbone}"

dict(config.trainer)

{'enable_checkpointing': True,
 'default_root_dir': './results/my_bottle_patchcore_resnet18',
 'gradient_clip_val': 0,
 'gradient_clip_algorithm': 'norm',
 'num_nodes': 1,
 'devices': 1,
 'enable_progress_bar': True,
 'overfit_batches': 0.0,
 'track_grad_norm': -1,
 'check_val_every_n_epoch': 1,
 'fast_dev_run': False,
 'accumulate_grad_batches': 1,
 'max_epochs': 1,
 'min_epochs': None,
 'max_steps': -1,
 'min_steps': None,
 'max_time': None,
 'limit_train_batches': 1.0,
 'limit_val_batches': 1.0,
 'limit_test_batches': 1.0,
 'limit_predict_batches': 1.0,
 'val_check_interval': 1.0,
 'log_every_n_steps': 50,
 'accelerator': 'auto',
 'strategy': None,
 'sync_batchnorm': False,
 'precision': 32,
 'enable_model_summary': True,
 'num_sanity_val_steps': 0,
 'profiler': None,
 'benchmark': False,
 'deterministic': False,
 'reload_dataloaders_every_n_epochs': 0,
 'auto_lr_find': False,
 'replace_sampler_ddp': True,
 'detect_anomaly': False,
 'auto_scale_batch_size': False,
 'plugins': None,


In [18]:
dict(config.visualization)

{'show_images': False,
 'save_images': True,
 'log_images': True,
 'image_save_path': None,
 'mode': 'full'}

## Generate Fake Mask

In [19]:
test_dir_path = f"{config.dataset.path}{config.dataset.category}/test/"
ground_truth_dir_path = f"{config.dataset.path}{config.dataset.category}/ground_truth/"
bad_labels = [d for d in os.listdir(test_dir_path) if os.path.isdir(os.path.join(test_dir_path, d)) and d != 'good']

for bad_label in bad_labels:
    current_test_dir_path = f"{test_dir_path}/{bad_label}/"
    current_ground_truth_dir_path = f"{ground_truth_dir_path}/{bad_label}/"
    img_labels = [f for f in os.listdir(current_test_dir_path) if os.path.isfile(os.path.join(current_test_dir_path, f))]

    if not os.path.exists(current_ground_truth_dir_path):
        os.makedirs(current_ground_truth_dir_path)
        
    for img_label in img_labels:
        test_img_path = f"{current_test_dir_path}{img_label}"
        test_img = cv2.imread(test_img_path, cv2.IMREAD_GRAYSCALE)
        
        black_img = np.zeros(test_img.shape, dtype=np.uint8)
        
        black_img_path = f"{current_ground_truth_dir_path}{img_label}"
        cv2.imwrite(black_img_path, black_img)

## Train

In [20]:
datamodule = get_datamodule(config)
model = get_model(config)
experiment_logger = get_experiment_logger(config)
callbacks = get_callbacks(config)



In [21]:
"""
Monkey Patch
 * _compute_adaptive_threshold
"""

def _compute_adaptive_threshold(self, outputs) -> None:
    self.image_threshold.reset()
    self.pixel_threshold.reset()
    self._collect_outputs(self.image_threshold, self.pixel_threshold, outputs)
    self.image_threshold.compute()
    # if "mask" in outputs[0].keys() and "anomaly_maps" in outputs[0].keys():
    #     self.pixel_threshold.compute()
    # else:
    #     self.pixel_threshold.value = self.image_threshold.value
    self.pixel_threshold.value = self.image_threshold.value

model._compute_adaptive_threshold = _compute_adaptive_threshold.__get__(model, model.__class__)

In [22]:
trainer = Trainer(**config.trainer, logger=experiment_logger, callbacks=callbacks)
logger.info("Training the model.")
trainer.fit(model=model, datamodule=datamodule)

  rank_zero_warn(


Training: 0it [00:00, ?it/s]



Validation: 0it [00:00, ?it/s]



Selecting Coreset Indices.:   0%|                                                                                                                                                                      | 0/16385 [00:00<?, ?it/s][A[A

Selecting Coreset Indices.:   0%|                                                                                                                                                              | 1/16385 [00:00<34:07,  8.00it/s][A[A

Selecting Coreset Indices.:   0%|▌                                                                                                                                                           | 56/16385 [00:00<00:55, 295.42it/s][A[A

Selecting Coreset Indices.:   1%|█                                                                                                                                                          | 114/16385 [00:00<00:39, 414.68it/s][A[A

Selecting Coreset Indices.:   1%|█▋                               

# Test

In [23]:
weight_file_path = trainer.checkpoint_callback.best_model_path
weight_file_path

'/work/results/my_bottle_patchcore_resnet18/weights/lightning/model.ckpt'

In [24]:
logger.info("Loading the best model weights.")
load_model_callback = LoadModelCallback(weights_path=weight_file_path)
trainer.callbacks.insert(0, load_model_callback)

_=trainer.test(model=model, datamodule=datamodule)

Testing: 0it [00:00, ?it/s]

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       image_AUROC                  1.0
      image_F1Score                 1.0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


## Check

In [25]:
model.model

PatchcoreModel(
  (feature_extractor): FeatureExtractor(
    (feature_extractor): FeatureListNet(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (act1): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): BasicBlock(
          (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (drop_block): Identity()
          (act1): ReLU(inplace=True)
          (aa): Identity()
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (act2): ReLU(inplace=True)
        )
     

In [26]:
for layer, dim, size in zip(model.model.layers, model.model.feature_extractor.out_dims, model.model.feature_extractor.out_sizes):
    print(f"{layer} [dim:{dim}, size:{size}]")

layer2.-1 [dim:128, size:28]
layer3.-1 [dim:256, size:14]


## Appendix

In [None]:
import timm

In [None]:
# timm.create_model(
#     "convnext_base_384_in22ft1k",
#     pretrained=True,
#     features_only=True,
#     exportable=True,
# )

In [None]:
timm.list_models(pretrained=True)