[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/babbu3682/Med_ChatGPT_tutorial/blob/main/Notebook/template/Template_Test.ipynb)

In [1]:
cd /workspace/sunggu/7.Mentor/Med_ChatGPT_tutorial

/workspace/sunggu/7.Mentor/Med_ChatGPT_tutorial


In [2]:
!nvidia-smi
import os
os.environ["CUDA_DEVICE_ORDER"]     =  'PCI_BUS_ID'
os.environ["CUDA_VISIBLE_DEVICES"]  =  '0'
print("CPU 갯수 = ", os.cpu_count())

Tue Jul  4 10:49:38 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.182.03   Driver Version: 470.182.03   CUDA Version: 11.4     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P40           Off  | 00000000:1B:00.0 Off |                    0 |
| N/A   34C    P8    10W / 250W |      2MiB / 22919MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
|   1  Tesla P40           Off  | 00000000:1C:00.0 Off |                    0 |
| N/A   38C    P8    12W / 250W |      2MiB / 22919MiB |      0%      Default |
|       

In [None]:
# !pip install torch==1.13.1+cu117 torchvision==0.14.1+cu117 torchaudio==0.13.1 --extra-index-url https://download.pytorch.org/whl/cu117
# !pip freeze > /workspace/sunggu/0.Challenge/requirements.txt
!pip install -r requirements.txt

# 1. Fix Seed

In [3]:
import random
import numpy as np
import torch

# 시드(seed) 설정
seed = 42

# Python의 random 모듈 시드 설정
random.seed(seed)

# Numpy 시드 설정
np.random.seed(seed)

# Torch 시드 설정
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# 2. Log 조사

In [None]:
'''
[Log analysis]
I want you to act as an AI developer in pytorch and python code for me. 
Please help with coding for implementing the ability to read data from a log file and visualize it in a graph.

Please conduct step by step using the following procedure:
	Step 1: Import the necessary libraries and modules: glob, numpy, matplotlib.pyplot.
	Step 2: Define a read_log function to read a log file. This function takes a file path as an argument, reads the log file, and returns it as a list.
	Step 3: Read the log file using the read_log function. Specify the path as '/workspace/sunggu/0.Challenge/Med_tutorial_ChatGPT/checkpoints/230615_ResNet50_L2_Reg/log.txt'. The read log file is stored in the log_list variable.
	Step 4: Create a dictionary, result_dict, to store the results. Initialize result_dict with the keys from the first dictionary in log_list.
	Step 5: Extract the value corresponding to each key in result_dict from log_list and save it as a list.
	Step 6: Generate a graph. To visualize the results, use matplotlib.pyplot's subplots function. The number of graphs is equal to the number of keys in result_dict, and they are laid out in a vertical orientation. Set the size of the graph via figsize.
	Step 7: Generate a graph for each key in result_dict through a for loop. axs[idx] indicates the subplot to plot at the position corresponding to idx.
	Step 8: Plot the data on the generated graphs. result_dict[key] takes the data corresponding to key and plots the graph. axs[idx].set_title(key) sets the title of the graph to key.
	Step 9: Display the graph. After adjusting the spacing of the graph with plt.tight_layout(), call plt.show() to output the graph.
'''

In [4]:
from transformers import ViTForImageClassification
import torch.nn as nn

model = ViTForImageClassification.from_pretrained("google/vit-base-patch16-224")

# 3. Dataset

In [None]:
import os
import re
import cv2
import skimage
import pydicom
import pandas as pd
import albumentations as A
from pydicom.pixel_data_handlers.util import apply_modality_lut, apply_voi_lut
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset

def list_sort_nicely(l):
    def convert(text): return int(text) if text.isdigit() else text
    def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', key)]
    return sorted(l, key=alphanum_key)

def fixed_clahe(image, **kwargs):
    clahe_mat = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    return clahe_mat.apply(image)

def change_to_uint8(image, **kwargs):
    return skimage.util.img_as_ubyte(image)

def change_to_float32(image, **kwargs):
    return skimage.util.img_as_float32(image)

def min_max_normalization(image, **kwargs):
    if len(np.unique(image)) != 1:
        image = image.astype('float32')
        image -= image.min()
        image /= image.max() 
    return image

def get_transforms():
    # medical augmentation
    return A.Compose([
        # Preprocessing
        A.Resize(224, 224),
        A.Lambda(image=min_max_normalization, always_apply=True),
        A.Lambda(image=change_to_uint8, always_apply=True),
        A.Lambda(image=fixed_clahe, always_apply=True),
        A.Lambda(image=change_to_float32, always_apply=True),
        
        # Normalize
        A.Lambda(image=min_max_normalization, always_apply=True),
        A.Normalize(max_pixel_value=1.0, mean=0.5, std=0.5),
        ToTensorV2()
    ])


class RSNA_Dataset(Dataset):
    def __init__(self, mode="test"):
        self.root       = '/workspace/sunggu/0.Challenge/Med_tutorial_ChatGPT/dataset/rsna_data.csv'
        temp_df         = pd.read_csv(self.root)
        self.data_df    = temp_df[temp_df['mode'] == mode]
        self.transforms = get_transforms()
        print(f"len of data: {len(self.data_df)}")

    def __len__(self):
        return len(self.data_df)

    def __getitem__(self, idx):
        img_path = self.data_df['path'].iloc[idx]
        dcm_data = pydicom.dcmread(img_path)
        temp_img = apply_modality_lut(dcm_data.pixel_array, dcm_data)   
        image    = apply_voi_lut(temp_img, dcm_data)                             

        label    = self.data_df['cancer'].iloc[idx]
        label    = torch.tensor(label).float().unsqueeze(0)

        # add channel axis
        image    = np.expand_dims(image, axis=-1)
        image    = self.transforms(image=image)['image']
        
        return image, label
    

In [None]:
from torch.utils.data import DataLoader

# 1. Create Dataset
test_dataset = RSNA_Dataset(mode="test")

# 2. Create DataLoader
test_loader  = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=4)

# 4. Network

In [None]:
import torch.nn as nn
import torchvision

class ResNet50(nn.Module):
    def __init__(self, pretrained=True):
        super(ResNet50, self).__init__()
        self.model = torchvision.models.resnet50(pretrained=pretrained)
        self.model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.model.fc = nn.Linear(2048, 1)
        
    def forward(self, x):
        return self.model(x)


In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

model = ResNet50()
num_params = count_parameters(model)
print("Number of learnable parameters: ", num_params)

# 5. Using GPU testing

In [None]:
# using CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# 6. Loss

In [None]:
import torch
import torch.nn as nn

criterion = nn.BCEWithLogitsLoss()

# 7. Check the resume point

In [None]:
checkpoint_path = '/workspace/sunggu/0.Challenge/Med_tutorial_ChatGPT/checkpoints/230615_ResNet50_L2_Reg'
save_dir        = '/workspace/sunggu/0.Challenge/Med_tutorial_ChatGPT/predictions/230615_ResNet50_L2_Reg'

# make folder if not exist
os.makedirs(checkpoint_path, exist_ok =True)
os.makedirs(save_dir, exist_ok =True)

In [None]:
# Resume
# Metric  =  valid_F1
# Argsort =  [ 7 10 14 12  6]
# Value   =  [0.6940299 0.6783217 0.6756757 0.6748971 0.6641791]

filename = 'epoch_7_checkpoint.pth'
print("=> loading checkpoint '{}'".format(filename))
checkpoint  = torch.load(os.path.join(checkpoint_path, filename))
model.load_state_dict(checkpoint['model_state_dict'])

# 8. Metric

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score, accuracy_score, f1_score, recall_score, confusion_matrix, precision_score

def calculate_metrics(predictions, targets):
    # AUC 계산
    auc = roc_auc_score(targets, predictions)

    # 정확도 계산
    accuracy = accuracy_score(targets, np.round(predictions))

    # F1 점수 계산
    f1 = f1_score(targets, np.round(predictions))

    # 민감도 (재현율) 계산
    sensitivity = recall_score(targets, np.round(predictions))

    # 특이도 계산
    tn, fp, fn, tp = confusion_matrix(targets, np.round(predictions)).ravel()
    specificity = tn / (tn + fp)

    return auc, accuracy, f1, sensitivity, specificity

# 9. Test Loop

In [None]:
'''
[Define Test Loop function]
I want you to act as an AI developer in pytorch and python code for me. 
Please help with writing a code that performs testing on a model using a testing loop. 
In this code, you will use the AverageMeter class to calculate the average of the metrics, and the test_loop_fn function to perform the testing process.
The code includes functionalities such as calculating metrics, generating Grad-CAM visualizations, and saving the visualizations.
In addition, the test_loop_fn function will utilize a loss function (criterion) to calculate the loss, which will be used to calculate the metrics using 'calculate_metrics' function (accuracy, F1, AUC, sensitivity, and specificity).
The model predicts cancer within an image in a binary classification manner.

Please conduct step by step using the following procedure:
	Step 1: Import the necessary libraries and modules for the code, including time, math, json, tqdm, datetime, matplotlib.pyplot, and defaultdict.
	Step 2: Define the AverageMeter class, which will be used to calculate the average of metrics.
	Step 3: Define the forward_hook function, which serves as the forward hook for capturing intermediate activations.
	Step 4: Define the backward_hook function, which serves as the backward hook for capturing gradients.
	Step 5: Define the min_max_normalization function, which performs min-max normalization on the input image.
	Step 6: Define the test_loop_fn function, which performs testing using the provided model, criterion, and data. The function takes the test_loader, model, criterion, device, and save_dir as arguments.
	Step 7: Set the model to evaluation mode.
	Step 8: Initialize the metric_logger to track the metrics during testing.
	Step 9: Iterate over the batches in the test_loader using the tqdm iterator.
	Step 10: Get the input images and targets from the batch.
	Step 11: Register the forward_hook and backward_hook on the specified layer of the model (resnet50's layer4[2].conv3).
	Step 12: Pass the input images through the model to obtain the logits.
	Step 13: Calculate the loss using the criterion.
	Step 14: Update the metric_logger with the loss value.
	Step 15: Perform post-processing steps, such as storing the predicted probabilities and ground truth values.
	Step 16: Calculate the Grad-CAM visualization by computing the gradients and activations.
	Step 17: Resize the Grad-CAM heatmap to the original image size.
	Step 18: Visualize the image and Grad-CAM heatmap using matplotlib.pyplot.
	Step 19: Save the visualization to a file in the specified save_dir.
	Step 20: Remove the forward_hook and backward_hook.
	Step 21: Concatenate the predicted probabilities and ground truth values to calculate metrics.
	Step 22: Calculate the metrics (AUC, accuracy, F1, sensitivity, specificity) using the calculate_metrics function.
	Step 23: Update the metric_logger with the calculated metrics.
	Step 24: Return the average of the metrics and the ground truth values.

Here is a valid_loop example, so you can refer to it:
<
@torch.no_grad()
def valid_loop_fn(valid_loader, model, criterion, device):
    model.eval()
    metric_logger = AverageMeter()
    # epoch_iterator = tqdm(valid_loader, desc="Validating")
    epoch_iterator = tqdm(valid_loader, desc="Validating (X / X Steps) (loss=X.X)", dynamic_ncols=True, total=len(valid_loader))

    preds = []
    gts = []
    for batch_data in epoch_iterator:
        image, target = batch_data
        image, target = image.to(device), target.to(device)

        logit = model(image)
        loss = criterion(logit, target)
        loss_value = loss.item()

        if not math.isfinite(loss_value):
            print("Loss is {}, stopping training".format(loss_value))

        metric_logger.update(key='valid_loss', value=loss_value, n=image.shape[0])
        # epoch_iterator.set_postfix(loss=loss_value)
        epoch_iterator.set_description("Validating: Epochs %d (%d / %d Steps), (valid_loss=%2.5f)" % (epoch, step, len(valid_loader), loss_value))

        preds.append(logit.sigmoid().squeeze().detach().cpu().numpy())
        gts.append(target.squeeze().detach().cpu().numpy())

    preds = np.concatenate(preds)
    gts = np.concatenate(gts)

    # Calculate metrics
    auc, accuracy, f1, sensitivity, specificity = calculate_metrics(preds, gts)

    metric_logger.update(key='valid_loss', value=loss.item(), n=image.size(0))
    metric_logger.update(key='valid_auc', value=auc, n=image.size(0))
    metric_logger.update(key='valid_accuracy', value=accuracy, n=image.size(0))
    metric_logger.update(key='valid_f1', value=f1, n=image.size(0))
    metric_logger.update(key='valid_sensitivity', value=sensitivity, n=image.size(0))
    metric_logger.update(key='valid_specificity', value=specificity, n=image.size(0))

    return metric_logger.average()
>
'''

In [None]:
# AverageMeter

# forward hook 정의

# backward hook 정의

# min_max_normalization 정의

# test_loop_fn 정의
def test_loop_fn(valid_loader, model, criterion, device, save_dir):
    # Set model to evaluation mode
    
    # Initialize metric_logger to track metrics during testing 

    # Iterate over batches in the valid_loader using tqdm iterator
    for step, batch_data in enumerate(epoch_iterator):
        
        # register forward hook and backward hook
        
        # forward

        # post-processing

        # remove hook

        # get gradients

        # upsampling

        # 시각화

        # hook 제거

    # post-processing

    # Calculate metrics



In [None]:
import warnings
warnings.filterwarnings(action='ignore')

print(f"Start training")
start_time = time.time()
    
test_stats = test_loop_fn(test_loader, model, criterion, device, save_dir)
print('==> Averaged Test stats: ' + str(test_stats))

# Finish
total_time = time.time() - start_time
total_time_str = str(datetime.timedelta(seconds=int(total_time)))
print('Training time {}'.format(total_time_str))