In [1]:
import os
import glob
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.resnet50 import preprocess_input, ResNet50
from sklearn.decomposition import PCA
from sklearn.neighbors import NearestNeighbors
from skimage.transform import resize
from skimage.io import imread
from sklearn.metrics import average_precision_score
from tqdm import tqdm


In [2]:
def process_images(folder_path, img_size=(224, 224)):
    images, labels = [], []
    for subfolder in os.listdir(folder_path):
        subfolder_path = os.path.join(folder_path, subfolder)
        if not os.path.isdir(subfolder_path):
            continue
        label = 0 if subfolder.lower() == "good" else 1
        for filename in os.listdir(subfolder_path):
            try:
                img_path = os.path.join(subfolder_path, filename)
                img = load_img(img_path, target_size=img_size)
                img = img_to_array(img)
                img = preprocess_input(img)
                images.append(img)
                labels.append(label)
            except Exception as e:
                print(f"Failed to load image: {img_path}, error: {e}")
    return np.array(images), np.array(labels)

def get_test_image_paths(test_folder):
    paths = []
    for subfolder in os.listdir(test_folder):
        sub_path = os.path.join(test_folder, subfolder)
        if not os.path.isdir(sub_path):
            continue
        paths += sorted(glob.glob(os.path.join(sub_path, '*.png')))
    return paths

def load_ground_truth_masks(gt_mask_folder, test_image_paths, target_size=(28, 28)):
    masks = []
    for path in test_image_paths:
        cls = os.path.basename(os.path.dirname(path))
        fname = os.path.basename(path)

        if cls == 'good':
            mask = np.zeros(target_size, dtype=np.uint8)
        else:
            defect_type = cls
            name, ext = os.path.splitext(fname)
            mask_name = name + '_mask' + ext
            mask_path = os.path.join(gt_mask_folder, defect_type, mask_name)

            if os.path.exists(mask_path):
                mask = imread(mask_path, as_gray=True)
                mask = resize(mask, target_size, anti_aliasing=False, preserve_range=True)
                mask = (mask > 0.5).astype(np.uint8)
            else:
                mask = np.zeros(target_size, dtype=np.uint8)
        masks.append(mask)
    return np.array(masks)

In [3]:
def get_feature_extractor(input_shape=(224, 224, 3)):
    base = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
    base.trainable = False
    layer_names = ['conv2_block3_out', 'conv3_block4_out', 'conv4_block6_out']
    outputs = [base.get_layer(name).output for name in layer_names]
    model = tf.keras.Model(inputs=base.input, outputs=outputs)
    return model


def extract_patch_features(images, model):
    features_list = model.predict(images, verbose=0)
    resized_features = [
        tf.image.resize(f, size=(28, 28)).numpy()
        for f in features_list
    ]
    combined = np.concatenate(resized_features, axis=-1)
    N, H, W, C = combined.shape
    patches = combined.reshape(N, H * W, C)
    return patches, H, W


def compute_anomaly_maps(memory_bank, test_patches, H, W, k=1):
    nn = NearestNeighbors(n_neighbors=k, metric='euclidean', n_jobs=-1)
    nn.fit(memory_bank)
    maps = []
    for patch in tqdm(test_patches, desc="Computing anomaly maps"):
        dist, _ = nn.kneighbors(patch)
        scores = dist.mean(axis=1).reshape(H, W)
        maps.append(scores)
    return np.array(maps)


def compute_auprc(y_true, y_score):
    return average_precision_score(y_true.flatten(), y_score.flatten())


def run_patchcore(product):
    train_dir = f'data/{product}/train'
    test_dir = f'data/{product}/test'
    gt_dir = f'data/{product}/ground_truth'

    train_imgs, _ = process_images(train_dir)
    test_imgs, _ = process_images(test_dir)
    test_paths = get_test_image_paths(test_dir)

    model = get_feature_extractor()
    train_patches, H, W = extract_patch_features(train_imgs, model)
    test_patches, _, _ = extract_patch_features(test_imgs, model)

    memory_bank = train_patches.reshape(-1, train_patches.shape[-1])

    pca = PCA(n_components=0.95)
    memory_bank_processed = pca.fit_transform(memory_bank)
    test_patches_processed = pca.transform(test_patches.reshape(-1, test_patches.shape[-1]))

    test_patches_processed_reshaped = test_patches_processed.reshape(test_imgs.shape[0], H * W, -1)

    maps = compute_anomaly_maps(
        memory_bank_processed,
        test_patches_processed_reshaped,
        H, W,
        k=1
    )

    gt_masks = load_ground_truth_masks(gt_dir, test_paths, (H, W))
    score = compute_auprc(gt_masks, maps)
    return score

In [4]:
products = ['carpet', 'grid', 'leather', 'tile', 'wood', 'bottle', 'cable', 'capsule', 'hazelnut',
            'metal_nut', 'pill', 'screw', 'toothbrush', 'transistor', 'zipper']

all_products_auprc = {}

IMG_SIZE = (224, 224)

for prod in products:
    auprc_score = run_patchcore(prod)
    all_products_auprc[prod] = auprc_score
    print(f"{prod:<15}: AUPRC = {auprc_score:.3f}")

if all_products_auprc:
    average_auprc = np.mean(list(all_products_auprc.values()))
    print(f"Average Pixel-level AUPRC across all products: {average_auprc:.3f}")

Computing anomaly maps: 100%|██████████| 117/117 [02:20<00:00,  1.20s/it]


carpet         : AUPRC = 0.587


Computing anomaly maps: 100%|██████████| 78/78 [01:01<00:00,  1.27it/s]


grid           : AUPRC = 0.335


Computing anomaly maps: 100%|██████████| 124/124 [01:59<00:00,  1.04it/s]


leather        : AUPRC = 0.521


Computing anomaly maps: 100%|██████████| 117/117 [02:10<00:00,  1.11s/it]


tile           : AUPRC = 0.423


Computing anomaly maps: 100%|██████████| 79/79 [01:32<00:00,  1.17s/it]


wood           : AUPRC = 0.468


Computing anomaly maps: 100%|██████████| 83/83 [00:35<00:00,  2.36it/s]


bottle         : AUPRC = 0.773


Computing anomaly maps: 100%|██████████| 150/150 [02:49<00:00,  1.13s/it]


cable          : AUPRC = 0.547


Computing anomaly maps: 100%|██████████| 132/132 [01:00<00:00,  2.19it/s]


capsule        : AUPRC = 0.450


Computing anomaly maps: 100%|██████████| 110/110 [02:41<00:00,  1.47s/it]


hazelnut       : AUPRC = 0.601


Computing anomaly maps: 100%|██████████| 115/115 [01:18<00:00,  1.46it/s]


metal_nut      : AUPRC = 0.845


Computing anomaly maps: 100%|██████████| 167/167 [01:31<00:00,  1.82it/s]


pill           : AUPRC = 0.563


Computing anomaly maps: 100%|██████████| 160/160 [02:27<00:00,  1.09it/s]


screw          : AUPRC = 0.386


Computing anomaly maps: 100%|██████████| 42/42 [00:04<00:00,  9.88it/s]


toothbrush     : AUPRC = 0.585


Computing anomaly maps: 100%|██████████| 100/100 [00:59<00:00,  1.69it/s]


transistor     : AUPRC = 0.443


Computing anomaly maps: 100%|██████████| 151/151 [00:47<00:00,  3.18it/s]


zipper         : AUPRC = 0.643
Average Pixel-level AUPRC across all products: 0.545
