## Setup

### Environment

In [None]:
!python -c "import monai" || pip install -q "monai-weekly[tqdm]"
!python -c "import matplotlib" || pip install -q matplotlib
%matplotlib inline

### Library

In [None]:
import os
import sys
import csv
import PIL
import copy
import torch
import random
import tempfile
import numpy as np
import pandas as pd
import nibabel as nib

from PIL import Image
from tqdm import tqdm

import keras
import tensorflow as tf
from tensorflow.keras.models import load_model

from scipy.stats import pearsonr

import monai
import matplotlib.pyplot as plt
from monai.data import Dataset, CacheDataset, DataLoader
from monai.utils import first, set_determinism

from monai.transforms import (
    Compose,
    Lambdad,
    Resized,
    Randomizable,
    EnsureChannelFirstd,
    ScaleIntensityRanged,
    RepeatChanneld,
    Transposed
)

In [None]:
directory = os.environ.get("MONAI_DATA_DIRECTORY")
root_dir = tempfile.mkdtemp() if directory is None else directory
print(root_dir)

set_determinism(42)

keras.utils.set_random_seed(42)
tf.config.experimental.enable_op_determinism()

In [None]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
    print("We got a GPU")
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
else:
    print("Sorry, no GPU for you...")

### Pytorch to Tensorflow

In [None]:
def pytorch_to_numpy(data_loader):
    images, ages = [], []
    
    for batch in data_loader:
        batch_cp = copy.deepcopy(batch)  
        
        img_batch = batch_cp["image"].numpy()  # (batch, 96, 128, 3)
        age_batch = batch_cp["age"].numpy()  # (batch, 1)
        
        del batch
        images.append(img_batch)
        ages.append(age_batch)
        
    return np.concatenate(images), np.concatenate(ages)

## Evaluate

In [None]:
# 모델 불러오기
# finetuned
checkpoint_path = './DBN_finetuned/best_90.h5'

# original
#checkpoint_path = './DeepBrainNet/Models/DBN_model.h5'
model = load_model(checkpoint_path, compile=False)

model.summary()

### Dataset & Transform

In [None]:
class TestDataset(Randomizable, CacheDataset):
    def __init__(
        self,
        root_dir,
        csv_file,
        section,
        transform=None,
        seed=0,
        cache_num=sys.maxsize,
        cache_rate=1.0,
        num_workers=0,
        progress: bool = True,
    ) -> None:
        if not os.path.isdir(root_dir):
            raise ValueError("Root directory root_dir must be a directory.")
        self.root_dir = root_dir
        self.csv_file = csv_file
        self.section = section
        self.set_random_state(seed=seed)

        data = self._generate_data_list()

        CacheDataset.__init__(
            self,
            data=data,
            transform=transform,
            cache_num=cache_num,
            cache_rate=cache_rate,
            num_workers=num_workers,
            progress=progress,
        )

    def randomize(self, data: np.ndarray) -> None:
        self.R.shuffle(data)

    def _generate_data_list(self):
        datalist = []
        with open(self.csv_file, mode='r') as file:
            reader = csv.DictReader(file)
            for row in reader:
                if self.section == 'real':
                    image_path = os.path.join(self.root_dir, f"real_{row['Filename']}")
                elif self.section == 'generated':
                    image_path = os.path.join(self.root_dir, f"generated_{row['Filename']}")
                if not os.path.exists(image_path):
                    continue
                image = np.array(Image.open(image_path).convert("L"))
                age = np.array([float(row['Age'])]).astype('float32')
                datalist.append({
                    "image": image,
                    "age": age
                })

        return datalist

    def __getitem__(self, index):
        sample = self.data[index]

        if self.transform:
            sample = self.transform(sample)
        return sample

In [None]:
# Test DataLoader
transforms = Compose(
    [
        EnsureChannelFirstd(keys=["image"], channel_dim='no_channel'),
        ScaleIntensityRanged(keys=["image"], a_min=0.0, a_max=255.0, b_min=0.0, b_max=1.0),
        Lambdad(keys=["age"], func=lambda x: torch.tensor(x, dtype=torch.float32)),
        RepeatChanneld(keys=["image"], repeats=3),  # (1, H, W) -> (3, H, W)
        Transposed(keys=["image"], indices=(1, 2, 0)),
    ]
)

In [None]:
output_dir = "./prediction_results"
csv_file = "./validation.csv"

guidance_scales = [7.0, 5.0, 3.0, 1.0]

### Real

In [None]:
# Real
real_data_dir = "./real_images/275"
csv_file = "./validation.csv"

real_ds = TestDataset(root_dir=real_data_dir, csv_file=csv_file, transform=transforms, section='real')
real_loader = DataLoader(real_ds, batch_size=8, shuffle=False, num_workers=8, persistent_workers=True)

real_images, real_ages = pytorch_to_numpy(real_loader)

# 예측 수행 및 결과 저장
predictions = []
ground_truths = []

y_pred = model.predict(real_images)
predictions = np.array(y_pred.flatten(), dtype=np.float32)  # 모델이 반환하는 형태에 따라 조정 가능
ground_truths = np.array([gt.item() for gt in real_ages], dtype=np.float32)

# 예측값과 실제값을 데이터프레임으로 저장
prediction_data = pd.DataFrame({'Ground_Truth': ground_truths, 'Prediction': predictions})
prediction_data.to_csv('./prediction_results/real.csv', index=False)

# 정확도 평가 (MAE, MSE, RMSE)
print("Real")
mae = np.mean(np.abs(predictions - ground_truths))
mse = np.mean((predictions - ground_truths)**2)
rmse = np.sqrt(mse)
r, _ = pearsonr(ground_truths, predictions)
print(f'MAE: {mae:.2f}, MSE: {mse:.2f}, RMSE: {rmse:.2f}, Pearson r: {r:.3f}')

### Generated

In [None]:
for guidance in guidance_scales:
    print(f"\n### Processing Guidance Scale: {guidance} ###")

    generated_data_dir = f"./generated_images/275_{guidance}"

    generated_ds = TestDataset(root_dir=generated_data_dir, csv_file=csv_file, transform=transforms, section='generated')
    generated_loader = DataLoader(generated_ds, batch_size=8, shuffle=False, num_workers=8, persistent_workers=True)
    
    generated_images, generated_ages = pytorch_to_numpy(generated_loader)
    
    # 예측 수행 및 결과 저장
    predictions = []
    ground_truths = []
    
    y_pred = model.predict(generated_images)
    predictions = np.array(y_pred.flatten(), dtype=np.float32)  # 모델이 반환하는 형태에 따라 조정 가능
    ground_truths = np.array([gt.item() for gt in generated_ages], dtype=np.float32)
    
    # 예측값과 실제값을 데이터프레임으로 저장
    prediction_data = pd.DataFrame({'Ground_Truth': ground_truths, 'Prediction': predictions})
    prediction_data.to_csv(f'./{output_dir}/{guidance}.csv', index=False)
    
    # 정확도 평가 (MAE, MSE, RMSE)
    mae = np.mean(np.abs(predictions - ground_truths))
    mse = np.mean((predictions - ground_truths)**2)
    rmse = np.sqrt(mse)
    r, _ = pearsonr(ground_truths, predictions)
    print(f'MAE: {mae:.2f}, MSE: {mse:.2f}, RMSE: {rmse:.2f}, Pearson r: {r:.3f}')