In [2]:
import os
import cv2
import torch
import numpy as np
from tqdm import tqdm
from torchvision import models, transforms
from torch import nn
from sklearn.metrics import roc_auc_score
from sklearn.metrics import pairwise_distances
from sklearn.random_projection import SparseRandomProjection

In [22]:
# --- Settings ---
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
TRAIN_DIR = "masks/good"
TEST_DIR_GOOD = "masks/test/good"
TEST_DIR_DEFECT = "masks/test/defective"



In [23]:
# --- Model Setup ---
resnet = models.wide_resnet50_2(pretrained=True).to(DEVICE)
resnet.eval()

features = {}

def save_hook(name):
    def fn(_, __, output): features[name] = output
    return fn

resnet.layer2[-1].register_forward_hook(save_hook("layer2"))
resnet.layer3[-1].register_forward_hook(save_hook("layer3"))

# --- Image Transform ---
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])



In [24]:
# --- Feature Extraction ---
def extract_patch_features(img_path):
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    x = transform(img).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        _ = resnet(x)
        f1 = nn.functional.avg_pool2d(features['layer2'], 3, 1, 1)
        f2 = nn.functional.avg_pool2d(features['layer3'], 3, 1, 1)
        f2 = nn.functional.interpolate(f2, size=f1.shape[-2:], mode="bilinear", align_corners=False)
        emb = torch.cat([f1, f2], dim=1)
        emb = emb.permute(0, 2, 3, 1).reshape(-1, emb.shape[1])
        return emb.cpu().numpy()  # [N_patches, C]

In [25]:
# --- Build Memory Bank from Good Training Images ---
print("🔍 Extracting features from training (good) images...")
all_patches = []
for fname in tqdm(os.listdir(TRAIN_DIR)):
    if fname.lower().endswith((".jpg", ".png", ".jpeg")):
        path = os.path.join(TRAIN_DIR, fname)
        all_patches.append(extract_patch_features(path))
all_patches = np.vstack(all_patches)  # [N, C]

🔍 Extracting features from training (good) images...


100%|██████████| 32/32 [00:01<00:00, 19.42it/s]


In [26]:
# --- Coreset Sampling ---
print("🎯 Building coreset (memory bank)...")
projector = SparseRandomProjection(eps=0.5)
projected = projector.fit_transform(all_patches)
mem_bank = []

🎯 Building coreset (memory bank)...


In [27]:
# K-Center Greedy
n_keep = int(len(projected) * 0.01)  # 1% of patches
selected = []
distances = None
for _ in tqdm(range(n_keep)):
    if distances is None:
        idx = np.random.randint(0, len(projected))
        centroid = projected[idx:idx+1]
        distances = pairwise_distances(projected, centroid)
        selected.append(idx)
    else:
        idx = np.argmax(distances)
        centroid = projected[idx:idx+1]
        new_dist = pairwise_distances(projected, centroid)
        distances = np.minimum(distances, new_dist)
        selected.append(idx)
memory_bank = torch.tensor(all_patches[selected]).float().to(DEVICE)

100%|██████████| 250/250 [00:07<00:00, 31.62it/s]


In [30]:
# --- Anomaly Scoring ---
def anomaly_score(img_path):
    patch_feats = extract_patch_features(img_path)
    with torch.no_grad():
        dists = torch.cdist(torch.tensor(patch_feats).float().to(DEVICE), memory_bank)
        min_dists = dists.min(dim=1).values  # distance to closest memory patch
    return min_dists.max().item()  # worst-case patch distance

# --- Scoring Function for a Folder ---
def score_folder(folder):
    scores = []
    for fname in tqdm(os.listdir(folder)):
        if fname.lower().endswith((".jpg", ".png", ".jpeg")):
            path = os.path.join(folder, fname)
            score = anomaly_score(path)
            scores.append((fname, score))
    return scores


In [29]:
# --- 1. Setup ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Feature extractor: ResNet50 up to layer2 & layer3
res = models.wide_resnet50_2(pretrained=True).to(device)
layers = ['layer2', 'layer3']
features = {}
def hook(name):
    def fn(m, i, o): features[name] = o
    return fn
for name in layers:
    getattr(res, name)[-1].register_forward_hook(hook(name))
res.eval()

# Image transform
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((256, 256)),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])




In [16]:

# --- 2. Extract patch-features from good images ---
def extract_patches(img):
    img = cv2.cvtColor(cv2.imread(img), cv2.COLOR_BGR2RGB)
    x = transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        _ = res(x)
        f1 = nn.functional.avg_pool2d(features['layer2'], 3, 1, 1)
        f2 = nn.functional.avg_pool2d(features['layer3'], 3, 1, 1)
    f2 = nn.functional.interpolate(f2, size=f1.shape[-2:])
    emb = torch.cat([f1, f2], dim=1)  # b×C×H×W
    emb = emb.permute(0,2,3,1).reshape(-1, emb.shape[1])
    return emb.cpu().numpy()

good = "good/"
all_patches = []
for fn in os.listdir(good):
    if fn.lower().endswith(('.jpg','.png','jpeg')):
        all_patches.append(extract_patches(os.path.join(good, fn)))
all_patches = np.vstack(all_patches)  # [N, C]

In [17]:
# --- 3. Coreset subsampling via k-Center Greedy ---
rp = SparseRandomProjection(eps=0.5)
proj = rp.fit_transform(all_patches)
selected = []
dist = None
total = proj.shape[0]
sub = int(total * 0.01)  # keep 1%
for _ in range(sub):
    if dist is None:
        idx = np.random.randint(0, total)
        cent = proj[idx:idx+1]
        dist = pairwise_distances(proj, cent).reshape(-1,1)
        selected = [idx]
    else:
        idx = np.argmax(dist)
        cent = proj[idx:idx+1]
        newd = pairwise_distances(proj, cent).reshape(-1,1)
        dist = np.minimum(dist, newd)
        selected.append(idx)
memory = torch.from_numpy(all_patches[selected]).float().to(device)  # memory bank


In [18]:
# --- 4. Test images ---
def anomaly_score(img_path):
    patches = extract_patches(img_path)
    with torch.no_grad():
        d = torch.cdist(torch.from_numpy(patches).float().to(device), memory)
        dmin = d.min(dim=1).values
    return dmin.max().item()  # take worst patch

In [31]:
# --- Score Good and Defective Images ---
print("📊 Scoring GOOD test images...")
scores_good = score_folder(TEST_DIR_GOOD)

print("📊 Scoring DEFECTIVE test images...")
scores_defect = score_folder(TEST_DIR_DEFECT)

📊 Scoring GOOD test images...


100%|██████████| 1/1 [00:00<00:00,  5.32it/s]


📊 Scoring DEFECTIVE test images...


100%|██████████| 1/1 [00:00<00:00,  6.71it/s]


In [32]:
# --- Threshold Selection ---
all_good_values = [s for _, s in scores_good]
threshold = np.percentile(all_good_values, 95)
print(f"✅ Threshold set at 95th percentile of GOOD scores: {threshold:.4f}")

# --- Final Decision and Output ---
print("\n--- Results ---")
def print_results(scores, label):
    for fname, score in scores:
        result = "ANOMALY 🚨" if score > threshold else "Normal ✅"
        print(f"{label} | {fname:<30} | Score: {score:.4f} | {result}")

print_results(scores_good, "GOOD")
print_results(scores_defect, "DEFECT")


✅ Threshold set at 95th percentile of GOOD scores: 2.1202

--- Results ---
GOOD | Image__2025-05-05__10-06-44.jpg | Score: 2.1202 | Normal ✅
DEFECT | 33af61dd-Image__2025-05-02__10-36-24.jpg | Score: 2.5506 | ANOMALY 🚨


In [21]:
import cv2
import numpy as np
import os

def auto_crop_mask(img_path, output_crop_path=None, output_mask_path=None):
    img = cv2.imread(img_path)
    img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)

    # Define white-ish color range in HSV (tweak as needed)
    lower_white = np.array([0, 0, 200])
    upper_white = np.array([180, 40, 255])

    mask = cv2.inRange(img_hsv, lower_white, upper_white)

    # Morphological cleanup
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (15,15))
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)

    # Find contours on the mask
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    if not contours:
        print("No white part detected.")
        return None, None

    # Find largest contour (assuming it’s your part)
    largest_contour = max(contours, key=cv2.contourArea)

    # Get bounding box around largest contour
    x,y,w,h = cv2.boundingRect(largest_contour)

    # Crop original image to bounding box
    cropped = img[y:y+h, x:x+w]

    # Crop mask to same bounding box
    mask_cropped = mask[y:y+h, x:x+w]

    # Save if paths provided
    if output_crop_path:
        cv2.imwrite(output_crop_path, cropped)
    if output_mask_path:
        cv2.imwrite(output_mask_path, mask_cropped)

    return cropped, mask_cropped

# Example batch processing
input_dir = "test\defective"
crop_dir = "cropped/"
mask_dir = "masks/"
os.makedirs(crop_dir, exist_ok=True)
os.makedirs(mask_dir, exist_ok=True)

for fname in os.listdir(input_dir):
    if fname.lower().endswith(('.jpg', '.png', '.jpeg')):
        img_path = os.path.join(input_dir, fname)
        crop_path = os.path.join(crop_dir, fname)
        mask_path = os.path.join(mask_dir, fname)
        cropped_img, mask_img = auto_crop_mask(img_path, crop_path, mask_path)