<a href="https://colab.research.google.com/github/YejinS/SimCLR_prostateSet_eval/blob/main/SimCLR_prostateSet_eval_recent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import sys
import numpy as np
import os
import yaml
import matplotlib.pyplot as plt
import torchvision

In [None]:
!pip install gdown

In [None]:
def get_file_id_by_model(folder_name):
  file_id = {'resnet18_100-epochs_stl10': '14_nH2FkyKbt61cieQDiSbBVNP8-gtwgF',
             'resnet18_100-epochs_cifar10': '1lc2aoVtrAetGn0PnTkOyFzPCIucOJq7C',
             'resnet50_50-epochs_stl10': '1ByTKAUsdm_X7tLcii6oAEl5qFRqRMZSu'}
  return file_id.get(folder_name, "Model not found.")

In [None]:
folder_name = 'resnet18_100-epochs_stl10'
file_id = get_file_id_by_model(folder_name)
print(folder_name, file_id)

In [None]:
# download and extract model files
os.system('gdown https://drive.google.com/uc?id={}'.format(file_id))
os.system('unzip {}'.format(folder_name))
!ls

In [None]:
from torch.utils.data import DataLoader, Dataset, TensorDataset
import torchvision.transforms as transforms
from torchvision import datasets

In [None]:
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)

## Custom Dataset (Prostate Dataset) 정의 & 불러오기

In [None]:
#Prostate Dataset 클래스 정의

import pandas as pd
from skimage import io
import cv2
from PIL import Image

class prostate_dataset(Dataset):
    def __init__(self, csv_file, root, download, split, transform=None):
        self.train_image = pd.read_csv(csv_file)
        self.valid_image = pd.read_csv(csv_file)
        self.test_image = pd.read_csv(csv_file)

        self.train_label = pd.read_csv(csv_file)
        self.valid_label = pd.read_csv(csv_file)
        self.test_label = pd.read_csv(csv_file)

        self.root = root
        self.download = download
        self.split = split
        self.transform = transform
        self.shape = self.__getshape__()


    def __len__(self):
        if self.split=="train":
            return len(self.train_image)
        elif self.split=="valid":
            return len(self.valid_image)
        elif self.split=="test": 
            return len(self.test_image)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        if self.split=='train':
            img_name = os.path.join(self.root, self.train_image.iloc[idx, 0])
            image = io.imread(img_name)
            image = torch.tensor(image)
            image = torch.unsqueeze(image, 0).repeat(3,1,1)
            label = self.train_label.iloc[idx, 1]
        elif self.split=='valid':
            img_name = os.path.join(self.root, self.valid_image.iloc[idx, 0])
            image = io.imread(img_name)
            image = torch.tensor(image)
            image = torch.unsqueeze(image, 0).repeat(3,1,1)
            label = self.valid_label.iloc[idx, 1]
        elif self.split=='test':
            img_name = os.path.join(self.root, self.test_image.iloc[idx, 0])
            image = io.imread(img_name)
            image = torch.tensor(image)
            image = torch.unsqueeze(image, 0).repeat(3,1,1)
            label = self.test_label.iloc[idx, 1]

        return image, label

    def __getshape__(self):
        return (self.__len__(), *self.__getitem__(0)[0].shape)

In [None]:
#Data Loader 정의
def get_stl10_data_loaders(download, shuffle=False, batch_size=256):
  train_dataset = datasets.STL10('./data', split='train', download=download,
                                  transform=transforms.ToTensor())

  train_loader = DataLoader(train_dataset, batch_size=batch_size,
                            num_workers=0, drop_last=False, shuffle=shuffle)
  
  test_dataset = datasets.STL10('./data', split='test', download=download,
                                  transform=transforms.ToTensor())

  test_loader = DataLoader(test_dataset, batch_size=2*batch_size,
                            num_workers=10, drop_last=False, shuffle=shuffle)
  return train_loader, test_loader

def get_cifar10_data_loaders(download, shuffle=False, batch_size=256):
  train_dataset = datasets.CIFAR10('./data', train=True, download=download,
                                  transform=transforms.ToTensor())

  train_loader = DataLoader(train_dataset, batch_size=batch_size,
                            num_workers=0, drop_last=False, shuffle=shuffle)
  
  test_dataset = datasets.CIFAR10('./data', train=False, download=download,
                                  transform=transforms.ToTensor())

  test_loader = DataLoader(test_dataset, batch_size=2*batch_size, #2*batch_size 부분 확인
                            num_workers=10, drop_last=False, shuffle=shuffle)
  return train_loader, test_loader

#영상에서 trainset이 여기서는 train_dataset
def get_prostate_data_loaders(download, shuffle=False, batch_size=16):
  train_dataset = prostate_dataset('./drive/MyDrive/ColabNotebooks/data/prostate_trainSet.csv', './drive/MyDrive/ColabNotebooks/data/datasets/', 
                                   download=download, split='train', transform=transforms.ToTensor())

  train_loader = DataLoader(train_dataset, batch_size=batch_size, num_workers=0, drop_last=False, shuffle=shuffle)

  valid_dataset = prostate_dataset('./drive/MyDrive/ColabNotebooks/data/prostate_valSet.csv', './drive/MyDrive/ColabNotebooks/data/datasets/', 
                                   download=download, split='valid', transform=transforms.ToTensor())

  valid_loader = DataLoader(valid_dataset, batch_size=batch_size, num_workers=0, drop_last=False, shuffle=shuffle)
  
  test_dataset = prostate_dataset('./drive/MyDrive/ColabNotebooks/data/prostate_testSet.csv', './drive/MyDrive/ColabNotebooks/data/datasets/', 
                                  download=download, split='test', transform=transforms.ToTensor())

  test_loader = DataLoader(test_dataset, batch_size=1, num_workers=0, drop_last=False, shuffle=shuffle)

  return train_loader, valid_loader, test_loader

In [None]:
with open(os.path.join('./config.yml')) as file:
  config = yaml.load(file, Loader=yaml.Loader)

In [None]:
if config.arch == 'resnet18':
  model = torchvision.models.resnet18(pretrained=False, num_classes=2).to(device)
elif config.arch == 'resnet50':
  model = torchvision.models.resnet50(pretrained=False, num_classes=10).to(device)

In [None]:
#checkpoint = torch.load('./drive/MyDrive/ColabNotebooks/checkpoint_0750_stl10+promiseX.pth.tar', map_location=device)
#checkpoint = torch.load('./drive/MyDrive/ColabNotebooks/checkpoint_0400.pth.tar', map_location=device)
#checkpoint = torch.load('./drive/MyDrive/ColabNotebooks/epoch500ES338lr0001wd0004batch8_T2W.pth.tar', map_location=device)
checkpoint = torch.load('./drive/MyDrive/ColabNotebooks/epoch550ES550lr0001wd0004batch8_DWIBEST.pth.tar', map_location=device)

state_dict = checkpoint['state_dict']

for k in list(state_dict.keys()): 
  #print(k)
  if k.startswith('backbone.'):
    if k.startswith('backbone') and not k.startswith('backbone.fc'):
      # remove prefix ex. backbone.layer -> layer
      state_dict[k[len("backbone."):]] = state_dict[k] #backbone.layer3.0.bn2.bias 이런식으로 되어있던 key에다가 parameter 텐서들 싹 다 backbone. 없앤 layer3.0.bn2.bias 형태의 새로운 키 이름에다가 복사해주고 원래 있언 backbone.layer3.0.bn2.bias 들의 키들은 싹 다 지워주기
  del state_dict[k] #이 셀을 거치고 나면 각 키들 앞에 "backbone" 이란 글씨랑 fc layer 부분이 사라짐

In [None]:
log = model.load_state_dict(state_dict, strict=False) #위에 새롭게 생성한 키들(backbone. 없는 새로운 이름들, fc 레이어도 없음)
assert log.missing_keys == ['fc.weight', 'fc.bias'] 

In [None]:
if config.dataset_name == 'cifar10':
  train_loader, test_loader = get_cifar10_data_loaders(download=True)
elif config.dataset_name == 'stl10':
  train_loader, test_loader = get_stl10_data_loaders(download=True)
elif config.dataset_name == 'prostate':
  train_loader, valid_loader, test_loader = get_prostate_data_loaders(download=False)
print("Dataset:", config.dataset_name)

Dataset: prostate


## 네트워크 & 모델 정의

In [None]:
# # freeze all layers but the last fc
# for name, param in model.named_parameters():
#     #print(name)
#     if name not in ['fc.weight', 'fc.bias']:
#         param.requires_grad = False 

for name, param in model.named_parameters():
      param.requires_grad = True #requires_grad = True 로 설정하면 텐서에 대한 기울기를 저장하게 됩니다.
      #print(name)

parameters = list(filter(lambda p: p.requires_grad, model.parameters()))
assert len(parameters) == 62  #requires_grad = True 된 layer 갯수 assert 문으로 확인

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.00005, weight_decay=0.1)
criterion = torch.nn.CrossEntropyLoss().to(device)

## 정확도 정의

In [None]:
import torch.nn as nn

def accuracy(output, target, topk=(1,)):  #top1 의 "값"을 뽑아야함!! confidence
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0) 

        _, pred = output.topk(maxk, 1, True, True) #torch.topk(input, k, dim=None, largest=True, sorted=True, out=None) -> (Tensor, LongTensor)
        accuracy._, accuracy.prediction = output.topk(maxk, 1, True, True)
        #accuracy.softmax = nn.functional.softmax(_)
        pred = pred.t() #t메소트 : ex) [[1,2,3],[4,5,6]] => [[1,4],[2,5],[3,6]]
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res 

## train & valid & test

In [None]:
epochs = 150
for epoch in range(epochs):
  top1_train_accuracy = 0
  train_loss =0
  valid_loss =0
  model.train()
  for counter, (x_batch, y_batch) in enumerate(train_loader):
    x_batch = x_batch.float()
    x_batch = x_batch.to(device)
    y_batch = y_batch.to(device)

    logits = model(x_batch)
    loss = criterion(logits, y_batch)
    train_loss +=loss.item()
    top1 = accuracy(logits, y_batch, topk=(1,))
    # writer.add_scalar("loss/train", loss, epoch)
    # writer.add_scalar("top1/train", top1[0], epoch)
    top1_train_accuracy += top1[0]

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  top1_train_accuracy /= (counter + 1)
  top1_valid_accuracy = 0

  # evaluate model:
  model.eval()
  with torch.no_grad():
    for counter, (x_batch, y_batch) in enumerate(valid_loader):
      x_batch = x_batch.float()
      x_batch = x_batch.to(device)
      y_batch = y_batch.to(device)

      logits = model(x_batch)
      loss = criterion(logits, y_batch)
      valid_loss +=loss.item()
      top1 = accuracy(logits, y_batch, topk=(1,))
      # writer.add_scalar("loss/valid", loss, epoch)
      # writer.add_scalar("top1/valid", top1[0], epoch)
      top1_valid_accuracy += top1[0]
  
    top1_valid_accuracy /= (counter + 1)

  train_loss = train_loss/(len(train_loader))
  valid_loss = valid_loss/(len(valid_loader))

  # writer.add_scalar("top1", {'train_acc':top1_train_accuracy.item(), 'val_acc':top1_valid_accuracy.item()}, epoch)
  # writer.add_scalar("loss", {'train_loss':train_loss, 'val_loss':valid_loss}, epoch)
  writer.add_scalar("top1/train", top1_train_accuracy.item(), epoch)
  writer.add_scalar("top1/valid", top1_valid_accuracy.item(), epoch)
  writer.add_scalar("loss/train", train_loss, epoch)
  writer.add_scalar("loss/valid", valid_loss, epoch)
  print(f"Epoch {epoch}\tTop1 Train accuracy {top1_train_accuracy.item()}\tTop1 Valid accuracy: {top1_valid_accuracy.item()}")

In [None]:
y_true = []
y_pred = []
probability=[]

top1_test_accuracy = 0
for counter, (x_batch, y_batch) in enumerate(test_loader):
  x_batch = x_batch.float()
  x_batch = x_batch.to(device)
  y_batch = y_batch.to(device)
  
  y_true.extend(y_batch.cpu().numpy()) #the .numpy() method is pretty much straightforward. It converts a tensor object into an numpy.ndarray object

  logits = model(x_batch)
  
  test_top1 = accuracy(logits, y_batch, topk=(1,))
  y_pred.extend(accuracy.prediction.cpu().numpy().tolist())
  y_pred2 = np.array(y_pred).flatten().tolist()
  softmax = nn.functional.softmax(logits, dim=1) #softmax 적용 부분
  probability.extend(softmax.cpu().detach().numpy())
  probability2 = np.array(probability).flatten()

  top1_test_accuracy += test_top1[0]

top1_test_accuracy /= (counter + 1)

print(f"Total \tTop1 Test accuracy: {top1_test_accuracy.item()}") 

In [None]:
print(y_true)
print(y_pred2)
print(probability2.round(2))

In [None]:
from sklearn.metrics import confusion_matrix
cf_matrix = confusion_matrix(y_true, y_pred2)

In [None]:
class_names = ('0','1') #label0==g1 label1==g2

# Create pandas dataframe
dataframe = pd.DataFrame(cf_matrix, index=class_names, columns=class_names)
dataframe

In [None]:
import seaborn as sns
# Create heatmap
matrixGraph = sns.heatmap(dataframe, annot=True, cbar=None, cmap="YlGnBu",fmt="d")
matrixGraph.xaxis.tick_top()
matrixGraph.xaxis.set_label_position('top')
matrixGraph.invert_yaxis()
matrixGraph.invert_xaxis()

plt.title("Confusion Matrix"), plt.tight_layout()
plt.ylabel("True Class"), 
plt.xlabel("Predicted Class")
plt.show() 

In [None]:
TP = cf_matrix[1,1] # true positive
TN = cf_matrix[0,0] # true negatives
FP = cf_matrix[1,0] # false positives
FN = cf_matrix[0,1] # false negatives

P=TP+FN
N=TN+FP

TP, TN, FP, FN

(4, 23, 24, 11)

In [None]:
sensitivity = TP/(TP+FN)
specificity = TN/(FP+TN)
ppv = TP/(TP+FP)
npv = TN/(TN+FN)

In [None]:
print(round(top1_test_accuracy.item(),2),", ,") 
print(sensitivity.round(4)*100,",")
print(specificity.round(4)*100,",")
print(ppv.round(4)*100,",")
print(npv.round(4)*100)

In [None]:
#print(sensitivity.round(4)*100,",  ",specificity.round(4)*100,",  ",ppv.round(4)*100,", ",npv.round(4)*100)

In [None]:
writer.close()

## 학습 결과 나타내기(Tensorboard)

In [None]:
%reload_ext tensorboard
%tensorboard --logdir=runs

In [None]:
#!pwd

In [None]:
#!rm -r ./runs