# PatchCore

In [None]:
import common
import sampler
import patchcore
import backbones
import utils

In [None]:
import os

# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"

import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# %matplotlib inline

from sklearn.model_selection import KFold

import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl

from PIL import Image
from torchvision.transforms import v2

from glob import glob
from tqdm import tqdm

import warnings
warnings.filterwarnings('ignore')

from argparse import ArgumentParser

parser = ArgumentParser(description="patchcore")
parser.add_argument('--image_size', default=224, type=int)
parser.add_argument('--backbone', default='wideresnet101', type=str)
parser.add_argument('--layers_to_extract_from', nargs='+', default=['layer3'], type=str)
parser.add_argument('--pretrain_embed_dimension', default=1024, type=int)
parser.add_argument('--target_embed_dimension', default=1024, type=int)
parser.add_argument('--patchsize', default=3, type=int)
parser.add_argument('--anomaly_scorer_num_nn', default=5, type=int)
parser.add_argument('--batch_size', default=32, type=int)
parser.add_argument('--cv', default=5, type=int)
parser.add_argument('--seed', default=826, type=int)
parser.add_argument('--device', nargs='+', default=[0], type=int)
parser.add_argument('--num_workers', default=0, type=int)
args = parser.parse_args('')

image_size = args.image_size
BATCH_SIZE = args.batch_size
CV = args.cv
SEED = args.seed

def set_seeds(seed=SEED):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    pl.seed_everything(SEED)

set_seeds()

## data_loader.py

In [None]:
X = []
y = []

for image_path in tqdm(glob("data/train/*")):
    X.append(image_path)
    y.append(0)
    
X_test = []
for image_path in tqdm(glob("data/test/*")):
    X_test.append(image_path)

X = np.array(X)
y = np.array(y)
X_test = np.array(X_test)

len(X), len(X_test)

In [None]:
class TransistorDataset(Dataset):
    def __init__(self, X, y=None, transform=None):
        super().__init__()        
        self.X = X
        self.y = y
        self.transform = transform
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        X = self.X[idx]
        X = Image.open(X)
        
        if self.transform is not None:
            X = self.transform(X)
        
        if self.y is None:
            return {"image": X}
        
        y = self.y[idx]        
        return {"image": X, "label": y}

In [None]:
train_transform = v2.Compose([
    v2.ToImage(),
    v2.Resize(size=(round(image_size*1.143), round(image_size*1.143))),
    v2.CenterCrop(size=(image_size, image_size)),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

val_transform = v2.Compose([
    v2.ToImage(),
    v2.Resize(size=(round(image_size*1.143), round(image_size*1.143))),
    v2.CenterCrop(size=(image_size, image_size)),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

test_transform = v2.Compose([
    v2.ToImage(),
    v2.Resize(size=(round(image_size*1.143), round(image_size*1.143))),
    v2.CenterCrop(size=(image_size, image_size)),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

## train

In [None]:
threshold_list = [] 
scores_list = [] 

kf = KFold(n_splits=CV, shuffle=True, random_state=SEED)

for i, (train_index, val_index) in enumerate(kf.split(X)):

    X_train = X[train_index]
    y_train = y[train_index]
    X_val = X[val_index]
    y_val = y[val_index]

    train_dataset = TransistorDataset(X_train, y_train, train_transform)
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=args.num_workers)
    
    val_dataset = TransistorDataset(X_val, y_val, val_transform)
    val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=args.num_workers)
    
    test_dataset = TransistorDataset(X_test, None, test_transform)
    test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=args.num_workers)

    device = utils.set_torch_device(gpu_ids=args.device)
    
    patch_core = patchcore.PatchCore(device)
    
    patch_core.load(
        backbone                 = backbones.load(args.backbone),
        layers_to_extract_from   = args.layers_to_extract_from,
        device                   = device,
        input_shape              = (3, image_size, image_size),
        pretrain_embed_dimension = args.pretrain_embed_dimension,
        target_embed_dimension   = args.target_embed_dimension,
        patchsize                = args.patchsize,
        anomaly_scorer_num_nn    = args.anomaly_scorer_num_nn,
        featuresampler           = sampler.GreedyCoresetSampler(percentage=0.1, device=device),
        nn_method                = common.FaissNN(on_gpu=False, num_workers=args.num_workers)
    )

    patch_core.fit(train_dataloader)

    scores, _ = patch_core.predict(
        val_dataloader
    )

    threshold = np.max(scores)
    print(f"threshold: {threshold}")

    scores, _ = patch_core.predict(
        test_dataloader
    )

    threshold_list.append(threshold)
    scores_list.append(scores)

In [None]:
threshold = np.mean(threshold_list)
print(f"threshold: {threshold}")

scores = np.max(scores_list, axis=0)
prediction = np.where(scores<threshold, 0, 1)

print(f"n_anomaly: {np.sum(prediction)}")

## submission

In [None]:
sample_submission = pd.read_csv('data/sample_submission.csv')
sample_submission['label'] = prediction
sample_submission.to_csv('patchcore.csv', index=False)

sample_submission.head()