In [89]:
import sys
sys.path.append('./MONAI')
import os
import random
import math
import numpy as np
import torch
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import cv2
import matplotlib.pyplot as plt
from datetime import datetime
from torchvision import models
from tensorboardX import SummaryWriter
from config import parse_arguments
from datasets import DiseaseDataset
from utils_folder.utils import AverageMeter, ProgressMeter
from utils_folder.eval_metric import *


In [90]:
# 배열로 변환하는 헬퍼 함수
# 모델의 출력을 numpy 배열로 변환하기 위한 함수
# 입력 텐서를 CPU로 이동시키고, detach하여 연산 그래프에서 분리한 뒤
# numpy로 변환한 후, (batch, height, width, channels) 형태로 차원을 재배치합니다.
fn_tonumpy = lambda x: x.to('cpu').detach().numpy().transpose(0, 2, 3, 1)

# ViT Attention Rollout 클래스
# Vision Transformer(ViT) 모델의 attention 맵을 추출하고 시각화하기 위한 클래스
class VITAttentionRollout:
    def __init__(self, model, attention_layer_name='self_attention', head_fusion="mean", discard_ratio=0.9):
        """
        VITAttentionRollout 초기화 함수
        Args:
            model: Vision Transformer 모델
            attention_layer_name: attention 레이어 이름, forward hook을 등록할 레이어
            head_fusion: 여러 attention head를 병합하는 방식 ('mean', 'max', 'min' 중 하나)
            discard_ratio: attention을 시각화할 때, 상위 discard_ratio 비율의 주의값을 제거하는 비율
        """
        self.model = model  # ViT 모델 저장
        self.head_fusion = head_fusion  # attention head 병합 방식
        self.discard_ratio = discard_ratio  # discard ratio 설정
        self.attentions = []  # attention 값을 저장할 리스트

        # 모델의 지정된 attention 레이어에 forward hook 등록
        for name, module in self.model.named_modules():
            if attention_layer_name in name:
                module.register_forward_hook(self.get_attention)  # get_attention 함수로 hook 설정

    def get_attention(self, module, input, output):
        """
        forward hook에서 호출되는 함수로, 모델의 attention 맵을 저장합니다.
        Args:
            module: 현재 레이어 모듈
            input: 레이어로 들어오는 입력 (사용되지 않음)
            output: 레이어의 출력, attention 맵을 포함
        """
        # 출력이 tuple 형태이면 첫 번째 요소를 사용하여 attention 맵을 추출하고, CPU로 이동한 뒤 리스트에 추가
        attention_map = output[0].cpu() if isinstance(output, tuple) else output.cpu()
        self.attentions.append(attention_map)  # attention 맵을 리스트에 저장

    def __call__(self, input_tensor):
        """
        모델의 attention 맵을 얻기 위해 객체를 함수처럼 호출합니다.
        Args:
            input_tensor: 모델에 입력할 이미지 텐서
        Returns:
            attention_map: attention rollout을 적용한 결과 attention 맵
        """
        self.attentions = []  # 새로운 호출마다 attention 리스트를 초기화
        with torch.no_grad():  # backpropagation 비활성화 (메모리 절약)
            output = self.model(input_tensor)  # 모델 실행
        # 저장된 attention 맵들에 rollout 함수 적용
        return rollout(self.attentions, self.discard_ratio, self.head_fusion)


In [91]:
def rollout(attentions, discard_ratio, head_fusion):
    """
    Rollout 함수를 사용하여 attention 레이어들의 attention 맵을 종합해 시각화 가능한 최종 attention 맵을 생성합니다.
    Args:
        attentions: attention 레이어들의 attention 맵 리스트
        discard_ratio: 낮은 attention 값의 일부를 제거하기 위한 비율
        head_fusion: 여러 attention head를 병합하는 방식 ('mean', 'max', 'min' 중 하나)
    Returns:
        최종적으로 생성된 attention 맵
    """
    # 첫 attention의 시퀀스 길이를 사용하여 초기 result 설정
    sequence_length = attentions[0].size(-1)
    result = torch.eye(sequence_length).unsqueeze(0).to(attentions[0].device)  # (1, 768, 768)로 초기화
    print("1. Initial result size:", result.size())

    # 각 attention 레이어를 처리하여 최종 attention 맵을 생성
    with torch.no_grad():
        for attention in attentions:
            # 여러 attention head 병합
            if head_fusion == "mean":
                attention_heads_fused = attention.mean(dim=1)  # 각 head의 평균
            elif head_fusion == "max":
                attention_heads_fused = attention.max(dim=1)[0]  # 각 head의 최대값
            elif head_fusion == "min":
                attention_heads_fused = attention.min(dim=1)[0]  # 각 head의 최소값
            else:
                raise ValueError("Unsupported attention head fusion type")

            print("2. attention_heads_fused size:", attention_heads_fused.size())

            # 행렬 곱을 수행하기 위해 a 생성
            a = (attention_heads_fused + 1.0) / 2  # attention 값을 0~1로 정규화
            a = a / a.sum(dim=-1, keepdim=True)  # 각 행의 합이 1이 되도록 정규화
            a = a.unsqueeze(-1)  # (1, 768, 1)로 만듦
            print("3. a size:", a.size())

            # result와 a의 차원을 맞추고 행렬 곱 수행
            print("Result before matmul:", result.size(), "  a size:", a.size())
            result = torch.matmul(result, a)
            print("4. result size after matmul:", result.size())  # 중간 결과 확인용

            # result의 마지막 차원이 축소되지 않았는지 확인
            if result.size(-1) != sequence_length:
                raise RuntimeError(f"Unexpected result shape: {result.size()}")

    # 최종적으로 클래스 토큰 제외 후 이미지 패치에 대한 attention 맵 생성
    mask = result.squeeze(-1)[0, 1:]  # 첫 번째 배치의 첫 번째 토큰 제외
    print("5. Final mask size:", mask.size())
    width = int(mask.size(-1) ** 0.5)  # 이미지 크기 추정
    mask = mask.reshape(width, width).cpu().numpy()  # 2D 형태로 변환 후 numpy로 변환
    mask = mask / np.max(mask)  # 0~1로 정규화
    return mask


In [92]:
def show_mask_on_image(img, mask):
    """
    입력 이미지 위에 attention 맵(mask)을 겹쳐서 시각화하는 함수입니다.
    Args:
        img: 원본 이미지 (numpy 배열)
        mask: attention 맵 (numpy 배열, 0~1 범위로 정규화된 형태)
    Returns:
        원본 이미지 위에 attention 맵을 덧씌운 결과 이미지 (numpy 배열, uint8 형식)
    """
    # 원본 이미지를 0~1 사이로 정규화
    img = np.float32(img) / 255

    # attention 맵을 히트맵으로 변환
    heatmap = cv2.applyColorMap(np.uint8(255 * mask), cv2.COLORMAP_JET)  # mask를 0~255 사이로 스케일링 후 COLORMAP_JET 적용
    heatmap = np.float32(heatmap) / 255  # 0~1 사이로 정규화하여 원본 이미지와 결합 준비

    # 원본 이미지에 히트맵을 겹침
    cam = heatmap + np.float32(img)
    cam = cam / np.max(cam)  # 전체 픽셀 값의 최대값을 1로 정규화

    # 최종 이미지를 uint8 형식으로 변환하여 반환 (0~255 사이 값)
    return np.uint8(255 * cam)


In [93]:
def evaluate_cam(args, loader, model, device, num_classes, class_list, log_dir):
    """
    ViT 모델의 attention 맵을 생성하고 원본 이미지와 결합하여 시각화하는 함수입니다.
    
    Args:
        args: 설정 파라미터를 포함한 객체
        loader: 데이터 로더 객체
        model: Vision Transformer(ViT) 모델
        device: 모델과 데이터를 실행할 장치(CPU 또는 GPU)
        num_classes: 분류할 클래스의 수
        class_list: 클래스 이름 목록
        log_dir: 시각화 결과를 저장할 디렉토리 경로
    """
    model.eval()  # 모델을 평가 모드로 전환
    save_dir = log_dir + '/vit_attention'  # 결과 저장 경로 설정
    os.makedirs(save_dir, exist_ok=True)  # 저장 경로가 없으면 생성

    # ViT Attention Rollout 객체 생성
    vit_attention = VITAttentionRollout(model)

    # 데이터 로더를 통해 이미지와 라벨을 가져옴
    for iter_, (imgs, labels) in enumerate(iter(loader)):
        print(f"Iteration {iter_}: imgs shape = {imgs.shape}, labels shape = {labels.shape}")  # 이미지와 라벨 크기 출력

        imgs = imgs.to(device)  # 이미지를 장치로 이동
        labels = labels.to(device, dtype=torch.long)  # 라벨을 장치로 이동

        # Attention 히트맵 생성
        attention_mask = vit_attention(imgs)  # ViT 모델로부터 attention 맵 생성
        print(f"Attention mask shape: {attention_mask.shape}")  # Attention 마스크 크기 확인

        # attention 맵을 원본 이미지 크기로 리사이즈
        attention_mask = cv2.resize(attention_mask, (imgs.shape[-1], imgs.shape[-2]))

        # 이미지를 numpy 배열로 변환
        gt_imgs = fn_tonumpy(imgs)
        print(f"gt_imgs shape: {gt_imgs.shape}")  # 배열 크기 확인

        cam_imgs = gt_imgs[0].squeeze()  # 첫 번째 배치의 이미지 선택 및 차원 축소

        # attention 맵을 원본 이미지에 겹침
        attention_result = show_mask_on_image(cam_imgs, attention_mask)

        # 결합된 이미지 생성: 왼쪽에 원본 이미지, 오른쪽에 attention 맵이 적용된 이미지
        full_image = np.zeros((224, 448, 3))  # 빈 이미지 생성
        full_image[:224, :224, :] = cam_imgs * 255  # 왼쪽에 원본 이미지 추가
        full_image[:224, 224:448, :] = attention_result  # 오른쪽에 attention 이미지 추가

        # 결과 이미지를 파일로 저장
        img_path = os.path.join(save_dir, f'{iter_}_attention.jpg')
        cv2.imwrite(img_path, full_image)

        # 이미지 시각화
        plt.figure(figsize=(12, 6))
        plt.subplot(1, 2, 1)
        plt.imshow(cam_imgs, cmap='gray')
        plt.title('Original Image')
        plt.axis('off')

        plt.subplot(1, 2, 2)
        plt.imshow(attention_result[..., ::-1])  # 컬러 순서 변경하여 출력
        plt.title('Attention Image')
        plt.axis('off')

        # 실제 라벨을 문자열로 변환하여 출력
        actual_labels = labels[0].cpu().numpy()  # 첫 번째 이미지의 라벨을 가져옴
        actual_labels_str = ', '.join([class_list[i] for i in range(num_classes) if actual_labels[i] == 1])
        plt.suptitle(f'Actual Labels: {actual_labels_str}')
        plt.show()

        # 실제 라벨 정보 출력
        print("Actual Labels: ", actual_labels)


In [94]:
import os
import random
import pathlib
import torch
import pandas as pd
import json  # json 모듈 임포트
import numpy as np


def main():
    ##### GPU 메모리 정리
    # 이전에 사용된 GPU 메모리를 정리
    if torch.cuda.is_available():
        print("[INFO] Clearing GPU memory...")
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()
        print("[INFO] GPU memory cleared.")

    ##### GPU 1번 지정
    # 특정 GPU를 사용하도록 설정 (GPU 1번)
    device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')
    print(f"[INFO] Using device: {device}")

    ##### Initial Settings

    filename = '/211113_vit_b16_uni_224_64_1e-3'
    class Args:
        def __init__(self):
            self.num_class = 14
            self.backbone = 'vit'
            self.log_dir = './runs' + filename
            self.img_size = 224
            self.bits = 8
            self.seed = 50
            self.w = 4
            self.resume = True
            self.pretrained = './runs' + filename + '/best.pth.tar'

    args = Args()

    # 클래스 리스트 정의
    class_list = ['Atelectasis', 'Cardiomegaly', 'Consolidation', 'Edema', 
                   'Enlarged Cardiomediastinum', 'Fracture', 'Lung Lesion',
                   'Lung Opacity', 'No Finding', 'Pleural Effusion', 'Pleural Other',
                   'Pneumonia', 'Pneumothorax', 'Support Devices']
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    # 모델 선택 및 로드
    print('[*] build network... backbone: {}'.format(args.backbone))
    if args.backbone == 'vit':
        vit = models.vit_b_16(pretrained=True)
        head_in_features = vit.heads.head.in_features
        vit.heads.head = nn.Linear(head_in_features, 14)
        model = vit
    else:
        raise ValueError('Have to set the backbone network in [resnet, vgg, densenet]')

    model = model.to(device)

    if args.resume:
        checkpoint = torch.load(args.pretrained, map_location=device)
        pretrained_dict = checkpoint['state_dict']
        pretrained_dict = {key.replace("module.", ""): value for key, value in pretrained_dict.items()}
        model.load_state_dict(pretrained_dict)
        print("Load model completed")
    else:
        raise ValueError('Have to input a pretrained network path')

    # 데이터셋 및 데이터로더 설정
    test_datasets = DiseaseDataset('./json' + filename + '.json', 'test', args.img_size, args.bits, args)
    test_loader = torch.utils.data.DataLoader(test_datasets, batch_size=1,
                                              num_workers=args.w, pin_memory=True, drop_last=True)

    # 테스트 시작
    print('[*] start a test')
    evaluate_cam(args, test_loader, model, device, args.num_class, class_list, args.log_dir)


if __name__ == '__main__':
    main()


[INFO] Clearing GPU memory...
[INFO] GPU memory cleared.
[INFO] Using device: cuda:1
[*] build network... backbone: vit
Load model completed
[*] start a test
Iteration 0: imgs shape = torch.Size([1, 3, 224, 224]), labels shape = torch.Size([1, 14])
1. Initial result size: torch.Size([1, 768, 768])
2. attention_heads_fused size: torch.Size([1, 768])
3. a size: torch.Size([1, 768, 1])
Result before matmul: torch.Size([1, 768, 768])   a size: torch.Size([1, 768, 1])
4. result size after matmul: torch.Size([1, 768, 1])


RuntimeError: Unexpected result shape: torch.Size([1, 768, 1])