In [None]:
# 딥러닝 파이토치 교과서 5.3, 5.4, 5.5장 정리
https://www.notion.so/3-112e4b05256f80689ce5deff74e2499a?pvs=4

In [None]:
# 라이브러리 호출
import os
import time
import copy
import glob
import cv2
import shutil

import torch
import torchvision
import torchvision.transforms as transforms # 데이터 전처리를 위해 사용되는 패키지
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

import matplotlib.pyplot as plt

In [None]:
# 이미지 데이터 전처리 방법 정의
data_path = '../chap05/data/catanddog/train'

transform = transforms.Compose(
    [
       transforms.Resize([256, 256]),
       transforms.RandomResizedCrop(224),
       transforms.RandomHorizontalFlip(),
       transforms.ToTensor() 
    ])
train_dataset = torchvision.datasets.ImageFolder(
                data_path,
                transform=transform
)
train_loader = torch.utils.data.DataLoader(
                train_dataset,
                batch_size=32,
                num_workers=8,
                shuffle=True
)

print(len(train_dataset))

In [None]:
# 학습에 사용될 이미지 출력
samples, labels = iter(train_loader).next()
classes = {0:'cat', 1:'dog'}
fig = plt.figure(figsize=(16,24))
for i in range(24):
    a = fig.add_subplot(4, 6, i+1)
    a.set_title(classes[labels[i].item()])
    a.axis('off')
    a.imshow(np.transpose(samples[i].numpy(), (1,2,0)))
plt.subplots_adjust(bottom=0.2, top=0.6, hspace=0)


In [None]:
# 사전 훈련된 모델 내려받기기
resnet18 = models.resnet18(pretrained=True) # pretrained=True는 사전 학습된 가중치를 사용하겠다는 의미

In [None]:
# 사전 훈련된 모델의 파라미터 학습 유무 지정
def set_parameter_requires_grad(model, feature_extracting=True):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False
set_parameter_requires_grad(renet18)

In [None]:
# ResNet18에 완전연결층 추가
resnet18.fc = nn.Linear(512, 2)

In [None]:
# 모델의 파라미터 값 할인
for name, param in resnet18.named_parameters(): # model.named_parameters()는 모델에 접근하여 파라미터 값들을 가져올 때 사용
    if param.requries_grad:
        print(name, param.data)

In [None]:
# 모델 객체 생성 및 손실 함수 정의
model = models.resnet18(pretrained=True) # 모델의 객체 생성

for param in model.parameters(): # 모델의 합성곱층 가중치 고정
    param.requires_grad = False

model.fc = torch.nn.Linear(512, 2)
for param in model.fc.parameters():
    param.requires_grad = True

optimizer = torch.optim.Adam(model.fc.parameters())
cost = torch.nn.CrossEntropyLoss() # 손실 함수 정의
print(model)

In [None]:
# 모델 학습을 위한 함수 생성
def train_model(model, dataloaders, criterion, optimizer, device, num_epochs=13, is_train=True):
    since = time.time()  # 컴퓨터의 현재 시간을 구하는 함수
    acc_history = []
    loss_history = []
    best_acc = 0.0

    for epoch in range(num_epochs):  # 에포크(전체 학습 데이터셋을 모두 학습하는 과정) 반복
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in dataloaders:  # 데이터로더에 전달된 데이터만큼 반복
            inputs = inputs.to(device)
            labels = labels.to(device)

            model.to(device)
            optimizer.zero_grad()  # 기울기를 0으로 설정

            # 순전파 학습
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

            # 역전파 학습
            loss.backward()
            optimizer.step()

            # 출력 과정의 손실값과 예측 결과를 저장
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(dataloaders.dataset)  # 평균 손실 계산
        epoch_acc = running_corrects.double() / len(dataloaders.dataset)  # 평균 정확도 계산

        print('Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))

        # 최고의 성능을 기록한 모델 저장
        if epoch_acc > best_acc:
            best_acc = epoch_acc

        acc_history.append(epoch_acc.item())
        loss_history.append(epoch_loss)
        torch.save(model.state_dict(), os.path.join('../chap05/data/catanddog/',
                    '{0:0=2d}.pth'.format(epoch))) # 모델 재사용을 위해 저장해 둠.
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:0f}s'.format(time_elapsed // 60,
            time_elapsed % 60))
    print('Best Acc: {:4f}'.format(best_acc))
    return acc_history, loss_history # 모델의 정확도와 오차를 반환


In [None]:
# 파라미터 학습 결과를 옵티마이저에 전달
params_to_update = []
for name, param in resnet18.named_parameters():
    if param.requires_grad == True:
        params_to_update.append(param) # 파라미터 학습 결과를 저장
        print("/t", name)

optimizer = optim.Adam(params_to_update) # 학습 결과를 옵티마이저에 전달

In [None]:
# 모델 학습
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss() # 손실 함수 지점
train_acc_hist, train_loss_hist = train_model(resnet18, train_loader, criterion,
                                              optimizer, device)

In [None]:
# 테스트 데이터 로딩 및 전처리
test_path = '../chap05/data/catanddog/test'

transform = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
])

test_dataset = torchvision.datasets.ImageFolder(
    root=test_path,
    transform=transform
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=32,
    num_workers=1,
    shuffle=True
)

print(len(test_dataset))


In [None]:
# 테스트 데이터 평가 함수 생성
def eval_model(model, dataloaders, device):
    since = time.time()
    acc_history = []
    best_acc = 0.0

    saved_models = glob.glob('../chap05/data/catanddog/' + '*.pth')  # 저장된 .pth 파일 목록을 정렬
    saved_models.sort()
    print('saved_models:', saved_models)

    for model_path in saved_models:
        print('Loading model', model_path)
        model.load_state_dict(torch.load(model_path))
        model.eval()
        model.to(device)
        running_corrects = 0

        for inputs, labels in dataloaders:  # 테스트 반복
            inputs = inputs.to(device)
            labels = labels.to(device)

            with torch.no_grad():  # autograd를 사용하지 않겠다는 의미
                outputs = model(inputs)  # 데이터로 모델에 전달한 결과를 outputs에 저장

            _, preds = torch.max(outputs.data, 1)  # torch.max를 통해 출력 값이 가장 큰 인덱스를 예측
            preds[preds >= 0.5] = 1  # torch.max로 출력 값이 0.5보다 크면 클라스로 예측
            preds[preds < 0.5] = 0  # torch.max로 출력 값이 0.5보다 작으면 클라스로 예측
            running_corrects += preds.eq(labels.cpu().int()).sum()  # 예측 결과와 실제 값을 비교하여 맞춘 개수를 계산

        epoch_acc = running_corrects.double() / len(dataloaders.dataset)  # 테스트 데이터의 정확도 계산
        print('Acc: {:.4f}'.format(epoch_acc))

        if epoch_acc > best_acc:
            best_acc = epoch_acc

        acc_history.append(epoch_acc.item())
    
    print()
    time_elapsed = time.time() - since
    print('Validation complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best Acc: {:.4f}'.format(best_acc))

    return acc_history  # 계산된 정확도 반환


In [None]:
# 테스트 데이터를 평가 함수에 적용
val_acc_hist = eval_model(resnet18, test_loader, device)

In [None]:
# 훈련과 테스트 데이터의 정확도를 그래프로 확인
plt.plot(train_acc_hist)
plt.plot(val_acc_hist)
plt.show()

In [None]:
# 훈련 데이터의 오차에 대한 그래프 확인
plt.plot(train_loss_hist)
plt.show()

In [None]:
# 예측 이미지 출력을 위한 전처리 함수
def im_convert(tensor):
    image = tensor.clone().detach().numpy()
    image = image.transpose(1, 2, 0)
    image = image * (np.array((0.5, 0.5, 0.5)) + np.array((0.5, 0.5, 0.5)))
    image = image.clip(0, 1)
    return image

In [None]:
# 개와 고양이 예측 결과 출력
classes = {0: 'cat', 1: 'dog'}  # 개와 고양이 두 개에 대한 레이블

dataiter = iter(test_loader)  # 테스트 데이터셋을 가져옴
images, labels = dataiter.next()  # 테스트 데이터에서 이미지와 레이블을 한 번에 가져옴
output = model(images)
_, preds = torch.max(output, 1)

fig = plt.figure(figsize=(25, 4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 10, idx+1, xticks=[], yticks=[])  # 2행 10열 형태로 서브플롯 생성
    plt.imshow(im_convert(images[idx]))  # 이미지를 출력 (im_convert 함수로 전처리)
    ax.set_title(f"{classes[preds[idx].item()]} ({classes[labels[idx].item()]})", 
                 color="green" if preds[idx] == labels[idx] else "red")

plt.show()
plt.subplots_adjust(bottom=0.2, top=0.6, hspace=0)


In [None]:
# 필요한 라이브러리 호출
import matplotlib.pyplot as plt
from PIL import Image
import cv2
import torch
import torch.nn.functional as F
import torch.nn as nn
from torchvision.transforms import ToTensor
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# 설명 가능한 네트워크 생성
class XAI(nn.Module):
    def __init__(self, num_classes=2):
        super(AttrNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),

            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),

            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return F.log_softmax(x, dim=1)


In [None]:
# 모델 객체화화
model = XAI() # model이라는 이름의 객체를 생성
model.to(device) # model을 장치(CPU 혹은 GPU)에 할당
model.eval() # 테스트 데이터에 대한 모델 평가 용도로 사용

In [None]:
# 특징 맵을 확인하기 위한 클래스 정의
class LayerActivations:
    features = []

    def __init__(self, model, layer_num):
        self.hook = model[layer_num].register_forward_hook(self.hook_fn)  # Hook 함수 등록

    def hook_fn(self, module, input, output):
        self.features = output.detach().numpy()  # 출력값을 Numpy 배열로 변환하여 저장

    def remove(self):  # Hook 제거
        self.hook.remove()


In [None]:
# 이미지 호출
img = cv2.imread('../chap05/data/cat.jpg')  # 이미지 파일을 읽어옴
plt.imshow(img)  # 이미지를 출력

# 이미지 크기 조정 및 Tensor 변환
img = cv2.resize(img, (100, 100), interpolation=cv2.INTER_LINEAR)  # 이미지를 100x100 크기로 리사이즈
img = ToTensor()(img).unsqueeze(0)  # 이미지를 Tensor로 변환하고 차원을 추가
print(img.shape)  # 변환된 이미지의 shape 출력

In [None]:
# (0): Conv2d 특성 맵 확인
result = LayerActivations(model.features, 0) # 0번째 Conv2d 특성 맵 확인

model(img)
activations = result.features

In [None]:
# 특성 맵 확인
fig, axes = plt.subplots(4, 4)
fig = plt.figure(figsize=(12, 8))
fig.subplots_adjust(left=0, right=1, bottom=0, top=1, hspace=0.05, wspace=0.05)
for row in range(4):
    for column in range(4):
        axis = axes[row][column]
        axis.get_xaxis().set_ticks([])
        axis.get_xaxis().set_ticks([])
        axis.imshow(activations[0][row*10+column])
plt.show()

In [None]:
# 20번째 계층에 대한 특성 맵
result = LayerActivations(model.features, 20) # 20번째 Conv2d 특성 맵 확인

model(img)
activations = result.features

In [None]:
# 특성 맵 확인
fig, axes = plt.subplots(4, 4)
fig = plt.figure(figsize=(12, 8))
fig.subplots_adjust(left=0, right=1, bottom=0, top=1, hspace=0.05, wspace=0.05)

for row in range(4):
    for column in range(4):
        axis = axes[row][column]
        axis.get_xaxis().set_ticks([])  # x축 눈금을 없앰
        axis.get_yaxis().set_ticks([])  # y축 눈금을 없앰
        axis.imshow(activations[0][row * 10 + column])  # 활성화 맵을 시각화

plt.show()

In [None]:
# 40번째 계층에 대한 특성 맵
result = LayerActivations(model.features, 40) # 40번째 Conv2d 특성 맵 확인

model(img)
activations = result.features

In [None]:
# 특성 맵 확인
fig, axes = plt.subplots(4, 4)
fig = plt.figure(figsize=(12,8))
fig.subplots_adjust(left=0, right=1, bottom=0, top=1, hspace=0.05, wspace=0.05)
for row in range(4):
    for column in range(4):
        axis = axes[row][column]
        axis.get_xaxis().set_ticks([])
        axis.get_yaxis().set_ticks([])  # y축 눈금을 없앰
        axis.imshow(activations[0][row * 10 + column])  # 활성화 맵을 시각화

plt.show()