In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install ngt

Collecting ngt
  Downloading ngt-2.1.6-cp310-cp310-manylinux_2_28_x86_64.whl (10.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.9/10.9 MB[0m [31m86.3 MB/s[0m eta [36m0:00:00[0m
Collecting pybind11 (from ngt)
  Downloading pybind11-2.11.1-py3-none-any.whl (227 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m227.7/227.7 kB[0m [31m31.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pybind11, ngt
Successfully installed ngt-2.1.6 pybind11-2.11.1


In [3]:
import os
import sys
import numpy as np
os.chdir('/content/drive/MyDrive/MRI_Anomaly/colab_practice')
# for path in sys.path:
#     print(path)

In [4]:
# inspection.py 불러오기

from codes import mvtecad
import numpy as np
import torch
from torch.utils.data import DataLoader
from codes.utils import PatchDataset_NCHW, NHWC2NCHW, distribute_scores
from codes.nearest_neighbor import search_NN
import os

__all__ = ['eval_encoder_NN_multiK', 'eval_embeddings_NN_multiK']

DATASET_PATH = '/content/drive/MyDrive/MRI_Anomaly'
name = 'lambda0.1_epoch50_repeat150_'

def infer(x, enc, K, S):
    x = NHWC2NCHW(x)
    dataset = PatchDataset_NCHW(x, K=K, S=S)
    loader = DataLoader(dataset, batch_size=64, shuffle=False, pin_memory=True)
    embs = np.empty((dataset.N, dataset.row_num, dataset.col_num, enc.D), dtype=np.float32)  # [-1, I, J, D]
    enc = enc.eval()
    with torch.no_grad():
        for xs, ns, iis, js in loader:
            # print(xs.shape)
            xs = xs.cuda()
            embedding = enc(xs)
            embedding = embedding.detach().cpu().numpy()

            for embed, n, i, j in zip(embedding, ns, iis, js):
                embs[n, i, j] = np.squeeze(embed)
    return embs

def assess_anomaly_maps(anomaly_maps):
    auroc_seg = mvtecad.segmentation_auroc(anomaly_maps)
    anomaly_scores = anomaly_maps.max(axis=-1).max(axis=-1)
    auroc_det = mvtecad.detection_auroc(anomaly_scores)
    return auroc_det, auroc_seg


#########################
def eval_encoder_NN_multiK(name, enc, load_emb=False):

    # best 모델로 학습 이미지를 임베딩(64*64patch) 한 결과물
    emb64_tr_filename = f'{name}_emb64_tr.npy'
    emb64_save_path = os.path.join(DATASET_PATH, emb64_tr_filename)

    # best 모델로 학습 이미지를 임베딩(32*32patch) 한 결과물
    emb32_tr_filename = f'{name}_emb32_tr.npy'
    emb32_save_path = os.path.join(DATASET_PATH, emb32_tr_filename)

    if load_emb:
        embs64_tr = np.load(emb64_save_path)
        embs32_tr = np.load(emb32_save_path)

    else:
        x_tr = mvtecad.get_x_standardized(mode='train')
        # test 폴더에서 normalized된 이미지

        # 182*182이미지를 64*64 크기의 필터와 stride는 16으로 설정하여
        # type((182-64 + 0 ) / 16 + 1) != integer 이거 크기가 귀찮게 18
        # 182*182말고 192*192 안 될까요???? 아니면 padding = 5로 설정
        # 이미지 패치를 학습된 encoder로 representation 추출
        # embs64_tr = infer(x_tr, enc, K=64, S=16)

        embs64_tr = infer(x_tr, enc, K=64, S=16)
        # print(f'embs64_tr의 shape{embs64_tr.shape}')
        np.save(emb64_save_path, embs64_tr)
        embs32_tr = infer(x_tr, enc.enc, K=32, S=4)
        # print(f'embs32_tr의 shape{embs32_tr.shape}')
        np.save(emb32_save_path, embs32_tr)

    x_te = mvtecad.get_x_standardized(mode='test')
    embs64_te = infer(x_te, enc, K=64, S=16)
    embs32_te = infer(x_te, enc.enc, K=32, S=4)
    # x_tr = mvtecad.get_x_standardized(mode='train')
    # x_te = mvtecad.get_x_standardized(mode='test')

    # 182*182이미지를 32*32 크기의 필터와 stride는 16으로 설정하여
    # type((182-32+0) / 4 + 1) != integer
    # padding을 5로 설정하는 것이 좋겠다.
    # embs32_tr = infer(x_tr, enc.enc, K=32, S=4)

    embs64 = embs64_tr, embs64_te
    embs32 = embs32_tr, embs32_te

    return eval_embeddings_NN_multiK(embs64, embs32)

# 이 함수가 실질적으로 detection, segmentation 성능을 평가
def eval_embeddings_NN_multiK(embs64, embs32, NN=1):
    emb_tr, emb_te = embs64
    maps_64 = measure_emb_NN(emb_te, emb_tr, method='kdt', NN=NN)
    # (256, 256) --> (182, 182)로 바꿈
    # map score 계산
    maps_64 = distribute_scores(maps_64, (182, 182), K=64, S=16)
    det_64, seg_64 = assess_anomaly_maps(maps_64)

    emb_tr, emb_te = embs32
    maps_32 = measure_emb_NN(emb_te, emb_tr, method='ngt', NN=NN)
    maps_32 = distribute_scores(maps_32, (182, 182), K=32, S=4)
    det_32, seg_32 = assess_anomaly_maps(maps_32)

    maps_sum = maps_64 + maps_32
    det_sum, seg_sum = assess_anomaly_maps(maps_sum)

    maps_mult = maps_64 * maps_32
    det_mult, seg_mult = assess_anomaly_maps(maps_mult)

    return {
        'det_64': det_64,
        'seg_64': seg_64,

        'det_32': det_32,
        'seg_32': seg_32,

        'det_sum': det_sum,
        'seg_sum': seg_sum,

        'det_mult': det_mult,
        'seg_mult': seg_mult,

        'maps_64': maps_64,
        'maps_32': maps_32,
        'maps_sum': maps_sum,
        'maps_mult': maps_mult
    }


########################

def measure_emb_NN(emb_te, emb_tr, method='kdt', NN=1):
    D = emb_tr.shape[-1]
    train_emb_all = emb_tr.reshape(-1, D)
    l2_maps, _ = search_NN(emb_te, train_emb_all, method=method, NN=NN)
    anomaly_maps = np.mean(l2_maps, axis=-1)

    return anomaly_maps

In [6]:
# main_visualize.py 파일
import matplotlib.pyplot as plt
from codes import mvtecad
from PIL import Image
from imageio import imread
from tqdm import tqdm
from codes.utils import resize, makedirpath
from skimage.segmentation import mark_boundaries
# from codes.inspection import eval_encoder_NN_multiK
from codes.networks import EncoderHier
import cv2

name='lambda0.1_epoch50_repeat150'

def save_overlay_images(name, maps):

    N = maps.shape[0]
    images = mvtecad.get_x(mode='test').squeeze()
    masks = mvtecad.get_mask()

    overlay_dir = os.path.join('anomaly_maps', 'test_0303', name)
    os.makedirs(overlay_dir, exist_ok=True)  # 결과를 저장할 디렉토리 생성

    for n in tqdm(range(N)):
        fig, axes = plt.subplots(ncols=2)
        fig.set_size_inches(6, 3)

        image = resize(images[n], (182, 182))
        mask = resize(masks[n], (182, 182))
        image = mark_boundaries(image, mask, color=(1, 0, 0))

        axes[0].imshow(image)
        axes[0].set_axis_off()

        axes[1].imshow(maps[n], vmax=maps[n].max(), cmap='Reds')
        axes[1].set_axis_off()

        plt.tight_layout()
        fpath = os.path.join(overlay_dir, f'n{n:03d}.png')
        makedirpath(fpath)
        plt.savefig(fpath)
        plt.close()

    # 시스템 메모리 누수 (leakage 제거를 위한)
    del maps, masks

#########################

def main():
    enc = EncoderHier(K=64, D=64).cuda()
    enc.load(name)
    enc.eval()
    results = eval_encoder_NN_multiK(name, enc, load_emb=True)
    maps = results['maps_mult']
    save_overlay_images(name, maps)


if __name__ == '__main__':
    main()

fpattern : /content/drive/MyDrive/MRI_Anomaly/test/*/*/*.png
fpattern 내 개수 : 600
Abnormal 경로 : 300
Normal 경로 : 300
(300, 182, 182)
(300, 182, 182)
이미지 텐서 shape : (600, 182, 182)
텐서 shape 변환!!! 

이미지 텐서 차원 : (600, 182, 182, 1)

fpattern : /content/drive/MyDrive/MRI_Anomaly/test/*/*/*.png
fpattern 내 개수 : 600
Abnormal 경로 : 300
Normal 경로 : 300
(300, 182, 182)
(300, 182, 182)
이미지 텐서 shape : (600, 182, 182)
텐서 shape 변환!!! 

이미지 텐서 차원 : (600, 182, 182, 1)



100%|██████████| 600/600 [01:02<00:00,  9.57it/s]
