0) Parámetros globales (ruta + semilla)

In [5]:
# === CELDA 0: CONFIG GLOBAL ===
import os, random, numpy as np, torch
BASE = r"C:\Users\DELL\Desktop\UNI-LEON\DP ULE\dataset_gua_crops\cropped_images"
SEED = 42

random.seed(SEED); np.random.seed(SEED)
torch.manual_seed(SEED); 
if torch.cuda.is_available(): torch.cuda.manual_seed_all(SEED)

print("BASE:", BASE)


BASE: C:\Users\DELL\Desktop\UNI-LEON\DP ULE\dataset_gua_crops\cropped_images


1) Organizar dataset (raíz → defectuosas / normales + labels.csv)

In [6]:
# === CELDA 1: ORGANIZAR DATASET ===
import os, shutil, csv

IMG_EXTS = {".png", ".jpg", ".jpeg", ".bmp"}
DEF_DIR  = os.path.join(BASE, "defectuosas")
NORM_DIR = os.path.join(BASE, "normales")
os.makedirs(DEF_DIR, exist_ok=True); os.makedirs(NORM_DIR, exist_ok=True)

def find_image(base_dir, basename):
    for ext in IMG_EXTS:
        p = os.path.join(base_dir, basename + ext)
        if os.path.exists(p): return p
    return None

# mover defectuosas usando .json
json_files = [f for f in os.listdir(BASE) if f.lower().endswith(".json")]
for jf in json_files:
    base = os.path.splitext(jf)[0]
    js = os.path.join(BASE, jf)
    im = find_image(BASE, base)
    shutil.move(js, os.path.join(DEF_DIR, jf))
    if im: shutil.move(im, os.path.join(DEF_DIR, os.path.basename(im)))

# mover lo demás (imágenes) a normales
for f in list(os.listdir(BASE)):
    p = os.path.join(BASE, f)
    if os.path.isfile(p) and os.path.splitext(f)[1].lower() in IMG_EXTS:
        shutil.move(p, os.path.join(NORM_DIR, f))

# labels.csv
with open(os.path.join(BASE, "labels.csv"), "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f); w.writerow(["filename","label"])
    for fimg in sorted(os.listdir(DEF_DIR)):
        if os.path.splitext(fimg)[1].lower() in IMG_EXTS:
            w.writerow([os.path.join("defectuosas", fimg), 1])
    for fimg in sorted(os.listdir(NORM_DIR)):
        if os.path.splitext(fimg)[1].lower() in IMG_EXTS:
            w.writerow([os.path.join("normales", fimg), 0])

print("✅ Dataset organizado.")


✅ Dataset organizado.


2) Crear splits: train (solo normales), val y test (mixtos)

In [7]:
# === CELDA 2: SPLITS ===
import os, shutil, csv, math, random
SPLITS = os.path.join(BASE, "splits")
for d in ["train","val","test"]:
    os.makedirs(os.path.join(SPLITS, d, "images"), exist_ok=True)

def list_imgs(folder):
    return sorted([f for f in os.listdir(folder) if os.path.splitext(f)[1].lower() in IMG_EXTS])

norm = list_imgs(NORM_DIR); defc = list_imgs(DEF_DIR)
random.shuffle(norm); random.shuffle(defc)

# normales 70/15/15
n = len(norm)
n_tr, n_val = math.floor(0.7*n), math.floor(0.15*n)
norm_train = norm[:n_tr]
norm_val   = norm[n_tr:n_tr+n_val]
norm_test  = norm[n_tr+n_val:]

# defectuosas 40% val / 60% test
m = len(defc)
m_val = math.floor(0.4*m)
def_val  = defc[:m_val]
def_test = defc[m_val:]

def cp(files, src, split):
    for fn in files:
        shutil.copy2(os.path.join(src, fn), os.path.join(SPLITS, split, "images", fn))

cp(norm_train, NORM_DIR, "train")
cp(norm_val,   NORM_DIR, "val")
cp(norm_test,  NORM_DIR, "test")
cp(def_val,    DEF_DIR,  "val")
cp(def_test,   DEF_DIR,  "test")

def write_labels(split, normals, defects):
    with open(os.path.join(SPLITS, split, "labels.csv"), "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f); w.writerow(["filename","label"])
        for fn in sorted(normals):  w.writerow([os.path.join("images", fn), 0])
        for fn in sorted(defects):  w.writerow([os.path.join("images", fn), 1])

write_labels("train", norm_train, [])
write_labels("val",   norm_val,   def_val)
write_labels("test",  norm_test,  def_test)

print("✅ Splits listos.")
print("train:", len(norm_train), "normales / 0 defectuosas")
print("val:",   len(norm_val),   "normales +", len(def_val),  "defectuosas")
print("test:",  len(norm_test),  "normales +", len(def_test), "defectuosas")


✅ Splits listos.
train: 109 normales / 0 defectuosas
val: 23 normales + 18 defectuosas
test: 24 normales + 27 defectuosas


3) PatchCore – cargar extractor y funciones comunes

In [8]:
# === CELDA 3: PATCHCORE - EXTRACTOR Y UTILES ===
import cv2, numpy as np, torch, torch.nn as nn
import torchvision.models as models
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm, trange
import json, os

IMG_SIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", DEVICE)

# extractor ResNet18 (capas intermedias)
backbone = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1).to(DEVICE)
backbone.eval()
FE_LAYERS = ["layer2","layer3"]

class FeatHook:
    def __init__(self, m): self.h = m.register_forward_hook(self.hook); self.feat=None
    def hook(self, m, i, o): self.feat = o.detach()
    def close(self): self.h.remove()

hook2 = FeatHook(dict(backbone.named_modules())["layer2"])
hook3 = FeatHook(dict(backbone.named_modules())["layer3"])

def load_img(path, size=IMG_SIZE):
    im = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    if im is None: raise FileNotFoundError(path)
    im = cv2.resize(im, (size,size), interpolation=cv2.INTER_AREA)
    x = im.astype(np.float32)/255.0
    x = np.stack([x,x,x], axis=0)          # 3 canales
    return torch.from_numpy(x).unsqueeze(0).to(DEVICE)

def extract_concat_features(img_path):
    x = load_img(img_path)
    with torch.no_grad():
        _ = backbone(x)
    f2 = hook2.feat; f3 = hook3.feat
    def up(x): 
        return torch.nn.functional.interpolate(x, size=f2.shape[-2:], mode="bilinear", align_corners=False)
    f3u = up(f3)
    fcat = torch.cat([f2, f3u], dim=1).squeeze(0)  # (C,H,W)
    return fcat

def patchify(fmap):  # (C,H,W) -> (N,C)
    C,H,W = fmap.shape
    return fmap.permute(1,2,0).reshape(H*W, C).contiguous()

def read_csv(csv_path):
    items=[]
    with open(csv_path, "r", encoding="utf-8") as f:
        next(f)
        for line in f:
            rel,lbl = line.strip().split(",")
            items.append((rel,int(lbl)))
    return items


Device: cuda


4) Construir Memory Bank (solo train normales) + CORESET y guardar

In [9]:
# === CELDA 4: MEMORY BANK + CORESET ===
TRAIN_DIR = os.path.join(SPLITS, "train")
PATCHCORE_DIR = os.path.join(BASE, "patchcore")
os.makedirs(PATCHCORE_DIR, exist_ok=True)

train_items = read_csv(os.path.join(TRAIN_DIR, "labels.csv"))
train_imgs = [os.path.join(TRAIN_DIR, rel) for rel,lbl in train_items if lbl==0]

all_patches=[]
print("Extrayendo parches de TRAIN (solo normales)...")
for p in tqdm(train_imgs):
    fcat = extract_concat_features(p)
    patches = patchify(fcat)
    patches = torch.nn.functional.normalize(patches, p=2, dim=1)
    all_patches.append(patches.cpu().numpy())

bank_full = np.vstack(all_patches).astype(np.float32)
print("Banco completo:", bank_full.shape)

# CORESET simple por proyección aleatoria + k-center greedy
def random_projection(X, out_dim=128, seed=SEED):
    rng = np.random.default_rng(seed)
    R = rng.standard_normal((X.shape[1], out_dim)).astype(np.float32)
    Z = X @ R
    Z /= (np.linalg.norm(Z, axis=1, keepdims=True)+1e-8)
    return Z

def kcenter_greedy(Z, m, seed=SEED):
    rng = np.random.default_rng(seed); N = Z.shape[0]
    start = int(rng.integers(0, N)); centers=[start]
    d = np.linalg.norm(Z - Z[start], axis=1)
    for _ in trange(1, m):
        i = int(np.argmax(d)); centers.append(i)
        d = np.minimum(d, np.linalg.norm(Z - Z[i], axis=1))
    return np.array(centers, dtype=np.int64)

CORESET_RATIO = 0.05
m = max(1, int(CORESET_RATIO * bank_full.shape[0]))
Z = random_projection(bank_full, out_dim=128)
sel = kcenter_greedy(Z, m=m)
bank = bank_full[sel]
np.savez(os.path.join(PATCHCORE_DIR, "memory_bank_core.npz"),
         bank=bank, img_size=np.array([IMG_SIZE], dtype=np.int32),
         layers=np.array(FE_LAYERS, dtype=object))
print("✅ Memory bank guardado:", bank.shape)


Extrayendo parches de TRAIN (solo normales)...


100%|██████████| 109/109 [00:01<00:00, 83.86it/s]


Banco completo: (111616, 384)


100%|██████████| 5579/5579 [03:20<00:00, 27.88it/s]

✅ Memory bank guardado: (5580, 384)





5) Validación: calibrar umbral y guardar config

In [10]:
# === CELDA 5: VALIDACIÓN (calibrar umbral) ===
VAL_DIR = os.path.join(SPLITS, "val")
VAL_CSV = os.path.join(VAL_DIR, "labels.csv")
items = read_csv(VAL_CSV)

# cargar banco y preparar KNN
mb = np.load(os.path.join(PATCHCORE_DIR, "memory_bank_core.npz"), allow_pickle=True)
bank = mb["bank"]  # (M, C)
knn = NearestNeighbors(n_neighbors=3, algorithm="auto").fit(bank)

def anomaly_map_and_score(img_path):
    fcat = extract_concat_features(img_path)
    Hf,Wf = fcat.shape[-2:]
    patches = patchify(fcat)
    patches = torch.nn.functional.normalize(patches, p=2, dim=1).cpu().numpy()
    dists,_ = knn.kneighbors(patches, return_distance=True)
    patch_scores = dists.mean(axis=1).reshape(Hf,Wf).astype(np.float32)
    score = float(patch_scores.max())
    return score

# obtener scores/labels
y_true, s = [], []
for rel,lbl in tqdm(items):
    p = os.path.join(VAL_DIR, rel)
    y_true.append(lbl)
    s.append(anomaly_map_and_score(p))

y_true = np.array(y_true); s = np.array(s)

from sklearn.metrics import roc_auc_score, precision_recall_curve, auc
auc_roc = roc_auc_score(y_true, s)
prec, rec, thr = precision_recall_curve(y_true, s)
f1s = 2*prec*rec/(prec+rec+1e-8)
best_idx = int(np.argmax(f1s))
thr_f1 = float(thr[best_idx])
print(f"VAL -> ROC-AUC={auc_roc:.4f} | bestF1_thr={thr_f1:.6f}")

# percentil 95 de normales (robusto)
thr_p95 = float(np.percentile(s[y_true==0], 95))
print("Umbral p95 normales:", round(thr_p95,6))

# guardar config
cfg = {"threshold": thr_p95}
with open(os.path.join(PATCHCORE_DIR, "config.json"), "w") as f:
    json.dump(cfg, f)
print("✅ Umbral guardado.")


100%|██████████| 41/41 [00:05<00:00,  7.54it/s]

VAL -> ROC-AUC=0.8865 | bestF1_thr=0.419504
Umbral p95 normales: 0.356087
✅ Umbral guardado.





6) Test: métricas finales

In [11]:
# === CELDA 6: TEST (métricas finales) ===
from sklearn.metrics import confusion_matrix, classification_report

TEST_DIR = os.path.join(SPLITS, "test")
TEST_CSV = os.path.join(TEST_DIR, "labels.csv")
cfg = json.load(open(os.path.join(PATCHCORE_DIR, "config.json"), "r"))
thr = cfg["threshold"]

items = read_csv(TEST_CSV)
y_true, s = [], []
for rel,lbl in tqdm(items):
    p = os.path.join(TEST_DIR, rel)
    y_true.append(lbl)
    s.append(anomaly_map_and_score(p))

y_true = np.array(y_true); s = np.array(s)
y_pred = (s > thr).astype(int)

auc_roc = roc_auc_score(y_true, s)
print("Confusion matrix [[TN FP],[FN TP]]:\n", confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred, digits=4))
print("ROC-AUC (scores):", round(auc_roc, 4))


100%|██████████| 51/51 [00:03<00:00, 12.76it/s]

Confusion matrix [[TN FP],[FN TP]]:
 [[23  1]
 [ 3 24]]
              precision    recall  f1-score   support

           0     0.8846    0.9583    0.9200        24
           1     0.9600    0.8889    0.9231        27

    accuracy                         0.9216        51
   macro avg     0.9223    0.9236    0.9215        51
weighted avg     0.9245    0.9216    0.9216        51

ROC-AUC (scores): 0.9506





7) Visuales (overlay, heatmap, polígonos) para demo

In [12]:
# === CELDA 7: VISUALIZACIÓN ===
viz_dir = os.path.join(PATCHCORE_DIR, "viz"); os.makedirs(viz_dir, exist_ok=True)

def heat_and_polys(img_path, score_map, percent=98):
    raw = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    heat = cv2.resize(score_map, (raw.shape[1], raw.shape[0]), interpolation=cv2.INTER_CUBIC)
    heat_norm = (heat - heat.min()) / (heat.max() - heat.min() + 1e-8)
    heat_u8 = (heat_norm*255).astype(np.uint8)
    heat_color = cv2.applyColorMap(heat_u8, cv2.COLORMAP_JET)
    overlay = cv2.addWeighted(cv2.cvtColor(raw, cv2.COLOR_GRAY2BGR), 0.6, heat_color, 0.4, 0)

    t = np.percentile(heat_u8, percent)
    _, mask = cv2.threshold(heat_u8, t, 255, cv2.THRESH_BINARY)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((3,3),np.uint8), 1)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((3,3),np.uint8), 1)
    cnts,_ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    for c in cnts:
        if cv2.contourArea(c) < 25: continue
        cv2.polylines(overlay, [c], True, (0,255,0), 2)
    return overlay, heat_color, mask

# demo en 8 imágenes de test
for rel,_ in items[:8]:
    p = os.path.join(TEST_DIR, rel)
    fcat = extract_concat_features(p)
    Hf,Wf = fcat.shape[-2:]
    patches = patchify(fcat)
    patches = torch.nn.functional.normalize(patches, p=2, dim=1).cpu().numpy()
    dists,_ = knn.kneighbors(patches, return_distance=True)
    patch_scores = dists.mean(axis=1).reshape(Hf,Wf).astype(np.float32)

    ov, heat_c, mask = heat_and_polys(p, patch_scores, percent=98)
    base = os.path.splitext(os.path.basename(p))[0]
    cv2.imwrite(os.path.join(viz_dir,f"{base}_overlay.png"), ov)
    cv2.imwrite(os.path.join(viz_dir,f"{base}_heat.png"), heat_c)
    cv2.imwrite(os.path.join(viz_dir,f"{base}_mask.png"), mask)

print("✅ Visuales guardadas en:", viz_dir)


✅ Visuales guardadas en: C:\Users\DELL\Desktop\UNI-LEON\DP ULE\dataset_gua_crops\cropped_images\patchcore\viz
